In [1]:
import sys
from pyunicorn.timeseries import VisibilityGraph
import numpy as np

sys.path.append("/Users/apple/Documents/Pycharm/QushiDataPlatform/airflow/dags")
import networkx as nx
import matplotlib.pyplot as plt
from ge.classify import read_node_label, Classifier
from ge import Struc2Vec
from postprocess.datakit.DtLoader import DtLoader

Read Edge & Nodes

Prediction

In [2]:
import numpy as np
import h5py
from tqdm import tqdm
import multiprocessing
from functools import partial

FILE_PATH = "./data/vg_data.hdf5"


# define enum for different embedding methods, including struc2vec and CI
class EmbeddingMethod:
    STRUC2VEC = "graph_s2v"
    CI = "graph_CI"


def process_graph_vec(args):
    data, period, embed_size, t, n = args
    graph_vec = struc2vec_from_arr_to_graph_vec(data[t - period : t, n], embed_size)
    assert graph_vec is not None
    return graph_vec


def struc2vec_from_arr_to_graph_vec(arr, embed_size=32, window_size=5):
    # 示例时间序列

    # 创建可见性图对象
    vg = VisibilityGraph(arr)

    # 计算可见性关系
    visibility_matrix = vg.visibility_relations()

    G = nx.from_numpy_array(visibility_matrix)

    model = Struc2Vec(
        G,
        walk_length=10,
        num_walks=80,
        workers=1,
        verbose=0,
    )

    model.train(embed_size=embed_size, window_size=window_size)

    embeddings = np.matrix([model.w2v_model.wv[word] for word in model.graph.nodes()])

    return embeddings


class FeatureData:
    """

    :data , shape = (F, T, N), F: number of features, T: number of time points, N: number of stocks
        data[... , ...] = [open, high, low, close, volume, amount, pct_chg]

    :cld_info = [tradedays, tradeassets]
        tradedays: list of tradedays
        tradeassets: list of tradeassets, int

    :period, int, the number of time points to be considered

    """

    def __init__(self, data: list, cld_info: list, period=20, embed_size=32):
        self.data = data
        self.cld_info = cld_info
        self.period = period
        self.embed_size = embed_size

        for i in range(len(data)):
            assert data[i].shape[1] == data[0].shape[1]

        close = self.data[0]
        self.labels = ((close[1:] - close[:-1]) > 0).astype(int)
        self.myset = []
        T, N = self.data[0].shape

        for t in range(T - 1):
            # skip the last period
            if t < self.period:
                continue
            else:
                for n in range(N):
                    raw = self.data[0][t - self.period : t, n]
                    label = self.labels[t, n]
                    if np.isnan(raw).any() or np.isnan(label):
                        continue
                    else:
                        self.myset.append((t, n))
        print(
            f"Valid: {len(self.myset)} samples , kept {(len(self.myset) / (T*N)):.2%} <- Timesteps: {T}, Stocks: {N}, Total: {T*N} samples"
        )

    def dump_label(self):
        length = len(self.myset)

        with h5py.File(FILE_PATH, "w") as f:
            y = f.create_dataset("label", shape=(length,), dtype="int32")
            for idx, (t, n) in enumerate(self.myset):
                y[idx] = self.labels[t, n]

    def dump_raw(self):
        G = len(self.data)
        length = len(self.myset)
        with h5py.File(FILE_PATH, "w") as f:
            x_raw = f.create_dataset(
                "raw", shape=(length, G, self.period), dtype="float32"
            )
            for i in range(G):
                for idx, (t, n) in enumerate(self.myset):
                    x_raw[idx, i, ...] = self.data[i][t - self.period : t, n]

    def dump_s2v(self):
        G = len(self.data)
        length = len(self.myset)
        # # Single process
        
        # with h5py.File(FILE_PATH, "w") as f:
        #     x_s2v = f.create_dataset(
        #         "graph_s2v",
        #         shape=(length, G, self.period, self.embed_size),
        #         dtype="float32",
            # )
            # for i in range(G):
            #     for idx, (t, n) in enumerate(tqdm(self.myset, desc=f"Struc2Vec_{i}")):
            #         graph_vec = struc2vec_from_arr_to_graph_vec(
            #             self.data[i][t - self.period : t, n], self.embed_size
            #         )
            #         assert graph_vec is not None
            #         x_s2v[idx, i, ...] = graph_vec
           
        ## Multi process
        with h5py.File(FILE_PATH, "w") as f:
            x_s2v = f.create_dataset(
                "graph_s2v",
                shape=(length, G, self.period, self.embed_size),
                dtype="float32",
            )
            
        for i in range(G):
            arg_list = [(idx, t, n, i, self.period, self.embed_size, self.data) for idx, (t, n) in enumerate(self.myset)]

            with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool:
                job_partial = partial(job_s2v,  data=self.data)
                results = pool.starmap(job_partial, arg_list)

                
def job_s2v(idx, t, n, i, period, embed_size, data):
    with h5py.File(FILE_PATH, "a") as f:
        x_s2v = f["graph_s2v"]
        graph_vec = struc2vec_from_arr_to_graph_vec(data[i][t - period : t, n], embed_size)
        assert graph_vec is not None
        x_s2v[idx, i, ...] = graph_vec

In [3]:
dl = DtLoader("stock")
data = [
    dl.close.iloc[-100:, :2].values,
    dl.open.iloc[-100:, :2].values,
    # dl.high.iloc[-100:, :2].values,
    # dl.low.iloc[-100:, :2].values,
    # dl.volume.iloc[-100:, :2].values,
    # dl.amount.iloc[-100:, :2].values,
    # dl.vwap.iloc[-100:, :2].values,
]
cld_info = [dl.tradedays, dl.tradeassets]
del dl

In [4]:
dl = DtLoader("stock")


In [8]:
dl.tradedays[0].strftime("%Y%m%d")

'20060104'

In [4]:
feature_data = FeatureData(data=data, cld_info=cld_info, period=20, embed_size=32)

Valid: 158 samples , kept 79.00% <- Timesteps: 100, Stocks: 2, Total: 200 samples


In [5]:
feature_data.dump_label()
feature_data.dump_raw()
feature_data.dump_s2v()

Process SpawnPoolWorker-2:
Process SpawnPoolWorker-1:
Process SpawnPoolWorker-3:
Traceback (most recent call last):
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/pool.py", line 114, in worker
    task = get()
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/queues.py", line 368, in get
    return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'job_s2v' on <module '__main__' (built-in)>
Traceback (most recent call last):
  File "/Library/Developer/Comm

KeyboardInterrupt: 

In [357]:
# Multi Process
feature_data.dump_label()
feature_data.dump_raw()
feature_data.dump_s2v()

TypeError: h5py objects cannot be pickled

In [100]:
from torch.utils.data import Dataset, DataLoader
import torch


class VisualGraphLearningDataset(Dataset):
    """
    All data:
        1. Raw data
        2. Graph embedding by struc2vec
        3. Graph embedding by CI [todo]
        4. labels


    """

    def __init__(self, s2v_file_path, s2v_data_name):
        self.struc2vec_file_path = s2v_file_path
        self.s2v_data_name = s2v_data_name
        with h5py.File(s2v_file_path, "r") as file:
            self.tradedays = file[s2v_data_name].shape[0]
            self.tradeassets = file[s2v_data_name].shape[1]

    def __len__(self):
        return self.tradedays

    def __getitem__(self, dt, symbol):
        with h5py.File(self.s2v_file_path, "r") as file:
            data = file[self.s2v_data_name][dt, symbol, ...]
            return torch.from_numpy(data)


# 使用 DataLoader
hdf5_dataset = HDF5Dataset("output_data.hdf5", "output")
data_loader = DataLoader(hdf5_dataset, batch_size=64, shuffle=True)

# 使用 DataLoader 迭代数据
for batch_tensor in data_loader:
    # 对 batch_tensor 进行操作
    print(batch_tensor.shape)

torch.Size([64, 2, 1, 20, 32])
torch.Size([36, 2, 1, 20, 32])


In [12]:
import os

In [16]:
# loop through paths
path = './data/temp'

# loop through files
for filename in os.listdir(path):
    if filename.endswith(".hdf5"):
        print(filename) 
        filepath = os.path.join(path, filename)
        with h5py.File(filepath, 'r') as source_file:
            # 遍历源文件中的数据集
            for dataset_name, dataset in source_file.items():
                # 获取并显示数据集的形状
                shape = dataset.shape
                print(f"Shape of dataset '{dataset_name}' in file {filename}: {shape}")

In [18]:
FILE_PATH = "./data/vg_data.hdf5"

with h5py.File(FILE_PATH, "r") as f:
    temp_dataset = f["s2v"]
    print(temp_dataset.shape)

(158, 2, 20, 32)
