In [1]:
import json
import time
import pandas as pd 

from kafka import KafkaProducer

def json_serializer(data):
    return json.dumps(data).encode('utf-8')

server = 'redpanda-1:29092'

producer = KafkaProducer(
    bootstrap_servers=[server],
    value_serializer=json_serializer
)

producer.bootstrap_connected()
# Check if the producer is connected
connected = producer.bootstrap_connected()
print("Producer connected:", connected)

Producer connected: True


In [None]:
df_green = pd.read_csv('https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2019-10.csv.gz', 
                       compression='gzip')


In [None]:
df_green.head(5)

In [None]:
topic_name = 'green-trips'

df_green = df_green.head(5)

t0 = time.time()

for row in df_green.itertuples(index=False):
    row_dict = {col: getattr(row, col) for col in row._fields}
    #print(row_dict)
    
    message = {
        'lpep_pickup_datetime': row_dict['lpep_pickup_datetime'],
        'lpep_dropoff_datetime': row_dict['lpep_dropoff_datetime'],
        'PULocationID': row_dict['PULocationID'],
        'DOLocationID': row_dict['DOLocationID'],
        'passenger_count': row_dict['passenger_count'],
        'trip_distance': row_dict['trip_distance'],
        'tip_amount': row_dict['tip_amount']
    }
    producer.send(topic_name, value=message)
    print(f"Sent: {message}")
    
t1 = time.time()

print(f'took {(t0 - t1):.2f} seconds')
    

In [None]:
import pyspark
pyspark.__file__

In [27]:
import pyspark
from pyspark.sql import SparkSession

pyspark_version = pyspark.__version__
kafka_jar_package = f"org.apache.spark:spark-sql-kafka-0-10_2.12:{pyspark_version}"

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("GreenTripsConsumer") \
    .config("spark.jars.packages", kafka_jar_package) \
    .getOrCreate()


In [28]:
green_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "redpanda-1:29092") \
    .option("subscribe", "green-trips") \
    .option("startingOffsets", "earliest") \
    .load()

In [None]:
green_stream.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [15]:
print(green_stream)

DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]


In [30]:
def peek(mini_batch, batch_id):
    first_row = mini_batch.take(1)
    
    if first_row:
        print(first_row[0]) 
        
query = green_stream.writeStream.foreachBatch(peek).start()

In [31]:
from pyspark.sql import types
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType, IntegerType, DoubleType

schema = types.StructType() \
    .add("lpep_pickup_datetime", types.StringType()) \
    .add("lpep_dropoff_datetime", types.StringType()) \
    .add("PULocationID", types.IntegerType()) \
    .add("DOLocationID", types.IntegerType()) \
    .add("passenger_count", types.DoubleType()) \
    .add("trip_distance", types.DoubleType()) \
    .add("tip_amount", types.DoubleType())

In [32]:
df_parsed = green_stream.select(from_json(col("value").cast("string"), schema).alias("data")).select("data.*")

In [33]:
query = df_parsed.writeStream \
    .queryName("green_trips_view") \
    .outputMode("append") \
    .format("memory") \
    .start()

# Wait for the streaming to initialize and some data to be processed
query.awaitTermination(10)  # waits for 10 seconds, adjust the timing as needed

# Now you can query the in-memory table to get a single record
single_record = spark.sql("SELECT * FROM green_trips_view LIMIT 1").collect()

# Show the result
print(single_record)

# Stop the query
query.stop()

StreamingQueryException: [STREAM_FAILED] Query [id = b3a02bae-4e3f-453b-acbe-81ccc630e01d, runId = c90029d1-7f79-4ea7-971c-95cf3214636e] terminated with exception: org/apache/spark/kafka010/KafkaConfigUpdater