In [None]:
pip install polars pandas duckdb pyspark faker deltalake memory_profiler pyarrow

In [None]:
import polars as pl
import pandas as pd
import duckdb
from pyspark.sql import SparkSession
from faker import Faker
import numpy as np
import os
import time
import psutil
from memory_profiler import memory_usage

# Initialize Spark (Single Node)
spark = SparkSession.builder \
    .appName("BigDataLab2") \
    .master("local[*]") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

In [None]:
def generate_data(num_records=1_000_000, output_path="social_media_data.parquet"):
    fake = Faker()

    print(f"Generating {num_records} records...")

    # Generate data using numpy for speed where possible
    data = {
        "post_id": [fake.uuid4() for _ in range(num_records)],
        "user_id": np.random.randint(1, 100_000, num_records),
        "timestamp": pd.date_range(start="2023-01-01", periods=num_records, freq="s").to_numpy().astype("datetime64[us]"),
        "likes": np.random.randint(0, 10_000, num_records),
        "views": np.random.randint(0, 1_000_000, num_records),
        "category": np.random.choice(["Tech", "Health", "Travel", "Food", "Fashion", "Politics", "Sports"], num_records),
        "tags": [np.random.choice(["#viral", "#new", "#trending", "#hot", "#update"], size=np.random.randint(1, 4)).tolist() for _ in range(num_records)],
        "location": np.random.choice(["USA", "UK", "DE", "PL", "FR", "JP", "BR"], num_records),
        "device": np.random.choice(["Mobile", "Desktop", "Tablet"], num_records),
        "latency": np.random.uniform(10.0, 500.0, num_records),
        "error_rate": np.random.beta(1, 10, num_records),
        "content": [fake.sentence() for _ in range(min(num_records, 1000))] * (num_records // 1000 + 1)
    }

    # Trim to exact size
    data["content"] = data["content"][:num_records]

    df = pd.DataFrame(data)

    print("Writing to Parquet...")
    df.to_parquet(output_path, engine="pyarrow")
    print(f"Data saved to {output_path}")

# Generate 5 million records
generate_data(num_records=5_000_000)

# Zadanie 1.

In [None]:
import numpy as np
import pandas as pd
import polars as pl
import duckdb
from pyspark.sql import functions as F
from pyspark.sql import Window
from memory_profiler import memory_usage
import time

# Small table for JOIN (user statistics)
user_stats = pd.DataFrame({
    "user_id": np.arange(1, 10_001),
    "user_rank": np.random.randint(1, 100, 10_000)
})

In [None]:
df_pd = pd.read_parquet("social_media_data.parquet")
df_pl = pl.read_parquet("social_media_data.parquet")
df_spark = spark.read.parquet("social_media_data.parquet")

In [None]:
def benchmark(func):
    mem_before = memory_usage()[0]
    t0 = time.time()

    result = func()

    t1 = time.time()
    mem_after = memory_usage()[0]

    return {
        "time_sec": t1 - t0,
        "memory_mb": mem_after - mem_before,
        "result": result
    }


## Query A

In [None]:
def pandas_query_A():
    return df_pd.groupby("category")[["likes", "views"]].mean()

def polars_query_A():
    return (
        df_pl
        .group_by("category")
        .agg([
            pl.col("likes").mean().alias("mean_likes"),
            pl.col("views").mean().alias("mean_views")
        ])
    )

def duckdb_query_A():
    return duckdb.sql("""
        SELECT category, AVG(likes), AVG(views)
        FROM 'social_media_data.parquet'
        GROUP BY category
    """).df()

def spark_query_A():
    return df_spark.groupBy("category").agg(
        F.avg("likes"), F.avg("views")
    ).collect()

print("Pandas A:", benchmark(pandas_query_A))
print("Polars A:", benchmark(polars_query_A))
print("DuckDB A:", benchmark(duckdb_query_A))
print("Spark A:", benchmark(spark_query_A))

Query A. Agregacja średnich wartości

W ramach pierwszego zapytania przeprowadzono agregację średnich wartości liczby polubień oraz wyświetleń dla każdej kategorii postów.

Wyniki:
1. Pandas: czas wykonania ~0.54 s, przyrost pamięci ~13 MB.
2. Polars: czas wykonania ~0.15 s, przyrost pamięci ~0.04 MB.
3. DuckDB: czas wykonania ~0.12 s, przyrost pamięci ~2.7 MB.
4. Spark: czas wykonania ~1.39 s, przyrost pamięci niedokładny.

Najszybszym silnikiem w tym zadaniu był DuckDB, a Spark był najwolniejszy w przetwarzaniu lokalnym. Różnice w przyroście pamięci wynikają głównie ze sposobu działania bibliotek.
Ogólnie wszystkie silniki zwróciły te same wartości średnich, co potwierdza poprawność agregacji.

## Query B.

In [None]:
def pandas_query_B():
    return (
        df_pd
        .sort_values(["category", "views"], ascending=[True, False])
        .groupby("category")
        .head(3)
    )

def polars_query_B():
    return (
        df_pl
        .sort(["category", "views"], descending=[False, True])
        .group_by("category")
        .head(3)
    )

def duckdb_query_B():
    return duckdb.sql("""
        SELECT *
        FROM (
            SELECT *,
                   ROW_NUMBER() OVER (PARTITION BY category ORDER BY views DESC) AS rn
            FROM 'social_media_data.parquet'
        )
        WHERE rn <= 3
    """).df()


def spark_query_B():
    w = Window.partitionBy("category").orderBy(F.col("views").desc())
    return (
        df_spark
        .withColumn("rn", F.row_number().over(w))
        .filter(F.col("rn") <= 3)
        .collect()
    )
print("Pandas B:", benchmark(pandas_query_B))
print("Polars B:", benchmark(polars_query_B))
print("DuckDB B:", benchmark(duckdb_query_B))
print("Spark B:", benchmark(spark_query_B))

Query B. Top-N postów według liczby wyświetleń w każdej kategorii

W ramach drugiego zapytania przeprowadzono wybór trzech postów o najwyższej liczbie wyświetleń dla każdej kategorii.

Wyniki:
1. Pandas: czas wykonania ~7.51 s, przyrost pamięci ~0 MB.
2. Polars: czas wykonania ~7.16 s, przyrost pamięci ~1144 MB.
3. DuckDB: czas wykonania ~17.28 s, przyrost pamięci ~170 MB.
4. Spark: czas wykonania ~20.15 s, przyrost pamięci ~2 MB.

Najszybszym silnikiem w tym zadaniu był Polars, choć przyrost pamięci był znaczący, co wynika z operacji w pamięci dla dużych zbiorów danych.
Najwolniejszym silnikiem w przetwarzaniu lokalnym był Spark.

Wszystkie silniki zwróciły te same 21 rekordów (7 kategorii × top-3 posty)

## Query C

In [None]:
def pandas_query_C():
    df_joined = df_pd.merge(user_stats, on="user_id", how="inner")
    return df_joined[df_joined["user_rank"] > 50]

df_pl_small = pl.from_pandas(user_stats)

def polars_query_C():
    return (
        df_pl.join(df_pl_small, on="user_id", how="inner")
             .filter(pl.col("user_rank") > 50)
    )


def duckdb_query_C():
    duckdb.register("user_stats_df", user_stats)
    return duckdb.sql("""
        SELECT *
        FROM 'social_media_data.parquet' AS p
        JOIN user_stats_df u USING(user_id)
        WHERE user_rank > 50
    """).df()

df_spark_small = spark.createDataFrame(user_stats)

def spark_query_C():
    return (
        df_spark.join(df_spark_small, "user_id", "inner")
        .filter(F.col("user_rank") > 50)
        .collect()
    )

print("Pandas C:", benchmark(pandas_query_C))
print("Polars C:", benchmark(polars_query_C))
print("DuckDB C:", benchmark(duckdb_query_C))
print("Spark C:", benchmark(spark_query_C))


Query C.

Wyniki uzyskane przy użyciu różnych silników:

1. Pandas: czas wykonania ~1.49 s, przyrost pamięci ~0 MB.
2. Polars: czas wykonania ~1.12 s, przyrost pamięci ~150 MB.
3. DuckDB: czas wykonania ~3.29 s, przyrost pamięci ~130 MB.
4. Spark: czas wykonania ~36 s, przyrost pamięci ~100 MB

Najszybszym silnikiem w tym zadaniu był Polars, a Spark był najwolniejszy.

## Scalability Test (Polars & DuckDB)

In [None]:
import os
import time
import duckdb
import polars as pl
import pandas as pd
import matplotlib.pyplot as plt


In [None]:
def time_run(func, runs=3):
    times = []
    for _ in range(runs):
        start = time.time()
        func()
        times.append(time.time() - start)
    return min(times)

duckdb_queries = {
    "A": duckdb_query_A,
    "B": duckdb_query_B,
    "C": duckdb_query_C
}

duckdb_threads = [1, 2, 4, 8]
duckdb_results = []

for qname, qfunc in duckdb_queries.items():
    print(f"\nDuckDB — Query {qname}")
    for n in duckdb_threads:
        duckdb.sql(f"PRAGMA threads={n}")
        t = time_run(qfunc)
        duckdb_results.append({
            "engine": "DuckDB",
            "query": qname,
            "threads": n,
            "time": t
        })
        print(f"Threads={n}: {t:.4f} s")

duckdb_df = pd.DataFrame(duckdb_results)



## DuckDB scalability benchmark

In [None]:
duckdb_queries = {
    "A": duckdb_query_A,
    "B": duckdb_query_B,
    "C": duckdb_query_C
}

duckdb_threads = [1, 2, 4, 8]
duckdb_results = []

for qname, qfunc in duckdb_queries.items():
    print(f"\nDuckDB — Query {qname}")
    for n in duckdb_threads:
        duckdb.sql(f"PRAGMA threads={n}")
        t = time_run(qfunc)
        duckdb_results.append({
            "engine": "DuckDB",
            "query": qname,
            "threads": n,
            "time": t
        })
        print(f"Threads={n}: {t:.4f} s")

duckdb_df = pd.DataFrame(duckdb_results)


## Polars scalability benchmark

In [None]:
import os
import time
import polars as pl
import pandas as pd

def time_run(func, runs=3):
    times = []
    for _ in range(runs):
        start = time.time()
        func()
        times.append(time.time() - start)
    return min(times)

polars_results = []

for n in [1, 2, 4, 8]:
    print(f"\nPolars — threads={n}")
    os.environ["POLARS_MAX_THREADS"] = str(n)

    df_pl_test = pl.read_parquet("social_media_data.parquet")
    df_pl_small_test = pl.from_pandas(user_stats)

    def polars_query_A_test():
        return (
            df_pl_test
            .group_by("category")
            .agg([
                pl.col("likes").mean(),
                pl.col("views").mean()
            ])
        )

    def polars_query_B_test():
        return (
            df_pl_test
            .sort(["category", "views"], descending=[False, True])
            .group_by("category")
            .head(3)
        )

    def polars_query_C_test():
        return (
            df_pl_test
            .join(df_pl_small_test, on="user_id", how="inner")
            .filter(pl.col("user_rank") > 50)
        )

    for name, fn in zip(
        ["A", "B", "C"],
        [polars_query_A_test, polars_query_B_test, polars_query_C_test]
    ):
        t = time_run(fn)
        polars_results.append({
            "engine": "Polars",
            "query": name,
            "threads": n,
            "time": t
        })
        print(f"Query {name}: {t:.4f} s")

polars_df = pd.DataFrame(polars_results)
polars_df


## Combine + compute speedup

In [None]:
results_df = pd.concat([duckdb_df, polars_df], ignore_index=True)

def compute_speedup(df):
    df = df.copy()
    for (engine, query), g in df.groupby(["engine", "query"]):
        base = g[g["threads"] == 1]["time"].iloc[0]
        df.loc[g.index, "speedup"] = base / g["time"]
    return df

results_df = compute_speedup(results_df)

for q in ["A", "B", "C"]:
    plt.figure(figsize=(6,4))
    sub = results_df[results_df["query"] == q]
    for engine in sub["engine"].unique():
        d = sub[sub["engine"] == engine]
        plt.plot(d["threads"], d["speedup"], marker="o", label=engine)
    plt.title(f"Speedup — Query {q}")
    plt.xlabel("Threads")
    plt.ylabel("Speedup")
    plt.legend()
    plt.grid(True)
    plt.show()


Sprawdzono jak zmiana liczby wątków wpływa na czas zapytań.

![QueryA](Speedup-queryA.png)![QueryB](Speedup-queryB.png)![QueryC](Speedup-queryC.png)

**DuckDB**

Query A: Brak reakcji na wątki (ok 0.10 s) zapytanie jest zbyt proste
Query B i C: Najlepiej działają przy 2 wątkach.
DuckDB błyskawicznie się nasyca i w tym konkretnym przypadku słabo wykorzystuje dużą liczbę rdzeni.


**Polars** poradził sobie nieco lepiej:

Query A: Podobnie jak w DuckDB, prawie bez zmian (ok 0.15 s).
Query B: Czas spadł z 6.88 s na 6.43 s przy 8 wątkach. Mały zysk, ale jest.
Query C: Stabilnie do 4 wątków, przy 8 nastąpiło spowolnienie.


Polars wydaje się odrobinę lepiej radzi sobie z bardziej złożonymi operacjami, podczas gdy DuckDB jest stabilny, ale mało podatny na skalowanie.

# Zadanie 3

In [None]:
import polars as pl
import time
from memory_profiler import memory_usage

file_path = "social_media_data.parquet"
filter_expr = pl.col("views") > 1000

def benchmark_polars(func):
    mem_usage = []
    def wrapper():
        return func()

    start_time = time.time()
    mem_usage = memory_usage((wrapper, ), max_iterations=1)
    end_time = time.time()

    result = wrapper()
    elapsed_time = end_time - start_time
    peak_memory = max(mem_usage) - min(mem_usage)

    return {"time_sec": elapsed_time, "peak_memory_MB": peak_memory, "rows": len(result)}

# Eager Execution
def polars_eager():
    df = pl.read_parquet(file_path)
    filtered = df.filter(filter_expr)
    return filtered

# Lazy Execution
def polars_lazy():
    df_lazy = pl.scan_parquet(file_path)
    filtered_lazy = df_lazy.filter(filter_expr).collect()
    return filtered_lazy

# Streaming Execution
def polars_streaming():
    df_lazy = pl.scan_parquet(file_path)
    filtered_stream = df_lazy.filter(filter_expr).collect(streaming=True)
    return filtered_stream

results = []

for mode, func in [("Eager", polars_eager), ("Lazy", polars_lazy), ("Streaming", polars_streaming)]:
    print(f"Running Polars {mode} execution...")
    bench = benchmark_polars(func)
    bench["mode"] = mode
    results.append(bench)
    print(f"{mode}: time={bench['time_sec']:.4f}s, peak_memory={bench['peak_memory_MB']:.2f} MB, rows={bench['rows']}")

import pandas as pd
df_results = pd.DataFrame(results)[["mode", "time_sec", "peak_memory_MB", "rows"]]
print("\nPolars Execution Modes Benchmark:")
print(df_results)


Polars Execution Modes Benchmark:

```
        mode  time_sec  peak_memory_MB     rows
0      Eager  3.278074      386.316406  4995129
1       Lazy  2.165080      345.730469  4995129
2  Streaming  2.144784      331.070312  4995129
```
**Eager execution** wyląda najsłabiej. Próba nie tylko trwa najdłużej, ale też najbardziej obciąża komputer. Przy większych plikach szybko doprowadzi do wyczerpaniu pamięci.

**Lazy execution** to dobry skok wydajności. Polars lepiej zarządza zasobami, co skróciło czas o ponad sekundę.

**Streaming execution** jest najbardziej ekonomiczny pod kątem pamięci.

Tryby Lazy i Streaming są szybsze i bezpieczniejsze dla systemu. Jeśli jednak zbiór danych przekroczy możliwości jednej maszyny to naturalnym krokiem będzie przejście na Apache Spark i przetwarzanie rozproszone.