### Dask client initialization

In [41]:
from dask.distributed import Client
client = Client(n_workers=6, threads_per_worker=2, memory_limit='2GB')

In [None]:
# TODO: Create a cluster on AWS
# from dask_cloudprovider.aws import EC2Cluster
#
# aws_key_pair_name = "YOUR_AWS_KEY_PAIR_NAME"
# cluster = EC2Cluster(instance_type="t3.micro", n_workers=1, key_name=aws_key_pair_name, region="us-east-1")
# client = Client(cluster)

In [15]:
print("Dashboard:", client.dashboard_link)

Dashboard: http://127.0.0.1:8787/status


### Load csv data from s3

In [None]:
import dask.dataframe as dd
import s3fs

base_s3_path_dir = "synthea-open-data/coherent/unzipped/csv/"

fs = s3fs.S3FileSystem(anon=True)
all_s3_files = fs.ls(base_s3_path_dir)
csv_files =[]
for file in all_s3_files:
    if file.endswith('.csv'):
        csv_files.append(file)

no_of_files = len(csv_files)

# in pandas dtype overrides were not needed
DTYPE_OVERRIDES = {
    'allergies.csv': {'STOP': 'object'},
    'patients.csv': {'SUFFIX': 'object'},
    'observations.csv': {'VALUE': 'object'},
    'organizations.csv': {'PHONE': 'object', 'ZIP': 'object'},
    'procedures.csv': {'REASONDESCRIPTION': 'object'},
    'providers.csv': {'PHONE': 'object', 'ZIP': 'object'}}

for filepath in csv_files:
    filename = filepath.split("/")[-1]
    print(f"\nFile: {filename} - {csv_files.index(filepath)+1} out of {no_of_files}")

    file_dtypes = DTYPE_OVERRIDES.get(filename, {})
    ddf = dd.read_csv(f"s3://{filepath}", storage_options={'anon': True}, dtype=file_dtypes)

    output_file = "parquet_files/" + filename.replace('.csv', '.parquet')
    ddf.to_parquet(output_file, write_index=False, overwrite=True)

In [127]:
from datetime import datetime

def process_data(library):
    start = time.time()

    if library == "dask":
        df_obs = dd.read_parquet("parquet_files/obs.parquet")
        df_pat = dd.read_parquet("parquet_files/patients.parquet")
    else:
        df_obs = pd.read_parquet("parquet_files/obs.parquet")
        df_pat = pd.read_parquet("parquet_files/patients.parquet")

    wanted_codes = ["8302-2", "29463-7", "8462-4", "8480-6"]
    df_obs = df_obs[df_obs["CODE"].isin(wanted_codes)]

    if library == "dask":
        df_obs["VALUE"] = dd.to_numeric(df_obs["VALUE"], errors="coerce")
        df_pat["BIRTHDATE"] = dd.to_datetime(df_pat["BIRTHDATE"], errors="coerce")

    else:
        df_obs["VALUE"] = pd.to_numeric(df_obs["VALUE"], errors="coerce")
        df_pat["BIRTHDATE"] = pd.to_datetime(df_pat["BIRTHDATE"], errors="coerce")


    df_pat["AGE"] = datetime.now().year - df_pat["BIRTHDATE"].dt.year
    df_pat = df_pat[(df_pat["GENDER"] == "F") & (df_pat["AGE"] > 40)][["Id", "AGE"]]

    df_merged = df_obs.merge(df_pat, left_on="PATIENT", right_on="Id", how="inner")

    result = df_merged.groupby(["PATIENT", "CODE"])["VALUE"].mean()

    if library == "dask":
        result = result.compute()

    end = time.time()

    return result, round(end - start, 2)

pandas_result, pandas_time = process_data("pandas")
dask_result, dask_time = process_data("dask")
print(f"Pandas processing time: {pandas_time} seconds")
print(f"Dask processing time: {dask_time} seconds")

Pandas processing time: 1.92 seconds
Dask processing time: 0.45 seconds


In [110]:
import os
import time
import pandas as pd
import dask.dataframe as dd

def count_total_rows(library):
    directory = "parquet_files"
    all_files = []
    for f in os.listdir(directory):
        all_files.append(os.path.join(directory, f))

    start = time.time()
    total_rows = 0
    for file in all_files:
        if library == 'dask':
            ddf = dd.read_parquet(file)
            row_count = len(ddf)
        else:
            df = pd.read_parquet(file)
            row_count = len(df)
        total_rows += row_count
    end = time.time()
    return total_rows, round(end - start, 2)

pandas_total, pandas_time = count_total_rows("pandas")
dask_total, dask_time = count_total_rows("dask")
print(f"Pandas processing time: {pandas_time} seconds")
print(f"Dask processing time: {dask_time} seconds")

Pandas processing time: 3.75 seconds
Dask processing time: 0.28 seconds


In [119]:
client.close()
# cluster.close()