# 将数据按照ID进行分别存储

在同时使用multiprocessing 和 polars的时候，**千万注意不能将lazyframe数据放入多个进程中，会导致死锁**。LazyFrame 是惰性执行的，而 multiprocessing 会复制主进程的内存结构（使用 pickle 序列化对象）。但是：LazyFrame 是不可序列化的对象（不能 pickling），传入子进程后可能会触发隐式的强制 .collect() 或失败重算，导致每个进程都重新加载所有数据。

## 生成测试数据

In [None]:
import pandas as pd

column_names = ["ID", "time", "ID2", 'longitude', 'latitude', 'geohash', 'epoch0', 'epoch1']
df = pd.read_csv('./Data/Input/data.csv', sep=',', nrows=4096, header=None, names=column_names)

chunk_size = 1000
for i, chunk in enumerate(df.groupby(df.index // chunk_size)):
    filename = './Data/Input/test/data_chunk_{}.csv'.format(i)
    chunk[1].to_csv(filename, index=False)

## 按ID分别存储所有数据

原始的数据的主键是时间，也就是说按照时间来记录所有用户的轨迹。

现在需要先按照ID来进行分组，然后再按照时间进行排序。

In [None]:
import polars as pl
import os
from pathlib import Path
import csv

def group_and_save_by_id(data_dir, 
                         log_file,
                         id_col="ID", 
                         output_dir="output_by_id", 
                         column_names=None):
    data_dir = Path(data_dir)
    os.makedirs(output_dir, exist_ok=True)

    # 获取所有文件路径
    file_list = [str(f) for f in data_dir.iterdir() if f.is_file()]
    print(f"found {len(file_list)} files, loading...")

    # 读取并合并所有 CSV 文件，强制指定列名（如果列名为空）
    lazy_frames = [
        pl.read_csv(f, 
                    has_header=False, 
                    new_columns=column_names, 
                    schema_overrides={id_col: pl.Utf8}).lazy()
        for f in file_list
    ]
    full_lf = pl.concat(lazy_frames)
    # df = full_lf.collect()
    
    # 获取唯一 ID
    unique_ids = (
        full_lf.select(pl.col(id_col).cast(pl.Utf8).unique())
        .collect()
        .get_column(id_col)
        .to_list()
    )

    # 遍历分组并保存
    for uid in unique_ids:
        df = full_lf.filter(pl.col(id_col) == uid).collect()
        out_path = f"{output_dir}/{uid}.csv"
        df.write_csv(out_path)
        print(f"{uid} save {df.shape[0]} rows.")
        
        # 写入日志。
        # 判断是否需要写入表头。
        write_header_flag = not os.path.exists(log_file) or os.stat(log_file).st_size == 0
        with open(log_file, mode='a', newline='', encoding='utf-8') as log_f:
            if write_header_flag:
                writer.writerow(["ID", "NumberofRow"])
            writer = csv.writer(log_f)
            writer.writerow([uid, df.shape[0]])
            

    print("completed.")


column_names = ["ID", "time", "ID2", 'longitude', 'latitude', 'geohash', 'epoch0', 'epoch1']
group_and_save_by_id(
    data_dir="./Data/Input/",
    log_file='./Data/Output/log.csv',
    id_col="ID",
    output_dir="./Data/Output/",
    column_names=column_names
)

found 6 files, loading...


## 将用户ID映射为整数

由于用户ID为字符串，在放入tensor的时候必须为数值，所以需要将用户ID转换为整数，并且保留对应关系。

只额吉将保存的log.csv中的保存顺序作为对应关系。

In [7]:
import pandas as pd

log = pd.read_csv('./Data/Geolife Trajectories 1.4/log.csv', header=0)
log['IDMap'] = log.index
log.to_csv('./Data/Geolife Trajectories 1.4/log.csv', index=False)

### 将所有用户的停留点数据中的用户ID进行映射