# Importing Spark and its machine learning packages

In [2]:
import findspark
findspark.init()
import pyspark

In [None]:
# Importing required modules
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

# Create and initialize Spark session
spark = SparkSession.builder.appName("Regression in Apache Spark") \
.config("spark.some.config.option", "some-value").getOrCreate()

# Loading flights data

In [5]:
# Create flights db schema
flightSchema = StructType([
    StructField("DayofMonth", IntegerType(), False),
    StructField("DayOfWeek", IntegerType(), False),
    StructField("Carrier", StringType(), False),
    StructField("OriginAirportID", IntegerType(), False),
    StructField("DestAirportID", IntegerType(), False),
    StructField("DepDelay", IntegerType(), False),
    StructField("ArrDelay", IntegerType(), False)
])

# Read csv data from file into DataFrame
df = spark.read.csv('flights.csv', schema=flightSchema, header=True)
df.show()

+----------+---------+-------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
|        19|        5|     DL|          11433|        13303|      -3|       1|
|        19|        5|     DL|          14869|        12478|       0|      -8|
|        19|        5|     DL|          14057|        14869|      -4|     -15|
|        19|        5|     DL|          15016|        11433|      28|      24|
|        19|        5|     DL|          11193|        12892|      -6|     -11|
|        19|        5|     DL|          10397|        15016|      -1|     -19|
|        19|        5|     DL|          15016|        10397|       0|      -1|
|        19|        5|     DL|          10397|        14869|      15|      24|
|        19|        5|     DL|          10397|        10423|      33|      34|
|        19|        5|     DL|          11278|      

# Preparing data

In [6]:
# Selecting relevant data columns
data = df.select("DayofMonth", "DayOfWeek", "OriginAirportID", "DestAirportID", "DepDelay", "ArrDelay")
data.show()

+----------+---------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+---------------+-------------+--------+--------+
|        19|        5|          11433|        13303|      -3|       1|
|        19|        5|          14869|        12478|       0|      -8|
|        19|        5|          14057|        14869|      -4|     -15|
|        19|        5|          15016|        11433|      28|      24|
|        19|        5|          11193|        12892|      -6|     -11|
|        19|        5|          10397|        15016|      -1|     -19|
|        19|        5|          15016|        10397|       0|      -1|
|        19|        5|          10397|        14869|      15|      24|
|        19|        5|          10397|        10423|      33|      34|
|        19|        5|          11278|        10397|     323|     322|
|        19|        5|          14107|        13487|      -7|     -13|
|     

# Splitting training and testing data

In [7]:
# Divide data, 70% for training, 30% for testing
dividedData = data.randomSplit([0.7, 0.3]) 
trainingData = dividedData[0] #index 0 = data training
testingData = dividedData[1] #index 1 = data testing
train_rows = trainingData.count()
test_rows = testingData.count()
print("Training data rows:", train_rows, "; Testing data rows:", test_rows)

Training data rows: 1891281 ; Testing data rows: 810937


# Preparing the training data

In [None]:
# Define an assembler to take into account all the features without the arrival delay column which is the label to predict
assembler = VectorAssembler(inputCols = ["DayofMonth", "DayOfWeek", "OriginAirportID", "DestAirportID",
                                         "DepDelay"], outputCol="features")

# Transforming the data into a single feature column using the assembler
trainingDataFinal = assembler.transform(trainingData).select(col("features"), (col("ArrDelay").cast("Int").alias("label")))
trainingDataFinal.show(truncate=False)

+-------------------------------+-----+
|features                       |label|
+-------------------------------+-----+
|[1.0,1.0,10140.0,10397.0,-4.0] |-11  |
|[1.0,1.0,10140.0,11292.0,0.0]  |-5   |
|[1.0,1.0,10140.0,11292.0,2.0]  |-1   |
|[1.0,1.0,10140.0,11298.0,-10.0]|-13  |
|[1.0,1.0,10140.0,11298.0,-6.0] |-25  |
|[1.0,1.0,10140.0,11298.0,-4.0] |-1   |
|[1.0,1.0,10140.0,11298.0,-2.0] |-2   |
|[1.0,1.0,10140.0,11298.0,-1.0] |-9   |
|[1.0,1.0,10140.0,11298.0,0.0]  |-11  |
|[1.0,1.0,10140.0,11298.0,87.0] |68   |
|[1.0,1.0,10140.0,12266.0,-5.0] |-16  |
|[1.0,1.0,10140.0,12266.0,27.0] |21   |
|[1.0,1.0,10140.0,13487.0,-3.0] |-9   |
|[1.0,1.0,10299.0,12173.0,-1.0] |-13  |
|[1.0,1.0,10299.0,13487.0,-7.0] |-36  |
|[1.0,1.0,10299.0,13487.0,-6.0] |-36  |
|[1.0,1.0,10299.0,13830.0,27.0] |16   |
|[1.0,1.0,10299.0,14057.0,-11.0]|0    |
|[1.0,1.0,10299.0,14057.0,-10.0]|-7   |
|[1.0,1.0,10299.0,14747.0,-13.0]|-10  |
+-------------------------------+-----+
only showing top 20 rows



# Train the regression model

In [11]:
# Define the Linear Regression model with a max iterations of 10 and a regularization parameter of 0.3
model = LinearRegression(labelCol="label",featuresCol="features", maxIter=10, regParam=0.3)

# Train the Linear Regression model
model = model.fit(trainingDataFinal)
print("Regression model training completed!")

Regression model training completed!


# Prepare the testing data

In [12]:
# Call the previous training assembler
testingDataFinal = assembler.transform(testingData).select(col("features"), (col("ArrDelay")).cast("Int").alias("trueLabel"))
testingDataFinal.show(truncate=False)

+-------------------------------+---------+
|features                       |trueLabel|
+-------------------------------+---------+
|[1.0,1.0,10140.0,10397.0,-2.0] |-17      |
|[1.0,1.0,10140.0,11298.0,-10.0]|-19      |
|[1.0,1.0,10140.0,12266.0,838.0]|812      |
|[1.0,1.0,10299.0,13930.0,-4.0] |-18      |
|[1.0,1.0,10299.0,14747.0,-7.0] |-12      |
|[1.0,1.0,10397.0,10423.0,-1.0] |2        |
|[1.0,1.0,10397.0,10423.0,0.0]  |-11      |
|[1.0,1.0,10397.0,10423.0,1.0]  |3        |
|[1.0,1.0,10397.0,10529.0,-1.0] |-15      |
|[1.0,1.0,10397.0,10529.0,39.0] |17       |
|[1.0,1.0,10397.0,10693.0,-4.0] |-20      |
|[1.0,1.0,10397.0,10693.0,-4.0] |-10      |
|[1.0,1.0,10397.0,10693.0,-3.0] |-13      |
|[1.0,1.0,10397.0,10721.0,-4.0] |-19      |
|[1.0,1.0,10397.0,10721.0,5.0]  |-7       |
|[1.0,1.0,10397.0,10721.0,22.0] |-10      |
|[1.0,1.0,10397.0,10721.0,71.0] |49       |
|[1.0,1.0,10397.0,10792.0,-5.0] |-13      |
|[1.0,1.0,10397.0,10792.0,4.0]  |-10      |
|[1.0,1.0,10397.0,10821.0,-10.0]

# Predict testing data using the trained model

In [13]:
# Generate the model flights arrival delay predictions for the testing data
prediction = model.transform(testingDataFinal)

# Take only the relevant columns
predictionFinal = prediction.select("features", "prediction", "trueLabel")
predictionFinal.show()

+--------------------+-------------------+---------+
|            features|         prediction|trueLabel|
+--------------------+-------------------+---------+
|[1.0,1.0,10140.0,...| -5.585694348357363|      -17|
|[1.0,1.0,10140.0,...|-13.766879700247753|      -19|
|[1.0,1.0,10140.0,...|  831.2440991347188|      812|
|[1.0,1.0,10299.0,...|  -8.35969389527107|      -18|
|[1.0,1.0,10299.0,...|-11.537865204565225|      -12|
|[1.0,1.0,10397.0,...| -4.542773580928234|        2|
|[1.0,1.0,10397.0,...|-3.5460357560337874|      -11|
|[1.0,1.0,10397.0,...|-2.5492979311393404|        3|
|[1.0,1.0,10397.0,...|  -4.56715978713233|      -15|
|[1.0,1.0,10397.0,...| 35.302353208645535|       17|
|[1.0,1.0,10397.0,...|  -7.59510286386729|      -20|
|[1.0,1.0,10397.0,...|  -7.59510286386729|      -10|
|[1.0,1.0,10397.0,...| -6.598365038972844|      -13|
|[1.0,1.0,10397.0,...| -7.601544503241957|      -19|
|[1.0,1.0,10397.0,...| 1.3690959208080624|       -7|
|[1.0,1.0,10397.0,...| 18.313638944013654|    

# Measure the accuracy of the trained model

In [None]:
# Import the RegressionEvaluator for the model's accuracy
from pyspark.ml.evaluation import RegressionEvaluator

# Define the evaluator by passing the 'trueLabel' column and the prediction column, and the desired evaluation metric (root mean squared error in this case)
evaluator = RegressionEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="rmse")

# Compute RMSE using the defined evaluator
rmse = evaluator.evaluate(predictionFinal)
print("Root Mean Square Error (RMSE):", rmse)

Root Mean Square Error (RMSE): 13.200130843107127


# Predict a single flight arrival delai 

In [None]:
# Take 1 row from the DataFrame (as a list)
Onerowtest = testingData.limit(1).collect()

# Convert the one row list to RDD then to DataFrame
OnerowtestDF = spark.createDataFrame(Onerowtest, schema=testingData.schema)

# Transform the data into the feature format previously used via assembler
OnerowtestDF_TF = assembler.transform(OnerowtestDF).select(col("features"),(col("ArrDelay")).cast("Int").alias("trueLabel"))

# Predict the data using the trained model
Onerowprediction = model.transform(OnerowtestDF_TF)
OnerowpredictionFinal = Onerowprediction.select("features", "prediction", "trueLabel")

OnerowpredictionFinal.show()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 32.0 failed 1 times, most recent failure: Lost task 0.0 in stage 32.0 (TID 116) (host.docker.internal executor driver): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:612)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:594)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:789)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$runJob$1(PythonRDD.scala:181)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2433)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.EOFException
	at java.base/java.io.DataInputStream.readInt(DataInputStream.java:397)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:774)
	... 32 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:181)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:612)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:594)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:789)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$runJob$1(PythonRDD.scala:181)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2433)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
Caused by: java.io.EOFException
	at java.base/java.io.DataInputStream.readInt(DataInputStream.java:397)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:774)
	... 32 more


# Displaying 5 rows from training data

In [None]:
trainingData.show(5)

+----------+---------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+---------------+-------------+--------+--------+
|         1|        1|          10140|        10397|      -4|     -11|
|         1|        1|          10140|        10397|      -2|     -18|
|         1|        1|          10140|        10397|      -2|     -17|
|         1|        1|          10140|        10397|       0|     -12|
|         1|        1|          10140|        10821|       8|      -9|
+----------+---------+---------------+-------------+--------+--------+
only showing top 5 rows



# Displaying 4 rows from testing data

In [16]:
testingData.show(4)

+----------+---------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+---------------+-------------+--------+--------+
|         1|        1|          10140|        10397|       0|      -9|
|         1|        1|          10140|        10821|       4|       4|
|         1|        1|          10140|        11259|      -5|     -23|
|         1|        1|          10140|        11259|      -5|     -14|
+----------+---------+---------------+-------------+--------+--------+
only showing top 4 rows



# Displaying coefficients and intercept

In [None]:
# Print the number of features
print("Number of features: " + str(model.numFeatures))

# Coefficients: DayofMonth, DayOfWeek, OriginAirportID, DestAirportID, DepDelay
print("Coefficients: " + str(model.coefficients))

# Intercept (constant)
print("Intercept: " + str(model.intercept))

Jumlah fitur: 5
Koefisien: [0.010972239517679812,-0.1415044049108317,0.00019352715667810667,-0.00023086025472954724,0.998130533411174]
Konstanta: -2.982229741541443


# Reducing the number of variables to only two

In [None]:
# Splitting data randomly, 70% for training and 30% for testing
dataTerpisahkan = data.randomSplit([0.7, 0.3])
trainingData2 = dataTerpisahkan[0] // training data at index 0
testingData2 = dataTerpisahkan[1] // testing data at index 1
train_rows2 = trainingData.count()
test_rows2 = testingData.count()

print ("Training data count:", train_rows2, "| Testing data count:", test_rows2)

Jumlah data training: 1890299 | Jumlah data testing: 811919


In [None]:
# Limiting variables to only two
# Defining assembler using only 'OriginAirportID' and 'DestAirportID'
assembler2 = VectorAssembler(inputCols = ["OriginAirportID", "DestAirportID"], outputCol="features")

# Transforming data into a single feature column using the defined assembler
trainingDataFinal2 = assembler2.transform(trainingData2).select(col("features"), (col("ArrDelay").cast("Int").alias("label")))
trainingDataFinal2.show(truncate=False, n=3)

+-----------------+-----+
|features         |label|
+-----------------+-----+
|[10140.0,10397.0]|-11  |
|[10140.0,10397.0]|-18  |
|[10140.0,10397.0]|-17  |
+-----------------+-----+
only showing top 3 rows



In [None]:
model2 = algoritma.fit(trainingDataFinal2)
print("Regression model training completed!")

Model regresi selesai ditraining!


In [None]:
# Transforming testing data using the previously defined assembler
testingDataFinal2 = assembler2.transform(testingData2).select(col("features"), (col("ArrDelay")).cast("Int").alias("trueLabel"))
testingDataFinal2.show(truncate=False, n=2)

+-----------------+---------+
|features         |trueLabel|
+-----------------+---------+
|[10140.0,10397.0]|-9       |
|[10140.0,11259.0]|-11      |
+-----------------+---------+
only showing top 2 rows



In [None]:
# Predicting testing data using our trained model
prediksiMentah2 = model2.transform(testingDataFinal2)

# Selecting only the relevant columns
prediksiFinal2 = prediksiMentah2.select("features", "prediction", "trueLabel")

# Displaying 3 prediction results
prediksiFinal2.show(3)

+-----------------+------------------+---------+
|         features|        prediction|trueLabel|
+-----------------+------------------+---------+
|[10140.0,10397.0]|7.5309839440462465|       -9|
|[10140.0,11259.0]| 7.443073537441167|      -11|
|[10140.0,11259.0]| 7.443073537441167|        5|
+-----------------+------------------+---------+
only showing top 3 rows



In [None]:
# Importing module to evaluate regression accuracy
from pyspark.ml.evaluation import RegressionEvaluator

# Defining evaluator by providing 'trueLabel' and prediction columns and the desired metric (rmse)
evaluator2 = RegressionEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="rmse")

# Computing RMSE using the defined evaluator
rmse = evaluator2.evaluate(prediksiFinal2)
print("Root Mean Square Error (RMSE):", rmse)

Root Mean Square Error (RMSE): 38.6106863771816
