In [2]:
import os
import sys
import gc
from time import time, sleep, strftime, localtime

import pandas as pd
import dask.dataframe as dd
import modin.pandas as mpd
import vaex
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, avg
# pandas on ray has moved to Modin
# import ray.dataframe as rpd



In [16]:
# data based on https://www.kaggle.com/c/ieee-fraud-detection/data
folder = "/home/vaclav/Data/Kaggle/EEE-CIS_Fraud_Detection"
files = ["train_transaction.csv", "train_identity.csv"]
paths = [os.path.join(folder, f) for f in files]

In [8]:
stats = {}

# Pandas

In [9]:
pd.__version__

'1.1.4'

In [10]:
stats["pandas"] = {}
s = stats["pandas"]

ts = time()
df = pd.read_csv(paths[0])
te = time()
s["load_transactions"] = te-ts

ts = time()
df2 = pd.read_csv(paths[1])
te = time()
s["load_identity"] = te-ts

ts = time()
dff = df.merge(df2, on="TransactionID")
te = time()
s["merge"] = te-ts

ts = time()
grp = dff.groupby(["isFraud","ProductCD","card4","card6","id_15","id_31"])["TransactionAmt"].agg(["mean","sum"])
te = time()
s["aggregation"] = te-ts

ts = time()
dff.sort_values(by=["card1","addr1","D9"], inplace=True)
dff.sort_values(by=["addr1","D9","card1"], inplace=True)
dff.sort_values(by=["D9","card1","addr1"], inplace=True)
te = time()
s["sorting"] = te-ts

In [11]:
pd.DataFrame(stats)

Unnamed: 0,pandas
aggregation,0.066574
load_identity,0.550774
load_transactions,18.682878
merge,2.118241
sorting,1.177349


In [12]:
dff.to_pickle("data/dff.pkl")

In [13]:
# Because julia groups by including N\A, let's just check that number of groups matches
grp = dff[["isFraud","ProductCD","card4","card6","id_15","id_31","TransactionAmt"]].fillna("~U~")\
.groupby(["isFraud","ProductCD","card4","card6","id_15","id_31"])["TransactionAmt"].agg(["mean","sum"])
grp.shape

(4553, 2)

In [14]:
def clean(wait_time: int=15):
    """Cleans created DataFrames and call the garbage collector to actions. Wait for 15s by default"""
    df, df2, dff, grp = None, None, None, None
    gc.collect()
    sleep(wait_time)
    return None

In [15]:
clean()


In [16]:
def list_variables_memory_usage() -> dict:
    """Memory of existing local variables"""
    local_vars = list(locals().items())
    return {var: sys.getsizeof(obj) for var, obj in local_vars}

# Dask
When to use dask - https://docs.dask.org/en/latest/dataframe.html#common-uses-and-anti-uses

In [17]:
stats["dask"] = {}
s = stats["dask"]

ts = time()
df = dd.read_csv(paths[0])
te = time()
s["load_transactions"] = te-ts

ts = time()
df2 = dd.read_csv(paths[1])
te = time()
s["load_identity"] = te-ts

ts = time()
dff = df.merge(df2, on="TransactionID")
te = time()
s["merge"] = te-ts

# the difference is that we call compute method, which runs all the computations at this point
ts = time()
grp = dff.groupby(["isFraud","ProductCD","card4","card6","id_15","id_31"])["TransactionAmt"]\
    .agg(["mean","sum"])\
    .compute()
te = time()
s["aggregation"] = te-ts

# parallel soring is tricky that is why there are only work arounds in dask. 
ts = time()
dff.set_index("card1").compute()
te = time()
s["sorting"] = te-ts


In [18]:
clean()

In [19]:
pd.DataFrame(stats)

Unnamed: 0,pandas,dask
load_transactions,18.682878,0.059999
load_identity,0.550774,0.015863
merge,2.118241,0.05824
aggregation,0.066574,17.411429
sorting,1.177349,49.431917


In [20]:
stats["dask_indexed"] = {}
s = stats["dask_indexed"]

ts = time()
df = dd.read_csv(paths[0]).set_index("TransactionID")
te = time()
s["load_transactions"] = te-ts

ts = time()
df2 = dd.read_csv(paths[1]).set_index("TransactionID")
te = time()
s["load_identity"] = te-ts

ts = time()
dff = df.merge(df2, left_index=True, right_index=True)
te = time()
s["merge"] = te-ts

# the difference is that we call compute method, which runs all the computations at this point
ts = time()
grp = dff.groupby(["isFraud","ProductCD","card4","card6","id_15","id_31"])["TransactionAmt"]\
    .agg(["mean","sum"])\
    .compute()
te = time()
s["aggregation"] = te-ts

# parallel soring is tricky that is why there are only work arounds in dask. 
ts = time()
dff.set_index("card1").compute()
te = time()
s["sorting"] = te-ts


In [21]:
clean()
pd.DataFrame(stats)

Unnamed: 0,pandas,dask,dask_indexed
load_transactions,18.682878,0.059999,10.035912
load_identity,0.550774,0.015863,0.569751
merge,2.118241,0.05824,0.049744
aggregation,0.066574,17.411429,15.891529
sorting,1.177349,49.431917,53.414108


# Vaex

In [22]:
vaex.__version__

{'vaex-core': '2.0.3',
 'vaex-viz': '0.4.0',
 'vaex-hdf5': '0.6.0',
 'vaex-server': '0.3.1',
 'vaex-astro': '0.7.0',
 'vaex-jupyter': '0.5.2',
 'vaex-ml': '0.9.0',
 'vaex-arrow': '0.5.1'}

In [23]:
tool = "vaex"
stats[tool] = {}
s = stats[tool]


ts = time()
df = vaex.open(paths[0])
te = time()
s["load_transactions"] = te-ts

ts = time()
df2 = vaex.open(paths[1])
te = time()
s["load_identity"] = te-ts


In [24]:
ts = time()
dff = df.join(df2, on="TransactionID")
te = time()
s["merge"] = te-ts

In [25]:
# the difference is that we call compute method, which runs all the computations at this point
ts = time()
grp = dff.groupby([dff["isFraud"],dff["ProductCD"],dff["card4"],dff["card6"],dff["id_15"],dff["id_31"]], 
                  agg=[vaex.agg.mean('TransactionAmt'), vaex.agg.sum('TransactionAmt')])
te = time()
s["aggregation"] = te-ts


In [26]:
# the difference is that we call compute method, which runs all the computations at this point
ts = time()
dff_s = dff.sort(by=["card1","addr1","D9"])
dff_s = dff.sort(by=["addr1","D9","card1"])
dff_s = dff.sort(by=["D9","card1","addr1"])
te = time()
s["sorting"] = te-ts

In [27]:
pd.DataFrame(stats)

Unnamed: 0,pandas,dask,dask_indexed,vaex
load_transactions,18.682878,0.059999,10.035912,17.666177
load_identity,0.550774,0.015863,0.569751,0.884425
merge,2.118241,0.05824,0.049744,0.14327
aggregation,0.066574,17.411429,15.891529,0.494523
sorting,1.177349,49.431917,53.414108,1.108287


In [28]:
clean()

# PySpark
Java must be installed. (e.g. Ubuntu 18 - https://www.digitalocean.com/community/tutorials/how-to-install-java-with-apt-on-ubuntu-18-04)

In [29]:
from pyspark import SparkContext
sc = SparkContext()
sc.version
sc.stop()

In [30]:
# Create my_spark
my_spark = SparkSession.builder \
    .master("local") \
    .appName("Pandas Alternative") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [31]:
tool = "spark"
stats[tool] = {}
s = stats[tool]


ts = time()
df = my_spark.read.csv(paths[0],inferSchema = True,header= True) 
te = time()
s["load_transactions"] = te-ts

ts = time()
df2 = my_spark.read.csv(paths[1],inferSchema = True,header= True) 
te = time()
s["load_identity"] = te-ts

In [32]:

ts = time()
dff = df.join(df2, "TransactionID")
te = time()
s["merge"] = te-ts

In [33]:
# the difference is that we call collect method, which runs all the computations at this point
#ts = time()
#grp = dff.groupby([dff["isFraud"],dff["ProductCD"],dff["card4"],dff["card6"],dff["id_15"],dff["id_31"]]) \
#        .agg(avg("TransactionAmt"), sum("TransactionAmt"))\
#        .collect()
#te = time()
#s["aggregation"] = te-ts
#s["all"] = te-tss

In [34]:
# the difference is that we call collect method, which runs all the computations at this point
ts = time()
grp = dff.groupby(["isFraud","ProductCD","card4","card6","id_15","id_31"]) \
        .agg(avg("TransactionAmt"), sum("TransactionAmt"))\
        .collect()
te = time()
s["aggregation"] = te-ts


In [39]:
ts = time()
dff.orderBy("card1","addr1","D9").collect()
# alternatively
# dff.sort("card1","addr1","D9").collect()
te = time()
s["sorting"] = te-ts

In [40]:
stats_df = pd.DataFrame(stats)
stats_df.loc['Total'] = stats_df.sum()
stats_df

Unnamed: 0,pandas,dask,dask_indexed,vaex,spark
load_transactions,18.682878,0.059999,10.035912,17.666177,31.344823
load_identity,0.550774,0.015863,0.569751,0.884425,1.417775
merge,2.118241,0.05824,0.049744,0.14327,0.24481
aggregation,0.066574,17.411429,15.891529,0.494523,16.870172
sorting,1.177349,49.431917,53.414108,1.108287,90.440744
Total,22.595816,66.977449,79.961045,20.296683,140.318324


# Modin

In [31]:
mpd.__version__

'0.8.2'

In [32]:
tool = "modin"
stats[tool] = {}
s = stats[tool]


ts = time()
df = mpd.read_csv(paths[0])
te = time()
s["load_transactions"] = te-ts

ts = time()
df2 = mpd.read_csv(paths[1])
te = time()
s["load_identity"] = te-ts

ts = time()
dff = df.merge(df2, on="TransactionID")
te = time()
s["merge"] = te-ts

# modin defaults to pandas for multiple column aggregation and then fails on KeyError, though the key is available
ts = time()
try:
    grp = dff.groupby(["isFraud","ProductCD","card4","card6","id_15","id_31"])["TransactionAmt"].agg(["mean","sum"])
except Exception as e:
    print(e)
te = time()
s["aggregation"] = te-ts


'isFraud'


To request implementation, send an email to feature_requests@modin.org.


In [33]:
pd.DataFrame(stats)

Unnamed: 0,pandas,dask,dask_indexed,vaex,spark,modin
load_transactions,18.840576,0.056465,10.54713,16.921098,30.63642,13.85153
load_identity,0.464806,0.016233,0.814531,0.738712,1.918626,0.509824
merge,1.954993,0.054572,0.045397,0.115304,0.347089,8.27643
aggregation,0.061256,12.940852,13.947035,0.412613,14.489696,0.152401
sorting,1.124379,36.934965,51.42109,1.124599,,


In [34]:
clean()

In [35]:
pd.DataFrame(stats).to_csv("statistics.csv")

# Performance test

In [3]:
import psutil
import numpy as np

In [4]:
# use psutil to check for system resources
psutil.cpu_percent(), psutil.virtual_memory().percent

(29.1, 18.8)

In [5]:
def system_resources(n, pause, cpu_threshold = 0.5, mem_threshold = 0.5):
    
    cpu_m = []
    mem_m = []
    cpu_treshold_breached = False
    mem_treshold_breached = False
    
    for i in range(n):
        cpu_m.append(psutil.cpu_percent())
        mem_m.append(psutil.virtual_memory().percent)
        sleep(pause)
    cpu_m = np.mean(cpu_m)
    mem_m = np.mean(mem_m)
    
    if cpu_m / 100 > cpu_threshold:
        cpu_treshold_breached = True
        
    if mem_m / 100 > mem_threshold:
        mem_treshold_breached = True
    
    return {"cpu": cpu_m, "memory": mem_m, "cpu_threshold": cpu_treshold_breached, "mem_threshold": mem_treshold_breached }

In [6]:
system_resources(3, 1)

{'cpu': 13.133333333333333,
 'memory': 18.799999999999997,
 'cpu_threshold': False,
 'mem_threshold': False}

In [7]:
class Events:
    
    def __init__(self, path):
        self.file = open(path, 'a', encoding='utf-8')
        
    def log(self, time, tool, operation, duration):
        print("|".join([strftime('%Y-%m-%d %H:%M:%S', localtime(te)),tool,operation,str(duration)])+"\n")
        print(self.file)
        self.file.write("|".join([strftime('%Y-%m-%d %H:%M:%S', localtime(te)),tool,operation,str(duration)])+"\n")
        
    def close(self):
        self.file.close()

In [8]:
logger = Events("example.log")

In [76]:
ts = time()
df = pd.read_csv(paths[1])
te = time()
logger.log(te, "pandas", "load",  te-ts)

2021-01-14 00:18:56|pandas|load|0.48686718940734863

<_io.TextIOWrapper name='example.log' mode='a' encoding='utf-8'>


In [74]:
logger.file.write("|".join([strftime('%Y-%m-%d %H:%M:%S', localtime(te)),"pandas","load",str(ts-te)])+"\n")

52

In [11]:
te = time()


'2021-01-13 23:38:59'

In [11]:
import logging
logging.basicConfig(filename='/home/vaclav/Notebooks/Medium/DataFrames/lexample.log', encoding='utf-8', level=logging.DEBUG)

In [12]:
logging.info('This is an info message')

INFO:MainThread:root:This is an info message
