In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql import functions as F
import pandas as pd

In [2]:
schema = types.StructType([
    types.StructField('lpep_pickup_datetime', types.StringType(), True),
    types.StructField('lpep_dropoff_datetime', types.StringType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),    
    types.StructField('passenger_count', types.DoubleType(), True),
    types.StructField('trip_distance', types.DoubleType(), True),
    types.StructField('tip_amount', types.DoubleType(), True)
])

In [3]:
columns_to_use = ['lpep_pickup_datetime', 'lpep_dropoff_datetime', 'PULocationID',
                  'DOLocationID', 'passenger_count', 'trip_distance', 'tip_amount']

In [4]:
pyspark_version = pyspark.__version__
kafka_jar_package = f"org.apache.spark:spark-sql-kafka-0-10_2.12:{pyspark_version}"

In [6]:
spark = SparkSession.builder.master("local[*]").appName('GreenTripsConsumer').\
    config("spark.jars.packages", kafka_jar_package).getOrCreate()

In [7]:
df = pd.read_csv('data/green_tripdata_2019-10.csv.gz', usecols=columns_to_use)

In [8]:
df.head()

Unnamed: 0,lpep_pickup_datetime,lpep_dropoff_datetime,PULocationID,DOLocationID,passenger_count,trip_distance,tip_amount
0,2019-10-01 00:26:02,2019-10-01 00:39:58,112,196,1.0,5.88,0.0
1,2019-10-01 00:18:11,2019-10-01 00:22:38,43,263,1.0,0.8,0.0
2,2019-10-01 00:09:31,2019-10-01 00:24:47,255,228,2.0,7.5,0.0
3,2019-10-01 00:37:40,2019-10-01 00:41:49,181,181,1.0,0.9,0.0
4,2019-10-01 00:08:13,2019-10-01 00:17:56,97,188,1.0,2.52,2.26


In [9]:
from kafka import KafkaProducer
import json
import time

In [10]:
server = 'localhost:9092'
def json_serializer(data):
    return json.dumps(data).encode('utf-8')

producer = KafkaProducer(bootstrap_servers=[server], value_serializer=json_serializer)
    

In [11]:
type(df)

pandas.core.frame.DataFrame

In [25]:
green_trips_topic = 'green-trips'

time_0 = time.time()

for row in df.itertuples(index=False):
    row_dict = {col :getattr(row, col) for col in row._fields}
    producer.send(green_trips_topic, value=row_dict)    

time_1 = time.time()

print(f'it took {(time_1-time_0):.1f} seconds')

it took 84.5 seconds


In [13]:
green_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "green-trips") \
    .option("startingOffsets", "earliest")\
    .option("checkpointLocation", "checkpoint") \
    .load()

In [15]:
green_stream.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [18]:
def peek(mini_batch, batch_id):
    print(mini_batch)
    first_row = mini_batch.take(1)
   
    if first_row:
        print(first_row[0])
    else:
        print('no first row')    

In [142]:
query = green_stream.writeStream.foreachBatch(peek).start()

24/03/14 11:43:17 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-718efeb1-bc4b-4b7c-9434-2688c78935da. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/14 11:43:17 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [16]:
green_stream = green_stream \
  .select(F.from_json(F.col("value").cast('STRING'), schema).alias("data")) \
  .select("data.*")

In [19]:
query = green_stream.writeStream.foreachBatch(peek).start()

24/03/14 11:49:36 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-7a253f70-2e2f-43b7-88f3-0024b526c7ea. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/14 11:49:36 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


DataFrame[lpep_pickup_datetime: string, lpep_dropoff_datetime: string, PULocationID: int, DOLocationID: int, passenger_count: double, trip_distance: double, tip_amount: double]
Row(lpep_pickup_datetime='2019-10-01 00:26:02', lpep_dropoff_datetime='2019-10-01 00:39:58', PULocationID=112, DOLocationID=196, passenger_count=1.0, trip_distance=5.88, tip_amount=0.0)


In [21]:
query.stop()

In [24]:
green_stream_modified = green_stream.withColumn('timestamp', F.current_timestamp())
popular_destinations = green_stream_modified.groupBy([F.window(F.col('timestamp'), '5 minutes'), F.col('DOLocationID')]).count().sort(F.desc(F.col('count')))

In [26]:
query = popular_destinations \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()

24/03/14 12:10:25 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-0653c63b-bf56-4e14-ab6f-c2d1d28f4fd2. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/14 12:10:25 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+------------------------------------------+------------+-----+
|window                                    |DOLocationID|count|
+------------------------------------------+------------+-----+
|{2024-03-14 12:10:00, 2024-03-14 12:15:00}|74          |70964|
|{2024-03-14 12:10:00, 2024-03-14 12:15:00}|42          |63768|
|{2024-03-14 12:10:00, 2024-03-14 12:15:00}|41          |56244|
|{2024-03-14 12:10:00, 2024-03-14 12:15:00}|75          |51360|
|{2024-03-14 12:10:00, 2024-03-14 12:15:00}|129         |47720|
|{2024-03-14 12:10:00, 2024-03-14 12:15:00}|7           |46132|
|{2024-03-14 12:10:00, 2024-03-14 12:15:00}|166         |43380|
|{2024-03-14 12:10:00, 2024-03-14 12:15:00}|236         |31652|
|{2024-03-14 12:10:00, 2024-03-14 12:15:00}|223         |30168|
|{2024-03-14 12:10:00, 2024-03-14 12:15:00}|238         |29272|
|{2024-03-14 12:10:00, 2024-03-14 12:15:00}|82          |29168|
|{2024-

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/home/taras/spark/spark-3.3.2-bin-hadoop3/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/taras/spark/spark-3.3.2-bin-hadoop3/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/home/taras/.pyenv/versions/3.10.13/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 