# Pandas Alternatives
##### “What is the average revenue per user by treatment group?”
- Step 1: Load data
- Step 2: Group + aggregate
- Step 3: Convert to pandas
- Step 4: Plot
- Step 5: One sentence takeaway

In [1]:
import numpy as np
import pandas as pd

np.random.seed(42)

n = 10_000
df = pd.DataFrame({
    "user_id": np.arange(n),
    "treatment": np.random.choice(["control", "test"], size=n),
    "revenue": np.random.gamma(shape=2, scale=10, size=n)
})

## Pandas

In [2]:
summary_pd = (
    df
    .groupby("treatment", as_index=False)
    .agg(avg_revenue=("revenue", "mean"))
)
print("Pandas Summary:")
print(summary_pd)

Pandas Summary:
  treatment  avg_revenue
0   control    20.083176
1      test    20.063683


Normal every day pandas

## Polars

In [3]:
import polars as pl

pl_df = pl.from_pandas(df)

summary_pl = (
    pl_df
    .group_by(pl.col("treatment"))
    .agg(
        pl.col("revenue").mean().alias("avg_revenue")
    )
)

print("Polars Summary:")
print(summary_pl)

Polars Summary:
shape: (2, 2)
┌───────────┬─────────────┐
│ treatment ┆ avg_revenue │
│ ---       ┆ ---         │
│ str       ┆ f64         │
╞═══════════╪═════════════╡
│ control   ┆ 20.083176   │
│ test      ┆ 20.063683   │
└───────────┴─────────────┘


Feels like pyspark

## DuckDB

In [4]:
import duckdb

summary_duck = duckdb.sql("""
    SELECT
        treatment,
        AVG(revenue) AS avg_revenue
    FROM df
    GROUP BY treatment
""").df()
print("DuckDB Summary:")
print(summary_duck)

DuckDB Summary:
  treatment  avg_revenue
0      test    20.063683
1   control    20.083176


This is legit sql that works directly off of a pandas df

## Dask

In [5]:
import dask.dataframe as dd

ddf = dd.from_pandas(df, npartitions=4)

summary_dask = (
    ddf
    .groupby("treatment")
    .revenue
    .mean()
    .compute()
    .reset_index(name="avg_revenue")
)
print("Dask Summary:")
print(summary_dask)


Dask Summary:
  treatment  avg_revenue
0   control    20.083176
1      test    20.063683


Probably the least intuitive

### Installation issues with the remainder, lost interest to move forward, but here's a summary

- Pandas for exploration
- Polars for speed on one machine
- DuckDB for local SQL analytics on parquet
- Dask or Modin when scaling Pandas
- Spark only when data requires a cluster
- PyArrow sits underneath for efficient IO

## PySpark

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg

spark = SparkSession.builder.getOrCreate()

spark_df = spark.createDataFrame(df)

summary_spark = (
    spark_df
    .groupBy("treatment")
    .agg(avg("revenue").alias("avg_revenue"))
    .toPandas()
)
print("Spark Summary:")
print(summary_spark)

Py4JJavaError: An error occurred while calling o63.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 9 in stage 0.0 failed 1 times, most recent failure: Lost task 9.0 in stage 0.0 (TID 9) (DESKTOP-2OHFJED.lan executor driver): java.io.IOException: Cannot run program "python3": CreateProcess error=2, The system cannot find the file specified
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1143)
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1073)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:247)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:154)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:158)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:309)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:72)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:107)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:180)
	at org.apache.spark.scheduler.Task.run(Task.scala:147)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:716)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:86)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:83)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:97)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:719)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.io.IOException: CreateProcess error=2, The system cannot find the file specified
	at java.base/java.lang.ProcessImpl.create(Native Method)
	at java.base/java.lang.ProcessImpl.<init>(ProcessImpl.java:499)
	at java.base/java.lang.ProcessImpl.start(ProcessImpl.java:158)
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1110)
	... 35 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$3(DAGScheduler.scala:3122)
	at scala.Option.getOrElse(Option.scala:201)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3122)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3114)
	at scala.collection.immutable.List.foreach(List.scala:323)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3114)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1303)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1303)
	at scala.Option.foreach(Option.scala:437)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1303)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3397)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3328)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3317)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:50)
Caused by: java.io.IOException: Cannot run program "python3": CreateProcess error=2, The system cannot find the file specified
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1143)
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1073)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:247)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:154)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:158)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:309)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:72)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:107)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:180)
	at org.apache.spark.scheduler.Task.run(Task.scala:147)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:716)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:86)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:83)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:97)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:719)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.io.IOException: CreateProcess error=2, The system cannot find the file specified
	at java.base/java.lang.ProcessImpl.create(Native Method)
	at java.base/java.lang.ProcessImpl.<init>(ProcessImpl.java:499)
	at java.base/java.lang.ProcessImpl.start(ProcessImpl.java:158)
	at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1110)
	... 35 more


## Modin

In [7]:
import modin.pandas as mpd

mdf = mpd.DataFrame(df)

summary_modin = (
    mdf
    .groupby("treatment", as_index=False)
    .agg(avg_revenue=("revenue", "mean"))
)


ImportError: Please refer to installation documentation page to install an engine

## Vaex

In [8]:
import vaex
vdf = vaex.from_pandas(df)
summary_vaex = vdf.groupby("treatment", agg=vaex.agg.mean("revenue")).to_pandas_df()
summary_vaex.rename(columns={"revenue_mean": "avg_revenue"}, inplace=True)
print("Vaex Summary:")
print(summary_vaex)

ModuleNotFoundError: No module named 'vaex'

## PyArrow

In [9]:
import pyarrow as pa

table = pa.Table.from_pandas(df)

df_roundtrip = table.to_pandas()

In [10]:
import pyarrow.compute as pc

grouped = pc.group_by(
    table,
    keys=["treatment"],
    aggregates={"avg_revenue": ("revenue", "mean")}
)

grouped.to_pandas()

AttributeError: module 'pyarrow.compute' has no attribute 'group_by'