# Benchmarking

In [1]:
# benchmakring on 30% of data
# loop crashes in between configs due to no memory left so manual "cleaning" from terminal was done in between thsoe
# so csv was overwritten

In [2]:
#spark.srop()

In [3]:
from pyspark.sql.functions import col, lit, when, collect_list
from pyspark.sql import Window
import pyspark.sql.functions as F
import struct
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import time
from statistics import mean
from itertools import product
from collections import defaultdict
from matplotlib.patches import Rectangle
import random
from pyspark.sql import SparkSession
import os
import sys
import matplotlib.pyplot as plt

In [4]:
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

## Functions

In [5]:
def unpack(file_content):
    word_size = 8
    words = (file_content[i:i+word_size] for i in range(0, len(file_content), word_size))
    unpacked = []
    for word in words:
        word = struct.unpack('<q', word)[0]
        if (word >> 61) & 0x7 == 2:
            unpacked.append([
                (word >> 58) & 0x7,
                (word >> 49) & 0x1FF,
                (word >> 17) & 0xFFFFFFFF,
                (word >> 5) & 0xFFF,
                (word >> 0) & 0x1F
            ])
    return unpacked

In [6]:
def best_fit_rdd(points):

    points = list(points)
    if not points or len(points) < 2:
        return None

    x_lefts = [p[0] for p in points]
    x_rights = [p[1] for p in points]
    z_vals = np.array([p[2] for p in points])

    try:
        combinations = list(product(*zip(x_lefts, x_rights)))
    except Exception:
        return None

    best_m, best_b = None, None
    min_residuals = float("inf")

    for combo in combinations:
        x_vals = np.array(combo)
        A = np.vstack([x_vals, np.ones(len(x_vals))]).T
        m, b = np.linalg.lstsq(A, z_vals, rcond=None)[0]
        residuals = np.linalg.norm(z_vals - (m * x_vals + b))

        if residuals < min_residuals:
            min_residuals = residuals
            best_m, best_b = m, b
            combin = combo

    return (best_m, best_b, min_residuals, combin)

In [7]:
def remove_outliers_b(xs: list, threshold: float = 4*42, k: int = 2):
    xs = np.array(xs, dtype=float)
    differences = np.abs(xs[:, None] - xs)
    count_exceeding = np.sum(differences > threshold, axis=1)
    return count_exceeding < k

In [8]:
def linear_fit(points):
    X = [p[0] for p in points]
    Z = [p[1] for p in points]
    m, b = np.polyfit(X, Z, 1)
    return m, b, X, Z

## Configurations

In [9]:
# executors_list = [2,3,4,6]
# max_cores_list = [2,4,6,12]
# partitions_list = [2,4,8,12]

# executors_list = [1]
# max_cores_list = [2,6,12]
# partitions_list = [2,4,8,12]

executors_list = [6]
max_cores_list = [12]
partitions_list = [2,4,8,12]

results = []

## Loop

In [None]:
for executors in executors_list:
    for max_cores in max_cores_list:
        for partitions in partitions_list:

            unpack_times = []
            local_times = []
            global_times = []

            for run in range(3):
                spark = SparkSession.builder \
                    .master("spark://master:7077") \
                    .appName("Group 14") \
                    .config("spark.jars.packages", "org.apache.hadoop:hadoop-common:3.3.4,org.apache.hadoop:hadoop-aws:3.3.4") \
                    .config("spark.driver.host", "10.67.22.87") \
                    .config("spark.dynamicAllocation.enabled", "false") \
                    .config("spark.cores.max", max_cores) \
                    .config("spark.executor.instances", executors) \
                    .config("spark.default.parallelism", partitions) \
                    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
                    .config("spark.sql.execution.arrow.pyspark.fallback.enabled", "false") \
                    .config("spark.hadoop.fs.s3a.access.key", "4e0262c6d36a4fb4a627bbdfd4e28e8a") \
                    .config("spark.hadoop.fs.s3a.secret.key", "e009cac9320e4b5cbd99e21c28ef03cd") \
                    .config("spark.hadoop.fs.s3a.endpoint", "https://cloud-areapd.pd.infn.it:5210") \
                    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
                    .config("spark.hadoop.fs.s3a.metadatastore.impl", "org.apache.hadoop.fs.s3a.s3guard.NullMetadataStore") \
                    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
                    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
                    .config("com.amazonaws.sdk.disableCertChecking", "true") \
                    .getOrCreate()

                sc = spark.sparkContext

                # data loading
                random.seed(42)
                all_files = sc.binaryFiles("s3a://mapd-minidt-batch/data_*.dat").keys().collect()
                subset_size = int(0.3 * len(all_files))
                random.shuffle(all_files)
                subset_files = all_files[:subset_size]
                file_path_str = ",".join(subset_files)
                

###################################### u n p a c k #######################################################
                
                t0 = time.time()
                rdd = sc.binaryFiles(file_path_str).flatMap(lambda f: unpack(f[1]))
                rdd.count()
                t_unpack = round(time.time() - t0, 3)

                df = rdd.toDF(['FPGA','CHAN','ORBIT','BX','TDC'])

                df = df.withColumn('CHAMBER', when((df.FPGA==0) & (df.CHAN < 63), 0)\
                                   .when((df.FPGA==0) & (df.CHAN > 63) & (df.CHAN < 128), 1)\
                                   .when((df.FPGA==1) & (df.CHAN < 63), 2)\
                                   .when((df.FPGA==1) & (df.CHAN > 63) & (df.CHAN < 128), 3)\
                                   .when((df.FPGA==1) & (df.CHAN ==128), 4)).na.drop()
                
                shift_chamber = {
                    0: {'z': 219.8}, 
                    1: {'z': 977.3},
                    2: {'z': 1035.6},
                    3: {'z': 1819.8}
                }
                
                df = df.withColumn('shift_z', when(col('CHAMBER') == 0, shift_chamber[0]['z'])
                                   .when(col('CHAMBER') == 1, shift_chamber[1]['z'])
                                   .when(col('CHAMBER') == 2, shift_chamber[2]['z'])
                                   .when(col('CHAMBER') == 3, shift_chamber[3]['z'])
                                   .otherwise(None))
                
                timecorrections = {0: 95.0 - 1.1, 1: 95.0 + 6.4, 2: 95.0 + 0.5, 3: 95.0 - 2.6}
                            
                df = df.withColumn('ABSOLUTETIME',
                    25 * (col('ORBIT') * 3564 + col('BX') + col('TDC') / 30)
                ).withColumn('T_HIT_ns',
                    col('ABSOLUTETIME') + when(col('CHAMBER') == 0, timecorrections[0])
                                           .when(col('CHAMBER') == 1, timecorrections[1])
                                           .when(col('CHAMBER') == 2, timecorrections[2])
                                           .when(col('CHAMBER') == 3, timecorrections[3])
                                           .otherwise(0)
                )

                #scintillator hits, determines the start time of muon through detector
                scintillator_hits = df.filter((col("CHAMBER") == 4)) \
                                      .select("ORBIT", "T_HIT_ns") \
                                      .withColumnRenamed("T_HIT_ns", "t0")
                
                scintillator_counts = scintillator_hits.groupBy("ORBIT").count() \
                                                       .withColumnRenamed("count", "hit_count") 


                #dfnew: joining scintillator hits
                dfnew = df.join(scintillator_hits, on="ORBIT", how="inner")

                # Compute hit counts
                hit_counts = dfnew.groupBy("ORBIT", "CHAMBER").count()
                total_hits_per_orbit = dfnew.groupBy("ORBIT").count().withColumnRenamed("count", "total_hits")
                
                # Filter ORBITs with hits in CHAMBERs 0, 2, and 3
                orbits_with_ch0 = hit_counts.filter(F.col("CHAMBER") == 0).select("ORBIT").distinct()
                orbits_with_ch2 = hit_counts.filter(F.col("CHAMBER") == 2).select("ORBIT").distinct()
                orbits_with_ch3 = hit_counts.filter(F.col("CHAMBER") == 3).select("ORBIT").distinct()
                
                valid_orbits = orbits_with_ch0.intersect(orbits_with_ch2).intersect(orbits_with_ch3)
                
                # Filter ORBITs with < 15 hits
                valid_orbits_with_hits = valid_orbits.join(total_hits_per_orbit, "ORBIT") \
                                                     .filter(F.col("total_hits") < 15) \
                                                     .select("ORBIT")
                
                
                #filter keeping only oarbit
                df_filtered = dfnew.join(valid_orbits_with_hits.select("ORBIT"), on="ORBIT", how="inner")

                # Calculate x positions and handle ambiguity
                width = 42  # mm
                height = 13  # mm
                v_drift = 53.8 # Î¼m

                # X_HIT
                dfnew = df_filtered.withColumn(
                        "X_HIT",
                        (F.col("T_HIT_ns") - F.col("t0")) * (F.lit(v_drift) / 1000) # Convert um to mm, remove negative values
                    )
                
                dfnew = dfnew.filter(F.col("X_HIT").isNotNull()) #removing rows where X_HIT is null
                
                # x-centre of each channel
                dfnew = dfnew.withColumn(
                    'X_CENTRE',
                    when(
                        ((col('CHAMBER').isin(0, 2)) & (col('CHAN') % 4).isin(0, 1)),
                        42 * F.floor(col('CHAN') / 4) + 21
                    )
                    .when(
                        ((col('CHAMBER').isin(0, 2)) & (col('CHAN') % 4).isin(2, 3)),
                        42 * F.floor(col('CHAN') / 4) + 42
                    )
                    .when(
                        ((col('CHAMBER').isin(1, 3)) & (col('CHAN') % 4).isin(0, 1)),
                        42 * F.floor((col('CHAN') - 64) / 4) + 21
                    )
                    .when(
                        ((col('CHAMBER').isin(1, 3)) & (col('CHAN') % 4).isin(2, 3)),
                        42 * F.floor((col('CHAN') - 64) / 4) + 42
                    )
                )
                
                # X_LEFT and X_RIGHT based on X_CENTRE
                dfnew = dfnew.withColumn('X_LEFT', col('X_CENTRE') - col('X_HIT'))
                dfnew = dfnew.withColumn('X_RIGHT', col('X_CENTRE') + col('X_HIT'))
                
                # Adding Z coordinates for each channel in the local track
                dfnew = dfnew.withColumn(
                    'Z_LOCAL',
                    when((col('CHAN') % 4 == 0), 19.5)
                    .when((col('CHAN') % 4 == 1), -6.5)
                    .when((col('CHAN') % 4 == 2), 6.5)
                    .when((col('CHAN') % 4 == 3), -19.5)
                )
                
                dfnew = dfnew.withColumn(
                    'Z_GLOBAL',
                    col('Z_LOCAL') + col('shift_z')
                )
                

                df_ch = (
                    dfnew
                    .select("ORBIT", "CHAMBER", "X_LEFT", "X_RIGHT", "Z_GLOBAL")
                    .filter("X_LEFT IS NOT NULL AND X_RIGHT IS NOT NULL AND Z_GLOBAL IS NOT NULL")
                )
                
                # Convert to RDD with ((ORBIT, CHAMBER), [X_LEFT, X_RIGHT, Z_LOCAL])
                first_rdd = df_ch.rdd.map(lambda row: ((row["ORBIT"], row["CHAMBER"]),
                                                 [row["X_LEFT"], row["X_RIGHT"], row["Z_GLOBAL"]]))
                
                # Combine by key to group the values into parallel arrays
                grouped_rdd = first_rdd.combineByKey(
                    lambda x: [[x[0]], [x[1]], [x[2]]],  # Create initial lists
                    lambda acc, x: [acc[0] + [x[0]], acc[1] + [x[1]], acc[2] + [x[2]]],  # Merge in same partition
                    lambda acc1, acc2: [acc1[0] + acc2[0], acc1[1] + acc2[1], acc1[2] + acc2[2]]  # Merge across partitions
                )
                

###################################### l o c a l #######################################################

                t0_local = time.time()

                cleaned_rdd = grouped_rdd.mapValues(lambda vals: (
                    lambda x_left, x_right, z_global: (
                        # compute mask using X_CENTRE = (X_LEFT + X_RIGHT)/2 for better geometry filtering
                        lambda mask: [
                            list(np.array(x_left)[mask]),
                            list(np.array(x_right)[mask]),
                            list(np.array(z_global)[mask])
                        ]
                    )(remove_outliers_b((np.array(x_left) + np.array(x_right)) / 2))
                )(vals[0], vals[1], vals[2]))
                
                rdd = cleaned_rdd.map(lambda kv: (
                    (kv[0][1], kv[0][0]),  # key = (CHAMBER, ORBIT)
                    (
                        [kv[1][0], kv[1][1], kv[1][2]],  # cleaned lists
                        best_fit_rdd(zip(kv[1][0], kv[1][1], kv[1][2]))  # fit function
                    )
                ))
                
                # keep only orbits with 3+ remaining hits
                rdd = rdd.filter(lambda x: len(x[1][0][0]) > 2)

                rdd.count() 

                t_local = round(time.time() - t0_local, 3)

                
            
###################################### g l o b a l #######################################################
                
                t0_global = time.time()

                global_fit_rdd = rdd.filter(lambda x: x[0][0] in [0, 2, 3])
                global_fit_rdd = global_fit_rdd.map(lambda x: (
                    x[0][1],  # ORBIT is the key
                    list(zip(x[1][1][3], x[1][0][2]))  # (X_hit, Z) tuples
                ))
                global_fit_rdd = global_fit_rdd.reduceByKey(lambda a, b: a + b)
                global_fit_rdd = global_fit_rdd.filter(lambda x: len(x[1]) >= 3)
                global_rdd = global_fit_rdd.mapValues(linear_fit)
                
                global_rdd.count()  
                
                t_global = round(time.time() - t0_global, 3)

                
###################################### per-run times #######################################################
                
                unpack_times.append(t_unpack)
                local_times.append(t_local)
                global_times.append(t_global)

                # Cleanup + time between runs
                os.system("rm -f /path/to/logs/*.log")
                os.system("rm -rf /tmp/spark-*")
                spark.stop()
                time.sleep(10)

            
###################################### avergages of 3 runs #######################################################
            
            avg_unpack = sum(unpack_times) / len(unpack_times)
            avg_local = sum(local_times) / len(local_times)
            avg_global = sum(global_times) / len(global_times)

            results.append({
                "executors": executors,
                "cores_per_executor": max_cores,
                "partitions": partitions,
                "unpack_time_avg_s": avg_unpack,
                "local_time_avg_s": avg_local,
                "global_time_avg_s": avg_global,
                "unpack_runs": unpack_times,
                "local_runs": local_times,
                "global_runs": global_times
            })


            print(f"Done | Executors: {executors}, Cores: {max_cores}, Partitions: {partitions} | "
                 f"Avg Unpack: {avg_unpack:.3f}s | Avg Local: {avg_local:.3f}s | Avg Global: {avg_global:.3f}s")



###################################### e x p o r t #######################################################


pd.DataFrame(results).to_csv("5.csv", index=False)
print("donenenenen")



:: loading settings :: url = jar:file:/usr/local/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/qasim/.ivy2/cache
The jars for the packages stored in: /home/qasim/.ivy2/jars
org.apache.hadoop#hadoop-common added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-a5f41757-1d48-49bc-927c-b794ae9b2aa5;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-common;3.3.4 in central
	found org.apache.hadoop.thirdparty#hadoop-shaded-protobuf_3_7;1.1.1 in central
	found org.apache.hadoop#hadoop-annotations;3.3.4 in central
	found org.apache.hadoop.thirdparty#hadoop-shaded-guava;1.1.1 in central
	found com.google.guava#guava;27.0-jre in central
	found com.google.guava#failureaccess;1.0 in central
	found com.google.guava#listenablefuture;9999.0-empty-to-avoid-conflict-with-guava in central
	found com.google.code.findbugs#jsr305;3.0.2 in central
	found org.checkerframework#checker-qual;2.5.2 in central
	found com.google.j2objc#j2objc-annotations;1.1 in central
	found org.codehaus.

## Plotting