<a href="https://colab.research.google.com/github/vaniamv/final-project-edit/blob/main/full_streaming.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Aplication in Real Time to Read Carris API - group 1 - ETL approach

This notebook documents the steps to implement a data pipeline leveraging Google Cloud Platform (GCP), following an ETL (Extract, Transform, Load) approach. The pipeline processes data in three stages:

Streaming Ingestion and Transformation (Extract and Transform):
Data is ingested in real-time from a bucket that gets the vehicles endpoint of  Carris API. During ingestion, transformations are applied directly to the data stream, such as cleaning, enrichment, and standardization, ensuring that only processed and structured data flows through the pipeline.

Loading Transformed Data:
The pre-processed data is then stored in a silver layer bucket on GCP. This layer serves as a structured repository, optimized for downstream analytical queries and consumption.

By prioritizing the ETL approach, this pipeline ensures that the data is transformed as it is ingested, minimizing the need for post-processing and enabling faster delivery of structured and actionable insights.

---



---



1. Authentication to Google Cloud Platform (GCP)



In [None]:
# autentication to gcloud with login

!gcloud auth application-default login

Go to the following link in your browser, and complete the sign-in prompts:

    https://accounts.google.com/o/oauth2/auth?response_type=code&client_id=764086051850-6qr4p6gpi6hn506pt8ejuq83di341hur.apps.googleusercontent.com&redirect_uri=https%3A%2F%2Fsdk.cloud.google.com%2Fapplicationdefaultauthcode.html&scope=openid+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fuserinfo.email+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcloud-platform+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fsqlservice.login&state=bkX1TEEMxAk1ufk0TNMXLpxTBzMkvh&prompt=consent&token_usage=remote&access_type=offline&code_challenge=Th6LJRA5MLhPhEsNsIjFgdVCpHXQVdLv9MzdbZcQTCU&code_challenge_method=S256

Once finished, enter the verification code provided in your browser: 4/0ASVgi3JmNVNq5TW3c2Urfe3hULxlpQj8BuzpruL0OpAeiyGFxDPccJhNjLv--IhlCKhZxA

Credentials saved to file: [/content/.config/application_default_credentials.json]

These credentials will be used by any library that requests Application Default Credentials (ADC).
Ca

In [None]:
# download connector and save it local

!wget https://repo1.maven.org/maven2/com/google/cloud/bigdataoss/gcs-connector/hadoop3-2.2.7/gcs-connector-hadoop3-2.2.7-shaded.jar -P /usr/local/lib/

--2025-01-22 13:58:52--  https://repo1.maven.org/maven2/com/google/cloud/bigdataoss/gcs-connector/hadoop3-2.2.7/gcs-connector-hadoop3-2.2.7-shaded.jar
Resolving repo1.maven.org (repo1.maven.org)... 199.232.192.209, 199.232.196.209, 2a04:4e42:4c::209, ...
Connecting to repo1.maven.org (repo1.maven.org)|199.232.192.209|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 33831577 (32M) [application/java-archive]
Saving to: ‘/usr/local/lib/gcs-connector-hadoop3-2.2.7-shaded.jar’


2025-01-22 13:58:52 (172 MB/s) - ‘/usr/local/lib/gcs-connector-hadoop3-2.2.7-shaded.jar’ saved [33831577/33831577]





---


---
2. Initialize SparkSession and set up the access to GSC

In [None]:
# import libraries

import os
from pyspark.sql import SparkSession

#spark session
spark = SparkSession.builder \
    .appName('GCS_Spark') \
    .config('spark.jars', '/usr/local/lib/gcs-connector-hadoop3-2.2.7-shaded.jar') \
    .config('spark.hadoop.fs.gs.impl', 'com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem') \
    .getOrCreate()

# save credentials
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = '/content/.config/application_default_credentials.json'

# Config PySpark to access the GCS
spark._jsc.hadoopConfiguration().set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
spark._jsc.hadoopConfiguration().set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
spark._jsc.hadoopConfiguration().set("fs.gs.auth.service.account.enable", "true")
spark._jsc.hadoopConfiguration().set("google.cloud.auth.service.account.json.keyfile", '/content/.config/application_default_credentials.json')



---


---
3. Set up the source schema and initialize the readStream

In [None]:
from pyspark.sql.types import *

# create schema
vehicle_schema = StructType([StructField('bearing', IntegerType(), True),
                             StructField('block_id', StringType(), True),
                             StructField('current_status', StringType(), True),
                             StructField('id', StringType(), True),
                             StructField('lat', FloatType(), True),
                             StructField('line_id', StringType(), True),
                             StructField('lon', FloatType(), True),
                             StructField('pattern_id', StringType(), True),
                             StructField('route_id', StringType(), True),
                             StructField('schedule_relationship', StringType(), True),
                             StructField('shift_id', StringType(), True),
                             StructField('speed', FloatType(), True),
                             StructField('stop_id', StringType(), True),
                             StructField('timestamp', TimestampType(), True),
                             StructField('trip_id', StringType(), True)])


#readStreaming
stream = spark.readStream.format("json").schema(vehicle_schema).load("gs://edit-de-project-streaming-data/carris-vehicles")



---


---
4. Write the stream in a bronze layer (landing zone)
* Purpose: Raw data ingestion layer.
* Data Characteristics: Raw, unprocessed, and schema-on-read where feasible.
* Data Storage: Store data exactly as ingested (in this case JSON format).
* Operations: Minimal transformation; only schema enforcement and deduplication.

In [None]:
df_stops = spark.read.option("header", "true").csv('gs://edit-data-eng-project-group1/LandingZone/GTFS/stops.txt')
df_stops = df_stops.select('stop_id','stop_lat','stop_lon')
df_stops = df_stops.withColumn("stop_lat", df_stops["stop_lat"].cast("float"))
df_stops = df_stops.withColumn("stop_lon", df_stops["stop_lon"].cast("float"))

In [None]:
#select columns
transform = stream.select('id', 'speed', 'timestamp','line_id','route_id','stop_id','lat', 'lon')
# join tables
transform = transform.join(df_stops, on='stop_id', how='left')

In [None]:

from pyspark.sql.functions import col, lag,coalesce, current_timestamp, window
from pyspark.sql.window import Window
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
from pyspark.sql import functions as F

# watermark is necessarary because of the aggregation
transformed = transform.withWatermark("timestamp", "60 seconds")

windowed_transform = transformed.groupBy("id", "stop_id", F.window("timestamp", "2 minutes")).agg(
    F.first(col("lat")).alias("previous_lat"),
    F.first(col("lon")).alias("previous_lon"),
    F.last(col("lat")).alias("lat"),
    F.last(col("lon")).alias("lon"),
    F.last(col("stop_lat")).alias("stop_lat"),
    F.last(col("stop_lon")).alias("stop_lon")
    )

AnalysisException: [MISSING_AGGREGATION] The non-aggregating expression "lat" is based on columns which are not participating in the GROUP BY clause.
Add the columns or the expression to the GROUP BY, aggregate the expression, or use "any_value(lat)" if you do not care which of the values within a group is returned.;
Project [id#12376, stop_id#12385, window#12611-T60000ms, previous_lat#12623, previous_lon#12625, lat#12627, lon#12629, stop_lat#12631, stop_lon#12633]
+- Project [id#12376, stop_id#12385, window#12611-T60000ms, lat#12377, timestamp#12386-T60000ms, lon#12379, stop_lat#12584, stop_lon#12588, previous_lat#12623, previous_lon#12625, lat#12627, lon#12629, stop_lat#12631, stop_lon#12633, previous_lat#12623, previous_lon#12625, lat#12627, lon#12629, stop_lat#12631, stop_lon#12633]
   +- Window [first(lat#12377, false) windowspecdefinition(id#12376, timestamp#12386-T60000ms ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS previous_lat#12623, first(lon#12379, false) windowspecdefinition(id#12376, timestamp#12386-T60000ms ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS previous_lon#12625, last(lat#12377, false) windowspecdefinition(id#12376, timestamp#12386-T60000ms ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS lat#12627, last(lon#12379, false) windowspecdefinition(id#12376, timestamp#12386-T60000ms ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS lon#12629, last(stop_lat#12584, false) windowspecdefinition(id#12376, timestamp#12386-T60000ms ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS stop_lat#12631, last(stop_lon#12588, false) windowspecdefinition(id#12376, timestamp#12386-T60000ms ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS stop_lon#12633], [id#12376], [timestamp#12386-T60000ms ASC NULLS FIRST]
      +- Aggregate [id#12376, stop_id#12385, window#12634-T60000ms], [id#12376, stop_id#12385, window#12634-T60000ms AS window#12611-T60000ms, lat#12377, timestamp#12386-T60000ms, lon#12379, stop_lat#12584, stop_lon#12588]
         +- Project [named_struct(start, knownnullable(precisetimestampconversion(((precisetimestampconversion(timestamp#12386-T60000ms, TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(timestamp#12386-T60000ms, TimestampType, LongType) - 0) % 120000000) < cast(0 as bigint)) THEN (((precisetimestampconversion(timestamp#12386-T60000ms, TimestampType, LongType) - 0) % 120000000) + 120000000) ELSE ((precisetimestampconversion(timestamp#12386-T60000ms, TimestampType, LongType) - 0) % 120000000) END) - 0), LongType, TimestampType)), end, knownnullable(precisetimestampconversion((((precisetimestampconversion(timestamp#12386-T60000ms, TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(timestamp#12386-T60000ms, TimestampType, LongType) - 0) % 120000000) < cast(0 as bigint)) THEN (((precisetimestampconversion(timestamp#12386-T60000ms, TimestampType, LongType) - 0) % 120000000) + 120000000) ELSE ((precisetimestampconversion(timestamp#12386-T60000ms, TimestampType, LongType) - 0) % 120000000) END) - 0) + 120000000), LongType, TimestampType))) AS window#12634-T60000ms, stop_id#12385, id#12376, speed#12384, timestamp#12386-T60000ms, line_id#12378, route_id#12381, lat#12377, lon#12379, stop_lat#12584, stop_lon#12588]
            +- Filter isnotnull(timestamp#12386-T60000ms)
               +- EventTimeWatermark timestamp#12386: timestamp, 1 minutes
                  +- Project [stop_id#12385, id#12376, speed#12384, timestamp#12386, line_id#12378, route_id#12381, lat#12377, lon#12379, stop_lat#12584, stop_lon#12588]
                     +- Join LeftOuter, (stop_id#12385 = stop_id#12420)
                        :- Project [id#12376, speed#12384, timestamp#12386, line_id#12378, route_id#12381, stop_id#12385, lat#12377, lon#12379]
                        :  +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@2e3288b6,json,List(),Some(StructType(StructField(bearing,IntegerType,true),StructField(block_id,StringType,true),StructField(current_status,StringType,true),StructField(id,StringType,true),StructField(lat,FloatType,true),StructField(line_id,StringType,true),StructField(lon,FloatType,true),StructField(pattern_id,StringType,true),StructField(route_id,StringType,true),StructField(schedule_relationship,StringType,true),StructField(shift_id,StringType,true),StructField(speed,FloatType,true),StructField(stop_id,StringType,true),StructField(timestamp,TimestampType,true),StructField(trip_id,StringType,true))),List(),None,Map(path -> gs://edit-de-project-streaming-data/carris-vehicles),None), FileSource[gs://edit-de-project-streaming-data/carris-vehicles], [bearing#12373, block_id#12374, current_status#12375, id#12376, lat#12377, line_id#12378, lon#12379, pattern_id#12380, route_id#12381, schedule_relationship#12382, shift_id#12383, speed#12384, stop_id#12385, timestamp#12386, trip_id#12387]
                        +- Project [stop_id#12420, stop_lat#12584, cast(stop_lon#12425 as float) AS stop_lon#12588]
                           +- Project [stop_id#12420, cast(stop_lat#12424 as float) AS stop_lat#12584, stop_lon#12425]
                              +- Project [stop_id#12420, stop_lat#12424, stop_lon#12425]
                                 +- Relation [stop_id#12420,stop_name#12421,stop_name_new#12422,stop_short_name#12423,stop_lat#12424,stop_lon#12425,operational_status#12426,areas#12427,region_id#12428,region_name#12429,district_id#12430,district_name#12431,municipality_id#12432,municipality_name#12433,parish_id#12434,parish_name#12435,locality#12436,jurisdiction#12437,stop_code#12438,tts_stop_name#12439,platform_code#12440,parent_station#12441,location_type#12442,stop_url#12443,... 56 more fields] csv


In [None]:
from pyspark.sql.functions import col, sqrt, pow

windowed_transform = windowed_transform.withColumn("distance",
      sqrt(
        pow(col("lat") - col("previous_lat"), 2) + pow(col("lon") - col("previous_lon"), 2)
    )* 110 # each degree is ~100km
                                                   )

windowed_transform = windowed_transform.withColumn("distance_to_stop",      sqrt(
        pow(col("lat") - col("stop_lat"), 2) + pow(col("lon") - col("stop_lon"), 2)
    )* 110 # each degree is ~110km
                                                   )

In [None]:
agg = windowed_transform.withColumn('speed', col('distance')/(2/60))

agg = agg.withColumn('time_to_stop', (col('distance_to_stop')/col('speed') * 3600))

agg = agg.withColumn(
    'time_to_stop',
    F.from_unixtime(
        F.unix_timestamp(F.lit('00:00:00'), 'HH:mm:ss') + col('time_to_stop'),
        'HH:mm:ss'
    ))

5. Write Stream

In [None]:
# select folder
folder = 'stream/vehicles6'
gc_folder = 'gs://edit-de-project-streaming-data/datalake/stream/vehicles'


# Output function for each windowed batch
def insert_windowed_vehicles(df, batch_id):
    print(f"Batch ID: {batch_id}")
    df.write.format("parquet").mode("append").save(f"{folder}")


# Write the streaming query with watermark and window
windowed_query = (agg
                  .writeStream
                  .outputMode("append")
                  .foreachBatch(insert_windowed_vehicles)
                  .option('checkpointLocation', f'{folder}/checkpoint')
                  .trigger(processingTime='10 seconds')
                  .start()
)

windowed_query.awaitTermination(30)

False

In [None]:
windowed_query.isActive

True

In [None]:
windowed_query.status

{'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}

In [None]:
windowed_query.recentProgress

[]

In [None]:
windowed_query.stop()

4. Transformations

In [None]:
windowed_query.recentProgress

[]

In [None]:
# Define the path to the Parquet files
parquet_path = "stream/vehicles5"

# Read the Parquet files into a DataFrame
parquet_df = spark.read.parquet(parquet_path)

# Show the first few rows
parquet_df.show(truncate=False)

# Print the schema to understand the data structure
parquet_df.printSchema()

+--------+-------+------------------------------------------+------------+------------+---------+---------+---------+---------+--------------------+--------------------+------------------+------------+
|id      |stop_id|window                                    |previous_lat|previous_lon|lat      |lon      |stop_lat |stop_lon |distance            |distance_to_stop    |speed             |time_to_stop|
+--------+-------+------------------------------------------+------------+------------+---------+---------+---------+---------+--------------------+--------------------+------------------+------------+
|43|2339 |140349 |{2025-01-22 13:18:00, 2025-01-22 13:20:00}|38.637917   |-9.154437   |38.63827 |-9.154015|38.63906 |-9.152929|0.060684412994725434|0.14763666595197703 |1.8205323898417631|00:04:51    |
|41|1423 |171629 |{2025-01-21 13:42:00, 2025-01-21 13:44:00}|38.904125   |-9.416783   |38.904636|-9.414741|38.9048  |-9.413896|0.2316330739949299  |0.09468030391511913 |6.948992219847898 |00:0