In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .config("spark.jars", "/opt/spark/postgresql-42.7.8.jar") \
    .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

In [5]:
from pyspark.sql.functions import col, sqrt

df = spark.read.jdbc(url="jdbc:postgresql://host.docker.internal:5432/postgres", table="raw_data", properties={
        "user": "postgres",
        "password": "password",
        "driver": "org.postgresql.Driver"
    })

df = (
    df
    .withColumn("E_sum", col("E1") + col("E2"))
    .withColumn("px_sum", col("px1") + col("px2"))
    .withColumn("py_sum", col("py1") + col("py2"))
    .withColumn("pz_sum", col("pz1") + col("pz2"))
    .withColumn(
        "M_phys",
        sqrt(
            col("E_sum")**2
            - col("px_sum")**2
            - col("py_sum")**2
            - col("pz_sum")**2
        )
    )
)


In [6]:
feature_cols = [
    "M_phys",
    "E1", "px1", "py1", "pz1", "Q1",
    "E2", "px2", "py2", "pz2", "Q2"
]


In [7]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features"
)

gbt = GBTRegressor(
    featuresCol="features",
    labelCol="M",
    maxDepth=6,
    maxIter=120,
    stepSize=0.05,
    subsamplingRate=0.8,
    seed=42
)

pipeline = Pipeline(stages=[assembler, gbt])


In [8]:
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

model = pipeline.fit(train_df)


25/12/29 08:59:41 INFO CodeGenerator: Code generated in 536.874419 ms
25/12/29 08:59:42 INFO Instrumentation: [4818250d] Stage class: GBTRegressor
25/12/29 08:59:42 INFO Instrumentation: [4818250d] Stage uid: GBTRegressor_b216377dd644
25/12/29 08:59:42 INFO CodeGenerator: Code generated in 122.394206 ms
25/12/29 08:59:42 INFO Instrumentation: [4818250d] training: numPartitions=1 storageLevel=StorageLevel(1 replicas)
25/12/29 08:59:42 INFO Instrumentation: [4818250d] {"seed":42,"stepSize":0.05,"subsamplingRate":0.8,"featuresCol":"features","maxDepth":6,"labelCol":"M","maxIter":120}
25/12/29 08:59:42 INFO SparkContext: Starting job: take at DecisionTreeMetadata.scala:119
25/12/29 08:59:42 INFO DAGScheduler: Got job 0 (take at DecisionTreeMetadata.scala:119) with 1 output partitions
25/12/29 08:59:42 INFO DAGScheduler: Final stage: ResultStage 0 (take at DecisionTreeMetadata.scala:119)
25/12/29 08:59:42 INFO DAGScheduler: Parents of final stage: List()
25/12/29 08:59:42 INFO DAGScheduler:

In [9]:
predictions = model.transform(test_df)

evaluator = RegressionEvaluator(
    labelCol="M",
    predictionCol="prediction",
    metricName="rmse"
)

rmse = evaluator.evaluate(predictions)

print(f"RMSE: {rmse}")


25/12/29 09:02:46 INFO MemoryStore: Block broadcast_2165 stored as values in memory (estimated size 2.3 MiB, free 429.7 MiB)
25/12/29 09:02:46 INFO BlockManagerInfo: Removed broadcast_2164_piece0 on 5d662929a363:44727 in memory (size: 5.4 KiB, free: 433.5 MiB)
25/12/29 09:02:46 INFO BlockManagerInfo: Removed broadcast_2163_piece0 on 5d662929a363:44727 in memory (size: 970.4 KiB, free: 434.4 MiB)
25/12/29 09:02:46 INFO BlockManager: Removing RDD 2993
25/12/29 09:02:46 INFO MemoryStore: Block broadcast_2165_piece0 stored as bytes in memory (estimated size 933.8 KiB, free 431.2 MiB)
25/12/29 09:02:46 INFO BlockManagerInfo: Added broadcast_2165_piece0 in memory on 5d662929a363:44727 (size: 933.8 KiB, free: 433.5 MiB)
25/12/29 09:02:46 INFO SparkContext: Created broadcast 2165 from broadcast at GBTRegressor.scala:272
25/12/29 09:02:46 INFO CodeGenerator: Code generated in 70.321334 ms
25/12/29 09:02:46 INFO SparkContext: Starting job: treeAggregate at Statistics.scala:58
25/12/29 09:02:46 I

RMSE: 1.2473842844677658


25/12/29 09:02:50 INFO Executor: Finished task 0.0 in stage 1444.0 (TID 1444). 2888 bytes result sent to driver
25/12/29 09:02:50 INFO TaskSetManager: Finished task 0.0 in stage 1444.0 (TID 1444) in 3421 ms on 5d662929a363 (executor driver) (1/1)
25/12/29 09:02:50 INFO TaskSchedulerImpl: Removed TaskSet 1444.0, whose tasks have all completed, from pool 
25/12/29 09:02:50 INFO DAGScheduler: ResultStage 1444 (treeAggregate at Statistics.scala:58) finished in 3.432 s
25/12/29 09:02:50 INFO DAGScheduler: Job 723 is finished. Cancelling potential speculative or zombie tasks for this job
25/12/29 09:02:50 INFO TaskSchedulerImpl: Killing all running tasks in stage 1444: Stage finished
25/12/29 09:02:50 INFO DAGScheduler: Job 723 finished: treeAggregate at Statistics.scala:58, took 3.434872 s
                                                                                

In [13]:
predictions = model.transform(test_df)

evaluator = RegressionEvaluator(
    labelCol="M",
    predictionCol="prediction",
    metricName="r2"
)

r2 = evaluator.evaluate(predictions)

print(f"R2: {r2}")

25/12/29 09:27:10 INFO MemoryStore: Block broadcast_2171 stored as values in memory (estimated size 2.3 MiB, free 422.6 MiB)
25/12/29 09:27:10 INFO BlockManagerInfo: Removed broadcast_2165_piece0 on 5d662929a363:44727 in memory (size: 933.8 KiB, free: 432.6 MiB)
25/12/29 09:27:10 INFO BlockManagerInfo: Removed broadcast_2167_piece0 on 5d662929a363:44727 in memory (size: 933.8 KiB, free: 433.5 MiB)
25/12/29 09:27:10 INFO MemoryStore: Block broadcast_2171_piece0 stored as bytes in memory (estimated size 933.8 KiB, free 428.1 MiB)
25/12/29 09:27:10 INFO BlockManagerInfo: Added broadcast_2171_piece0 in memory on 5d662929a363:44727 (size: 933.8 KiB, free: 432.6 MiB)
25/12/29 09:27:10 INFO SparkContext: Created broadcast 2171 from broadcast at GBTRegressor.scala:272
25/12/29 09:27:10 INFO SparkContext: Starting job: treeAggregate at Statistics.scala:58
25/12/29 09:27:10 INFO DAGScheduler: Got job 726 (treeAggregate at Statistics.scala:58) with 1 output partitions
25/12/29 09:27:10 INFO DAGSc

R2: 0.9976113037074862


25/12/29 09:27:13 INFO Executor: Finished task 0.0 in stage 1447.0 (TID 1447). 2845 bytes result sent to driver
25/12/29 09:27:13 INFO TaskSetManager: Finished task 0.0 in stage 1447.0 (TID 1447) in 3101 ms on 5d662929a363 (executor driver) (1/1)
25/12/29 09:27:13 INFO TaskSchedulerImpl: Removed TaskSet 1447.0, whose tasks have all completed, from pool 
25/12/29 09:27:13 INFO DAGScheduler: ResultStage 1447 (treeAggregate at Statistics.scala:58) finished in 3.135 s
25/12/29 09:27:13 INFO DAGScheduler: Job 726 is finished. Cancelling potential speculative or zombie tasks for this job
25/12/29 09:27:13 INFO TaskSchedulerImpl: Killing all running tasks in stage 1447: Stage finished
25/12/29 09:27:13 INFO DAGScheduler: Job 726 finished: treeAggregate at Statistics.scala:58, took 3.137795 s
                                                                                

In [11]:
predictions = model.transform(test_df)

evaluator = RegressionEvaluator(
    labelCol="M",
    predictionCol="prediction",
    metricName="mae"
)

mae = evaluator.evaluate(predictions)

print(f"MAE: {mae}")

25/12/29 09:06:18 INFO MemoryStore: Block broadcast_2169 stored as values in memory (estimated size 2.3 MiB, free 425.6 MiB)
25/12/29 09:06:18 INFO MemoryStore: Block broadcast_2169_piece0 stored as bytes in memory (estimated size 933.8 KiB, free 424.7 MiB)
25/12/29 09:06:18 INFO BlockManagerInfo: Added broadcast_2169_piece0 in memory on 5d662929a363:44727 (size: 933.8 KiB, free: 431.6 MiB)
25/12/29 09:06:18 INFO SparkContext: Created broadcast 2169 from broadcast at GBTRegressor.scala:272
25/12/29 09:06:18 INFO SparkContext: Starting job: treeAggregate at Statistics.scala:58
25/12/29 09:06:18 INFO DAGScheduler: Got job 725 (treeAggregate at Statistics.scala:58) with 1 output partitions
25/12/29 09:06:18 INFO DAGScheduler: Final stage: ResultStage 1446 (treeAggregate at Statistics.scala:58)
25/12/29 09:06:18 INFO DAGScheduler: Parents of final stage: List()
25/12/29 09:06:18 INFO DAGScheduler: Missing parents: List()
25/12/29 09:06:18 INFO DAGScheduler: Submitting ResultStage 1446 (Map

MAE: 0.7453835375180048


25/12/29 09:06:20 INFO Executor: Finished task 0.0 in stage 1446.0 (TID 1446). 2845 bytes result sent to driver
25/12/29 09:06:20 INFO TaskSetManager: Finished task 0.0 in stage 1446.0 (TID 1446) in 2617 ms on 5d662929a363 (executor driver) (1/1)
25/12/29 09:06:20 INFO TaskSchedulerImpl: Removed TaskSet 1446.0, whose tasks have all completed, from pool 
25/12/29 09:06:20 INFO DAGScheduler: ResultStage 1446 (treeAggregate at Statistics.scala:58) finished in 2.624 s
25/12/29 09:06:20 INFO DAGScheduler: Job 725 is finished. Cancelling potential speculative or zombie tasks for this job
25/12/29 09:06:20 INFO TaskSchedulerImpl: Killing all running tasks in stage 1446: Stage finished
25/12/29 09:06:20 INFO DAGScheduler: Job 725 finished: treeAggregate at Statistics.scala:58, took 2.625793 s
25/12/29 09:08:27 INFO BlockManager: Removing RDD 1440                          
25/12/29 09:08:27 INFO BlockManager: Removing RDD 840
25/12/29 09:08:27 INFO BlockManager: Removing RDD 1490
25/12/29 09:08

In [15]:
with open("results.txt", 'w') as f:
    f.write(f"RMSE {rmse}, MAE: {mae}, R2: {r2}")