## 4. Real‚ÄëTime Processing with Spark Structured Streaming
Launch a Spark session that reads from the Kafka topic and processes the data. This code is placed in another Jupyter cell (or a separate notebook):

In [1]:
#pip install pyspark kafka-python

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, when, unix_timestamp, to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
import requests
import redis
import json

# üî• Initialize Spark with Kafka Support
spark = SparkSession.builder \
    .appName("NYC_Taxi_Streaming") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
    .getOrCreate()

# ‚úÖ Define Schema for Kafka Data (Includes `taxi_type`)
schema = StructType([
    StructField("vendorid", IntegerType()),
    StructField("pickup_datetime", StringType()),
    StructField("dropoff_datetime", StringType()),
    StructField("passenger_count", IntegerType()),
    StructField("trip_distance", DoubleType()),
    StructField("fare_amount", DoubleType()),
    StructField("extra", DoubleType()),
    StructField("mta_tax", DoubleType()),
    StructField("tip_amount", DoubleType()),
    StructField("tolls_amount", DoubleType()),
    StructField("improvement_surcharge", DoubleType()),
    StructField("total_amount", DoubleType()),
    StructField("table_name", StringType()),
    StructField("taxi_type", StringType())  # ‚úÖ Added taxi_type
])

# ‚úÖ Read Data from Kafka
stream_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "nyc_taxi_topic") \
    .option("startingOffsets", "earliest") \
    .load()

# ‚úÖ Parse Kafka JSON Data
parsed_df = stream_df.select(
    from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")

# ‚úÖ Convert `pickup_datetime` and `dropoff_datetime` to timestamp format
parsed_df = parsed_df.withColumn("pickup_datetime", to_timestamp(col("pickup_datetime"), "yyyy-MM-dd HH:mm:ss")) \
                     .withColumn("dropoff_datetime", to_timestamp(col("dropoff_datetime"), "yyyy-MM-dd HH:mm:ss"))

# ‚úÖ Compute Trip Duration (in Minutes)
trip_duration_expr = (unix_timestamp(col("dropoff_datetime")) - unix_timestamp(col("pickup_datetime"))) / 60

parsed_df = parsed_df.withColumn(
    "trip_duration",
    when(trip_duration_expr > 0, trip_duration_expr).otherwise(0.0)  # ‚úÖ Only compute trip_duration once
)

# ‚úÖ Compute Tip Ratio, Toll Ratio, and Fare Per Mile/Passenger
parsed_df = parsed_df.withColumn(
    "tip_ratio", when(col("total_amount") > 0, col("tip_amount") / col("total_amount")).otherwise(0.0)
).withColumn(
    "toll_ratio", when(col("total_amount") > 0, col("tolls_amount") / col("total_amount")).otherwise(0.0)
).withColumn(
    "fare_per_mile", when(col("trip_distance") > 0, col("fare_amount") / col("trip_distance")).otherwise(0.0)
).withColumn(
    "fare_per_passenger", when(col("passenger_count") > 0, col("fare_amount") / col("passenger_count")).otherwise(0.0)
)

# ‚úÖ Compute Duration per Mile & Duration per Passenger
parsed_df = parsed_df.withColumn(
    "duration_per_mile", 
    when((col("trip_distance") > 0) & (col("trip_duration") > 0), col("trip_duration") / col("trip_distance")).otherwise(0.0)
).withColumn(
    "duration_per_passenger", 
    when((col("passenger_count") > 0) & (col("trip_duration") > 0), col("trip_duration") / col("passenger_count")).otherwise(0.0)
)



25/03/22 21:40:59 WARN Utils: Your hostname, Sherines-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.101 instead (on interface en0)
25/03/22 21:40:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /Users/sherinechally/.ivy2/cache
The jars for the packages stored in: /Users/sherinechally/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-5f12a508-8efc-4ca7-ad80-bd774fa17858;1.0
	confs: [default]


:: loading settings :: url = jar:file:/opt/anaconda3/envs/py39/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.0 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
:: resolution report :: resolve 208ms :: artifacts dl 5ms
	:: modules in use:
	com.google.code.findbugs#jsr305;3.0.0 from central in [default]
	commons-logging#commons-logging;1.1.3 from central in [default]
	org.apache.commons#commons-pool2;2.11.1 from central in [default]
	org.apache.hadoop#hadoop-client-api;3.3.4 from central in [default]
	org.apache.hadoop#h

In [3]:
print("üöÄ Available Columns in parsed_df:")
print(parsed_df.columns)  # ‚úÖ Check actual column names


üöÄ Available Columns in parsed_df:
['vendorid', 'pickup_datetime', 'dropoff_datetime', 'passenger_count', 'trip_distance', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount', 'table_name', 'taxi_type', 'trip_duration', 'tip_ratio', 'toll_ratio', 'fare_per_mile', 'fare_per_passenger', 'duration_per_mile', 'duration_per_passenger']


## 4.1 Calling the Predictive Model API and Storing Predictions in Redis
Assume you have a Flask API running that returns fare predictions. Use Spark‚Äôs foreachBatch to call the API and store the prediction in Redis.

In [4]:
import redis
import json

r = redis.StrictRedis(host='localhost', port=6379, decode_responses=True)

def send_to_model(batch_df, batch_id):
    """Processes each batch in Spark Streaming, sends enhanced features to Flask API, and stores predictions in Redis."""
    
    for row in batch_df.toLocalIterator():  # ‚úÖ Process row-by-row
        try:
            payload = {
                "trip_distance": row.trip_distance,
                "passenger_count": row.passenger_count,
                "extra": row.extra,
                "mta_tax": row.mta_tax,
                "tip_amount": row.tip_amount,
                "tolls_amount": row.tolls_amount,
                "improvement_surcharge": row.improvement_surcharge,
                "total_amount": row.total_amount,
                "tip_ratio": row.tip_ratio,
                "toll_ratio": row.toll_ratio,
                "fare_per_mile": row.fare_per_mile,
                "fare_per_passenger": row.fare_per_passenger,
                "duration_per_mile": row.duration_per_mile,
                "duration_per_passenger": row.duration_per_passenger
            }

            response = requests.post("http://localhost:5000/predict", json=payload)

            if response.status_code != 200:
                print(f"‚ùå API Error {response.status_code}: {response.text}")
                print(f"üì§ Sent Payload: {json.dumps(payload, indent=2)}")  # ‚úÖ Debugging
                continue

            prediction = response.json()

            # ‚úÖ Ensure prediction keys exist
            if "fare_prediction" not in prediction or "duration_prediction" not in prediction:
                print(f"‚ùå Invalid API response: {response.text}")
                continue

            key = f"trip:{row.vendorid}:{row.trip_distance}"
            r.set(key, json.dumps(prediction))
            print(f"‚úÖ Stored prediction: {key} -> {prediction}")

        except Exception as e:
            print(f"‚ùå Error processing row: {e}")


# ‚úÖ Apply the Function to Stream
query = parsed_df.writeStream.foreachBatch(send_to_model).start()
query.awaitTermination()


25/03/22 21:41:01 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/4n/68l65j717mxb8994g_2bc9_00000gn/T/temporary-4f67aa93-637a-40dc-a01c-b4245ded4365. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/03/22 21:41:01 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/03/22 21:41:01 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
[Stage 0:>                                                          (0 + 1) / 1]

‚úÖ Stored prediction: trip:2:0.98 -> {'duration_prediction': 0.715811550617218, 'fare_prediction': 10.063321113586426}
‚úÖ Stored prediction: trip:2:5.99 -> {'duration_prediction': -0.06264559179544449, 'fare_prediction': 23.094005584716797}
‚úÖ Stored prediction: trip:2:1.52 -> {'duration_prediction': 2.4805853366851807, 'fare_prediction': 9.90581226348877}
‚úÖ Stored prediction: trip:2:0.0 -> {'duration_prediction': -0.36556288599967957, 'fare_prediction': -0.3430016040802002}
‚úÖ Stored prediction: trip:2:0.0 -> {'duration_prediction': -0.18354009091854095, 'fare_prediction': 1.287048101425171}
‚úÖ Stored prediction: trip:2:2.03 -> {'duration_prediction': 6.108088970184326, 'fare_prediction': 12.468304634094238}
‚úÖ Stored prediction: trip:2:1.01 -> {'duration_prediction': 2.559443712234497, 'fare_prediction': 7.696784496307373}
‚úÖ Stored prediction: trip:2:7.82 -> {'duration_prediction': -2.0602962970733643, 'fare_prediction': 35.03217697143555}
‚úÖ Stored prediction: trip:2:0.48

25/03/22 21:41:13 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


‚úÖ Stored prediction: trip:2:1.69 -> {'duration_prediction': 5.033759117126465, 'fare_prediction': 9.423654556274414}
‚úÖ Stored prediction: trip:2:2.95 -> {'duration_prediction': 15.271123886108398, 'fare_prediction': 16.888212203979492}
‚úÖ Stored prediction: trip:2:6.24 -> {'duration_prediction': 13.851920127868652, 'fare_prediction': 26.931556701660156}
‚úÖ Stored prediction: trip:2:0.71 -> {'duration_prediction': 3.955986976623535, 'fare_prediction': 6.215360641479492}
‚úÖ Stored prediction: trip:2:0.0 -> {'duration_prediction': 2.584059000015259, 'fare_prediction': 3.7049508094787598}
‚úÖ Stored prediction: trip:1:0.0 -> {'duration_prediction': 26.126041412353516, 'fare_prediction': 37.340110778808594}
‚úÖ Stored prediction: trip:2:11.79 -> {'duration_prediction': 29.412700653076172, 'fare_prediction': 45.13197326660156}
‚úÖ Stored prediction: trip:1:2.8 -> {'duration_prediction': 14.47610855102539, 'fare_prediction': 17.027189254760742}
‚úÖ Stored prediction: trip:2:0.89 -> {'d

25/03/23 03:59:13 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 135290 ms exceeds timeout 120000 ms
25/03/23 03:59:13 WARN SparkContext: Killing executors is not supported by current scheduler.
25/03/23 03:59:17 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:642)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1223)
	at o

Py4JError: An error occurred while calling o122.awaitTermination