## CONSUMER

This code represents the consumer, It is the component that processes real-time data received from the Kafka topic. It implements an architecture based on Spark for data stream processing and MongoDB for saving the processed information, also integrating HTTP requests to enrich the data with additional information about the stations.

Initially, the script makes an HTTP request to retrieve a complete set of bike-sharing station data from a JSON endpoint. Using the Spark Structured Streaming APIs, messages are then read in streaming from the topic, transformed into a pandas DataFrame, which is then passed to the save_to_mongodb function for each data batch.

A key function, get_station_info, allows extracting specific details of each station based on the ID, using the previously obtained DataFrame. This step is crucial for enriching the received Kafka messages with contextual information such as the station's capacity, short name, and full name.

Furthermore, to ensure efficient data storage, the script implements a function, ensure_ttl_index, which creates a TTL (Time-To-Live) index on a MongoDB collection. This index ensures the automatic deletion of documents after a short period, optimizing disk space management and keeping the data relevant and up-to-date.

Finally, the execution of the streaming is managed to remain active indefinitely, until forced termination or the occurrence of an exception, at which point errors are handled, and the Spark session is properly terminated to prevent resource loss.

In [1]:
import pandas as pd
import findspark
import json
import time
import requests
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json
from pymongo import MongoClient
from datetime import datetime, timedelta
import pytz

In [None]:
import pandas as pd
import findspark
import json
import time
import requests
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json
from pymongo import MongoClient
from datetime import datetime, timedelta
import pytz

# Recupero le informazioni sulle stazioni
url2 = 'https://gbfs.capitalbikeshare.com/gbfs/en/station_information.json'
response2 = requests.get(url2)
data2 = response2.json()
df_stations = pd.DataFrame(data2['data']['stations'])

def get_station_info(station_id):
    station_info = df_stations[df_stations['station_id'] == station_id]
    if not station_info.empty:
        return station_info.iloc[0].to_dict()
    return None

def ensure_ttl_index(collection, field="expire_at", ttl_seconds=120):
    """
    Assicura che l'indice TTL sia stato creato sulla collezione per il campo specificato.
    Se l'indice non esiste, lo crea con il TTL specificato.
    """
    indexes = collection.index_information()
    if not any(index.get('expireAfterSeconds') == ttl_seconds for index in indexes.values()):
        collection.create_index([(field, 1)], expireAfterSeconds=ttl_seconds)

def save_to_mongodb(batch_df, batch_id):
    client = MongoClient(mongo_uri)
    db = client[mongo_db]
    collection = db[mongo_collection]
    
    
    ensure_ttl_index(collection, "expire_at", 120)  # 120 secondi = 2 minuti

    documents = []

    with open('station_id.txt', 'r') as f:
        station_ids = [line.strip() for line in f]

    for row in batch_df.collect():
        data = row.asDict()
        message = json.loads(data["message"])
        station_id = message["station_id"]
        if station_id in station_ids:
            # Impostazioni per il fuso orario e il formato data/ora
            fuso_orario_washington = pytz.timezone("America/New_York")
            timestamp2 = datetime.now(fuso_orario_washington)
            dt_iso_wdc = timestamp2.strftime("%Y-%m-%d %H:%M:%S")
            data_washington = timestamp2.strftime("%Y-%m-%d")
            ora_washington = timestamp2.strftime("%H")
            
            # Creazione del timestamp di scadenza per il TTL
            expire_at = datetime.utcnow() + timedelta(minutes=2)
            
            station_info = get_station_info(station_id)

            
            if station_info:
                capacity = station_info["capacity"]
                short_name = station_info["short_name"]
                name = station_info["name"]
                total_bikes_available = message["num_bikes_available"] + message["num_ebikes_available"]

                # Calcolo del numero di partenze
                departures = capacity - total_bikes_available

                # Creazione del documento con il nuovo campo TTL, inclusione di capacity e calcolo delle partenze
                values = {
                    "metadata": {
                        "station_id": station_id,
                        "short_name": short_name,
                        "name": name,
                        "capacity": capacity,
                        "dt_iso_wdc": dt_iso_wdc,
                        "date": data_washington,
                        "ora": ora_washington
                    },
                    "timestamp": datetime.utcnow(),
                    "expire_at": expire_at,
                    "total_bikes_available": total_bikes_available,
                    "departures": departures  # Aggiunta del campo departures
                }
                documents.append(values)

    if documents:
        collection.insert_many(documents)
    client.close()

if __name__ == "__main__":
    findspark.init()
    mongo_uri = "mongodb://mongoadmin:secret@localhost:27017/"
    mongo_db = "sensor_data"
    mongo_collection = "sensor_data"

    try:
        spark = SparkSession \
            .builder \
            .appName("Bike_Share_Data_Processing") \
            .config("spark.mongodb.input.uri", mongo_uri) \
            .config("spark.mongodb.output.uri", mongo_uri) \
            .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2") \
            .getOrCreate()

        sc = spark.sparkContext
        sc.setLogLevel('ERROR')

        df = spark \
            .readStream \
            .format("kafka") \
            .option("kafka.bootstrap.servers", "localhost:9092") \
            .option("subscribe", "sensor_data") \
            .option("startingOffsets", "latest") \
            .load() \
            .selectExpr("CAST(value AS STRING) as message")

        query = df \
            .writeStream \
            .foreachBatch(save_to_mongodb) \
            .outputMode("append") \
            .start()

        query.awaitTermination()
    except Exception as ex:
        print(f"ERRORE: {str(ex)}")


24/03/02 17:52:19 WARN Utils: Your hostname, MacBook-Pro-di-Giuseppe.local resolves to a loopback address: 127.0.0.1; using 192.168.200.186 instead (on interface en0)
24/03/02 17:52:19 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/Users/panda/mambaforge/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/panda/.ivy2/cache
The jars for the packages stored in: /Users/panda/.ivy2/jars
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-918536bf-3846-47cf-b400-b76c747812d5;1.0
	confs: [default]
	found org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 in central
	found org.mongodb#mongodb-driver-sync;4.0.5 in central
	found org.mongodb#bson;4.0.5 in central
	found org.mongodb#mongodb-driver-core;4.0.5 in central
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.1.2 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.1.2 in central
	found org.apache.kafka#kafka-clients;2.6.0 in central
	found com.github.luben#zstd-jni;1.4.8-1 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.2 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spar