In [1]:
%timeit
import numpy as np
import pandas as pd
import dask.dataframe as dd
import glob

from functools import partial

In [2]:
headers = ['col_'+str(i) for i in range(1, 21)]

In [3]:
csv_config = dict(delimiter=',', names=headers, usecols=headers[:3], quotechar='"', escapechar='\\')

## Pandas

In [None]:
%%time
df = pd.concat(map(partial(pd.read_csv, **csv_config), glob.glob('sample_1000/*.csv')))

In [5]:
df.shape

(5000000, 3)

In [6]:
df.dropna(inplace=True)

In [7]:
df.shape

(4839039, 3)

In [8]:
%%time
grouped = df.groupby('col_3')['col_1', 'col_2'].agg(set)
# crashes with 19mn



CPU times: user 3min 9s, sys: 2.03 s, total: 3min 11s
Wall time: 3min 11s


In [10]:
%%time
grouped = grouped.sort_values('col_3')

CPU times: user 7.4 s, sys: 172 ms, total: 7.58 s
Wall time: 7.55 s


In [11]:
grouped.head()

Unnamed: 0_level_0,col_1,col_2
col_3,Unnamed: 1_level_1,Unnamed: 2_level_1
http://www..com,"{1703936, 1966082, 1310725, 8, 2752521, 393228...","{cc577541954, 80485323e740ccc, d6a06c69ee92b, ..."
http://www.0.com,"{1417221, 737286, 1998858, 573454, 1851408, 28...","{e8b0c39f5e08ea2b97e9f83, 72440929acb610462121..."
http://www.00.com,"{1101830, 57353, 1654800, 606227, 610327, 2574...","{d30ac45e1e72c18f173b107b6d, 86ba10596cfa42d9c..."
http://www.000.com,"{2657856, 2576769, 990342, 1772040, 526985, 17...","{27c50abebbdddedc186, 413f48d22fde95200e41a, c..."
http://www.0000.com,{2402429},{648ca91dc6fa2}


In [9]:
grouped.shape

(4509197, 2)

In [12]:
%%time
grouped.reset_index(inplace=True)

CPU times: user 138 ms, sys: 16.1 ms, total: 154 ms
Wall time: 154 ms


In [13]:
grouped.head()

Unnamed: 0,col_3,col_1,col_2
0,http://www..com,"{1703936, 1966082, 1310725, 8, 2752521, 393228...","{cc577541954, 80485323e740ccc, d6a06c69ee92b, ..."
1,http://www.0.com,"{1417221, 737286, 1998858, 573454, 1851408, 28...","{e8b0c39f5e08ea2b97e9f83, 72440929acb610462121..."
2,http://www.00.com,"{1101830, 57353, 1654800, 606227, 610327, 2574...","{d30ac45e1e72c18f173b107b6d, 86ba10596cfa42d9c..."
3,http://www.000.com,"{2657856, 2576769, 990342, 1772040, 526985, 17...","{27c50abebbdddedc186, 413f48d22fde95200e41a, c..."
4,http://www.0000.com,{2402429},{648ca91dc6fa2}


In [14]:
grouped.shape

(4509197, 3)

## Test example with 1 file

In [None]:
sample = dd.read_csv('sample_400/file-0100.csv', **csv_config).dropna()
collect_set = dd.Aggregation(
    name="collect_set",
    chunk=lambda s: s.apply(lambda x: list(set(x))),
    agg=lambda s: s.obj.groupby(level=0).sum(),
    finalize=lambda s: s.apply(lambda final: set(final)),
)
dd_result = sample.groupby('col_3').agg({'col_1': collect_set, 'col_2': collect_set}).compute()

In [None]:
sample = pd.read_csv('sample_400/file-0100.csv', **csv_config).dropna()

In [None]:
pd_result = sample.groupby('col_3').agg({'col_1': set, 'col_2': set})

In [None]:
pd_result.shape

In [None]:
dd_result.shape

In [None]:
pd_result.equals(dd_result)

## Dask

In [5]:
sample_100 = dd.read_csv('sample_1000/*.csv', **csv_config).dropna()

In [None]:
%%time
sample_100 = sample_100.sort_values('col_3')

In [9]:
%%time
collect_set = dd.Aggregation(
    name="collect_set",
    chunk=lambda s: s.apply(lambda x: list(set(x))),
    agg=lambda s: s.obj.groupby(level=0).sum(),
    finalize=lambda s: s.apply(lambda final: set(final)),
)
result = sample_100.groupby('col_3', sort=True).agg({'col_1':collect_set, 'col_2':collect_set})

CPU times: user 56.9 ms, sys: 4.48 ms, total: 61.3 ms
Wall time: 60.6 ms


In [10]:
%%time
result = result.compute()

CPU times: user 4min 48s, sys: 7.83 s, total: 4min 55s
Wall time: 4min 34s


In [11]:
result.shape

(4509197, 2)

In [12]:
result[result.col_1.str.len() > 1].head()

Unnamed: 0_level_0,col_1,col_2
col_3,Unnamed: 1_level_1,Unnamed: 2_level_1
http://www..com,"{1703936, 1966082, 1310725, 8, 2752521, 393228...","{583ad8ab098842, 3e4, 66f9e22d81f34f108c5ad788..."
http://www.0.com,"{1417221, 737286, 1998858, 573454, 1851408, 28...","{e35, 20d4b11a6, 1d1963, 81265490de82eb0d50770..."
http://www.00.com,"{1101830, 57353, 1654800, 606227, 610327, 2574...","{85, ccd5cc79011c1db262, e4958fe70fc4e, d7c66e..."
http://www.000.com,"{2657856, 2576769, 990342, 1792104, 1772040, 5...","{332804b8ade6ea17986bad1f7, 5ce196ae19, 039acd..."
http://www.0001.com,"{1274699, 1892934, 938423}","{93f067721c8331f71dbd91d1f30f, 5, 9d53}"


In [13]:
%%time
result.reset_index(inplace=True)

CPU times: user 87.7 ms, sys: 20.2 ms, total: 108 ms
Wall time: 140 ms


In [14]:
result.head()

Unnamed: 0,col_3,col_1,col_2
0,http://www..com,"{1703936, 1966082, 1310725, 8, 2752521, 393228...","{583ad8ab098842, 3e4, 66f9e22d81f34f108c5ad788..."
1,http://www.0.com,"{1417221, 737286, 1998858, 573454, 1851408, 28...","{e35, 20d4b11a6, 1d1963, 81265490de82eb0d50770..."
2,http://www.00.com,"{1101830, 57353, 1654800, 606227, 610327, 2574...","{85, ccd5cc79011c1db262, e4958fe70fc4e, d7c66e..."
3,http://www.000.com,"{2657856, 2576769, 990342, 1792104, 1772040, 5...","{332804b8ade6ea17986bad1f7, 5ce196ae19, 039acd..."
4,http://www.0000.com,{2402429},{648ca91dc6fa2}


In [5]:
import shutil

In [10]:
for i in range(1000):
    shutil.copy('../data/file-%04d.csv' % i, 'sample_1000/')
    if i % 100 == 0: print(i)
print('done..')

0
100
200
300
400
500
600
700
800
900
done..
