# Arrow, Pandas, Polars and PySpark - simple Comparison
In this notebook, we want to take a closer look at the concepts and learn which one is best suited for which scenario.
Credits:
- https://amanjaiswalofficial.medium.com/apache-arrow-making-spark-even-faster-3ae-8ca8e1a67dc7
- https://www.datacamp.com/de/tutorial/apache-arrow

In [1]:
# required imports
import time
from time import perf_counter
import pandas as pd
import numpy as np
import polars as pl
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
RUN_SPARK = True

## Generate Test Data
### lets first generate some test data
you can always adjust the size by adjusting `NUM_ROWS`

In [None]:
NUM_ROWS = 10000000 # kann angepasst werden

# %%
# Datensatz erzeugen
np.random.seed(42)
id = np.arange(1, NUM_ROWS+1)
x = np.random.randn(NUM_ROWS) * 10 + 50
y = np.random.randint(1, 5, size=NUM_ROWS)
target = 2*x + y + np.random.randn(NUM_ROWS)*5

df_pd = pd.DataFrame({
    'id': id,
    'x': x,
    'category': y,
    'target': target
})

df_pd.to_csv('data/large_dataset.csv', index=False)

In [2]:
tmpdf = pl.read_csv('large_dataset.csv')
tmpdf.head()

id,x,category,target
i64,f64,i64,f64
1,54.967142,2,121.51567
2,48.617357,1,89.778238
3,56.476885,3,108.760504
4,65.230299,3,126.704923
5,47.658466,3,101.613006


In [3]:
# Timer
def timed(name, func):
    start = perf_counter()
    result = func()
    end = perf_counter()
    print(f"{name}: {end-start:.4f} s")
    return result

## Environment & Spark session setup

This cell creates the SparkSession and sets runtime configuration for local execution. 

In [4]:
spark = SparkSession.builder.appName("Runtime_Comparison").getOrCreate()
spark.conf.set("spark.sql.execution.arrow.enabled", "false")

## Arrow

"Apache Arrow is a multi-language toolbox for building high performance applications that process and transport large data sets. It is designed to both improve the performance of analytical algorithms and the efficiency of moving data from one system or programming language to another.

A critical component of Apache Arrow is its in-memory columnar format, a standardized, language-agnostic specification for representing structured, table-like datasets in-memory. This data format has a rich data type system (included nested and user-defined data types) designed to support the needs of analytic database systems, data frame libraries, and more". [https://arrow.apache.org/overview/]

### Why Arrow
- Arrow uses its in-memory format, which has memory buffers organized in columns and batches. 
- this makes vectorised processing possible, performing operations on entire columns efficiently

### Shared Memory Model
- Arrow avoids traditional serialization
- Instead, it relies on a shared memory model in which multiple processes can directly access the same data without copying or converting it.

### Pandas <-> Spark: conversion runtime comparison

In [5]:
spark_df = spark.read.csv("large_dataset.csv", header=True, inferSchema=True)

In [7]:
pandas_df = timed("Without Arrow to pdf:", lambda:spark_df.toPandas())
spark_df = timed("Without Arrow to spark df:", lambda:spark.createDataFrame(pandas_df))

Without Arrow to pdf:: 32.2962 s
Without Arrow to spark df:: 181.2396 s


In [13]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
#spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 1000)
#pandas_df = timed("With Arrow to pandas_df:", lambda:spark_df.toPandas())
spark_df = timed("With Arrow to spark df:", lambda:spark.createDataFrame(pandas_df))

With Arrow to spark df:: 1.1740 s


### Pandas <-> Polars: conversion runtime comparison

In [18]:
spark.conf.set("spark.sql.execution.arrow.enabled", "false")
polars_df = timed("Without Arrow to polars df:", lambda: pl.from_pandas(pandas_df))
pandas_df = timed("Without Arrow to pandas df:", lambda: polars_df.to_pandas())

Without Arrow to polars df:: 0.1204 s
Without Arrow to pandas df:: 0.2680 s


In [19]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
polars_df = timed("Without Arrow to polars df:", lambda: pl.from_pandas(pandas_df))
pandas_df = timed("Without Arrow to pandas df:", lambda: polars_df.to_pandas())

Without Arrow to polars df:: 0.0490 s
Without Arrow to pandas df:: 0.0590 s


### Spark <-> Polars: conversion runtime comparison
#### intermediate step `toPandas` needed --> use PySpark Arrow

In [20]:
# You can implement the conversion steps here

# `read`, `fillna` and `groupBy` Comparison

## Pandas: Runtime

### Duration of Pandas `read_csv`

In [23]:
pandas_df = timed("Pandas read", lambda:pd.read_csv("large_dataset.csv"))

Pandas read: 2.7614 s


In [24]:
#df = pd.DataFrame({'id': id, 'x': x, 'category': y, 'target': target})

print("--- pandas ---")
pandas_df = timed('pandas_fillna', lambda: pandas_df.fillna(0))
pandas_df_group = timed('pandas_groupby', lambda: pandas_df.groupby('category')['target'].mean())
print(pandas_df_group)

--- pandas ---
pandas_fillna: 0.0559 s
pandas_groupby: 0.1506 s
category
1    100.995639
2    102.010160
3    103.013624
4    103.974664
Name: target, dtype: float64


## Polars: Runtime

In [25]:
def polars_groupby(polars_df):
    return polars_df.select([
        pl.col("category"),
        pl.col("target").mean().over("category").alias("mean_target")
    ]).unique(subset="category")

### Duration of Polars `read_csv`

In [26]:
polars_df = timed("Polars read", lambda:pl.read_csv("large_dataset.csv"))

Polars read: 0.5195 s


In [27]:
print("--- polars ---")
polars_df = timed('polars_fillna', lambda: polars_df.fill_null(0))
polars_df_group = timed('polars_groupby', lambda:polars_groupby(polars_df))
print(polars_df_group)

--- polars ---
polars_fillna: 0.1026 s
polars_groupby: 0.2109 s
shape: (4, 2)
┌──────────┬─────────────┐
│ category ┆ mean_target │
│ ---      ┆ ---         │
│ i64      ┆ f64         │
╞══════════╪═════════════╡
│ 3        ┆ 103.013624  │
│ 4        ┆ 103.974664  │
│ 1        ┆ 100.995639  │
│ 2        ┆ 102.01016   │
└──────────┴─────────────┘


## PySpark: Runtime

### Duration of Spark `read.csv`

In [29]:
spark_df = timed("Spark read", lambda: spark.read.csv("large_dataset.csv", header=True, inferSchema=True))

Spark read: 18.3132 s


In [30]:
print("--- pyspark ---")
spark = SparkSession.builder.master('local[*]').appName('SimpleCompare').getOrCreate()
spark_df = timed('spark_fillna', lambda: spark_df.na.fill(0))
spark_df_group = timed('spark_groupby', lambda: spark_df.groupBy('category').agg(F.mean('target').alias('mean')))
spark_df_group.show(5)
spark.stop()

--- pyspark ---
spark_fillna: 0.1062 s
spark_groupby: 0.1753 s
+--------+------------------+
|category|              mean|
+--------+------------------+
|       1|100.99563886783933|
|       3|103.01362398979641|
|       4|103.97466350434654|
|       2|102.01016019808266|
+--------+------------------+

