In [1]:
import boto3
import pandas as pd
import numpy as np
from io import StringIO
from scipy.stats import entropy
from datetime import datetime

S3_BUCKET = 'dmm-microbench'

s3 = boto3.client('s3', aws_access_key_id="AKIASVDNFDSGZYUVLQED", aws_secret_access_key="XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX")

def download_s3_file(file_name, destination_file_name):
    s3.download_file(Bucket=S3_BUCKET, Key=file_name, Filename=destination_file_name)

def get_content(file_name, expression):
    return s3.select_object_content(
        Bucket=S3_BUCKET,
        Key=file_name,
        ExpressionType='SQL',
        Expression=expression,
        InputSerialization={'CSV': {"FileHeaderInfo": "Use"}},
        OutputSerialization={'CSV': {}},
    )


def convert_data_to_df(data, record_header):
    for event in data['Payload']:
        if 'Records' in event:
            record_header.append(event['Records']['Payload'])
    csv_content = ''.join(r.decode('utf-8').replace("\r", "") for r in record_header)
    csv_pd = pd.read_csv(StringIO(csv_content))

    print('\n##################################')
    print(f"Length of dataframe: {len(csv_pd)}")
    print(f"Memory usage of dataframe: \n {csv_pd.info(memory_usage='deep')}")
    print('\n##################################')

    return pd.DataFrame(csv_pd)

def read_dask_csv(file_name):
    import dask.dataframe as dd
    dd.read_csv(file_name)


In [6]:
%%time
for i in range(1,9):
    download_s3_file(f"yellow_tripdata_2019-0{i}.csv", f"yellow_tripdata_2019-0{i}.csv")

CPU times: user 16.5 s, sys: 20.1 s, total: 36.5 s
Wall time: 34.2 s


In [2]:
%%time
import dask.dataframe as dd
# dask_df = dd.read_csv(f"yellow_tripdata_2019-*.csv", assume_missing=True)
dask_df = dd.read_csv(f"2Mn_200Cols.csv", assume_missing=True)

CPU times: user 352 ms, sys: 32 ms, total: 384 ms
Wall time: 576 ms


In [9]:
columns = ["passenger_count", "trip_distance", "fare_amount", "extra", "mta_tax", "tip_amount", "tolls_amount", "improvement_surcharge", "total_amount", "congestion_surcharge"]

In [7]:
# This is for local file
columns = list(dask_df.columns)

In [None]:
%%time
import dask.array as da
for col in columns:
    try:
        col_data = dask_df[col]
        min, max = dd.compute(col_data.min(), col_data.max())
        h, bins = da.histogram(col_data, bins=100, range=[min, max])
        print(bins, h.compute())
    except Exception as e:
        print(e)

[1.   1.03 1.06 1.09 1.12 1.15 1.18 1.21 1.24 1.27 1.3  1.33 1.36 1.39
 1.42 1.45 1.48 1.51 1.54 1.57 1.6  1.63 1.66 1.69 1.72 1.75 1.78 1.81
 1.84 1.87 1.9  1.93 1.96 1.99 2.02 2.05 2.08 2.11 2.14 2.17 2.2  2.23
 2.26 2.29 2.32 2.35 2.38 2.41 2.44 2.47 2.5  2.53 2.56 2.59 2.62 2.65
 2.68 2.71 2.74 2.77 2.8  2.83 2.86 2.89 2.92 2.95 2.98 3.01 3.04 3.07
 3.1  3.13 3.16 3.19 3.22 3.25 3.28 3.31 3.34 3.37 3.4  3.43 3.46 3.49
 3.52 3.55 3.58 3.61 3.64 3.67 3.7  3.73 3.76 3.79 3.82 3.85 3.88 3.91
 3.94 3.97 4.  ] [ 788593       0       0       0       0       0       0       0       0
       0       0       0       0       0       0       0       0       0
       0       0       0       0       0       0       0       0       0
       0       0       0       0       0       0 1184134       0       0
       0       0       0       0       0       0       0       0       0
       0       0       0       0       0       0       0       0       0
       0       0       0       0       0       0

In [20]:
## Compute histograms for categorical columns

In [2]:
categorical_columns = ["store_and_fwd_flag", "payment_type"]

In [None]:
%%time
dask_df.dropna().compute()
dask_df.dropnan().compute()

  return func(*(_execute_task(a, cache) for a in args))


In [None]:
%%time
for col in categorical_columns:
    col_data = dask_df[col]
    bins = col_data.unique()
    counts = col_data.value_counts()
    print(dd.compute(bins, counts))