In [1]:
import numpy as np
import pandas as pd

In [2]:
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql import Row

In [3]:
APP_NAME = "gender_read_dataset_spark"

config = {
    'spark.driver.memory': '200G',
    'spark.sql.shuffle.partitions': '1000',
    'spark.ui.port': '4041',
    'spark.local.dir': '/mnt/data/kireev/spark.local_dir/',
}

spark = SparkSession.builder
for k, v in config.items():
    spark = spark.config(k, v)
spark = spark.appName(APP_NAME).master('local[16]').getOrCreate()

In [4]:
spark

# Dataset Preparation

In [5]:
N_CLIENTS = 5e6
N_TRX_PER_CLIENT = 250
N_CAT_FIELDS = 6
N_AMOUNTS = 1

In [6]:
df = spark.range(0, N_CLIENTS * N_TRX_PER_CLIENT).repartition(1000)

In [7]:
df = df.withColumn('client_id', (F.rand() * N_CLIENTS).cast('int'))

In [8]:
df = df.withColumn('event_time', (F.rand() * 365).cast('float'))

In [9]:
for i in range(N_CAT_FIELDS):
    df = df.withColumn(f'cat_{i}', F.abs((F.pow(F.randn(), 2) * 300)).cast('int') + 1)

In [10]:
for i in range(N_AMOUNTS):
    df = df.withColumn(f'amnt_{i}', (F.randn() * 1000).cast('int') * 100)

In [11]:
df = df.drop('id')

In [12]:
def collect_lists(df, col_id):
    col_list = [col for col in df.columns if col != col_id]
    df = df \
        .withColumn('trx_count', F.count(F.lit(1)).over(Window.partitionBy(col_id))) \
        .withColumn('_rn', F.row_number().over(Window.partitionBy(col_id).orderBy('event_time')))

    w = Window.partitionBy(col_id).orderBy('_rn')
    for col in col_list:
        df = df.withColumn(col, F.collect_list(col).over(w)) \

    df = df.filter('_rn = trx_count').drop('_rn')
    return df

df = collect_lists(df, 'client_id')

In [13]:
%%time
df.repartition(50).write \
    .option('compression', 'uncompressed') \
    .mode('overwrite') \
    .parquet('/mnt/data/kireev/pycharm_1/dltranz/experiments/pyarrow_datasets/df_trx.uncompressed.parquet')

CPU times: user 422 ms, sys: 233 ms, total: 656 ms
Wall time: 42min 6s


In [14]:
df = spark.read.parquet('/mnt/data/kireev/pycharm_1/dltranz/experiments/pyarrow_datasets/df_trx.uncompressed.parquet')

In [15]:
%%time
df.write \
    .option('compression', 'snappy') \
    .mode('overwrite') \
    .parquet('/mnt/data/kireev/pycharm_1/dltranz/experiments/pyarrow_datasets/df_trx.snappy.parquet')

CPU times: user 15.4 ms, sys: 13.5 ms, total: 28.9 ms
Wall time: 1min 57s


In [16]:
%%time
df.write \
    .option('compression', 'gzip') \
    .mode('overwrite') \
    .parquet('/mnt/data/kireev/pycharm_1/dltranz/experiments/pyarrow_datasets/df_trx.gzip.parquet')

CPU times: user 29.9 ms, sys: 8.6 ms, total: 38.5 ms
Wall time: 2min 52s


In [17]:
def to_array(x):
    if type(x) is not list:
        return x
    
    x = np.array(x)
    if x.dtype.kind == 'i':
        x = x.astype(np.int32)      
    if x.dtype.kind == 'f':
        x = x.astype(np.float32)      
    
    return x

In [18]:
df = df.rdd \
    .map(lambda x: {k: to_array(v) for k, v in x.asDict().items()})

In [19]:
!rm -r /mnt/data/kireev/pycharm_1/dltranz/experiments/pyarrow_datasets/df_trx.sparkpickle

In [20]:
%%time
df.saveAsPickleFile('/mnt/data/kireev/pycharm_1/dltranz/experiments/pyarrow_datasets/df_trx.sparkpickle')

CPU times: user 33.9 ms, sys: 13.8 ms, total: 47.7 ms
Wall time: 2min 47s


In [21]:
!du -sh /mnt/data/kireev/pycharm_1/dltranz/experiments/pyarrow_datasets/df_trx*

17G	/mnt/data/kireev/pycharm_1/dltranz/experiments/pyarrow_datasets/df_trx.gzip.parquet
19G	/mnt/data/kireev/pycharm_1/dltranz/experiments/pyarrow_datasets/df_trx.snappy.parquet
40G	/mnt/data/kireev/pycharm_1/dltranz/experiments/pyarrow_datasets/df_trx.sparkpickle
19G	/mnt/data/kireev/pycharm_1/dltranz/experiments/pyarrow_datasets/df_trx.uncompressed.parquet


# Read results

In [1]:
import pandas as pd

In [27]:
df = pd.read_json('/mnt/data/kireev/pycharm_1/dltranz/experiments/pyarrow_datasets/results.json', lines=True)

In [28]:
df

Unnamed: 0,data_path,n_process,cpu_count,return_data,engine,output_file,elapsed
0,/mnt/data/kireev/pycharm_1/dltranz/experiments...,-1,0,False,sparkpickle,results.json,171
1,/mnt/data/kireev/pycharm_1/dltranz/experiments...,0,0,False,sparkpickle,results.json,146
2,/mnt/data/kireev/pycharm_1/dltranz/experiments...,1,0,False,sparkpickle,results.json,143
3,/mnt/data/kireev/pycharm_1/dltranz/experiments...,2,0,False,sparkpickle,results.json,73
4,/mnt/data/kireev/pycharm_1/dltranz/experiments...,4,0,False,sparkpickle,results.json,36
...,...,...,...,...,...,...,...
115,/mnt/data/kireev/pycharm_1/dltranz/experiments...,16,8,True,pyarrow,results.json,251
116,/mnt/data/kireev/pycharm_1/dltranz/experiments...,18,8,True,pyarrow,results.json,242
117,/mnt/data/kireev/pycharm_1/dltranz/experiments...,20,8,True,pyarrow,results.json,243
118,/mnt/data/kireev/pycharm_1/dltranz/experiments...,22,8,True,pyarrow,results.json,233


In [29]:
df_read = df[lambda x: ~x['return_data']].copy()
df_load = df[lambda x: x['return_data']].copy()

In [30]:
df_read \
    .drop(columns=['data_path', 'output_file', 'return_data']) \
    .set_index(['n_process', 'cpu_count', 'engine']).unstack(level=[2, 1])

Unnamed: 0_level_0,elapsed,elapsed,elapsed,elapsed,elapsed,elapsed
engine,sparkpickle,pyarrow,pyarrow,pyarrow,pyarrow,pyarrow
cpu_count,0,0,2,4,6,8
n_process,Unnamed: 1_level_3,Unnamed: 2_level_3,Unnamed: 3_level_3,Unnamed: 4_level_3,Unnamed: 5_level_3,Unnamed: 6_level_3
-1,171,135,79,50,50,39
0,146,123,73,49,47,37
1,143,115,69,46,44,34
2,73,58,35,23,23,18
4,36,30,18,12,12,9
6,24,20,12,9,8,9
8,18,16,10,7,8,8
10,15,12,7,6,7,7
12,13,12,7,6,6,7
14,11,9,6,6,6,7


In [31]:
df_load \
    .drop(columns=['data_path', 'output_file', 'return_data']) \
    .set_index(['n_process', 'cpu_count', 'engine']).unstack(level=[2, 1])

Unnamed: 0_level_0,elapsed,elapsed
engine,sparkpickle,pyarrow
cpu_count,0,8
n_process,Unnamed: 1_level_3,Unnamed: 2_level_3
-1,176,38
0,199,39
1,475,419
2,248,274
4,227,232
6,245,227
8,246,261
10,255,237
12,221,240
14,226,247
