In [1]:
import numpy as np
import functools
import sys
print(sys.version)
from timeit import Timer

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import polars as pl
import duckdb
from datafusion import SessionContext

ctx = SessionContext()
con = duckdb.connect()


def generate_data(number_of_rows):
    rng = np.random.default_rng()

    return {
        "order_id": range(1, number_of_rows + 1),
        "region": rng.choice(
            ["North", "South", "East", "West"], size=number_of_rows
        ),
        "sales_person": rng.choice(
            ["Armstrong", "Aldrin", "Collins"], size=number_of_rows
        ),
        "product": rng.choice(
            ["Helmet", "Oxygen", "Boots", "Gloves"], size=number_of_rows
        ),
        "sales_income":  rng.integers(1, 5001, size=number_of_rows),
    }

3.13.5 | packaged by Anaconda, Inc. | (main, Jun 12 2025, 16:37:03) [MSC v.1929 64 bit (AMD64)]


In [2]:
def create_pandas_dataframe(data_file):
    return pd.read_parquet(data_file)

def create_pandas_dataframe_with_pyarrow(data_file):
    return pq.read_table(data_file).to_pandas(types_mapper=pd.ArrowDtype)

def create_pyarrow_dataframe(data_file):
    return pq.read_table(data_file)

def create_polars_dataframe(data_file):
    return pl.read_parquet(data_file)

def create_polars_lazyframe(data_file):
    return pl.scan_parquet(data_file)

In [None]:
def analyze_pandas_dataframe(pandas_df):
    return pandas_df.groupby(["region", "product", "sales_person"])[
        "sales_income"
    ].sum()

def analyze_pyarrow_dataframe(pyarrow_df):
    return pyarrow_df.group_by(["region", "product", "sales_person"]).aggregate(
        [("sales_income", "sum")]
    )

def analyze_datafusion_dataframe():
    sql = """
        SELECT region, product, sales_person, SUM(sales_income) AS total_sales
        FROM t
        GROUP BY region, product, sales_person
        """
    return ctx.sql(sql).to_arrow_table()

def analyze_datafusion_parquet():
    sql = """
        SELECT region, product, sales_person, SUM(sales_income) AS total_sales
        FROM t2
        GROUP BY region, product, sales_person
        """
    return ctx.sql(sql).to_arrow_table()

def analyze_duckdb_dataframe():
    sql = """
        SELECT region, product, sales_person, SUM(sales_income) AS total_sales
        FROM t
        GROUP BY region, product, sales_person
        """
    return con.execute(sql).fetch_arrow_table()

def analyze_duckdb_parquet(data_file):
    sql = f"""
        SELECT region, product, sales_person, SUM(sales_income) AS total_sales
        FROM "{data_file}"
        GROUP BY region, product, sales_person
        """
    return con.execute(sql).fetch_arrow_table()


def analyze_polars_dataframe(polars_df):
    return polars_df.lazy().group_by(["region", "product", "sales_person"]).agg(
        total_sales=pl.col("sales_income").sum()
    ).collect(engine="streaming")

def analyze_polars_lazyframe(polars_lf):
    return polars_lf.group_by(["region", "product", "sales_person"]).agg(
        total_sales=pl.col("sales_income").sum()
    ).collect(engine="streaming")


n = 50_000_000

data_file = "data.parquet"
test_data = generate_data(n)
table = pa.table(test_data)
pq.write_table(table, data_file)

In [4]:
print("Creating DataFrames...")

print(f"Pandas dataframe creation time for {n:,} rows:")
print(Timer(functools.partial(create_pandas_dataframe, data_file)).timeit(1))
print(f"\nPandas dataframe with pyarrow creation time for {n:,} rows:")
print(Timer(functools.partial(create_pandas_dataframe_with_pyarrow, data_file)).timeit(1))
print(f"\nPyarrow dataframe creation time for {n:,} rows:")
print(Timer(functools.partial(create_pyarrow_dataframe, data_file)).timeit(1))
print(f"\nPolars dataframe creation time for {n:,} rows:")
print(Timer(functools.partial(create_polars_dataframe, data_file)).timeit(1))
print(f"\nPolars lazyframe creation time for {n:,} rows:")
print(Timer(functools.partial(create_polars_lazyframe, data_file)).timeit(1))

Creating DataFrames...
Pandas dataframe creation time for 10,000,000 rows:
1.4561255999142304

Pandas dataframe with pyarrow creation time for 10,000,000 rows:
0.33366909995675087

Pyarrow dataframe creation time for 10,000,000 rows:
0.24838220002129674

Polars dataframe creation time for 10,000,000 rows:
0.19922349997796118

Polars lazyframe creation time for 10,000,000 rows:
8.080003317445517e-05


# 1. Data analysis
--------------

In [5]:
print("-" * 50)

pandas_df = create_pandas_dataframe(data_file)
pandas_df_with_pyarrow = create_pandas_dataframe_with_pyarrow(data_file)
pyarrow_df = create_pyarrow_dataframe(data_file)
if ctx.table_exist("t"):
    ctx.deregister_table("t")
ctx.from_arrow(pyarrow_df, "t")

if ctx.table_exist("t2"):
    ctx.deregister_table("t2")
ctx.register_parquet("t2", data_file)
con.register("t", pyarrow_df)
polars_df = create_polars_dataframe(data_file)
polars_lf = create_polars_lazyframe(data_file)

print(f"Pandas dataframe analysis time for {n:,} rows:")
print(Timer(functools.partial(analyze_pandas_dataframe, pandas_df)).timeit(1))

print(f"\nPandas dataframe with pyarrow analysis time for {n:,} rows:")
print(Timer(functools.partial(analyze_pandas_dataframe, pandas_df_with_pyarrow)).timeit(1))

print(f"\nPyarrow dataframe analysis time for {n:,} rows:")
print(Timer(functools.partial(analyze_pyarrow_dataframe, pyarrow_df)).timeit(1))

print(f"\nDatafusion dataframe analysis time for {n:,} rows:")
print(Timer(functools.partial(analyze_datafusion_dataframe)).timeit(1))

print(f"\nDatafusion parquet analysis time for {n:,} rows:")
print(Timer(functools.partial(analyze_datafusion_parquet)).timeit(1))

print(f"\nDuckdb dataframe analysis time for {n:,} rows:")
print(Timer(functools.partial(analyze_duckdb_dataframe)).timeit(1))

print(f"\nDuckdb parquet analysis time for {n:,} rows:")
print(Timer(functools.partial(analyze_duckdb_parquet, data_file)).timeit(1))

print(f"\nPolars dataframe analysis time for {n:,} rows:")
print(Timer(functools.partial(analyze_polars_dataframe, polars_df)).timeit(1))

print(f"\nPolars lazyframe analysis time for {n:,} rows:")
print(Timer(functools.partial(analyze_polars_lazyframe, polars_lf)).timeit(1))

--------------------------------------------------
Pandas dataframe analysis time for 10,000,000 rows:
1.9524870000313967

Pandas dataframe with pyarrow analysis time for 10,000,000 rows:
1.367561100050807

Pyarrow dataframe analysis time for 10,000,000 rows:
0.042230599909089506

Datafusion dataframe analysis time for 10,000,000 rows:
0.23641319992020726

Datafusion parquet analysis time for 10,000,000 rows:
0.1275084000080824

Duckdb dataframe analysis time for 10,000,000 rows:
0.08285230002366006

Duckdb parquet analysis time for 10,000,000 rows:
0.059257599990814924

Polars dataframe analysis time for 10,000,000 rows:
0.1406298999208957

Polars lazyframe analysis time for 10,000,000 rows:
0.14792229991871864


# 2. Serialization/Deserialization
------------------

In [6]:
import pickle
import pandas as pd

pandas_df = create_pandas_dataframe(data_file)
pandas_df_with_pyarrow = create_pandas_dataframe_with_pyarrow(data_file)
pyarrow_df = create_pyarrow_dataframe(data_file)

In [7]:
def serialize_pandas_df(pandas_df):
    return pickle.dumps(pandas_df, protocol=pickle.HIGHEST_PROTOCOL)

def serialize_pyarrow_df(pyarrow_df):
    sink = pa.BufferOutputStream()
    with pa.ipc.new_file(sink, pyarrow_df.schema) as writer:
        writer.write_table(pyarrow_df)

    buf = sink.getvalue()
    return buf.to_pybytes()

In [8]:
print(f"Pandas dataframe serialize time for {n:,} rows:")
print(Timer(functools.partial(serialize_pandas_df, pandas_df)).timeit(1))

print(f"\nPandas dataframe with pyarrow serialize time for {n:,} rows:")
print(Timer(functools.partial(serialize_pandas_df, pandas_df_with_pyarrow)).timeit(1))

print(f"\nPyarrow dataframe serialize time for {n:,} rows:")
print(Timer(functools.partial(serialize_pyarrow_df, pyarrow_df)).timeit(1))

Pandas dataframe serialize time for 10,000,000 rows:
0.9356666000094265

Pandas dataframe with pyarrow serialize time for 10,000,000 rows:
0.6746197000611573

Pyarrow dataframe serialize time for 10,000,000 rows:
0.6774355999659747


In [9]:
pandas_df_bytes = serialize_pandas_df(pandas_df)
pandas_df_with_pyarrow_bytes = serialize_pandas_df(pandas_df_with_pyarrow)
pyarrow_df_bytes = serialize_pyarrow_df(pyarrow_df)

print(f"pandas bytes              : {len(pandas_df_bytes)}")
print(f"pandas with pyarrow bytes : {len(pandas_df_with_pyarrow_bytes)}")
print(f"pyarrow_df_bytes bytes    : {len(pyarrow_df_bytes)}")

pandas bytes              : 220069131
pandas with pyarrow bytes : 455835403
pyarrow_df_bytes bytes    : 455874210


In [10]:
def deserialize_pandas_df(pandas_df_bytes):
    return pickle.loads(pandas_df_bytes)

def deserialize_pyarrow_df(pyarrow_df_bytes):
    with pa.ipc.open_file(pyarrow_df_bytes) as reader:
        return reader.read_all()

In [11]:
print(f"Pandas dataframe deserialize time for {n:,} rows:")
print(Timer(functools.partial(deserialize_pandas_df, pandas_df_bytes)).timeit(1))
print(f"\nPandas dataframe with pyarrow deserialize time for {n:,} rows:")
print(Timer(functools.partial(deserialize_pandas_df, pandas_df_with_pyarrow_bytes)).timeit(1))
print(f"\nPyarrow dataframe deserialize time for {n:,} rows:")
print(Timer(functools.partial(deserialize_pyarrow_df, pyarrow_df_bytes)).timeit(1))

Pandas dataframe deserialize time for 10,000,000 rows:
1.672693199943751

Pandas dataframe with pyarrow deserialize time for 10,000,000 rows:
0.1634343999903649

Pyarrow dataframe deserialize time for 10,000,000 rows:
0.00563320005312562
