<a href="https://colab.research.google.com/github/joanadecaa1/data_processing/blob/main/spark_streaming/examples/example_3_api_json.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Setting up PySpark

In [13]:
%pip install pyspark



In [14]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master('local').appName('Test streaming').getOrCreate()
sc = spark.sparkContext

In [15]:
!rm -rf /content/landing
!rm -rf /content/bronze
!mkdir -p /content/landing

## Simulate producer:
- extract data from API
- store data as json in the lake
- run task async

In [16]:
import requests
from pyspark.sql.types import *
import json
import datetime
import asyncio

async def ingest_from_api(url: str, table: str, schema: StructType = None):
  response = requests.get(url)
  timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
  if response.status_code == 200: # este número é quando funciona. se aparecer 400 quer dizer que deu erro
    data = response.json()
    with open(f"/content/landing/{table}_{int(timestamp)}.json", "w") as f:
        json.dump(data, f)

async def producer(loop: int, interval_time: int): #este é o loop para extrair os dados
  for i in range(loop):
    await ingest_from_api("https://api.carrismetropolitana.pt/vehicles", "vehicles")
    await ingest_from_api("https://api.carrismetropolitana.pt/lines", "lines")
    await asyncio.sleep(interval_time)  #isto serve para não bloquear

async def main():
  asyncio.create_task(producer(10, 30)) #vou chamar 10 vezes cada uma das apis e com 30 segundos de intervalo entre cada chamadada

await main()

- Read from /content/landing as streaming
- store data in memory (for testing)
- store data in the bronze layer

In [17]:
from pyspark.sql.types import *

vehicle_schema = StructType([StructField('bearing', IntegerType(), True),
                             StructField('block_id', StringType(), True),
                             StructField('current_status', StringType(), True),
                             StructField('id', StringType(), True),
                             StructField('lat', FloatType(), True),
                             StructField('line_id', StringType(), True),
                             StructField('lon', FloatType(), True),
                             StructField('pattern_id', StringType(), True),
                             StructField('route_id', StringType(), True),
                             StructField('schedule_relationship', StringType(), True),
                             StructField('shift_id', StringType(), True),
                             StructField('speed', FloatType(), True),
                             StructField('stop_id', StringType(), True),
                             StructField('timestamp', TimestampType(), True),
                             StructField('trip_id', StringType(), True)])

stream = spark.readStream.format("json").schema(vehicle_schema).load("/content/landing/vehicles*")

dedup = stream.dropDuplicates()

In [18]:
dedup.printSchema()

root
 |-- bearing: integer (nullable = true)
 |-- block_id: string (nullable = true)
 |-- current_status: string (nullable = true)
 |-- id: string (nullable = true)
 |-- lat: float (nullable = true)
 |-- line_id: string (nullable = true)
 |-- lon: float (nullable = true)
 |-- pattern_id: string (nullable = true)
 |-- route_id: string (nullable = true)
 |-- schedule_relationship: string (nullable = true)
 |-- shift_id: string (nullable = true)
 |-- speed: float (nullable = true)
 |-- stop_id: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- trip_id: string (nullable = true)



In [25]:
dedup.isStreaming

True

In [26]:
# using memory for testing
try:
  if query.isActive:
    query.stop()
except:
  pass

query = (dedup.writeStream.format("memory").option("queryName", "vehicles").start())

In [34]:
query.status

{'message': 'Waiting for data to arrive',
 'isDataAvailable': False,
 'isTriggerActive': False}

In [35]:
query.isActive

True

In [36]:
query.lastProgress

{'id': '7f1edb04-0fd6-4f2d-8ff3-a5d3e018a5ef',
 'runId': '4b57cb95-6da9-4ec8-9967-dd20426e8281',
 'name': 'vehicles',
 'timestamp': '2024-11-30T09:59:57.066Z',
 'batchId': 1,
 'numInputRows': 0,
 'inputRowsPerSecond': 0.0,
 'processedRowsPerSecond': 0.0,
 'durationMs': {'latestOffset': 5, 'triggerExecution': 5},
 'stateOperators': [{'operatorName': 'dedupe',
   'numRowsTotal': 3689,
   'numRowsUpdated': 0,
   'allUpdatesTimeMs': 3399,
   'numRowsRemoved': 0,
   'allRemovalsTimeMs': 0,
   'commitTimeMs': 5442,
   'memoryUsedBytes': 1615296,
   'numRowsDroppedByWatermark': 0,
   'numShufflePartitions': 200,
   'numStateStoreInstances': 200,
   'customMetrics': {'loadedMapCacheHitCount': 0,
    'loadedMapCacheMissCount': 0,
    'numDroppedDuplicateRows': 821,
    'stateOnCurrentVersionSizeBytes': 1586496}}],
 'sources': [{'description': 'FileStreamSource[file:/content/landing/vehicles*]',
   'startOffset': {'logOffset': 0},
   'endOffset': {'logOffset': 0},
   'latestOffset': None,
   'nu

In [37]:
query.recentProgress

[{'id': '7f1edb04-0fd6-4f2d-8ff3-a5d3e018a5ef',
  'runId': '4b57cb95-6da9-4ec8-9967-dd20426e8281',
  'name': 'vehicles',
  'timestamp': '2024-11-30T09:56:03.788Z',
  'batchId': 0,
  'numInputRows': 4510,
  'inputRowsPerSecond': 0.0,
  'processedRowsPerSecond': 342.4969623329283,
  'durationMs': {'addBatch': 12926,
   'commitOffsets': 81,
   'getBatch': 24,
   'latestOffset': 57,
   'queryPlanning': 30,
   'triggerExecution': 13168,
   'walCommit': 48},
  'stateOperators': [{'operatorName': 'dedupe',
    'numRowsTotal': 3689,
    'numRowsUpdated': 3689,
    'allUpdatesTimeMs': 3399,
    'numRowsRemoved': 0,
    'allRemovalsTimeMs': 0,
    'commitTimeMs': 5442,
    'memoryUsedBytes': 1615296,
    'numRowsDroppedByWatermark': 0,
    'numShufflePartitions': 200,
    'numStateStoreInstances': 200,
    'customMetrics': {'loadedMapCacheHitCount': 0,
     'loadedMapCacheMissCount': 0,
     'numDroppedDuplicateRows': 821,
     'stateOnCurrentVersionSizeBytes': 1586496}}],
  'sources': [{'descri

In [32]:
ls /content/landing/


lines_20241130095048.json  lines_20241130095420.json     vehicles_20241130095249.json
lines_20241130095050.json  lines_20241130095451.json     vehicles_20241130095319.json
lines_20241130095118.json  lines_20241130095521.json     vehicles_20241130095350.json
lines_20241130095148.json  vehicles_20241130095047.json  vehicles_20241130095420.json
lines_20241130095219.json  vehicles_20241130095050.json  vehicles_20241130095451.json
lines_20241130095249.json  vehicles_20241130095118.json  vehicles_20241130095521.json
lines_20241130095320.json  vehicles_20241130095148.json
lines_20241130095350.json  vehicles_20241130095219.json


In [29]:
spark.sql("select * from vehicles").show()

+-------+--------------+--------------+-------+---------+-------+---------+----------+--------+---------------------+-----------+---------+-------+-------------------+--------------------+
|bearing|      block_id|current_status|     id|      lat|line_id|      lon|pattern_id|route_id|schedule_relationship|   shift_id|    speed|stop_id|          timestamp|             trip_id|
+-------+--------------+--------------+-------+---------+-------+---------+----------+--------+---------------------+-----------+---------+-------+-------------------+--------------------+
|      0|       4703-21|    STOPPED_AT|42|2504| 38.79252|   2750|-9.173625|  2750_0_2|  2750_0|            SCHEDULED|       4703|      0.0| 110577|2024-11-30 09:50:55|2750_0_2|2|1|0940...|
|      4|ESC_SAB_ES1051| IN_TRANSIT_TO|43|2369|38.569008|   3105|-9.086332|  3105_0_1|  3105_0|            SCHEDULED|     ES1052|7.7777777| 140403|2024-11-30 09:50:58|3105_0_1_0930_095...|
|    174|ESC_SAB_ES1018| IN_TRANSIT_TO|43|2364| 38.6129

In [38]:
!rm -rf /content/bronze

In [39]:
from pyspark.sql.functions import *

# watermark is necessary because of the aggregation
transformed = stream.withWatermark("timestamp", "60 seconds")
agg = (transformed
       .groupBy(window(transformed.timestamp, "5 minutes"), col("current_status"))
       .agg(min(transformed.timestamp).alias("init_timestamp"), count("*").alias("count")))

def insert_vehicles(df, batch_id):
  #df2 = df.groupBy("window").pivot("current_status").sum("count")
  df.write.format("parquet").mode("append").save("/content/bronze/vehicles")

# using memory for testing
query2 = (agg
          .writeStream
          .outputMode("append")
          .foreachBatch(insert_vehicles)
          .option("checkpointLocation", "/content/bronze/checkpoint")
          .trigger(processingTime='20 seconds')
          .start())

In [41]:
query2.status

{'message': 'No new data but cleaning up state',
 'isDataAvailable': False,
 'isTriggerActive': True}

In [42]:
spark.read.format("parquet").load("/content/bronze/vehicles/*").show(100, False)

+------------------------------------------+--------------+-------------------+-----+
|window                                    |current_status|init_timestamp     |count|
+------------------------------------------+--------------+-------------------+-----+
|{2024-11-30 09:45:00, 2024-11-30 09:50:00}|IN_TRANSIT_TO |2024-11-30 09:49:21|38   |
|{2024-11-30 09:45:00, 2024-11-30 09:50:00}|INCOMING_AT   |2024-11-30 09:49:14|5    |
|{2024-11-30 09:45:00, 2024-11-30 09:50:00}|STOPPED_AT    |2024-11-30 09:49:18|40   |
+------------------------------------------+--------------+-------------------+-----+



## Report
- show vehicles by status in 5-min window time
- one line per window time

In [43]:
def pivot_data(df: DataFrame):
  result = df.orderBy("init_timestamp").groupBy("window").pivot("current_status").sum("count")
  result.show(100, False)

df = spark.read.format("parquet").load("/content/bronze/vehicles/*")
pivot_data(df)

+------------------------------------------+-----------+-------------+----------+
|window                                    |INCOMING_AT|IN_TRANSIT_TO|STOPPED_AT|
+------------------------------------------+-----------+-------------+----------+
|{2024-11-30 09:45:00, 2024-11-30 09:50:00}|5          |38           |40        |
+------------------------------------------+-----------+-------------+----------+



In [None]:
query2.stop()

ERROR:py4j.clientserver:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 617, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 120, in call
    raise e
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/utils.py", line 117, in call
    self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
  File "<ipython-input-48-9446b2336b0e>", line 11, in insert_vehicles
    df.write.format("parquet").mode("append").save("/content/bronze/vehicles")
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/readwriter.py", line 1463, in save
    self._jwrite.save(path)
  File "/usr/local/lib/python3.10/dist-packages/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
  File "/usr/local/lib/python3.10/dist-packages/pysp