# all2graph基础教程

all2graph是基于图神经网络的深度学习框架，其基本思想在于：<br>
1、将数据中的“实体”、“事件”和“属性”转化成图上的点<br>
2、将数据中的“从属关系“、”关联关系”和“时序关系”转化成图上的边<br>
3、将传统的特征工程转化为“图注意力机”算子<br>
以上的三个特性使得all2graph能基本涵盖业务中的大部分数据，并以一种通用的方式进行一般化的建模

all2graph的建模流程分为四个主要步骤<br>
1、数据预处理<br>
2、图结构解析<br>
3、模型构建<br>
4、训练

本教程将结合一个虚拟货币价格预测的案例来讲解all2graph的使用方法<br>
数据下载地址：<br>
https://www.kaggle.com/c/g-research-crypto-forecasting<br>
注：本教程出现的数据处理只是为了更好的展示主要功能，并不是预测该数据源的最佳实践！

## 1. 数据预处理
all2graph使用json作为主要的数据输出格式<br>
首先要将一个样本的所有数据组合成一个json字符串

In [1]:
import os
from datetime import datetime as dtt

import all2graph as ag
import numpy as np
import pandas as pd
import torch
from tqdm import tqdm
from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score

Using backend: pytorch


In [2]:
os.chdir('/Users/chenxiaotian/Downloads/g-research-crypto-forecasting/')

In [3]:
asset_detail_df = pd.read_csv('asset_details.csv')
asset_detail_df

Unnamed: 0,Asset_ID,Weight,Asset_Name
0,2,2.397895,Bitcoin Cash
1,0,4.304065,Binance Coin
2,1,6.779922,Bitcoin
3,5,1.386294,EOS.IO
4,7,2.079442,Ethereum Classic
5,6,5.894403,Ethereum
6,9,2.397895,Litecoin
7,11,1.609438,Monero
8,13,1.791759,TRON
9,12,2.079442,Stellar


In [4]:
train_df = pd.read_csv('train.csv', nrows=10000)
train_df = train_df.merge(asset_detail_df, on='Asset_ID')
train_df

Unnamed: 0,timestamp,Asset_ID,Count,Open,High,Low,Close,Volume,VWAP,Target,Weight,Asset_Name
0,1514764860,2,40.0,2376.580000,2399.50,2357.14,2374.590000,19.233005,2373.116392,-0.004218,2.397895,Bitcoin Cash
1,1514764920,2,53.0,2374.553333,2400.90,2354.20,2372.286667,24.050259,2371.434498,-0.004079,2.397895,Bitcoin Cash
2,1514764980,2,61.0,2371.633333,2401.90,2353.70,2372.063333,42.676438,2375.442755,-0.002892,2.397895,Bitcoin Cash
3,1514765040,2,95.0,2376.060000,2406.40,2344.00,2370.566667,37.820918,2371.096152,-0.003718,2.397895,Bitcoin Cash
4,1514765100,2,33.0,2372.656667,2404.60,2343.40,2370.173333,8.519679,2370.345730,-0.002171,2.397895,Bitcoin Cash
...,...,...,...,...,...,...,...,...,...,...,...,...
9995,1514842500,11,6.0,340.320000,340.50,339.51,340.180000,8.915547,339.896497,-0.010872,1.609438,Monero
9996,1514842560,11,6.0,342.655000,345.85,339.38,342.615000,6.252500,342.649362,0.005154,1.609438,Monero
9997,1514842620,11,5.0,339.380000,339.38,338.51,338.550000,2.960035,338.659290,-0.007088,1.609438,Monero
9998,1514842680,11,8.0,342.085000,344.76,338.40,341.410000,7.420612,341.437057,0.003703,1.609438,Monero


In [5]:
def foo(df, n):
    assert df.timestamp.unique().shape[0] == df.shape[0]
    assert (df.timestamp.diff().dropna() > 0).all()
    
    output = []
    for i, row in tqdm(df.iterrows(), total=df.shape[0], ascii=True):
        sample = dict(row[['timestamp', 'Asset_ID', 'Target']])
        sub_df = df.iloc[max(i-n, 0):(i+1)].copy()
        sub_df['time_diff'] = row['timestamp'] - sub_df['timestamp']

        sample['history'] = sub_df[
            ['time_diff', 'Count', 'Open', 'High', 'Low', 'Close', 'Volume', 'VWAP', 'Asset_Name']
        ].to_json(orient='records')
        output.append(sample)
        
    return output


def processing(df, n):
    output = []
    for name, group in df.groupby('Asset_Name'):
        output += foo(group, n)
    return pd.DataFrame(output)
    

In [6]:
pro_train_df = processing(train_df, 60)

100%|##########| 1273/1273 [00:02<00:00, 487.95it/s]
100%|##########| 1300/1300 [00:02<00:00, 489.34it/s]
100%|##########| 1300/1300 [00:02<00:00, 472.06it/s]
100%|##########| 1154/1154 [00:02<00:00, 492.22it/s]
100%|##########| 1299/1299 [00:02<00:00, 491.40it/s]
100%|##########| 1274/1274 [00:02<00:00, 476.99it/s]
100%|##########| 1299/1299 [00:02<00:00, 495.45it/s]
100%|##########| 1101/1101 [00:02<00:00, 482.78it/s]


In [7]:
pro_train_df, pro_valid_df = train_test_split(pro_train_df, test_size=0.2)

In [8]:
pro_train_df.to_csv('pro_train.csv', index=False)
pro_valid_df.to_csv('pro_valid.csv', index=False)

all2graph的设计目标是使用单机处理几十G甚是几个T的原始数据<br>
一些小机器的内存可能无法载入完整的数据集，因此需要对数据进行分片存储<br>
all2graph封装了分片的函数，可供调用<br>
你也可以根据你的场景编写分片的代码

In [9]:
train_meta_df = ag.split_csv(
    src='pro_train.csv', # 原始数据，可以是dataframe，path和list of path
    dst='pro_train', # 分片后的文件夹地址
    chunksize=100, # 每一片的大小，建议根据你的机器配置设置，设的太大会影响后续训练速度，增大内存开销，设的太小会产生大量小文件
    meta_cols=['timestamp', 'Asset_ID', 'Target'] # 见下文
)
valid_meta_df = ag.split_csv(
    src='pro_valid.csv',
    dst='pro_valid',
    chunksize=100,
    meta_cols=['timestamp', 'Asset_ID', 'Target']
)

80it [00:01, 67.15it/s, spliting csv]
20it [00:00, 70.05it/s, spliting csv]


meta_df记录了每一条样本的分片文件路径和元信息<br>
在后续的dataset构建中，我们会用到它

In [10]:
train_meta_df = pd.read_csv('pro_train_meta.csv')
valid_meta_df = pd.read_csv('pro_valid_meta.csv')

In [11]:
train_meta_df

Unnamed: 0,timestamp,Asset_ID,Target,path
0,1514828280,2,-0.004210,pro_train/0.zip
1,1514839380,7,-0.008874,pro_train/0.zip
2,1514810820,11,,pro_train/0.zip
3,1514782500,7,0.027493,pro_train/0.zip
4,1514792880,6,-0.000477,pro_train/0.zip
...,...,...,...,...
7995,1514785740,11,0.006936,pro_train/79.zip
7996,1514803980,1,-0.018706,pro_train/79.zip
7997,1514822640,6,-0.007311,pro_train/79.zip
7998,1514781660,5,0.002253,pro_train/79.zip


## 2. 图结构解析
基于图结构生成对应的网络结果是一个繁琐且冗长的过程<br>
all2graph提供了一个自动化解析图结构的组建，能根据输入的json结构生成相应的图结构<br>
注：json本身就是一个树结构，all2graph的基本原理就是以json的树结构为基础，添加额外的边，从而生成图

### 2.1 DataParser
将原始解析成图的过程是由DataParser对象完成的<br>
all2graph实现了一个JsonParser类，用于json到图的解析

JsonParser的输入是一个pandas dataframe，输入必须包含两列：<br>
&emsp; json_col: 包含json数据的列<br>
&emsp; time_col: 包含样本观察时间戳的列<br>
注：<br>
&emsp; 由于all2graph包含时序处理的功能，因此需要样本时间戳字段，用于序列时间的归一化<br>
&emsp; 如果你的数据中没有时间戳信息，那么你可以将time_col这一列全部填充成None，但是不能没有这一列

In [12]:
data_parser = ag.json.JsonParser(
    # 必填，包含json数据的列
    json_col='history', 
    # 必填，包含样本观察时间戳的列
    time_col='timestamp', 
    # 如果time_col的值不为空，按么必填，需要是时间戳的格式，如”%Y-%m-%d“
    time_format=None,
    # 整数，list内部节点跳连前置节点的度数，0表述全部，-1表示没有，默认1，数量越大模型精度越高，消耗内存也更多
    list_dst_degree=1, 
    # 整数，list内部节点跳连后置节点的度数，0表述全部，-1表示没有，默认1，数量越大模型精度越高，消耗内存也更多
    r_list_inner_degree=1
)

In [13]:
graph, *_ = data_parser.parse(pro_train_df.iloc[[0]])

graph是解析原始数据后得到的中间结构，是一个RawGraph对象<br>
graph是按照“点属性”和“边”来储存图当中的数据的<br>
点属性主要有<br>
&emsp; key:点的类型<br>
&emsp; value:点的数据，也就是每个点所表示的json当中的节点的原始数据<br>
边记录的起点和终点的index<br>
&emsp; src:起点index
&emsp; dst:终点index

In [14]:
graph

RawGraph(num_nodes=611, num_edges=1341, num_components=1, num_keys=10, num_types=2)

In [15]:
graph.key[:10]

['readout',
 'readout',
 'time_diff',
 'Count',
 'Open',
 'High',
 'Low',
 'Close',
 'Volume',
 'VWAP']

In [16]:
graph.value[:10]

[[{'time_diff': 3600,
   'Count': 39.0,
   'Open': 2286.075,
   'High': 2297.85,
   'Low': 2270.7,
   'Close': 2282.11,
   'Volume': 14.22393094,
   'VWAP': 2283.1510413329,
   'Asset_Name': 'Bitcoin Cash'},
  {'time_diff': 3540,
   'Count': 35.0,
   'Open': 2302.4366666667,
   'High': 2343.1,
   'Low': 2271.1,
   'Close': 2305.13,
   'Volume': 9.80240477,
   'VWAP': 2304.133322459,
   'Asset_Name': 'Bitcoin Cash'},
  {'time_diff': 3480,
   'Count': 31.0,
   'Open': 2287.84,
   'High': 2293.99,
   'Low': 2278.0,
   'Close': 2290.24,
   'Volume': 11.72394998,
   'VWAP': 2287.5306179712,
   'Asset_Name': 'Bitcoin Cash'},
  {'time_diff': 3420,
   'Count': 32.0,
   'Open': 2308.1266666667,
   'High': 2343.0,
   'Low': 2285.5,
   'Close': 2307.5666666667,
   'Volume': 9.57904064,
   'VWAP': 2308.6340278365,
   'Asset_Name': 'Bitcoin Cash'},
  {'time_diff': 3360,
   'Count': 39.0,
   'Open': 2290.495,
   'High': 2294.0,
   'Low': 2284.5,
   'Close': 2290.3,
   'Volume': 32.3633004,
   'VWAP'

In [17]:
graph.src[:10], graph.dst[:10]

([0, 1, 1, 2, 2, 3, 3, 4, 4, 5], [0, 1, 0, 2, 1, 3, 1, 4, 1, 5])

### 2.2 Factory
Factory是一个工具类，集成了图解析和模型生成等种种功能，并做了多进程的优化<br>
Factory的构造函数只有两个基本参数<br>
&emsp; data_parser: 上文所说的DataParser<br>
&emsp; targets: dataframe当中target的列名

In [18]:
factory = ag.Factory(
    data_parser=data_parser,
    raw_graph_parser_config=dict(
        targets=['Target']
    )
)

factory.analyse将会分析数据集，并将分析结果以原数据的方式储存下来

In [19]:
factory.analyse(
    'pro_train',# 数据源，dataframe, csv path或者list of csv path
    processes=None # 多进程数量，None表示使用全部cpu
)

125it [00:03, 34.93it/s, reading csv]
100%|##########| 8/8 [00:00<00:00, 29.57it/s, reducing meta numbers]
100%|##########| 8/8 [00:00<00:00, 62022.98it/s, reducing meta numbers phase 2]
100%|##########| 2/2 [00:00<00:00, 92.18it/s, reducing meta string]
100%|##########| 2/2 [00:00<00:00, 85.17it/s, reducing meta string phase 2]
100%|##########| 10/10 [00:00<00:00, 84.99it/s, reducing meta name]
100%|##########| 10/10 [00:00<00:00, 83.23it/s, reducing meta name phase 2]

Building prefix dict from the default dictionary ...
Loading model from cache /var/folders/2n/078_l_0n3532fx4p3_99_qpm0000gp/T/jieba.cache





Loading model cost 0.835 seconds.
Prefix dict has been built successfully.


MetaInfo(num_strings=MetaString(num_strings=2), num_numbers=8, num_keys=10, num_etypes=19)

## 3. 模型构建
factory会根据analyse的结果自动构建模型，算法工程师只需要制定一些基础的模型参数

In [20]:
model = factory.produce_model(
    d_model=8, # 隐藏层的神经元数量
    nhead=2, # multihead attention中head的数量
    num_layers=[1, 1, 1, 1, 1], # 隐藏层的层数，这里表示5层
)
if torch.cuda.is_available():
    model.cuda()

## 4. 模型训练

factory.produce_dataloader封装了生成数据集的功能<br>
返回的结果是一个原生的pytorch dataloader

In [21]:
train_data = factory.produce_dataloader(
    meta_df=train_meta_df,
    batch_size=64,
    num_workers=1,
    shuffle=True
)

valid_data = factory.produce_dataloader(
    meta_df=valid_meta_df,
    batch_size=64,
    num_workers=1,
    shuffle=True
)

In [22]:
graph, label = next(iter(train_data))
pred = model(graph)

In [23]:
graph

MetaGraph(num_nodes=308, num_edges=688,
      ndata_schemes={'key': Scheme(shape=(), dtype=torch.int64), 'value': Scheme(shape=(), dtype=torch.int64), 'meta_symbol': Scheme(shape=(), dtype=torch.int64), 'meta_component_id': Scheme(shape=(), dtype=torch.int64)}
      edata_schemes={'key': Scheme(shape=(), dtype=torch.int64)})
Graph(num_nodes=5938, num_edges=12890,
      ndata_schemes={'number': Scheme(shape=(), dtype=torch.float32), 'value': Scheme(shape=(), dtype=torch.int64), 'symbol': Scheme(shape=(), dtype=torch.int64), 'meta_node_id': Scheme(shape=(), dtype=torch.int64)}
      edata_schemes={'meta_edge_id': Scheme(shape=(), dtype=torch.int64)})

In [24]:
label

{'Target': tensor([-0.0037, -0.0137, -0.0082,  0.0132, -0.0059,  0.0023, -0.0075,  0.0252,
          0.0130,  0.0055,  0.0053, -0.0051,  0.0068,     nan,     nan, -0.0025,
          0.0092, -0.0043,  0.0043, -0.0046,  0.0270,  0.0095,  0.0013, -0.0047,
          0.0049, -0.0125,  0.0163,  0.0028,  0.0067, -0.0011, -0.0060,  0.0012,
          0.0045, -0.0018,     nan, -0.0070, -0.0006, -0.0153, -0.0046, -0.0120,
          0.0104,  0.0099, -0.0035, -0.0006, -0.0062,  0.0094,  0.0242,  0.0041,
          0.0012,  0.0095, -0.0153, -0.0012, -0.0032,  0.0151,     nan,  0.0015,
             nan, -0.0032,  0.0022, -0.0065,  0.0031,  0.0153,  0.0125, -0.0046])}

In [25]:
pred

{'Target': tensor([1.5462, 1.6428, 1.3823, 1.9953, 0.8573, 1.0657, 0.8622, 0.9225, 1.7220,
         1.5560, 1.5631, 1.5864, 1.7521, 2.5729, 0.9609, 1.5824, 2.2937, 1.5597,
         1.6078, 1.7403, 1.1783, 0.9150, 1.7370, 1.7636, 0.6980, 1.1600, 0.8712,
         1.3299, 0.2327, 1.8028, 1.2067, 0.8947, 0.8894, 1.9864, 0.9813, 1.1451,
         0.3856, 1.1669, 1.4622, 1.8546, 1.7670, 1.7056, 1.7248, 1.4474, 1.5528,
         1.0430, 1.0806, 0.6433, 0.0383, 1.2131, 2.2127, 0.7957, 1.3353, 1.3373,
         2.3722, 1.8274, 0.5005, 1.1609, 0.4528, 1.3635, 1.1308, 0.7516, 1.1741,
         0.9423], grad_fn=<SelectBackward>)}

为了兼容多任务学习，all2graph的标签和预测值都是以dict的形式储存的，这对计算loss和metric都产生了不变<br>
因此all2graph封装了一些类，用于将不同的loss和metric转化成能计算dict输入

In [26]:
loss = ag.nn.DictLoss(torch.nn.L1Loss())
metric = ag.Metric(r2_score, label_first=True)

In [27]:
loss(pred, label)

tensor(1.2967, grad_fn=<DivBackward0>)

In [28]:
metric(ag.nn.to_numpy(label), ag.nn.to_numpy(pred))

{'Target': -21381.876353712098}

all2graph封装了Trainer用于模型训练

In [29]:
def get_metric(x):
    return x['r2_score']['Target']

early_stop = ag.nn.EarlyStop(rounds=5, higher=True, json_path=get_metric)
trainer = ag.nn.Trainer(
    module=model,
    loss=loss,
    data=train_data,
    valid_data=[valid_data],
    metrics={'r2_score': metric},
    check_point='check_point',
    early_stop = early_stop
)
trainer.fit(1)

epoch 1 train: 100%|##########| 125/125 [00:56<00:00,  2.23it/s, loss=0.331]
epoch 1 val 0: 100%|##########| 32/32 [00:04<00:00,  7.30it/s]
epoch 1 train metrics: {"r2_score": {"Target": -2109.217}}
epoch 1 val 0 metrics: {"r2_score": {"Target": -6.946}}
save at "check_point.0.2.2/20220112140934.484777/1.all2graph.trainer"
current_epoch=1, current_metric=-6.946, best_epoch=1, best_metric=-6.946
save at "check_point.0.2.2/20220112140934.484777/1.all2graph.trainer"


训练的模型会被自动保存，用于重新加载

In [30]:
trainer = torch.load('check_point.0.2.2/20220112140934.484777/1.all2graph.trainer')

Trainer.build_predictor方法能返回一个可以直接用于上线的模型

In [31]:
predictor = trainer.build_predictor().eval()

In [32]:
with torch.no_grad():
    print(predictor(pro_train_df.iloc[[0]])['Target_output'])

tensor([0.0269])


In [33]:
predictor.predict(pro_train_df.iloc[:5])

1it [00:00, 28.81it/s, predicting]


Unnamed: 0,timestamp,Asset_ID,Target,Target_output
3630,1514828280,2,-0.00421,0.026941
7543,1514839380,7,-0.008874,0.016322
9538,1514810820,11,,0.016322
6611,1514782500,7,0.027493,0.016322
5494,1514792880,6,-0.000477,0.016322
