In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Create Spark Session
spark = SparkSession.builder \
    .appName("Transport Predictive Analytics") \
    .getOrCreate()

print("Spark Session Created!")


Spark Session Created!


In [2]:
# ======================
# FILE PATHS
# ======================

DISRUPTION_PATH = r"C:\Users\sujee\Desktop\DATASCIENCE123\disruptions_data_catalogue.csv"
FARES_PATH = r"C:\Users\sujee\Desktop\DATASCIENCE123\fares_data_catalogue.csv"
LOCATION_PATH = r"C:\Users\sujee\Desktop\DATASCIENCE123\location_data_catalogue.csv"
TIMETABLE_PATH = r"C:\Users\sujee\Desktop\DATASCIENCE123\timetables_data_catalogue.csv"


In [3]:
# ======================
# LOAD DATASETS USING PYSPARK
# ======================

disruptions_df = spark.read.csv(DISRUPTION_PATH, header=True, inferSchema=True)
fares_df = spark.read.csv(FARES_PATH, header=True, inferSchema=True)
location_df = spark.read.csv(LOCATION_PATH, header=True, inferSchema=True)
timetable_df = spark.read.csv(TIMETABLE_PATH, header=True, inferSchema=True)

print("Datasets Loaded Successfully!")


Datasets Loaded Successfully!


In [4]:
print(type(disruptions_df))
print(type(fares_df))
print(type(location_df))
print(type(timetable_df))


<class 'pyspark.sql.classic.dataframe.DataFrame'>
<class 'pyspark.sql.classic.dataframe.DataFrame'>
<class 'pyspark.sql.classic.dataframe.DataFrame'>
<class 'pyspark.sql.classic.dataframe.DataFrame'>


In [5]:
print("Disruptions Data:")
disruptions_df.show(5)

print("Fares Data:")
fares_df.show(5)

print("Location Data:")
location_df.show(5)

print("Timetable Data:")
timetable_df.show(5)


Disruptions Data:
+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+-------+--------------+------------------+-----------------+--------------+
|   Organisation|                  ID|      Validity start|        Validity end|   Publication start|     Publication end|    Reason|Planned|Modes affected|Operators affected|Services affected|Stops affected|
+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+-------+--------------+------------------+-----------------+--------------+
|West of England|18224249-29cf-4f6...|Sun Sep 01 2024 0...|                NULL|Sun Sep 01 2024 0...|                NULL| roadworks|   true|           bus|              NULL|             NULL|          NULL|
|           WYCA|2b9e1d8f-b0ee-43a...|Mon Nov 04 2024 0...|                NULL|Mon Nov 04 2024 0...|                NULL| roadworks|   true|     

In [6]:
# Remove duplicates
disruptions_df = disruptions_df.dropDuplicates()
fares_df = fares_df.dropDuplicates()
location_df = location_df.dropDuplicates()
timetable_df = timetable_df.dropDuplicates()

print("Duplicates Removed!")


Duplicates Removed!


In [7]:
disruptions_df = disruptions_df.fillna("Unknown")
fares_df = fares_df.fillna("Unknown")
location_df = location_df.fillna("Unknown")
timetable_df = timetable_df.fillna("Unknown")

print("Missing Values Handled!")


Missing Values Handled!


In [8]:
print("Data processing complete!")

print("Disruptions Count:", disruptions_df.count())
print("Fares Count:", fares_df.count())
print("Location Count:", location_df.count())
print("Timetable Count:", timetable_df.count())

print("Preview Timetable:")
timetable_df.show(5)


Data processing complete!
Disruptions Count: 332
Fares Count: 227857
Location Count: 459
Timetable Count: 14069
Preview Timetable:
+----------------+-------------+------------------+----------------+----------+------------+---------------+--------------------+--------------------+-----------+----------------------------------------+-----------------------------------+---------------------------------+-----------------------------------------+-------------------+-----------------+--------------------+----------------------+--------------------------+------------------+-------------------+-------------------+-------------------------------+-----------------------------+----------+--------------------+---------------+--------------------+--------------------+------------------+------------------+-----------------------+----------------------------+--------------------+------------------+--------------------+--------------------+--------------------+----------------+---------------+-------

In [9]:
print("Timetable Columns:", timetable_df.columns)
print("Disruptions Columns:", disruptions_df.columns)
print("Location Columns:", location_df.columns)
print("Fares Columns:", fares_df.columns)


Timetable Columns: ['XML:Service Code', 'XML:Line Name', 'Requires Attention', 'Published Status', 'OTC Status', 'Scope Status', 'Seasonal Status', 'Timeliness Status', 'Organisation Name', 'Data set ID', 'Date OTC variation needs to be published', 'Date for complete 42 day look ahead', 'Date when data is over 1 year old', 'Date seasonal service should be published', 'Seasonal Start Date', 'Seasonal End Date', 'XML:Filename', 'XML:Last Modified Date', 'XML:National Operator Code', 'XML:Licence Number', 'XML:Public Use Flag', 'XML:Revision Number', 'XML:Operating Period Start Date', 'XML:Operating Period End Date', 'OTC:Origin', 'OTC:Destination', 'OTC:Operator ID', 'OTC:Operator Name', 'OTC:Address', 'OTC:Licence Number', 'OTC:Licence Status', 'OTC:Registration Number', 'OTC:Service Type Description', 'OTC:Variation Number', 'OTC:Service Number', 'OTC:Start Point', 'OTC:Finish Point', 'OTC:Via', 'OTC:Granted Date', 'OTC:Expiry Date', 'OTC:Effective Date', 'OTC:Received Date', 'OTC:Serv

In [10]:
timetable_disruptions_df = timetable_df.join(
    disruptions_df,
    timetable_df["Organisation Name"] == disruptions_df["Organisation"],
    "left"
)

timetable_disruptions_df.show(5)


+----------------+-------------+------------------+----------------+----------+------------+---------------+--------------------+--------------------+-----------+----------------------------------------+-----------------------------------+---------------------------------+-----------------------------------------+-------------------+-----------------+--------------------+----------------------+--------------------------+------------------+-------------------+-------------------+-------------------------------+-----------------------------+----------+--------------------+---------------+--------------------+--------------------+------------------+------------------+-----------------------+----------------------------+--------------------+------------------+--------------------+--------------------+--------------------+----------------+---------------+------------------+-----------------+------------------------------+----------------+-------------------------+------------+----+---------

In [11]:
import os
os.environ["HADOOP_HOME"] = "C:/hadoop"
os.environ["hadoop.home.dir"] = "C:/hadoop"


In [12]:
timetable_disruptions_df.toPandas().to_csv(
    r"C:\Users\sujee\Desktop\joined_timetable_disruptions.csv",
    index=False
)

print("Saved to Desktop!")


Saved to Desktop!


In [13]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator


In [14]:
ml_df = timetable_df.select(
    "Requires Attention",
    "Published Status",
    "Scope Status"
).dropna()

ml_df.show(5)


+------------------+----------------+------------+
|Requires Attention|Published Status|Scope Status|
+------------------+----------------+------------+
|                No|       Published|    In Scope|
|                No|       Published|Out of Scope|
|               Yes|       Published|    In Scope|
|               Yes|     Unpublished|    In Scope|
|                No|       Published|    In Scope|
+------------------+----------------+------------+
only showing top 5 rows


In [15]:
indexer1 = StringIndexer(inputCol="Published Status", outputCol="Published_Status_Index")
indexer2 = StringIndexer(inputCol="Scope Status", outputCol="Scope_Status_Index")
indexer3 = StringIndexer(inputCol="Requires Attention", outputCol="label")

ml_df = indexer1.fit(ml_df).transform(ml_df)
ml_df = indexer2.fit(ml_df).transform(ml_df)
ml_df = indexer3.fit(ml_df).transform(ml_df)

ml_df.show(5)


+------------------+----------------+------------+----------------------+------------------+-----+
|Requires Attention|Published Status|Scope Status|Published_Status_Index|Scope_Status_Index|label|
+------------------+----------------+------------+----------------------+------------------+-----+
|                No|       Published|    In Scope|                   0.0|               0.0|  0.0|
|                No|       Published|Out of Scope|                   0.0|               1.0|  0.0|
|               Yes|       Published|    In Scope|                   0.0|               0.0|  1.0|
|               Yes|     Unpublished|    In Scope|                   1.0|               0.0|  1.0|
|                No|       Published|    In Scope|                   0.0|               0.0|  0.0|
+------------------+----------------+------------+----------------------+------------------+-----+
only showing top 5 rows


In [16]:
assembler = VectorAssembler(
    inputCols=[
        "Published_Status_Index",
        "Scope_Status_Index"
    ],
    outputCol="features"
)

ml_df = assembler.transform(ml_df)

ml_df.select("features", "label").show(5)


+---------+-----+
| features|label|
+---------+-----+
|[0.0,0.0]|  0.0|
|[0.0,1.0]|  0.0|
|[0.0,0.0]|  1.0|
|[1.0,0.0]|  1.0|
|[0.0,0.0]|  0.0|
+---------+-----+
only showing top 5 rows


In [17]:
train_df, test_df = ml_df.randomSplit([0.8, 0.2], seed=42)

print("Train count:", train_df.count())
print("Test count:", test_df.count())


Train count: 11355
Test count: 2714


In [18]:
lr = LogisticRegression(
    featuresCol="features",
    labelCol="label"
)

model = lr.fit(train_df)

print("Model trained!")


Model trained!


In [19]:
predictions = model.transform(test_df)

predictions.select(
    "features",
    "label",
    "prediction",
    "probability"
).show(10)


+---------+-----+----------+--------------------+
| features|label|prediction|         probability|
+---------+-----+----------+--------------------+
|[5.0,2.0]|125.0|      19.0|[3.51531431029350...|
|[5.0,2.0]|138.0|      19.0|[3.51531431029350...|
|[2.0,2.0]| 79.0|       2.0|[7.01654532835557...|
|[2.0,2.0]| 82.0|       2.0|[7.01654532835557...|
|[2.0,2.0]| 85.0|       2.0|[7.01654532835557...|
|[2.0,2.0]| 11.0|       2.0|[7.01654532835557...|
|[2.0,2.0]| 90.0|       2.0|[7.01654532835557...|
|[2.0,2.0]| 25.0|       2.0|[7.01654532835557...|
|[2.0,2.0]| 98.0|       2.0|[7.01654532835557...|
|[2.0,2.0]| 29.0|       2.0|[7.01654532835557...|
+---------+-----+----------+--------------------+
only showing top 10 rows


In [20]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="accuracy"
)

accuracy = evaluator.evaluate(predictions)

print("Model Accuracy:", accuracy)


Model Accuracy: 0.8868828297715549


In [21]:
disruptions_df.groupBy("Organisation").count().show()


+------------------+-----+
|      Organisation|count|
+------------------+-----+
|              WYCA|   47|
|North Lincolnshire|    2|
|              TfGM|  130|
|             SYMCA|   34|
|      Merseytravel|   18|
|          Cornwall|    8|
|   West of England|   93|
+------------------+-----+



In [22]:
peak_df = timetable_df.groupBy("Organisation Name").count()

peak_df.orderBy("count", ascending=False).show(10)


+--------------------+-----+
|   Organisation Name|count|
+--------------------+-----+
|Organisation not ...| 2046|
|          Stagecoach| 1671|
|             Unknown| 1272|
|            Go-Ahead| 1119|
|           First Bus|  869|
|       Arriva UK Bus|  684|
|Transport for Gre...|  631|
|     Centrebus Group|  232|
|National Express ...|  228|
|            Transdev|  208|
+--------------------+-----+
only showing top 10 rows


In [23]:
print("========== TASK 4 PIPELINE START ==========")




In [24]:
print(model)


LogisticRegressionModel: uid=LogisticRegression_c44d4ead6f21, numClasses=239, numFeatures=2


In [25]:
predictions = model.transform(ml_df)

predictions.show(5)


+------------------+----------------+------------+----------------------+------------------+-----+---------+--------------------+--------------------+----------+
|Requires Attention|Published Status|Scope Status|Published_Status_Index|Scope_Status_Index|label| features|       rawPrediction|         probability|prediction|
+------------------+----------------+------------+----------------------+------------------+-----+---------+--------------------+--------------------+----------+
|                No|       Published|    In Scope|                   0.0|               0.0|  0.0|[0.0,0.0]|[158.607317836934...|[0.92517121744603...|       0.0|
|                No|       Published|Out of Scope|                   0.0|               1.0|  0.0|[0.0,1.0]|[91.1374142166726...|[1.0,3.6690323615...|       0.0|
|               Yes|       Published|    In Scope|                   0.0|               0.0|  1.0|[0.0,0.0]|[158.607317836934...|[0.92517121744603...|       0.0|
|               Yes|     Unp

In [26]:
predictions.select(
    "Requires Attention",
    "prediction",
    "probability"
).show(10)


+--------------------+----------+--------------------+
|  Requires Attention|prediction|         probability|
+--------------------+----------+--------------------+
|                  No|       0.0|[0.92517121744603...|
|                  No|       0.0|[1.0,3.6690323615...|
|                 Yes|       0.0|[0.92517121744603...|
|                 Yes|       1.0|[0.13486245044922...|
|                  No|       0.0|[0.92517121744603...|
|                  No|       0.0|[0.92517121744603...|
|North East Combin...|       2.0|[7.01654532835557...|
|                  No|       0.0|[0.92517121744603...|
|                  No|       0.0|[1.0,3.6690323615...|
|West Yorkshire Co...|       2.0|[7.01654532835557...|
+--------------------+----------+--------------------+
only showing top 10 rows


In [27]:
predictions.groupBy("prediction").count().show()


+----------+-----+
|prediction|count|
+----------+-----+
|       0.0|11671|
|       1.0| 1123|
|     114.0|    6|
|      78.0|    1|
|     130.0|    1|
|      72.0|    6|
|      19.0|   30|
|       2.0| 1190|
|      15.0|   30|
|      76.0|    6|
|      77.0|    2|
|     140.0|    1|
|     132.0|    2|
+----------+-----+



In [29]:

import os

os.makedirs("pipeline_output", exist_ok=True)


In [35]:
predictions.toPandas().to_csv(
    "C:/Users/sujee/Desktop/predictions.csv",
    index=False
)

print("Saved to Desktop!")



Saved to Desktop!


In [36]:
df = pd.read_csv("pipeline_output/predictions.csv")
