In [1]:
# Import pandas dask and numpy
import pandas as pd
import dask.dataframe as dd
import numpy as np
import time
import modin.pandas as mpd
import ray

In [2]:
# Import the data
start_dask = time.time()
df = dd.read_csv('./data/UserAchievements.csv')
print("Dask dataframe: ", df)
dask_time = time.time() - start_dask
print("Dask time: ", dask_time)

Dask dataframe:  Dask DataFrame Structure:
                   Id UserId AchievementType   Tier TierAchievementDate Points CurrentRanking HighestRanking TotalGold TotalSilver TotalBronze
npartitions=49                                                                                                                                
                int64  int64          object  int64              object  int64        float64        float64     int64       int64       int64
                  ...    ...             ...    ...                 ...    ...            ...            ...       ...         ...         ...
...               ...    ...             ...    ...                 ...    ...            ...            ...       ...         ...         ...
                  ...    ...             ...    ...                 ...    ...            ...            ...       ...         ...         ...
                  ...    ...             ...    ...                 ...    ...            ...      

In [3]:
# Import the data with pandas
start_pandas = time.time()
df = pd.read_csv('./data/UserAchievements.csv')
print("Pandas dataframe: ", df)
pandas_time = time.time() - start_pandas
print("Pandas time: ", pandas_time)

Pandas dataframe:                   Id    UserId AchievementType  Tier TierAchievementDate  \
0           3739822         1      Discussion     1          11/06/2019   
1           3916402         1    Competitions     1          11/06/2019   
2           3739823       368    Competitions     1          07/15/2016   
3           3739824       368         Scripts     2          09/21/2016   
4           3739825       368      Discussion     2          08/30/2016   
...             ...       ...             ...   ...                 ...   
42871159  173261487  11652940    Competitions     0          09/20/2022   
42871160  173261488  11652941        Datasets     0          09/20/2022   
42871161  173261489  11652941      Discussion     0          09/20/2022   
42871162  173261490  11652941         Scripts     0          09/20/2022   
42871163  173261491  11652941    Competitions     0          09/20/2022   

          Points  CurrentRanking  HighestRanking  TotalGold  TotalSilver  \
0   

In [4]:
# Import the data with ray
start_ray = time.time()
ray.init()

2022-10-01 21:42:43,201	INFO worker.py:1518 -- Started a local Ray instance.


0,1
Python version:,3.7.7
Ray version:,2.0.0


In [5]:
df = ray.data.read_csv('./data/UserAchievements.csv')
print("Ray dataframe: ", df)
ray_time = time.time() - start_ray
print("Ray time: ", ray_time)



Ray dataframe:  Dataset(num_blocks=1, num_rows=42871164, schema={Id: int64, UserId: int64, AchievementType: string, Tier: int64, TierAchievementDate: string, Points: int64, CurrentRanking: int64, HighestRanking: int64, TotalGold: int64, TotalSilver: int64, TotalBronze: int64})
Ray time:  53.74173331260681


In [6]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting testutility.py


In [7]:
%%writefile file.yaml
file_type: csv
dataset_name: UserAchievments
file_name: UserAchievments
table_name: UserAchievments
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - Id
    - UserId
    - AchievementType
    - Tier
    - TierAchievementDate
    - Points
    - CurrentRanking
    - HighestRanking
    - TotalGold
    

Overwriting file.yaml


In [8]:
# Read the config file
import testutility as tu
config = tu.read_config_file('file.yaml')

In [9]:
config['file_name']

'UserAchievments'

In [10]:
config

{'file_type': 'csv',
 'dataset_name': 'UserAchievments',
 'file_name': 'UserAchievments',
 'table_name': 'UserAchievments',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['Id',
  'UserId',
  'AchievementType',
  'Tier',
  'TierAchievementDate',
  'Points',
  'CurrentRanking',
  'HighestRanking',
  'TotalGold']}

In [11]:
# read data with config file
df = pd.read_csv('./data/UserAchievements.csv', delimiter = config['inbound_delimiter'])

In [12]:
#validate column names
tu.col_header_val(df,config)

column name and column length validation failed
Following File columns are not in the YAML file ['totalsilver', 'totalbronze']
Following YAML columns are not in the file uploaded []


0

In [13]:
print("Data frame Column: ",df.columns)
print("columns from YAML config file: ",config['columns'])

Data frame Column:  Index(['id', 'userid', 'achievementtype', 'tier', 'tierachievementdate',
       'points', 'currentranking', 'highestranking', 'totalgold',
       'totalsilver', 'totalbronze'],
      dtype='object')
columns from YAML config file:  ['Id', 'UserId', 'AchievementType', 'Tier', 'TierAchievementDate', 'Points', 'CurrentRanking', 'HighestRanking', 'TotalGold']


In [14]:
# add time variables to dictionary variable
time_dict = {'dask_time': dask_time, 'pandas_time': pandas_time, 'ray_time': ray_time}

# lowest time
min_time = min(time_dict, key=time_dict.get)
# print the lowest time and the time it took
print("The lowest time is: ", min_time, " and it took: ", time_dict[min_time])
maximum_time = max(time_dict, key=time_dict.get)
print("The maximum time is: ", maximum_time, " and it took: ", time_dict[maximum_time])

The lowest time is:  dask_time  and it took:  0.02496337890625
The maximum time is:  ray_time  and it took:  53.74173331260681


In [15]:
#print the time it took for each
print("Dask time: ", dask_time)
print("Pandas time: ", pandas_time)
print("Ray time: ", ray_time)

Dask time:  0.02496337890625
Pandas time:  36.32085371017456
Ray time:  53.74173331260681
