In [118]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, month, dayofmonth, hour, regexp_replace

In [120]:
# Intialize spark session
spark = SparkSession.builder \
    .appName("New Traffic Data Analysis on Cycling") \
    .getOrCreate()

24/04/20 21:29:03 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [121]:
# Load the Datasets
df1 = spark.read.csv("gs://cycling_raw_data/1_Monitoring_locations.csv", header=True, inferSchema=True)
df2 = spark.read.csv("gs://cycling_raw_data/2014_Jan_March.csv", header=True, inferSchema=True)

In [122]:
# Data preprocessing
# Before merging try to understand the dataset schema and dtypes
# Checking the schema to understand the data types
df1.printSchema()
df2.printSchema()

root
 |-- Site ID: string (nullable = true)
 |-- Location description: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Functional area for monitoring: string (nullable = true)
 |-- Road type: string (nullable = true)
 |-- Is it on the strategic CIO panel?: integer (nullable = true)
 |-- Old site ID (legacy): string (nullable = true)
 |-- Easting (UK Grid): double (nullable = true)
 |-- Northing (UK Grid): double (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)

root
 |-- Year: string (nullable = true)
 |-- UnqID: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Weather: string (nullable = true)
 |-- Time: timestamp (nullable = true)
 |-- Day: string (nullable = true)
 |-- Round: string (nullable = true)
 |-- Dir: string (nullable = true)
 |-- Path: string (nullable = true)
 |-- Mode: string (nullable = true)
 |-- Count: integer (nullable = true)



In [123]:
# Convert date formats if necessary (assuming Date columns are in different formats)
from pyspark.sql.functions import to_date, date_format
df2 = df2.withColumn("Date", to_date(df2.Date, 'dd/MM/yyyy'))

In [124]:
# Print unique values for each column
for column in df2.columns:
    print(f"Unique values in {column}:")
    df2.select(column).distinct().show()

Unique values in Year:
+-----------------+
|             Year|
+-----------------+
|2014 Q1 (Jan-Mar)|
+-----------------+

Unique values in UnqID:
+------+
| UnqID|
+------+
|ML0172|
|ML0195|
|ML0082|
|ML0081|
|ML0170|
|ML0003|
|ML0156|
|ML0006|
|ML0189|
|ML0199|
|ML0184|
|ML0127|
|ML0011|
|ML0110|
|ML0070|
|ML0176|
|ML0126|
|ML0135|
|ML0125|
|ML0050|
+------+
only showing top 20 rows

Unique values in Date:
+----------+
|      Date|
+----------+
|2014-02-13|
|2014-01-21|
|2014-02-26|
|2014-03-27|
|2014-01-23|
|2014-02-10|
|2014-03-24|
|2014-02-24|
|2014-02-03|
|2014-01-29|
|2014-01-20|
|2014-03-28|
|2014-03-31|
|2014-01-28|
|2014-01-31|
|2014-01-27|
|2014-01-22|
|2014-02-25|
|2014-02-27|
|2014-02-04|
+----------+
only showing top 20 rows

Unique values in Weather:
+------------------+
|           Weather|
+------------------+
|        Almost Dry|
|           Showery|
|         Dry - Wet|
|      Rain Stopped|
|         Rain Damp|
|      Light Shower|
|        Cold/ Rain|
|        Dry 

In [125]:
# 1. Extract the year from the "Year" column
df2 = df2.withColumn("Year", regexp_replace(col("Year"), " Q[1-4] \\(.*\\)", ""))
df2.show(5)

+----+------+----------+-------+-------------------+-------+-----+----------+----+--------------+-----+
|Year| UnqID|      Date|Weather|               Time|    Day|Round|       Dir|Path|          Mode|Count|
+----+------+----------+-------+-------------------+-------+-----+----------+----+--------------+-----+
|2014|ML0001|2014-01-24|    Dry|2024-04-20 06:00:00|Weekday|    A|Northbound| n/a|Private cycles|    0|
|2014|ML0001|2014-01-24|    Dry|2024-04-20 06:15:00|Weekday|    A|Northbound| n/a|Private cycles|   15|
|2014|ML0001|2014-01-24|    Dry|2024-04-20 06:30:00|Weekday|    A|Northbound| n/a|Private cycles|   35|
|2014|ML0001|2014-01-24|    Dry|2024-04-20 06:45:00|Weekday|    A|Northbound| n/a|Private cycles|   59|
|2014|ML0001|2014-01-24|    Dry|2024-04-20 07:00:00|Weekday|    A|Northbound| n/a|Private cycles|   73|
+----+------+----------+-------+-------------------+-------+-----+----------+----+--------------+-----+
only showing top 5 rows



In [126]:
# 2. Create new "Month" and "Day" columns from the "Date" column
df2 = df2.withColumn("Month", month(to_date(col("Date"), "dd/MM/yyyy"))) \
         .withColumn("Day", dayofmonth(to_date(col("Date"), "dd/MM/yyyy")))
df2.show(5)

+----+------+----------+-------+-------------------+---+-----+----------+----+--------------+-----+-----+
|Year| UnqID|      Date|Weather|               Time|Day|Round|       Dir|Path|          Mode|Count|Month|
+----+------+----------+-------+-------------------+---+-----+----------+----+--------------+-----+-----+
|2014|ML0001|2014-01-24|    Dry|2024-04-20 06:00:00| 24|    A|Northbound| n/a|Private cycles|    0|    1|
|2014|ML0001|2014-01-24|    Dry|2024-04-20 06:15:00| 24|    A|Northbound| n/a|Private cycles|   15|    1|
|2014|ML0001|2014-01-24|    Dry|2024-04-20 06:30:00| 24|    A|Northbound| n/a|Private cycles|   35|    1|
|2014|ML0001|2014-01-24|    Dry|2024-04-20 06:45:00| 24|    A|Northbound| n/a|Private cycles|   59|    1|
|2014|ML0001|2014-01-24|    Dry|2024-04-20 07:00:00| 24|    A|Northbound| n/a|Private cycles|   73|    1|
+----+------+----------+-------+-------------------+---+-----+----------+----+--------------+-----+-----+
only showing top 5 rows



In [127]:
# Convert the numeric month to its corresponding calendar month name
df2 = df2.withColumn("Month", date_format(to_date(col("Date"), "dd/MM/yyyy"), "MMMM"))

# Replace the numeric day of the month with the day of the week
df2 = df2.withColumn("Day", date_format(to_date(col("Date"), "dd/MM/yyyy"), "EEEE"))
df2.show(5)

+----+------+----------+-------+-------------------+------+-----+----------+----+--------------+-----+-------+
|Year| UnqID|      Date|Weather|               Time|   Day|Round|       Dir|Path|          Mode|Count|  Month|
+----+------+----------+-------+-------------------+------+-----+----------+----+--------------+-----+-------+
|2014|ML0001|2014-01-24|    Dry|2024-04-20 06:00:00|Friday|    A|Northbound| n/a|Private cycles|    0|January|
|2014|ML0001|2014-01-24|    Dry|2024-04-20 06:15:00|Friday|    A|Northbound| n/a|Private cycles|   15|January|
|2014|ML0001|2014-01-24|    Dry|2024-04-20 06:30:00|Friday|    A|Northbound| n/a|Private cycles|   35|January|
|2014|ML0001|2014-01-24|    Dry|2024-04-20 06:45:00|Friday|    A|Northbound| n/a|Private cycles|   59|January|
|2014|ML0001|2014-01-24|    Dry|2024-04-20 07:00:00|Friday|    A|Northbound| n/a|Private cycles|   73|January|
+----+------+----------+-------+-------------------+------+-----+----------+----+--------------+-----+-------+
o

In [128]:
# 3. Delete the original "Date" column
df2 = df2.drop("Date")

In [129]:
df2.show(5)

+----+------+-------+-------------------+------+-----+----------+----+--------------+-----+-------+
|Year| UnqID|Weather|               Time|   Day|Round|       Dir|Path|          Mode|Count|  Month|
+----+------+-------+-------------------+------+-----+----------+----+--------------+-----+-------+
|2014|ML0001|    Dry|2024-04-20 06:00:00|Friday|    A|Northbound| n/a|Private cycles|    0|January|
|2014|ML0001|    Dry|2024-04-20 06:15:00|Friday|    A|Northbound| n/a|Private cycles|   15|January|
|2014|ML0001|    Dry|2024-04-20 06:30:00|Friday|    A|Northbound| n/a|Private cycles|   35|January|
|2014|ML0001|    Dry|2024-04-20 06:45:00|Friday|    A|Northbound| n/a|Private cycles|   59|January|
|2014|ML0001|    Dry|2024-04-20 07:00:00|Friday|    A|Northbound| n/a|Private cycles|   73|January|
+----+------+-------+-------------------+------+-----+----------+----+--------------+-----+-------+
only showing top 5 rows



In [130]:
# 4. Modify the "Time" column to label time ranges
df2 = df2.withColumn("Time", 
    when((hour(col("Time")) >= 3) & (hour(col("Time")) < 11), "Morning")
    .when((hour(col("Time")) >= 11) & (hour(col("Time")) < 15), "Afternoon")
    .when((hour(col("Time")) >= 15) & (hour(col("Time")) < 18), "Evening")
    .otherwise("Night"))

# Show the transformed DataFrame (for verification)
df2.select("Time").show(5)

+-------+
|   Time|
+-------+
|Morning|
|Morning|
|Morning|
|Morning|
|Morning|
+-------+
only showing top 5 rows



In [131]:
# 6. Analyze the "Round" column
df2 = df2.withColumn("Round", 
    when(col("Round").rlike("[A-I]"), "Weekday")
    .otherwise("Weekend"))
df2.select("Round").show(5)

+-------+
|  Round|
+-------+
|Weekday|
|Weekday|
|Weekday|
|Weekday|
|Weekday|
+-------+
only showing top 5 rows



In [132]:
# 7. Apply transformations to the "Weather" column
df2 = df2.withColumn("Weather", 
    when(col("Weather").rlike("(?i)^v.? wet$"), "Wet")
    .when(col("Weather").rlike("(?i)^wet$"), "Wet")
    .otherwise(col("Weather")))

In [133]:
# Count of each distinct value in the "Weather" column
weather_counts = df2.groupBy("Weather").count()

# Show the counts for each weather type
weather_counts.show()

# Get the total count of distinct "Weather" values
total_distinct_weather = weather_counts.count()

# Print the total count of distinct weather values
print(f"Total distinct weather values: {total_distinct_weather}")

+------------------+-----+
|           Weather|count|
+------------------+-----+
|        Almost Dry|    4|
|           Showery|    8|
|         Dry - Wet|   64|
|      Rain Stopped|    4|
|         Rain Damp|   94|
|      Light Shower|    4|
|        Cold/ Rain|  256|
|        Dry - Rain|   30|
|               Dry|30082|
|         Wet - Dry|  144|
|          Rain Dry|    4|
|Wet Intermittently|   52|
|              Damp|  194|
|      Cloudy/ Rain|  128|
|       Windy/ Rain|  128|
|              Fine| 1152|
|        Light Rain|   68|
|       Damp - Rain|   34|
|            S. Wet|   64|
|          Very Wet|   64|
+------------------+-----+
only showing top 20 rows

Total distinct weather values: 27


In [134]:
df2.select("Weather").distinct().show()

+------------------+
|           Weather|
+------------------+
|        Almost Dry|
|           Showery|
|         Dry - Wet|
|      Rain Stopped|
|         Rain Damp|
|      Light Shower|
|        Cold/ Rain|
|        Dry - Rain|
|               Dry|
|         Wet - Dry|
|          Rain Dry|
|Wet Intermittently|
|              Damp|
|      Cloudy/ Rain|
|       Windy/ Rain|
|              Fine|
|        Light Rain|
|       Damp - Rain|
|            S. Wet|
|          Very Wet|
+------------------+
only showing top 20 rows



In [135]:
# Count the occurrences of "wet" values
wet_count = df2.filter(col("Weather").like("%Wet%")).count()

# Count the occurrences of "Very wet" values
very_wet_count = df2.filter(col("Weather") == "Very Wet").count()

# Print the counts
print(f"Count of 'wet' values: {wet_count}")
print(f"Count of 'Very wet' values: {very_wet_count}")

Count of 'wet' values: 16768
Count of 'Very wet' values: 64


In [136]:
# Replace "Wet" and "Very Wet" with "Wet" in the "Weather" column
df2 = df2.withColumn("Weather", 
                     when(col("Weather") == "Very Wet", "Wet")
                     .when(col("Weather") == "Wet", "Wet")
                     .otherwise(col("Weather")))

In [137]:
wet_count = df2.filter(col("Weather") == "Wet").count()
# Print the counts
print(f"Count of 'wet' values: {wet_count}")

Count of 'wet' values: 16156


In [138]:
# Simplify weather descriptions
df2 = df2.withColumn("Weather", 
                   when(col("Weather").isin("Almost Dry", "Dry - Wet", "Dry - Rain", "Wet - Dry", "Rain Dry", "Dry"), "Dry")
                   .when(col("Weather").isin("Rain Stopped", "Rain Damp", "Light Shower", "Cold/ Rain", "Wet Intermittently", "Very Wet", "S. Wet", "Light Rain", "Damp - Rain", "Cloudy/ Rain", "Windy/ Rain"), "Wet")
                   .when(col("Weather").isin("Showery", "Fine"), "Moderate")
                   .otherwise("Other"))

In [139]:
df2.select("Weather").distinct().show(5)

+--------+
| Weather|
+--------+
|   Other|
|     Dry|
|Moderate|
|     Wet|
+--------+



In [140]:
df2.show(5)

+----+------+-------+-------+------+-------+----------+----+--------------+-----+-------+
|Year| UnqID|Weather|   Time|   Day|  Round|       Dir|Path|          Mode|Count|  Month|
+----+------+-------+-------+------+-------+----------+----+--------------+-----+-------+
|2014|ML0001|    Dry|Morning|Friday|Weekday|Northbound| n/a|Private cycles|    0|January|
|2014|ML0001|    Dry|Morning|Friday|Weekday|Northbound| n/a|Private cycles|   15|January|
|2014|ML0001|    Dry|Morning|Friday|Weekday|Northbound| n/a|Private cycles|   35|January|
|2014|ML0001|    Dry|Morning|Friday|Weekday|Northbound| n/a|Private cycles|   59|January|
|2014|ML0001|    Dry|Morning|Friday|Weekday|Northbound| n/a|Private cycles|   73|January|
+----+------+-------+-------+------+-------+----------+----+--------------+-----+-------+
only showing top 5 rows



In [141]:
# 8. Handle "n/a" values in the "Path" column
df2 = df2.withColumn("Path", 
    when(col("Path") == "n/a", "Not Available")
    .otherwise(col("Path")))

In [143]:
# Show the transformed DataFrame (for verification)
df2.show(10)

+----+------+-------+-------+------+-------+----------+-------------+--------------+-----+-------+
|Year| UnqID|Weather|   Time|   Day|  Round|       Dir|         Path|          Mode|Count|  Month|
+----+------+-------+-------+------+-------+----------+-------------+--------------+-----+-------+
|2014|ML0001|    Dry|Morning|Friday|Weekday|Northbound|Not Available|Private cycles|    0|January|
|2014|ML0001|    Dry|Morning|Friday|Weekday|Northbound|Not Available|Private cycles|   15|January|
|2014|ML0001|    Dry|Morning|Friday|Weekday|Northbound|Not Available|Private cycles|   35|January|
|2014|ML0001|    Dry|Morning|Friday|Weekday|Northbound|Not Available|Private cycles|   59|January|
|2014|ML0001|    Dry|Morning|Friday|Weekday|Northbound|Not Available|Private cycles|   73|January|
|2014|ML0001|    Dry|Morning|Friday|Weekday|Northbound|Not Available|Private cycles|   89|January|
|2014|ML0001|    Dry|Morning|Friday|Weekday|Northbound|Not Available|Private cycles|  151|January|
|2014|ML00

In [144]:
df1.show(10)

+-------+--------------------+--------------+------------------------------+---------+---------------------------------+--------------------+-----------------+------------------+---------+------------+
|Site ID|Location description|       Borough|Functional area for monitoring|Road type|Is it on the strategic CIO panel?|Old site ID (legacy)|Easting (UK Grid)|Northing (UK Grid)| Latitude|   Longitude|
+-------+--------------------+--------------+------------------------------+---------+---------------------------------+--------------------+-----------------+------------------+---------+------------+
| ML0001|Millbank (south o...|   Westminster|                    01 Central|01 A Road|                                1|            CENCY001|        530251.49|         178742.45|51.492628| -0.12520362|
| ML0002|         Bishopsgate|City of London|                    01 Central|01 A Road|                                2|            CENCY002|        533362.68|         181824.45|51.519599| -0.

In [145]:
# Print unique values for each column
for column in df1.columns:
    print(f"Unique values in {column}:")
    df1.select(column).distinct().show()

Unique values in Site ID:
+-------+
|Site ID|
+-------+
| ML0172|
| ML1270|
| ML0720|
| ML0411|
| ML1447|
| ML0646|
| ML1972|
| ML0682|
| ML0733|
| ML1756|
| ML0829|
| ML1159|
| ML1608|
| ML1863|
| ML0365|
| ML0456|
| ML1360|
| ML1471|
| ML1556|
| ML0877|
+-------+
only showing top 20 rows

Unique values in Location description:
+--------------------+
|Location description|
+--------------------+
|      Battersea Rise|
| Harleyford Road (2)|
|Grand Union Canal...|
|Tavistock Place j...|
|        Norwood Road|
|      Northcote Road|
|      Somervell Road|
|      Birdbrook Road|
|       Avenue Elmers|
|Thames Path (Gree...|
|Southwark Bridge ...|
|        Darfield Way|
|         Rope Street|
|Pendennis Road / ...|
|       Emmanuel Road|
|        Rutland Gate|
|         Milton Park|
|      Erpingham Road|
|     Woodstock Drive|
|       Mowbrays Road|
+--------------------+
only showing top 20 rows

Unique values in Borough:
+--------------------+
|             Borough|
+------------------

In [146]:
# 1. Fill "0" value in "Road Type" column with mode value
# Calculate the mode of the "Road type" column
road_type_mode = df1.filter(col("Road type") != "0") \
                    .groupBy("Road type") \
                    .count() \
                    .orderBy("count", ascending=False) \
                    .first()["Road type"]

In [147]:
# Replace "0" with the mode value
df1 = df1.withColumn("Road type", when(col("Road type") == "0", road_type_mode).otherwise(col("Road type")))

In [149]:
# 2. Shorten column names
df1 = df1.withColumnRenamed("Location description", "Location") \
         .withColumnRenamed("Functional area for monitoring", "FunctionalArea") \
         .withColumnRenamed("Is it on the strategic CIO panel?", "StrategicCIO") \
         .withColumnRenamed("Old site ID (legacy)", "OldSiteID") \
         .withColumnRenamed("Easting (UK Grid)", "Easting") \
         .withColumnRenamed("Northing (UK Grid)", "Northing")

In [150]:
df1.show(5)

+-------+--------------------+--------------+--------------+---------+------------+---------+---------+---------+---------+------------+
|Site ID|            Location|       Borough|FunctionalArea|Road type|StrategicCIO|OldSiteID|  Easting| Northing| Latitude|   Longitude|
+-------+--------------------+--------------+--------------+---------+------------+---------+---------+---------+---------+------------+
| ML0001|Millbank (south o...|   Westminster|    01 Central|01 A Road|           1| CENCY001|530251.49|178742.45|51.492628| -0.12520362|
| ML0002|         Bishopsgate|City of London|    01 Central|01 A Road|           2| CENCY002|533362.68|181824.45|51.519599| -0.07925389|
| ML0003|    Southwark Bridge|     Southwark|    01 Central|01 A Road|           3| CENCY003|532334.06|180520.37|51.508123|-0.094550618|
| ML0004|Southwark Bridge ...|     Southwark|    01 Central|01 A Road|           4| CENCY004| 532052.5|179677.64|51.500613|-0.098926959|
| ML0005|       Tooley Street|     Southw

In [151]:
# Select the required columns from df1
df1_selected = df1.select(
    "Site ID",
    "Borough",
    "Road type",
    "Latitude",
    "Longitude",
    "StrategicCIO"
)

# Select the required columns from df2
df2_selected = df2.select(
    "UnqID",
    "Time",
    "Count",
    "Day",
    "Month"
)

# Show the selected columns from df1 and df2 for verification
df1_selected.show()
df2_selected.show()

+-------+--------------+---------+---------+------------+------------+
|Site ID|       Borough|Road type| Latitude|   Longitude|StrategicCIO|
+-------+--------------+---------+---------+------------+------------+
| ML0001|   Westminster|01 A Road|51.492628| -0.12520362|           1|
| ML0002|City of London|01 A Road|51.519599| -0.07925389|           2|
| ML0003|     Southwark|01 A Road|51.508123|-0.094550618|           3|
| ML0004|     Southwark|01 A Road|51.500613|-0.098926959|           4|
| ML0005|     Southwark|01 A Road|  51.5052|-0.084629047|           5|
| ML0006|     Southwark|01 A Road|51.504068|-0.095742803|           6|
| ML0007|       Lambeth|01 A Road|  51.4915| -0.12213838|           7|
| ML0008|   Westminster|01 A Road|51.509779| -0.12311615|           8|
| ML0009|   Westminster|01 A Road|51.510077| -0.12824866|           9|
| ML0010|       Lambeth|01 A Road|51.499213| -0.11696597|          10|
| ML0011|   Westminster|01 A Road|51.504092| -0.12623169|          11|
| ML00

In [154]:
# Merge the two dataframes based on the Site ID and UnqID columns
final_df = df1_selected.join(df2_selected, df1_selected["Site ID"] == df2_selected["UnqID"])

# Drop the Site ID column after merging
final_df = final_df.drop("Site ID")

# Show the merged dataframe
final_df.show(5)

+-----------+---------+---------+-----------+------------+------+-------+-----+------+-------+
|    Borough|Road type| Latitude|  Longitude|StrategicCIO| UnqID|   Time|Count|   Day|  Month|
+-----------+---------+---------+-----------+------------+------+-------+-----+------+-------+
|Westminster|01 A Road|51.492628|-0.12520362|           1|ML0001|Morning|    0|Friday|January|
|Westminster|01 A Road|51.492628|-0.12520362|           1|ML0001|Morning|   15|Friday|January|
|Westminster|01 A Road|51.492628|-0.12520362|           1|ML0001|Morning|   35|Friday|January|
|Westminster|01 A Road|51.492628|-0.12520362|           1|ML0001|Morning|   59|Friday|January|
|Westminster|01 A Road|51.492628|-0.12520362|           1|ML0001|Morning|   73|Friday|January|
+-----------+---------+---------+-----------+------------+------+-------+-----+------+-------+
only showing top 5 rows



In [155]:
final_df.show()

+-----------+---------+---------+-----------+------------+------+-------+-----+------+-------+
|    Borough|Road type| Latitude|  Longitude|StrategicCIO| UnqID|   Time|Count|   Day|  Month|
+-----------+---------+---------+-----------+------------+------+-------+-----+------+-------+
|Westminster|01 A Road|51.492628|-0.12520362|           1|ML0001|Morning|    0|Friday|January|
|Westminster|01 A Road|51.492628|-0.12520362|           1|ML0001|Morning|   15|Friday|January|
|Westminster|01 A Road|51.492628|-0.12520362|           1|ML0001|Morning|   35|Friday|January|
|Westminster|01 A Road|51.492628|-0.12520362|           1|ML0001|Morning|   59|Friday|January|
|Westminster|01 A Road|51.492628|-0.12520362|           1|ML0001|Morning|   73|Friday|January|
|Westminster|01 A Road|51.492628|-0.12520362|           1|ML0001|Morning|   89|Friday|January|
|Westminster|01 A Road|51.492628|-0.12520362|           1|ML0001|Morning|  151|Friday|January|
|Westminster|01 A Road|51.492628|-0.12520362|     