In [11]:
from pyspark.sql import SparkSession
from datetime import datetime
from pyspark.sql.types import *
import pandas as pd
import pyspark.sql.functions as f 

def spark_run(file_name):
    SCHEMA = StructType([StructField('key', IntegerType(), False),
                         StructField('value1', IntegerType(), False),
                         StructField('value2', IntegerType(), False),
                         StructField('value3', IntegerType(), False),])
    spark = SparkSession.builder \
        .appName("Spark Benchmarking") \
        .getOrCreate()
    file_vals = spark\
        .read\
        .load(file_name, format="csv", header='true', schema=SCHEMA)\
    file_vals["Ratio1"] = file_vals.apply(lambda x : (x["value1"] - x["value2"])/x["value3"], axis=1, meta='int')
    file_vals["Ratio2"] = file_vals.apply(lambda x : (x["value1"] * x["value2"])/x["value3"], axis=1, meta='int')
    file_vals = file_vals.groupby("key").agg({'Ratio1' : 'mean', 'Ratio2' : 'mean'})
    file_vals.csv(f'./output/{file_name}.csv')
   
    print(file_vals.head(5))

    del spark
    del file_vals
    
%timeit spark_run('./data/sample_file_9.csv')

[Row(key=148, avg(value)=413.5625), Row(key=463, avg(value)=450.0625), Row(key=471, avg(value)=464.484375), Row(key=496, avg(value)=425.34375), Row(key=833, avg(value)=439.0625)]
[Row(key=148, avg(value)=413.5625), Row(key=463, avg(value)=450.0625), Row(key=471, avg(value)=464.484375), Row(key=496, avg(value)=425.34375), Row(key=833, avg(value)=439.0625)]
[Row(key=148, avg(value)=413.5625), Row(key=463, avg(value)=450.0625), Row(key=471, avg(value)=464.484375), Row(key=496, avg(value)=425.34375), Row(key=833, avg(value)=439.0625)]
[Row(key=148, avg(value)=413.5625), Row(key=463, avg(value)=450.0625), Row(key=471, avg(value)=464.484375), Row(key=496, avg(value)=425.34375), Row(key=833, avg(value)=439.0625)]
[Row(key=148, avg(value)=413.5625), Row(key=463, avg(value)=450.0625), Row(key=471, avg(value)=464.484375), Row(key=496, avg(value)=425.34375), Row(key=833, avg(value)=439.0625)]
[Row(key=148, avg(value)=413.5625), Row(key=463, avg(value)=450.0625), Row(key=471, avg(value)=464.484375

In [2]:
import pandas as pd
import glob
def pandas_run(file_name):
    file_vals = pd.read_csv(file_name, sep=",")
    file_vals.columns = ["key", "value1", "value2", "value3"]
    file_vals["Ratio1"] = file_vals.apply(lambda x : (x["value1"] - x["value2"])/x["value3"], axis=1, meta='int')
    file_vals["Ratio2"] = file_vals.apply(lambda x : (x["value1"] * x["value2"])/x["value3"], axis=1, meta='int')
    file_vals = file_vals.groupby("key").agg({'Ratio1' : 'mean', 'Ratio2' : 'mean'})
    file_vals.to_csv(f'./output/{file_name}.csv')
    del file_vals
  
for filename in glob.glob('./data/*.csv'):
    print(filename)
    %timeit pandas_run(filename)

./data/sample_file_1.csv
0    662.0
1    785.5
2    417.5
3    534.5
4    381.0
Name: value, dtype: float64
0    662.0
1    785.5
2    417.5
3    534.5
4    381.0
Name: value, dtype: float64
0    662.0
1    785.5
2    417.5
3    534.5
4    381.0
Name: value, dtype: float64
0    662.0
1    785.5
2    417.5
3    534.5
4    381.0
Name: value, dtype: float64
0    662.0
1    785.5
2    417.5
3    534.5
4    381.0
Name: value, dtype: float64
0    662.0
1    785.5
2    417.5
3    534.5
4    381.0
Name: value, dtype: float64
0    662.0
1    785.5
2    417.5
3    534.5
4    381.0
Name: value, dtype: float64
0    662.0
1    785.5
2    417.5
3    534.5
4    381.0
Name: value, dtype: float64
402 ms ± 11.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
./data/sample_file_2.csv
0    359.75
1    639.50
2    342.50
3    545.75
4    525.25
Name: value, dtype: float64
0    359.75
1    639.50
2    342.50
3    545.75
4    525.25
Name: value, dtype: float64
0    359.75
1    639.50
2    342.50
3    5

In [1]:
from pyspark.sql import SparkSession
SparkSession.builder \
        .appName("Spark Benchmarking") \
        .master('local[16]')\
        .config("spark.driver.memory", '10g')\
        .getOrCreate()

import databricks.koalas as ks
import glob
def koalas_run(file_name):
    file_vals = ks.read_csv(file_name, sep=",", header=0)
    file_vals = file_vals.groupby("key").mean()
    print(file_vals.head(5))
    del file_vals
  
for filename in glob.glob('./data/*.csv'):
    print(filename)
    %timeit koalas_run(filename)



./data/sample_file_1.csv
     value
key       
0    662.0
1    785.5
2    417.5
3    534.5
4    381.0
     value
key       
0    662.0
1    785.5
2    417.5
3    534.5
4    381.0
     value
key       
0    662.0
1    785.5
2    417.5
3    534.5
4    381.0
     value
key       
0    662.0
1    785.5
2    417.5
3    534.5
4    381.0
     value
key       
0    662.0
1    785.5
2    417.5
3    534.5
4    381.0
     value
key       
0    662.0
1    785.5
2    417.5
3    534.5
4    381.0
     value
key       
0    662.0
1    785.5
2    417.5
3    534.5
4    381.0
     value
key       
0    662.0
1    785.5
2    417.5
3    534.5
4    381.0
1.88 s ± 182 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
./data/sample_file_2.csv
      value
key        
0    359.75
1    639.50
2    342.50
3    545.75
4    525.25
      value
key        
0    359.75
1    639.50
2    342.50
3    545.75
4    525.25
      value
key        
0    359.75
1    639.50
2    342.50
3    545.75
4    525.25
      value
key

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions  as f
SparkSession.builder \
        .appName("Spark Benchmarking") \
        .master('local[16]')\
        .config("spark.driver.memory", '10g')\
        .getOrCreate()

import databricks.koalas as ks
import glob
def koalas_run(file_name):
    file_vals = ks.read_csv(file_name, sep=",", header=0)
    file_vals = file_vals\
        .groupby("key")\
        .mean()
    print(file_vals.head(5))
    del file_vals
  
for filename in glob.glob('./data/*.csv'):
    print(filename)
    %timeit koalas_run(filename)


./data/sample_file_1.csv
     value
key       
0    662.0
1    785.5
2    417.5
3    534.5
4    381.0
     value
key       
0    662.0
1    785.5
2    417.5
3    534.5
4    381.0
     value
key       
0    662.0
1    785.5
2    417.5
3    534.5
4    381.0
     value
key       
0    662.0
1    785.5
2    417.5
3    534.5
4    381.0
     value
key       
0    662.0
1    785.5
2    417.5
3    534.5
4    381.0
     value
key       
0    662.0
1    785.5
2    417.5
3    534.5
4    381.0
     value
key       
0    662.0
1    785.5
2    417.5
3    534.5
4    381.0
