In [1]:
import pandas as pd
import pyarrow as pa 
import pyarrow.parquet as pq
import numpy as np
import math
import hashlib

import os
path = './ds'
if not os.path.exists(path):
    os.mkdir(path)

In [None]:
%%time

# 数据集α: 千万级 | 7 | 基础运算、联合统计
# 通过Python脚本生成三份一千万行的随机浮点数(float32)样本集。
# 每份样本集包含一列ID（对1到10000000进行32位的MD5加密处理），两列特征(样本1为X1、X2；样本2为X3、X4；样本3为X5、X6)，
# 其中三份样本的ID列相同，X1、X2、X3、X4、X5、X6为随机生成的浮点数（范围从1.0-1000.0）。
# X1、X3、X5用于基础运算，X2、X4、X6用于联合统计。

def md5(i: int):
    md = hashlib.md5()
    md.update(str(i).encode('utf-8'))
    return md.hexdigest()

len_alpha = 10000000
ID_alpha = [md5(i) for id in np.arange(1, len_alpha + 1)]
for x in [0, 1, 2]:
    df = pd.DataFrame({'ID': ID_alpha,
                       f'X{x * 2 + 1}': np.random.random((len_alpha,)) * 999 + 1,
                       f'X{x * 2 + 2}': np.random.random((len_alpha,)) * 999 + 1
                      })
    table = pa.Table.from_pandas(df)
    pq.write_to_dataset(table, root_path=f'{path}/ds_alpha{x+1}')

In [None]:
%%time

# 数据集β: 亿级别 | 4 | PIR
# 数据集由脚本随机生成，每条样本包含ID、注册日期、年龄、消费金额四个特征。
# 其中ID为18位十进制的随机整数，
# 注册日期为2000-01-01到2020-12-31范围的随机年月日数值，
# 年龄为15到80之间的随机整数，
# 消费金额为0.00-1000000.00范围内的随机浮点数(小数点后保留2位)。
# 从被查询数据集中随机抽取10000个ID值，作为高效率查询（百级不可区分度）的待查询ID。
# 从被查询数据集中随机抽取1个ID值，作为高隐匿性查询（百万级不可区分度）的待查询ID。

len_beta = 100000000
batch_size = 1000000
query_ratio = 1000 / len_beta
batch = math.ceil(len_beta / batch_size)
for i in range(0, batch):
    print(f"{i}/{batch}", end=": ")
    ID_beta_array = np.random.choice(bytearray(b'0123456789'), size=(batch_size, 18))
    ID_beta = ID_beta_array.view(dtype='|S18').flat
    date_start = np.datetime64('2000-01-01')
    days = np.arange(0, np.datetime64('2020-12-31') - date_start)
    Date_beta = date_start + np.random.choice(days, size=(batch_size,))
    df = pd.DataFrame({'ID': ID_beta,
                    'Date': Date_beta,
                    'Age': np.random.randint(15, 80, size=(batch_size,)),
                    'Amount': np.random.random(size=(batch_size,)) * 1000000
                    })
    table = pa.Table.from_pandas(df)
    pq.write_to_dataset(table, root_path=f'{path}/ds_beta')
    print("ds_beta", end=",")
    df_query = pd.DataFrame({'ID': np.random.choice(ID_beta, (math.ceil(batch_size * query_ratio),))})
    table_query = pa.Table.from_pandas(df_query)
    pq.write_to_dataset(table_query, root_path=f'{path}/ds_beta_query')
    print("ds_beta_query")

In [None]:
%%time 

# 数据集γ: 亿级别 | 1 | PSI
# 求交数据集由脚本随机生成，每个样本包含一个ID信息，ID为18位十进制的随机整数。不同场景各数据方的数据集规模不同。
# a）两方平衡场景：数据方A、数据方B的数据总量均为一亿行，两方数据的相交率为50%，即五千万条相同ID。
# b）两方非平衡场景：数据方A数据总量为一亿行、数据方B的数据总量为十万行，两方数据的相交率为50%，即五万条相同ID。

len_gamma = 100000000
batch_size = 1000000
join_rate = 0.5
query_ratio = 100000 / len_gamma
rng = np.random.default_rng()
batch = math.ceil(len_gamma / batch_size)
for i in range(0, batch):
    print(f"{i}/{batch}", end=": ")
    ID_gamma_array = np.random.choice(bytearray(b'0123456789'), size=(batch_size, 18))
    ID_gamma = ID_gamma_array.view(dtype='|S18').flatten()
    df = pd.DataFrame({'ID': ID_gamma})
    table = pa.Table.from_pandas(df)
    pq.write_to_dataset(table, root_path=f'{path}/ds_gamma')
    print("ds_gamma", end=", ")
    # query a)
    ID_gamma_query_a1 = np.random.choice(ID_gamma, 
                                         size=math.ceil(batch_size * join_rate,)
                                        )
    ID_gamma_query_a2 = np.random.choice(bytearray(b'0123456789'), 
                                         size=(batch_size - math.ceil(batch_size * join_rate), 18)
                                        )
    ID_gamma_query_a = np.concatenate((ID_gamma_query_a1, ID_gamma_query_a2.view(dtype='|S18').flatten()))
    rng.shuffle(ID_gamma_query_a)
    df_query_a = pd.DataFrame({'ID': ID_gamma_query_a })
    table_query_a = pa.Table.from_pandas(df_query_a)
    pq.write_to_dataset(table_query_a, root_path=f'{path}/ds_gamma_query_a')
    print("ds_gamma_query_a", end=", ")
    # query b)
    batch_size_query_b = math.ceil(batch_size * query_ratio)
    ID_gamma_query_b1 = np.random.choice(ID_gamma, 
                                         size=math.ceil(batch_size_query_b * join_rate,)
                                        )
    ID_gamma_query_b2 = np.random.choice(bytearray(b'0123456789'), 
                                         size=(batch_size_query_b - math.ceil(batch_size_query_b * join_rate), 18))
    ID_gamma_query_b = np.concatenate((ID_gamma_query_b1, ID_gamma_query_b2.view(dtype='|S18').flatten()))
    rng.shuffle(ID_gamma_query_b)
    df_query_b = pd.DataFrame({'ID': ID_gamma_query_b })
    table_query_b = pa.Table.from_pandas(df_query_b)
    pq.write_to_dataset(table_query_b, root_path=f'{path}/ds_gamma_query_b')
    print("ds_gamma_query_b")

In [4]:
%%time

# epsilon：400,000 / 100,000 | 2000抽取900 | 特征工程、建模、预测
# 采用epsilon数据集，数据集已完成归一化、标准化。40万行样本作为训练集，10万行样本作为测试集。
# 原始数据集包含2000个特征，随机抽取900个特征用于计算。
# 数据方数量为两个，每个数据方都持有450个特征，只有一个数据方持有标签信息。

import random
from catboost.datasets import epsilon
epsilon_train, epsilon_test = epsilon()
epsilon_test.columns = epsilon_test.columns.astype(str)
epsilon_train.columns = epsilon_train.columns.astype(str)
assert (epsilon_test.columns == epsilon_train.columns).all()
columns = list(epsilon_train.columns[1:])
random.shuffle(columns)
table1_schema = pa.schema([pa.field('0', pa.int8())] + [pa.field(col, pa.float32()) for col in columns[0:450]])
table2_schema = pa.schema([pa.field(col, pa.float32()) for col in columns[450:900]])

table1_test = pa.Table.from_pandas(epsilon_test, schema=table1_schema)
pq.write_to_dataset(table1_test, root_path=f'{path}/ds_epsilon1_test')
table2_test = pa.Table.from_pandas(epsilon_test, schema=table2_schema)
pq.write_to_dataset(table2_test, root_path=f'{path}/ds_epsilon2_test')

table1_train = pa.Table.from_pandas(epsilon_train, schema=table1_schema)
pq.write_to_dataset(table1_train, root_path=f'{path}/ds_epsilon1_train')
table2_train = pa.Table.from_pandas(epsilon_train, schema=table2_schema)
pq.write_to_dataset(table2_train, root_path=f'{path}/ds_epsilon2_train')

CPU times: user 29.5 s, sys: 7.56 s, total: 37.1 s
Wall time: 1min 49s
