## step 0a: get the data

In [24]:
import pandas as pd
import time

In [25]:
df = pd.read_csv('test/groupby-N_1000000_K_100_file_0.csv')

In [26]:
df.head()

Unnamed: 0,id1,id2,id3,id4,id5,id6,v1,v2,v3
0,id085,id094,id0000004473,94,26,7809,4,9,47.169958
1,id002,id002,id0000001226,32,84,3769,4,8,22.157261
2,id082,id013,id0000000309,31,24,1281,3,11,86.690457
3,id082,id055,id0000008012,23,61,2802,2,12,52.400937
4,id032,id039,id0000002823,63,34,3895,4,14,72.752869


In [27]:
import dask.dataframe as dd

dff = dd.read_csv('test/groupby-N_1000000_K_100_file_0.csv')

In [28]:
dff.head()

Unnamed: 0,id1,id2,id3,id4,id5,id6,v1,v2,v3
0,id085,id094,id0000004473,94,26,7809,4,9,47.169958
1,id002,id002,id0000001226,32,84,3769,4,8,22.157261
2,id082,id013,id0000000309,31,24,1281,3,11,86.690457
3,id082,id055,id0000008012,23,61,2802,2,12,52.400937
4,id032,id039,id0000002823,63,34,3895,4,14,72.752869


## step 0b: create a timing function

In [29]:
def perform_test(df):
    start_time = time.time()
    
    if isinstance(df, pd.DataFrame):
        result = df.groupby('id1')['v1'].sum()
    elif isinstance(df, dd.DataFrame):
        result = df.groupby('id1')['v1'].sum().compute()
    else:
        raise ValueError("Invalid dataframe type. Expected pandas or dask dataframe.")
    
    end_time = time.time()
    execution_time = end_time - start_time
    
    return execution_time

In [30]:
perform_test(df)

0.1230018138885498

In [31]:
perform_test(dff)

1.1186625957489014

# Exercise 1: improve on the Dask dataframe
## step 1: setting a dask baseline

In [33]:
# type all the columns except v1 as object columns
dtypes = {
    "id1": "object",
    "id2": "object",
    "id3": "object",
    "id4": "object",
    "id5": "object",
    "id6": "object",
    "v1": "int64",
    "v2": "object",
    "v3": "object",
}

# load the data by changing types
dff = dd.read_csv('test/groupby-N_1000000_K_100_file_0.csv', dtype=dtypes)

## step 2: avoid object columns

In [34]:
dtypes = {
    "id1": "string",
    "id2": "string",
    "id3": "string",
    "id4": "int64",
    "id5": "int64",
    "id6": "int64",
    "v1": "int64",
    "v2": "int64",
    "v3": "float64",
}

dff = dd.read_csv('test/groupby-N_1000000_K_100_file_0.csv', dtype=dtypes)

## step 3: using multiple files

In [38]:
# Split the data into multiple files
dff_repartitioned = dff.repartition(partition_size='100MB')

In [39]:
# Write the repartitioned data
dff_repartitioned.to_csv('test/groupby_partitioned/*.csv', index=False)

['c:\\Users\\Mahdiye\\Desktop\\programming2_git\\Programming_2\\test\\groupby_partitioned\\0.csv',
 'c:\\Users\\Mahdiye\\Desktop\\programming2_git\\Programming_2\\test\\groupby_partitioned\\1.csv',
 'c:\\Users\\Mahdiye\\Desktop\\programming2_git\\Programming_2\\test\\groupby_partitioned\\2.csv']

In [41]:
# Read all data
dff_multi = dd.read_csv('test/groupby_partitioned/*.csv', dtype=dtypes)

In [43]:
# Rerun perform-test function
execution_time = perform_test(dff_multi)
print("Execution time:", execution_time)

Execution time: 1.4560320377349854


## step 4a: parquet instead of csv

In [46]:
# pip install pyarrow

In [47]:
# Write the repartitioned data to Parquet files
dff_repartitioned.to_parquet('test/groupby_partitioned.parquet', compression=None, engine='pyarrow')

In [48]:
# Read parquet data
dff_multi_parquet = dd.read_parquet('test/groupby_partitioned.parquet', engine='pyarrow')

In [50]:
# Rerun perform-test by new data
execution_time = perform_test(dff_multi_parquet)
print("Execution time:", execution_time)

Execution time: 0.6108603477478027


## step 4b: use snappy as a compressor

In [52]:
# pip install python-snappy

In [54]:
# Write the repartitioned data to Parquet files with Snappy compression
dff_repartitioned.to_parquet('test/groupby_partitioned.parquet', 
                            compression='snappy', 
                            engine='pyarrow')

In [55]:
# Read new data
dff_multi_snappy = dd.read_parquet('test/groupby_partitioned.parquet', engine='pyarrow')

In [56]:
execution_time = perform_test(dff_multi_snappy)
print("Execution time:", execution_time)

Execution time: 0.5629994869232178


## step 5: column pruning

In [57]:
# pruning to select only the relevant columns
df_pruned = dd.read_parquet('test/groupby_partitioned.parquet', 
                            columns=['id1', 'v1'], 
                            engine='pyarrow')

In [58]:
execution_time = perform_test(df_pruned)
print("Execution time:", execution_time)

Execution time: 0.6834335327148438


## step 6: comparison with pandas

In [59]:
df_pandas = pd.read_csv('test/groupby-N_1000000_K_100_file_0.csv')
execution_time_pandas = perform_test(df_pandas)
print("Pandas Execution time:", execution_time_pandas)

Pandas Execution time: 0.07899141311645508
