In [None]:
import json
import time 

from kafka import KafkaProducer

def json_serializer(data):
    return json.dumps(data).encode('utf-8')

server = 'localhost:9092'

producer = KafkaProducer(
    bootstrap_servers=[server],
    value_serializer=json_serializer
)

producer.bootstrap_connected()

In [None]:
t0 = time.time()

topic_name = 'test-topic'

for i in range(10):
    message = {'number': i}
    producer.send(topic_name, value=message)
    print(f"Sent: {message}")
    time.sleep(0.05)

t1 = time.time()
print(f'sending messages took {(t1 - t0):.2f} seconds')


producer.flush()

t2 = time.time()
print(f'flushin took {(t2 - t1):.2f} seconds')


In [None]:
import pandas as pd
url = 'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2019-10.csv.gz'
df = pd.read_csv(url)
df_green = df[['lpep_pickup_datetime',
'lpep_dropoff_datetime',
'PULocationID',
'DOLocationID',
'passenger_count',
'trip_distance',
'tip_amount']]

In [None]:
df_green.head()

In [None]:
df_green.shape[0]

In [None]:
t0 = time.time()

topic_name = 'green-trips-3'

for row in df_green.itertuples(index=False):
    row_dict = {col: getattr(row, col) for col in row._fields}
    producer.send(topic_name, value=row_dict)
t1 = time.time()

print(f"took {t1-t0} seconds")
 

In [None]:
import pyspark
from pyspark.sql import SparkSession

pyspark_version = pyspark.__version__
kafka_jar_package = f"org.apache.spark:spark-sql-kafka-0-10_2.12:{pyspark_version}"

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("GreenTripsConsumer3") \
    .config("spark.jars.packages", kafka_jar_package) \
    .getOrCreate()

In [None]:
green3_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "green-trips-3") \
    .option("startingOffsets", "earliest") \
    .load()

In [None]:
def peek(mini_batch, batch_id):
    first_row = mini_batch.take(1)

    if first_row:
        print(first_row[0])

query = green3_stream.writeStream.foreachBatch(peek).start()

In [None]:
query.stop()

In [None]:
from pyspark.sql import types

schema = types.StructType() \
    .add("lpep_pickup_datetime", types.StringType()) \
    .add("lpep_dropoff_datetime", types.StringType()) \
    .add("PULocationID", types.IntegerType()) \
    .add("DOLocationID", types.IntegerType()) \
    .add("passenger_count", types.DoubleType()) \
    .add("trip_distance", types.DoubleType()) \
    .add("tip_amount", types.DoubleType())

In [None]:
from pyspark.sql import functions as F

green_stream = green_stream \
  .select(F.from_json(F.col("value").cast('STRING'), schema).alias("data")) \
  .select("data.*")

In [None]:
query2 = green_stream \
    .writeStream \
    .foreachBatch(peek) \
    .start()

In [None]:
query.stop()

In [None]:
green_stream

In [None]:
from pyspark.sql import functions as F
green_stream = green_stream.withColumn("timestamp", F.current_timestamp())

In [None]:
green_stream

In [None]:
popular_dest =  green_stream.groupBy(F.window("timestamp", "5 minutes"), "DOLocationID").count().alias("count").orderBy(F.desc("count"))

In [None]:
query = popular_dest \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()

In [None]:
query.stop()