In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext

In [3]:
credentials_location = '/home/vamsi/Desktop/data-engg/Yelp_project/terraform/keys/mycreds.json'

conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('test') \
    .set("spark.jars", "./lib/gcs-connector-hadoop3-2.2.5.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location)

In [4]:
sc = SparkContext(conf=conf)

hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", credentials_location)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

25/02/19 12:22:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [5]:
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

In [6]:
from pyspark.sql import types

In [7]:
schema = types.StructType([
    types.StructField('Wave', types.StringType(), True), 
    types.StructField('SiteID', types.StringType(), True), 
    types.StructField('Date', types.StringType(), True), 
    types.StructField('Weather', types.StringType(), True), 
    types.StructField('Time', types.StringType(), True), 
    types.StructField('Day', types.StringType(), True), 
    types.StructField('Round', types.StringType(), True), 
    types.StructField('Direction', types.StringType(), True), 
    types.StructField('Path', types.StringType(), True), 
    types.StructField('Mode', types.StringType(), True), 
    types.StructField('Count', types.LongType(), True)
])

In [8]:
df_2023 = spark.read.csv('gs://yelp-de-project-451206-bucket/cyclingdata_2023/*', header=True, schema=schema)

In [11]:
from pyspark.sql import functions as F

In [12]:
df_2023 = df_2023.withColumn('Year', F.lit('2023'))

In [9]:
df_2024 = spark.read.csv('gs://yelp-de-project-451206-bucket/cyclingdata_2024/*', header=True, schema=schema)

In [14]:
df_2024 = df_2024.withColumn('Year', F.lit('2024'))

In [15]:
df_2024.show()

[Stage 2:>                                                          (0 + 1) / 1]

+--------------+------+----------+-------+--------+-------+-----+----------+-----------+-----------+-----+----+
|          Wave|SiteID|      Date|Weather|    Time|    Day|Round| Direction|       Path|       Mode|Count|Year|
+--------------+------+----------+-------+--------+-------+-----+----------+-----------+-----------+-----+----+
|2024 W1 spring|ML0001|16/07/2024|    Dry|06:00:00|Weekday|    A|Northbound|Carriageway|Cargo bikes|    0|2024|
|2024 W1 spring|ML0001|16/07/2024|    Dry|06:15:00|Weekday|    A|Northbound|Carriageway|Cargo bikes|    0|2024|
|2024 W1 spring|ML0001|16/07/2024|    Dry|06:30:00|Weekday|    A|Northbound|Carriageway|Cargo bikes|    0|2024|
|2024 W1 spring|ML0001|16/07/2024|    Dry|06:45:00|Weekday|    A|Northbound|Carriageway|Cargo bikes|    0|2024|
|2024 W1 spring|ML0001|16/07/2024|    Dry|07:00:00|Weekday|    A|Northbound|Carriageway|Cargo bikes|    0|2024|
|2024 W1 spring|ML0001|16/07/2024|    Dry|07:15:00|Weekday|    A|Northbound|Carriageway|Cargo bikes|    

                                                                                

In [17]:
df_all = df_2023.unionAll(df_2024)

In [20]:
df_all.count()

                                                                                

4705344

In [21]:
df_all.createOrReplaceTempView("cycling_data")

In [22]:
df_result = spark.sql('''
SELECT
    Year,
    SiteID,
    Weather,
    Direction,
    Path,
    Mode,
    SUM(Count) AS Total_Count,  -- Total traffic count
    AVG(Count) AS Avg_Count,    -- Average traffic count per record
    MAX(Count) AS Max_Count,    -- Maximum traffic count recorded
    MIN(Count) AS Min_Count     -- Minimum traffic count recorded
FROM cycling_data
GROUP BY Year, SiteID, Weather, Direction, Path, Mode
ORDER BY Year, SiteID
''')

In [23]:
df_result.show()

[Stage 9:>                                                          (0 + 4) / 4]

+----+------+-------+----------+--------------------+-------------------+-----------+---------+---------+---------+
|Year|SiteID|Weather| Direction|                Path|               Mode|Total_Count|Avg_Count|Max_Count|Min_Count|
+----+------+-------+----------+--------------------+-------------------+-----------+---------+---------+---------+
|2023|ML0001|    Dry|Southbound|Cycle lane - Sout...|        Pedestrians|          2|  0.03125|        1|        0|
|2023|ML0001|    Dry|Southbound|Pavement - Southb...|        Pedestrians|       1333|20.828125|       51|        2|
|2023|ML0001|    Dry|Southbound|Cycle lane - Sout...|         E-scooters|         21| 0.328125|        3|        0|
|2023|ML0001|    Dry|Southbound|Cycle lane - Nort...|        Cargo bikes|          0|      0.0|        0|        0|
|2023|ML0001|    Dry|Southbound|         Carriageway|         E-scooters|          1| 0.015625|        1|        0|
|2023|ML0001|    Dry|Northbound|Pavement - Southb...|Conventional cycles

                                                                                