In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import types

In [2]:
spark = SparkSession.builder \
    .appName("Data_Transformation") \
    .config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
    .config("spark.jars", "gcs-connector-hadoop3-latest.jar,spark-bigquery-latest_2.12.jar") \
    .config("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", "/home/chenchen/.gc/my-creds.json") \
    .getOrCreate()

25/03/22 09:22:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
carpark_info = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("gs://de-zoomcamp-project-453801-terra-bucket/carpark_info/CarparkInformation.csv") 

                                                                                

In [5]:
carpark_info.show(5)

+---+-----------+--------------------+----------+----------+--------------------+----------------------+------------------+--------------------+-------------+--------------+-------------+-----------------+
|_id|car_park_no|             address|   x_coord|   y_coord|       car_park_type|type_of_parking_system|short_term_parking|        free_parking|night_parking|car_park_decks|gantry_height|car_park_basement|
+---+-----------+--------------------+----------+----------+--------------------+----------------------+------------------+--------------------+-------------+--------------+-------------+-----------------+
|  1|        ACB|BLK 270/271 ALBER...|30314.7936|31490.4942|   BASEMENT CAR PARK|    ELECTRONIC PARKING|         WHOLE DAY|                  NO|          YES|             1|          1.8|                Y|
|  2|        ACM|BLK 98A ALJUNIED ...|33758.4143|33695.5198|MULTI-STOREY CAR ...|    ELECTRONIC PARKING|         WHOLE DAY|SUN & PH FR 7AM-1...|          YES|             5|   

In [7]:
carpark_details = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("gs://de-zoomcamp-project-453801-terra-bucket/carpark_data/*.csv") 

                                                                                

In [8]:
carpark_details.show(5) 

+---------------+-------------+-------------------+--------------+-------------------+-------------------+
|info_total_lots|info_lot_type|info_lots_available|carpark_number|    update_datetime|          timestamp|
+---------------+-------------+-------------------+--------------+-------------------+-------------------+
|            105|            C|                  0|          HE12|2023-01-01 08:58:22|2023-01-01 00:59:27|
|            583|            C|                435|           HLM|2023-01-01 08:58:34|2023-01-01 00:59:27|
|            329|            C|                 83|           RHM|2023-01-01 08:58:22|2023-01-01 00:59:27|
|             97|            C|                 59|          BM29|2023-01-01 08:58:39|2023-01-01 00:59:27|
|             96|            C|                 35|           Q81|2023-01-01 08:58:18|2023-01-01 00:59:27|
+---------------+-------------+-------------------+--------------+-------------------+-------------------+
only showing top 5 rows



In [16]:
from pyspark.sql.functions import col, expr, to_timestamp, date_format, trunc, lit, avg 
from pyspark.sql import functions as F
from pyspark.sql.types import DateType

carpark_details = carpark_details.withColumn("update_datetime", to_timestamp(col("update_datetime")))\
                                 .withColumn("timestamp", to_timestamp(col("timestamp")))
carpark_details.show(5) 

+---------------+-------------+-------------------+--------------+-------------------+-------------------+
|info_total_lots|info_lot_type|info_lots_available|carpark_number|    update_datetime|          timestamp|
+---------------+-------------+-------------------+--------------+-------------------+-------------------+
|            105|            C|                  0|          HE12|2023-01-01 08:58:22|2023-01-01 00:59:27|
|            583|            C|                435|           HLM|2023-01-01 08:58:34|2023-01-01 00:59:27|
|            329|            C|                 83|           RHM|2023-01-01 08:58:22|2023-01-01 00:59:27|
|             97|            C|                 59|          BM29|2023-01-01 08:58:39|2023-01-01 00:59:27|
|             96|            C|                 35|           Q81|2023-01-01 08:58:18|2023-01-01 00:59:27|
+---------------+-------------+-------------------+--------------+-------------------+-------------------+
only showing top 5 rows



In [11]:
# Join carpark_info with carpark_details
carpark_df = carpark_details.join(carpark_info, carpark_details.carpark_number == carpark_info.car_park_no, "left")
carpark_df.show(5) 

+---------------+-------------+-------------------+--------------+-------------------+-------------------+----+-----------+-------+-------+-------+-------------+----------------------+------------------+------------+-------------+--------------+-------------+-----------------+
|info_total_lots|info_lot_type|info_lots_available|carpark_number|    update_datetime|          timestamp| _id|car_park_no|address|x_coord|y_coord|car_park_type|type_of_parking_system|short_term_parking|free_parking|night_parking|car_park_decks|gantry_height|car_park_basement|
+---------------+-------------+-------------------+--------------+-------------------+-------------------+----+-----------+-------+-------+-------+-------------+----------------------+------------------+------------+-------------+--------------+-------------+-----------------+
|            105|            C|                  0|          HE12|2023-01-01 08:58:22|2023-01-01 00:59:27|null|       null|   null|   null|   null|         null|     

In [12]:
# Compute utilization rate
carpark_df = carpark_df.withColumn(
    "utilization_rate",
    expr("(info_total_lots - info_lots_available) / info_total_lots * 100")
)

In [17]:
carpark_df = carpark_df.withColumn("year_month", date_format(trunc(col("timestamp"), "MM"), "yyyy-MM-01"))\
                               .withColumn("day_of_week", date_format(col("timestamp"), "EEEE"))



In [18]:
carpark_df.show(5)

+---------------+-------------+-------------------+--------------+-------------------+-------------------+----+-----------+-------+-------+-------+-------------+----------------------+------------------+------------+-------------+--------------+-------------+-----------------+------------------+----+-----------+-------+----------+
|info_total_lots|info_lot_type|info_lots_available|carpark_number|    update_datetime|          timestamp| _id|car_park_no|address|x_coord|y_coord|car_park_type|type_of_parking_system|short_term_parking|free_parking|night_parking|car_park_decks|gantry_height|car_park_basement|  utilization_rate|hour|day_of_week|  month|year_month|
+---------------+-------------+-------------------+--------------+-------------------+-------------------+----+-----------+-------+-------+-------+-------------+----------------------+------------------+------------+-------------+--------------+-------------+-----------------+------------------+----+-----------+-------+----------+
|

In [22]:
# Compute average utilization rate per region and time period
utilization_by_region = carpark_df.groupBy("car_park_type","carpark_number", "year_month", "day_of_week")\
                                      .agg(avg("utilization_rate").alias("avg_utilization"))

# Show results
utilization_by_region.show(5)



+--------------------+--------------+----------+-----------+-----------------+
|       car_park_type|carpark_number|year_month|day_of_week|  avg_utilization|
+--------------------+--------------+----------+-----------+-----------------+
|    SURFACE CAR PARK|           A27|2023-01-01|     Sunday| 49.9601593625498|
|    SURFACE CAR PARK|           A33|2023-01-01|  Wednesday|            100.0|
|MULTI-STOREY CAR ...|           A76|2023-01-01|   Thursday|18.30684596577017|
|    SURFACE CAR PARK|           A29|2023-02-01|  Wednesday|37.93103448275862|
|    SURFACE CAR PARK|           A28|2023-02-01|   Saturday|85.41666666666667|
+--------------------+--------------+----------+-----------+-----------------+
only showing top 5 rows



                                                                                

In [25]:
from pyspark.sql.functions import to_date

# Convert year_month to DATE type
utilization_by_region = utilization_by_region.withColumn("year_month", to_date(col("year_month"), "yyyy-MM-dd"))


utilization_by_region.write \
    .format("bigquery") \
    .option("temporaryGcsBucket", "de-zoomcamp-project-453801-terra-bucket") \
    .option("table", "de-zoomcamp-project-453801.demo_dataset.utilization_by_region") \
    .option("partitionField", "year_month") \
    .mode("overwrite") \
    .save()

                                                                                

In [26]:
carpark_details.write \
    .format("bigquery") \
    .option("temporaryGcsBucket", "de-zoomcamp-project-453801-terra-bucket") \
    .option("table", "de-zoomcamp-project-453801.demo_dataset.carpark_details") \
    .option("partitionField", "timestamp") \
    .mode("overwrite") \
    .save()

                                                                                

In [27]:
carpark_df.write \
    .format("bigquery") \
    .option("temporaryGcsBucket", "de-zoomcamp-project-453801-terra-bucket") \
    .option("table", "de-zoomcamp-project-453801.demo_dataset.carpark_data") \
    .option("partitionField", "timestamp") \
    .mode("overwrite") \
    .save()

                                                                                