# 1. Performance Testing Notebook

This notebook is designed for conducting performance tests and analyzing the execution time of different operations (both in Pandas and PySpark).

In [None]:
import pandas as pd
import utils
import test_functions as test

from pyspark.sql import SparkSession

## Load Data

### Load Pandas Dataframes 
execution time: 37 seconds

In [None]:
# load small step data
small_step_pd = []
for i in range(10_000, 100_001, 10_000):
    df = pd.read_csv(f"data/small/pandas_test_{i}_rows.csv")
    small_step_pd.append(df)

# load large step data
large_step_pd = []
for i in range(50_000, 1_000_001, 50_000):
    df = pd.read_csv(f"data/large/pandas_test_{i}_rows.csv")
    large_step_pd.append(df)

### Load Spark Dataframes 
execution time: 28 seconds

In [None]:
# Create a SparkSession
spark = SparkSession.builder \
    .appName("Performance Analysis") \
    .getOrCreate()

In [None]:
# load small step data
small_step_spark = []
for i in range(10_000, 100_001, 10_000):
    df = spark.read.csv(f"data/small/pandas_test_{i}_rows.csv", header=True, inferSchema=True, sep=",")
    small_step_spark.append(df)

# load large step data
large_step_spark = []
for i in range(50_000, 1_000_001, 50_000):
    df = spark.read.csv(f"data/large/pandas_test_{i}_rows.csv", header=True, inferSchema=True)
    large_step_spark.append(df)

## Run Tests

#### Test 1: Write Dataframe to CSV

In [None]:
# load time small statistics
time_statistics_small = pd.read_csv("data/time_statistics_small.csv")

# run test
time_statistics_small = utils.test_run(test.write_data, "write_pd", "write_spark", time_statistics_small, small_step_pd, small_step_spark, spark, 5)

# save time small statistics
time_statistics_small.to_csv("data/time_statistics_small.csv", index=False)

In [None]:
# load time statistics
time_statistics_large = pd.read_csv("data/time_statistics_large.csv")

# I iterated in steps - because of memory allocation issues
# if you run the test on your own machine, modify the steps accordingly
for i in range(17, 20):
    df_pd = large_step_pd[i]
    df_spark = large_step_spark[i]
    print(df_pd.shape)
    average_pandas_time, average_pyspark_time, _ , _ = utils.test_iterations(test.write_data, 5, spark, df_pd, df_spark)
    time_statistics_large.loc[(time_statistics_large['row_count'] == df_pd.shape[0]) & (time_statistics_large["column_count"] == df_pd.shape[1]), ["write_pd", "write_spark"]] = [average_pandas_time, average_pyspark_time]

# save time statistics
time_statistics_large.to_csv("data/time_statistics_large.csv", index=False)

#### Test 2: Load Dataframe from CSV

In [None]:
# load time small statistics
time_statistics_small = pd.read_csv("data/time_statistics_small.csv")

# run test
time_statistics_small = utils.test_run(test.load_data, "read_pd", "read_spark", time_statistics_small, small_step_pd, small_step_spark, spark, 5)

# save time small statistics
time_statistics_small.to_csv("data/time_statistics_small.csv", index=False)

In [None]:
# load time statistics
time_statistics_large = pd.read_csv("data/time_statistics_large.csv")

# run test
time_statistics_large = utils.test_run(test.load_data, "read_pd", "read_spark", time_statistics_large, large_step_pd, large_step_spark, spark, 5)

# save time statistics
time_statistics_large.to_csv("data/time_statistics_large.csv", index=False)

#### Test 3: Drop NaN Values

In [None]:
# load time small statistics
time_statistics_small = pd.read_csv("data/time_statistics_small.csv")

# run test
time_statistics_small = utils.test_run(test.drop_nan, "drop_na_pd", "drop_na_spark", time_statistics_small, small_step_pd, small_step_spark, spark, 100)

# save time small statistics
time_statistics_small.to_csv("data/time_statistics_small.csv", index=False)

In [None]:
# load time statistics
time_statistics_large = pd.read_csv("data/time_statistics_large.csv")

# run test
time_statistics_large = utils.test_run(test.drop_nan, "drop_na_pd", "drop_na_spark", time_statistics_large, large_step_pd, large_step_spark, spark, 100)

# save time statistics
time_statistics_large.to_csv("data/time_statistics_large.csv", index=False)

#### Test 4: Fill NaN Values

In [None]:
# load time small statistics
time_statistics_small = pd.read_csv("data/time_statistics_small.csv")

# run test
time_statistics_small = utils.test_run(test.fill_nan, "fill_na_pd", "fill_na_spark", time_statistics_small, small_step_pd, small_step_spark, spark, 100)

# save time small statistics
time_statistics_small.to_csv("data/time_statistics_small.csv", index=False)

In [None]:
# load time statistics
time_statistics_large = pd.read_csv("data/time_statistics_large.csv")

# run test
time_statistics_large = utils.test_run(test.fill_nan, "fill_na_pd", "fill_na_spark", time_statistics_large, large_step_pd, large_step_spark, spark, 50)

# save time statistics
time_statistics_large.to_csv("data/time_statistics_large.csv", index=False)

#### Test 5: Groupby

In [None]:
# load time small statistics
time_statistics_small = pd.read_csv("data/time_statistics_small.csv")

# run test
time_statistics_small = utils.test_run(test.group_df, "group_pd", "group_spark", time_statistics_small, small_step_pd, small_step_spark, spark, 100)

# save time small statistics
time_statistics_small.to_csv("data/time_statistics_small.csv", index=False)

In [None]:
# load time statistics
time_statistics_large = pd.read_csv("data/time_statistics_large.csv")

# run test
time_statistics_large = utils.test_run(test.group_df, "group_pd", "group_spark", time_statistics_large, large_step_pd, large_step_spark, spark, 100)

# save time statistics
time_statistics_large.to_csv("data/time_statistics_large.csv", index=False)

#### Test 6: GroupBy and Sum

In [None]:
# load time small statistics
time_statistics_small = pd.read_csv("data/time_statistics_small.csv")

# run test
time_statistics_small = utils.test_run(test.group_sum_df, "group_sum_pd", "group_sum_spark", time_statistics_small, small_step_pd, small_step_spark, spark, 50)

# save time small statistics
time_statistics_small.to_csv("data/time_statistics_small.csv", index=False)

In [None]:
# load time statistics
time_statistics_large = pd.read_csv("data/time_statistics_large.csv")

# run test
time_statistics_large = utils.test_run(test.group_sum_df, "group_sum_pd", "group_sum_spark", time_statistics_large, large_step_pd, large_step_spark, spark, 30)

# save time statistics
time_statistics_large.to_csv("data/time_statistics_large.csv", index=False)

#### Test 7: GroupBy and Count

In [None]:
# load time small statistics
time_statistics_small = pd.read_csv("data/time_statistics_small.csv")

# run test
time_statistics_small = utils.test_run(test.group_count_df, "group_count_pd", "group_count_spark", time_statistics_small, small_step_pd, small_step_spark, spark, 50)

# save time small statistics
time_statistics_small.to_csv("data/time_statistics_small.csv", index=False)

In [None]:
# load time statistics
time_statistics_large = pd.read_csv("data/time_statistics_large.csv")

# run test
time_statistics_large = utils.test_run(test.group_count_df, "group_count_pd", "group_count_spark", time_statistics_large, large_step_pd, large_step_spark, spark, 30)

# save time statistics
time_statistics_large.to_csv("data/time_statistics_large.csv", index=False)

#### Test 8: Filter by Column Value (under 0)

In [None]:
# load time small statistics
time_statistics_small = pd.read_csv("data/time_statistics_small.csv")

# run test
time_statistics_small = utils.test_run(test.filter_less_0, "filter_less_0_pd", "filter_less_0_spark", time_statistics_small, small_step_pd, small_step_spark, spark, 100)

# save time small statistics
time_statistics_small.to_csv("data/time_statistics_small.csv", index=False)

In [None]:
# load time statistics
time_statistics_large = pd.read_csv("data/time_statistics_large.csv")

# run test
time_statistics_large = utils.test_run(test.filter_less_0, "filter_less_0_pd", "filter_less_0_spark", time_statistics_large, large_step_pd, large_step_spark, spark, 50)

# save time statistics
time_statistics_large.to_csv("data/time_statistics_large.csv", index=False)

#### Test 9: Filter by Column Value (under 10)

In [None]:
# load time small statistics
time_statistics_small = pd.read_csv("data/time_statistics_small.csv")

# run test
time_statistics_small = utils.test_run(test.filter_less_10, "filter_less_10_pd", "filter_less_10_spark", time_statistics_small, small_step_pd, small_step_spark, spark, 100)

# save time small statistics
time_statistics_small.to_csv("data/time_statistics_small.csv", index=False)

In [None]:
# load time statistics
time_statistics_large = pd.read_csv("data/time_statistics_large.csv")

# run test
time_statistics_large = utils.test_run(test.filter_less_10, "filter_less_10_pd", "filter_less_10_spark", time_statistics_large, large_step_pd, large_step_spark, spark, 50)

# save time statistics
time_statistics_large.to_csv("data/time_statistics_large.csv", index=False)

#### Test 10: Join Dataframes

In [None]:
# load time small statistics
time_statistics_small = pd.read_csv("data/time_statistics_small.csv")

# run test
time_statistics_small = utils.test_run(test.join_df, "join_pd", "join_spark", time_statistics_small, small_step_pd, small_step_spark, spark, 5)

# save time small statistics
time_statistics_small.to_csv("data/time_statistics_small.csv", index=False)


In [None]:
# load time statistics
time_statistics_large = pd.read_csv("data/time_statistics_large.csv")

# run test
time_statistics_large = utils.test_run(test.join_df, "join_pd", "join_spark", time_statistics_large, large_step_pd, large_step_spark, spark, 3)

# save time statistics
time_statistics_large.to_csv("data/time_statistics_large.csv", index=False)

#### Test 11: Multiplication (Build-In)

In [None]:
# load time statistics
time_statistics_small = pd.read_csv("data/time_statistics_small.csv")

# run test
time_statistics_small = utils.test_run(test.multiply_build_in, "mul_build_pd", "mul_build_spark", time_statistics_small, small_step_pd, small_step_spark, 100000)

# save time small statistics
time_statistics_small.to_csv("data/time_statistics_small.csv", index=False)

In [None]:
# load time statistics
time_statistics_large = pd.read_csv("data/time_statistics_large.csv")

# run test
time_statistics_large = utils.test_run(test.multiply_build_in, "mul_build_pd", "mul_build_spark", time_statistics_large, large_step_pd, large_step_spark, 100000)

# save time statistics
time_statistics_large.to_csv("data/time_statistics_large.csv", index=False)

#### Test 12: Multiplication (Column)

In [None]:
# load time small statistics
time_statistics_small = pd.read_csv("data/time_statistics_small.csv")

# run test
time_statistics_small = utils.test_run(test.multiply_by_selection, "mul_col_pd", "mul_col_spark", time_statistics_small, small_step_pd, small_step_spark, spark, 1000)

# save time small statistics
time_statistics_small.to_csv("data/time_statistics_small.csv", index=False)

In [None]:
# load time statistics
time_statistics_large = pd.read_csv("data/time_statistics_large.csv")

# run test
time_statistics_large = utils.test_run(test.multiply_by_selection, "mul_col_pd", "mul_col_spark", time_statistics_large, large_step_pd, large_step_spark, spark, 500)

# save time statistics
time_statistics_large.to_csv("data/time_statistics_large.csv", index=False)

#### Test 13: Convert Dataframe

In [None]:
# load time small statistics
time_statistics_small = pd.read_csv("data/time_statistics_small.csv")

# run test
time_statistics_small = utils.test_run(test.convert_df, "pd_to_spark", "pyspark_to_pd", time_statistics_small, small_step_pd, small_step_spark, spark, 5)

# save time small statistics
time_statistics_small.to_csv("data/time_statistics_small.csv", index=False)

In [None]:
# load time statistics
time_statistics_large = pd.read_csv("data/time_statistics_large.csv")

# I iterated in steps - because of memory allocation issues
for i in range(18, 20):
    df_pd = large_step_pd[i]
    df_spark = large_step_spark[i]
    print(df_pd.shape)
    average_pandas_time, average_pyspark_time, _ , _ = utils.test_iterations(test.convert_df, 5, spark, df_pd, df_spark)
    time_statistics_large.loc[(time_statistics_large['row_count'] == df_pd.shape[0]) & (time_statistics_large["column_count"] == df_pd.shape[1]), ["pd_to_spark", "pyspark_to_pd"]] = [average_pandas_time, average_pyspark_time]

# save time statistics
time_statistics_large.to_csv("data/time_statistics_large.csv", index=False)