In [1]:
import sys, os, socket
print("PY:", sys.executable)
print("CWD:", os.getcwd())
print("HOSTNAME:", socket.gethostname())

PY: /opt/conda/bin/python
CWD: /home/jovyan/work/notebooks
HOSTNAME: spark-jupyter-local


In [2]:
!java -version

openjdk version "17.0.8.1" 2023-08-24
OpenJDK Runtime Environment (build 17.0.8.1+1-Ubuntu-0ubuntu122.04)
OpenJDK 64-Bit Server VM (build 17.0.8.1+1-Ubuntu-0ubuntu122.04, mixed mode, sharing)


In [3]:
import time
import gc
import pyspark
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.sql import SparkSession
from pyspark import SparkContext

try:
    if 'spark' in globals() and spark is not None:
        spark.stop()
        del spark
except Exception as e:
    print("Fehler beim Stoppen:", e)

try:
    sc = SparkContext._active_spark_context
    if sc is not None:
        sc.stop()
except Exception as e:
    print("Fehler beim Stoppen:", e)

gc.collect()

MODE = "local"
TAG = "-var_fts"

DRIVER_CORES = 2
DRIVER_MEMORY = "4g"
EXECUTOR_CORES = 4
EXECUTOR_MEMORY = 8
NUM_PARTITIONS = 3 * EXECUTOR_CORES

if MODE == "local":
    spark = (SparkSession.builder
         .appName("OLS-Baseline")
         .master(f"local[{EXECUTOR_CORES}]")
         .config("spark.driver.bindAddress", "0.0.0.0")
         .config("spark.driver.host", "spark-jupyter-local")
         .config("spark.ui.port", "4040")
         .config("spark.driver.memory", f"{EXECUTOR_MEMORY}g")
         .config("spark.default.parallelism", NUM_PARTITIONS)
         .config("spark.sql.shuffle.partitions", NUM_PARTITIONS)
         .getOrCreate())
elif MODE == "single":
    spark = (SparkSession.builder
         .appName("OLS-Single")
         .master("spark://spark-master:7077")
         .config("spark.driver.bindAddress", "0.0.0.0")
         .config("spark.driver.host", "spark-jupyter")
         .config("spark.ui.port", "4040")
         .config("spark.executor.cores", EXECUTOR_CORES)
         .config("spark.executor.instances", 1)
         .config("spark.executor.memory",  f"{EXECUTOR_MEMORY}g")
         .config("spark.driver.memory", DRIVER_MEMORY)
         .config("spark.default.parallelism", NUM_PARTITIONS)
         .config("spark.sql.shuffle.partitions", NUM_PARTITIONS)
         .getOrCreate())
elif MODE == "multi":
    spark = (SparkSession.builder
         .appName("OLS-Multi")
         .master("spark://spark-master:7077")
         .config("spark.driver.bindAddress", "0.0.0.0")
         .config("spark.driver.host", "spark-jupyter")
         .config("spark.ui.port", "4040")
         .config("spark.executor.cores", EXECUTOR_CORES // 2)
         .config("spark.executor.instances", 2)
         .config("spark.executor.memory", f"{EXECUTOR_MEMORY // 2}g")
         .config("spark.driver.memory", DRIVER_MEMORY)
         .config("spark.default.parallelism", NUM_PARTITIONS)
         .config("spark.sql.shuffle.partitions", NUM_PARTITIONS)
         .getOrCreate())


print("PySpark:", pyspark.__version__)
print("Spark UI:", spark.sparkContext.uiWebUrl)
print("Default parallelism:", spark.conf.get("spark.default.parallelism"))
print("Anzahl Partitionen:", spark.conf.get("spark.sql.shuffle.partitions"))
print("Driver Memory:", spark.conf.get("spark.driver.memory"))

try:
    print("Anzahl Worker:",  spark.conf.get("spark.executor.instances"))
    print("CPU-Kerne pro Worker:", spark.conf.get("spark.executor.cores"))
    print("Memory pro Worker:", spark.conf.get("spark.executor.memory"))
except:
    pass

PySpark: 3.5.0
Spark UI: http://spark-jupyter-local:4040
Default parallelism: 12
Anzahl Partitionen: 12
Driver Memory: 8g


In [4]:
from pyspark.sql import functions as F
from pyspark.ml.functions import array_to_vector

def gen_spark_data_fast(n:int, d:int, seed:int, parts:int):
    # früh repartitionieren, damit später kein breiter Shuffle nötig ist
    df = spark.range(n).repartition(parts)

    # Features als Array mit d Zufallswerten
    feats_arr = F.array(*[F.randn(seed + j) for j in range(d)])

    # Koeffizienten beta = [1.0, 2.0, ..., d.0]
    beta = F.array(*[F.lit(float(j + 1)) for j in range(d)])

    # Elementweise multiplizieren und aufsummieren: dot = sum_j feats_arr[j]*beta[j]
    prod = F.zip_with(feats_arr, beta, lambda x, b: x * b)
    dot  = F.aggregate(prod, F.lit(0.0), lambda acc, v: acc + v)

    y = dot + 0.1 * F.randn(seed + d)

    # für MLlib: Array → Vector
    feats_vec = array_to_vector(feats_arr)
    return df.select(feats_vec.alias("features"), y.alias("y"))

def human_readable_size(num_bytes: int) -> str:
    """
    Wandelt eine Größe in Bytes in einen gut lesbaren String (KB, MB, GB) um.
    """
    if num_bytes < 1024:
        return f"{num_bytes} B"
    elif num_bytes < 1024**2:
        return f"{num_bytes / 1024:.2f} KB"
    elif num_bytes < 1024**3:
        return f"{num_bytes / 1024**2:.2f} MB"
    else:
        return f"{num_bytes / 1024**3:.2f} GB"

def warm_up(spark):
    tmp = gen_spark_data_fast(1_000_000, 200, 1, parts=NUM_PARTITIONS).cache()
    _ = tmp.agg(F.count("*")).collect()
    _ = LinearRegression(featuresCol="features", labelCol="y", regParam=0.0, elasticNetParam=0.0, solver="normal").fit(tmp)

In [5]:
from sparkmeasure import StageMetrics
import pandas as pd

def g(k, metrics): 
    v = metrics.aggregate_stagemetrics().get(k, 0) 
    return 0 if v is None else float(v)

def get_benchmark(i, model, metrics, mode):
    summary = model.summary

    elapsed_ms              = g("elapsedTime", metrics)
    executorRun_ms          = g("executorRunTime", metrics)
    cpu_ms                  = g("executorCpuTime", metrics)
    jvm_gc_ms               = g("jvmGCTime", metrics)
    scheduler_delay_ms      = g("schedulerDelay", metrics)
    task_deser_ms           = g("taskDeserializationTime", metrics)
    result_ser_ms           = g("resultSerializationTime", metrics)
    shuffle_read_bytes      = g("shuffleReadBytes", metrics)
    shuffle_write_bytes     = g("shuffleWriteBytes", metrics)
    spilled_mem_bytes       = g("memoryBytesSpilled", metrics)
    spilled_disk_bytes      = g("diskBytesSpilled", metrics)
    bytes_read              = g("bytesRead", metrics)
    bytes_written           = g("bytesWritten", metrics)

    wall_overhead_ms    = max(0.0, elapsed_ms - executorRun_ms)
    nonCPU_overhead_ms  = max(0.0, executorRun_ms - cpu_ms)
    sched_ser_gc_ms     = scheduler_delay_ms + task_deser_ms + result_ser_ms + jvm_gc_ms
    
    row = {
        # Kontext
        "n": i,
        "mode": mode,

        # Modellgüte (falls vorhanden)
        "r2": getattr(getattr(model, "summary", None), "r2", None),
        "rmse": getattr(getattr(model, "summary", None), "rootMeanSquaredError", None),
        "mae": getattr(getattr(model, "summary", None), "meanAbsoluteError", None),
        "explainedVariance": getattr(getattr(model, "summary", None), "explainedVariance", None),

        # Zeit/Gesamt
        "elapsed_ms": elapsed_ms,
        "executorRun_ms": executorRun_ms,
        "cpu_ms": cpu_ms,
        "gc_ms": jvm_gc_ms,

        # Overheads
        "wall_overhead_ms": wall_overhead_ms,
        "nonCPU_overhead_ms": nonCPU_overhead_ms,
        "sched_ser_gc_ms": sched_ser_gc_ms,

        # Overhead-Anteile (falls sinnvoll berechenbar)
        "wall_overhead_pct": (wall_overhead_ms / elapsed_ms) if elapsed_ms else None,
        "nonCPU_overhead_pct": (nonCPU_overhead_ms / executorRun_ms) if executorRun_ms else None,

        # Datenbewegung
        "bytesRead": bytes_read,
        "bytesWritten": bytes_written,
        "shuffleRead": shuffle_read_bytes,
        "shuffleWrite": shuffle_write_bytes,
        "spilled": (spilled_mem_bytes + spilled_disk_bytes),
    }
    return row

In [6]:
def run_variable_datapoints(datapoints, features):
    for m in datapoints:
        print(f"Datensatz vom Shape {m} x {features} generieren. Geschätze Größe: {human_readable_size(m * features * 8)}")
        t0 = time.perf_counter()
        sdf = gen_spark_data_fast(m, features, SEED, NUM_PARTITIONS).cache()
        _ = sdf.agg(F.count("*")).collect()
        t1 = time.perf_counter()
        print(f"gen: {t1 - t0:.2f}s, m={m}, n={features}, seed={SEED}")
        gen_times.append(round(t1 - t0, 2))
    
        lr = LinearRegression(featuresCol="features", labelCol="y", regParam=0.0, elasticNetParam=0.0, solver="normal")
        metrics = StageMetrics(spark)
        
        t0 = time.perf_counter()
        metrics.begin()
        
        model = lr.fit(sdf)
    
        metrics.end()
        t1 = time.perf_counter()
    
        res = get_benchmark(i=m, model=model, metrics=metrics, mode=MODE)
        results.append(res)
        
        print(f"fit: {t1 - t0:.2f}s, R2={model.summary.r2:.4f}\n")
        fit_times.append(round(t1 - t0, 2))

def run_variable_features(features, datapoints):
    for n in features:
        print(f"Datensatz vom Shape {datapoints} x {n} generieren. Geschätze Größe: {human_readable_size(datapoints * n * 8)}")
        t0 = time.perf_counter()
        # sdf = gen_spark_data(n, D, SEED).repartition(NUM_PARTITIONS).persist()
        sdf = gen_spark_data_fast(datapoints, n, SEED, NUM_PARTITIONS).cache()
        _ = sdf.agg(F.count("*")).collect()
        t1 = time.perf_counter()
        print(f"gen: {t1 - t0:.2f}s, m={datapoints}, n={n}, seed={SEED}")
        gen_times.append(round(t1 - t0, 2))
    
        lr = LinearRegression(featuresCol="features", labelCol="y", regParam=0.0, elasticNetParam=0.0, solver="normal")
        metrics = StageMetrics(spark)
        
        t0 = time.perf_counter()
        metrics.begin()
        
        model = lr.fit(sdf)
    
        metrics.end()
        t1 = time.perf_counter()
    
        res = get_benchmark(i=n, model=model, metrics=metrics, mode=MODE)
        results.append(res)
        
        print(f"fit: {t1 - t0:.2f}s, R2={model.summary.r2:.4f}\n")
        fit_times.append(round(t1 - t0, 2))

In [7]:
SEED = 42
DATAPOINTS_FIX = 100000
DATAPOINTS_VAR = range(1_000_000, 16_000_000, 1_000_000) # fine
FEATURES_FIX = 10
FEATURES_VAR = [8, 16, 32, 64, 128, 256, 512, 1_024, 2_048]#, 4_096]
# NS = [1_000, 2_000, 5_000, 10_000, 20_000, 50_000, 100_000, 200_000, 500_000, 1_000_000, 2_000_000, 5_000_000, 10_000_000, 20_000_000, 50_000_000]

gen_times = []
fit_times = []
results = []

try:
    print("Mode:", MODE)
    print("Driver Memory:", spark.conf.get("spark.driver.memory"))
    print("Anzahl Worker:",  spark.conf.get("spark.executor.instances"))
    print("CPU-Kerne pro Worker:", spark.conf.get("spark.executor.cores"))
    print("Memory pro Worker:", spark.conf.get("spark.executor.memory"))
    print("\n")
except:
    pass


warm_up(spark)
# run_variable_datapoints(DATAPOINTS_VAR, FEATURES_FIX)
run_variable_features(FEATURES_VAR, DATAPOINTS_FIX)

print("Zeiten für Generierung:", gen_times)
print("Zeiten für Training:", fit_times)

pd.DataFrame(results).to_csv(f"../stats/{MODE}{TAG}.csv", index=False)
print(f"{MODE}{TAG}.csv gespeichert")

Mode: local
Driver Memory: 8g
Datensatz vom Shape 100000 x 8 generieren. Geschätze Größe: 6.10 MB
gen: 0.44s, m=100000, n=8, seed=42
fit: 0.37s, R2=1.0000

Datensatz vom Shape 100000 x 16 generieren. Geschätze Größe: 12.21 MB
gen: 0.46s, m=100000, n=16, seed=42
fit: 0.36s, R2=1.0000

Datensatz vom Shape 100000 x 32 generieren. Geschätze Größe: 24.41 MB
gen: 0.52s, m=100000, n=32, seed=42
fit: 0.34s, R2=1.0000

Datensatz vom Shape 100000 x 64 generieren. Geschätze Größe: 48.83 MB
gen: 0.73s, m=100000, n=64, seed=42
fit: 0.33s, R2=1.0000

Datensatz vom Shape 100000 x 128 generieren. Geschätze Größe: 97.66 MB
gen: 1.19s, m=100000, n=128, seed=42
fit: 0.46s, R2=1.0000

Datensatz vom Shape 100000 x 256 generieren. Geschätze Größe: 195.31 MB
gen: 1.96s, m=100000, n=256, seed=42
fit: 0.74s, R2=1.0000

Datensatz vom Shape 100000 x 512 generieren. Geschätze Größe: 390.62 MB
gen: 3.45s, m=100000, n=512, seed=42
fit: 2.25s, R2=1.0000

Datensatz vom Shape 100000 x 1024 generieren. Geschätze Größe: