## Machine learning model for a tuckfleet simulator

In [1]:
import findspark
findspark.init()

In [2]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1" pyspark-shell'

In [3]:
from pyspark.sql.session import SparkSession

spark = SparkSession.builder\
                    .appName("Truck Fleet - Streaming - Structured Streaming")\
                    .getOrCreate()



:: loading settings :: url = jar:file:/opt/spark3/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/osbdet/.ivy2/cache
The jars for the packages stored in: /home/osbdet/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-4e70a19a-f263-4488-993d-4f695d03a691;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.2.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.2.1 in central
	found org.apache.kafka#kafka-clients;2.8.0 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.1 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.1 in central
	found org.apache.htrace#htrace-core4;4.1.0-incubating in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central

## Streaming dataframe

In [4]:
events_df = spark.readStream\
                 .format("kafka") \
                 .option("kafka.bootstrap.servers", "localhost:9092") \
                 .option("subscribe", "geo_event") \
                 .option("startingOffsets", "latest") \
                 .option("kafka.group.id", "IE") \
                 .load()

In [5]:
from pyspark.sql.functions import split, col
from pyspark.sql.types import TimestampType

topDriverViolations_df = events_df.select(split("value",'\|').alias("fields"))\
                                  .withColumn("timestamp_str",col("fields").getItem(0))\
                                  .withColumn("eventTime",col("timestamp_str").cast(TimestampType()))\
                                  .withColumn("driverName",col("fields").getItem(4))\
                                  .withColumn("violation", col("fields").getItem(7))\
                                  .select("eventTime", "driverName", "violation")
topDriverViolations_df

DataFrame[eventTime: timestamp, driverName: string, violation: string]

In [6]:
# from pyspark.sql.functions import when, col

# # Assuming 'violation' is the column to check, and 'topDriverViolations' is your DataFrame
# # We create a new column 'is_violation' where we mark with 1 if the condition is met (violation != 'Normal'), otherwise 0
# topDriverViolations = topDriverViolations_df

In [7]:
from pyspark.sql.functions import year, month, dayofmonth, when, col

# Define the name pattern correctly
namePattern = ".*(Adam|John|Michael).*"

topDriverViolations = topDriverViolations_df \
    .withColumn("year", year(col("eventTime"))) \
    .withColumn("month", month(col("eventTime"))) \
    .withColumn("day", dayofmonth(col("eventTime"))) \
    .withColumn(
        "is_Adam_John_Michael",
        when(col("driverName").rlike(namePattern), 1).otherwise(0)
    ) \
    .withColumn(
        'is_violation',
        when(col('violation') != 'Normal', 1).otherwise(0)
    )

In our business case we only care what's happening right away, therefore 5 minutes it's too late for us and we don't want to consider those events arriving so late:

In [8]:
topDriverViolations = topDriverViolations.withWatermark("eventTime", "10 seconds")

## apply assembler for the mllib model


In [9]:
from pyspark.ml.feature import VectorAssembler

# Define the VectorAssembler transformation
assembler = VectorAssembler(
    inputCols=["year", "month", "day", "is_Adam_John_Michael"],
    outputCol="features"
)

# Apply the transformation to your streaming DataFrame
topDriverViolationsWithFeatures = assembler.transform(topDriverViolations)\
                                    .select("features", "eventTime", "year", "month", "day")


# Make predictions

In [10]:

from pyspark.ml.classification import LogisticRegressionModel


modelPath = "hdfs://localhost:9000/datalake/raw/truckfleet/logistic_regression_model"
# Load the model back from the specified HDFS path
loadedModel = LogisticRegressionModel.load(modelPath)

print("Model successfully loaded.")


[Stage 2:>                                                          (0 + 1) / 1]

Model successfully loaded.


                                                                                

In [11]:
# Apply the loaded model to make predictions
predictions_df = loadedModel.transform(topDriverViolationsWithFeatures)

# Replace the 'prediction' column with "violation" or "no violation" based on its value
predictions_df = predictions_df.withColumn(
    'prediction', 
    when(col('prediction') == 1, 'violation').otherwise('no violation')
)

In [12]:
# Select only "features", "probability", and "prediction" columns for display
predictionsToDisplay = predictions_df.select("features", "probability", "prediction")

# Write the selected columns to console in real-time
query = predictionsToDisplay \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()


-------------------------------------------
Batch: 0
-------------------------------------------
+--------+-----------+----------+
|features|probability|prediction|
+--------+-----------+----------+
+--------+-----------+----------+



[Stage 4:>                                                          (0 + 1) / 1]

-------------------------------------------
Batch: 1
-------------------------------------------
+--------------------+--------------------+------------+
|            features|         probability|  prediction|
+--------------------+--------------------+------------+
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
+--------------------+--------------------+------------+



                                                                                

-------------------------------------------
Batch: 2
-------------------------------------------
+--------------------+--------------------+------------+
|            features|         probability|  prediction|
+--------------------+--------------------+------------+
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.27224640359727...|   violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.27224640359727...|   violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no vi

[Stage 7:>                                                          (0 + 1) / 1]                                                                                

-------------------------------------------
Batch: 4
-------------------------------------------
+--------------------+--------------------+------------+
|            features|         probability|  prediction|
+--------------------+--------------------+------------+
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.27224640359727...|   violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no vi

[Stage 8:>                                                          (0 + 1) / 1]                                                                                

-------------------------------------------
Batch: 5
-------------------------------------------
+--------------------+--------------------+------------+
|            features|         probability|  prediction|
+--------------------+--------------------+------------+
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.27224640359727...|   violation|
+--------------------+--------------------+------------+

-------------------------------------------
Batch: 6
-------------------------------------------
+--------------------+--------------------+------------+
|      

[Stage 10:>                                                         (0 + 1) / 1]                                                                                

-------------------------------------------
Batch: 7
-------------------------------------------
+--------------------+--------------------+------------+
|            features|         probability|  prediction|
+--------------------+--------------------+------------+
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
+--------------------+--------------------+------------+



[Stage 11:>                                                         (0 + 1) / 1]                                                                                

-------------------------------------------
Batch: 8
-------------------------------------------
+--------------------+--------------------+------------+
|            features|         probability|  prediction|
+--------------------+--------------------+------------+
|[2024.0,3.0,13.0,...|[0.27224640359727...|   violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no vi

[Stage 13:>                                                         (0 + 1) / 1]                                                                                

-------------------------------------------
Batch: 10
-------------------------------------------
+--------------------+--------------------+------------+
|            features|         probability|  prediction|
+--------------------+--------------------+------------+
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.27224640359727...|   violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no v

[Stage 14:>                                                         (0 + 1) / 1]                                                                                

-------------------------------------------
Batch: 11
-------------------------------------------
+--------------------+--------------------+------------+
|            features|         probability|  prediction|
+--------------------+--------------------+------------+
|[2024.0,3.0,13.0,...|[0.27224640359727...|   violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.27224640359727...|   violation|
+--------------------+--------------------+------------+

-------------------------------------------
Batch: 12
-------------------------------------------
+----

[Stage 16:>                                                         (0 + 1) / 1]                                                                                

-------------------------------------------
Batch: 13
-------------------------------------------
+--------------------+--------------------+------------+
|            features|         probability|  prediction|
+--------------------+--------------------+------------+
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
+--------------------+--------------------+------------+

-------------------------------------------
Batch: 14
-------------------------------------------
+--------------------+--------------------+------------+
|            features|         probability|  prediction|
+--------------------+--------------------+------------+
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[202

[Stage 20:>                                                         (0 + 1) / 1]                                                                                

-------------------------------------------
Batch: 17
-------------------------------------------
+--------------------+--------------------+------------+
|            features|         probability|  prediction|
+--------------------+--------------------+------------+
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
+--------------------+--------------------+------------+



[Stage 21:>                                                         (0 + 1) / 1]                                                                                

-------------------------------------------
Batch: 18
-------------------------------------------
+--------------------+--------------------+------------+
|            features|         probability|  prediction|
+--------------------+--------------------+------------+
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.27224640359727...|   violation|
|[2024.0,3.0,13.0,...|[0.27224640359727...|   violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no v

[Stage 22:>                                                         (0 + 1) / 1]                                                                                

-------------------------------------------
Batch: 19
-------------------------------------------
+--------------------+--------------------+------------+
|            features|         probability|  prediction|
+--------------------+--------------------+------------+
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no v

[Stage 23:>                                                         (0 + 1) / 1]                                                                                

-------------------------------------------
Batch: 20
-------------------------------------------
+--------------------+--------------------+------------+
|            features|         probability|  prediction|
+--------------------+--------------------+------------+
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.27224640359727...|   violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no violation|
|[2024.0,3.0,13.0,...|[0.93987354252975...|no v

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/opt/spark3/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/opt/spark3/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.9/socket.py", line 704, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [None]:
# query = topDriverViolations \
#     .writeStream \
#     .outputMode("append") \
#     .format("console") \
#     .start()

# query.awaitTermination()

In [None]:
# # Stop Spark session
# spark.stop()

In [None]:
predictions_df.printSchema()