In [82]:
import pyspark
from pyspark.sql import SparkSession

pyspark_version = pyspark.__version__
print(pyspark_version)

kafka_jar_package = f"org.apache.spark:spark-sql-kafka-0-10_2.12:{pyspark_version}"

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("GreenTripsConsumer") \
    .config("spark.jars.packages", kafka_jar_package) \
    .getOrCreate()

3.5.0


In [2]:
green_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "green-trips") \
    .option("startingOffsets", "earliest") \
    .load()

In [39]:
def peek(mini_batch, batch_id):
    first_row = mini_batch.take(1)

    if first_row:
        print(first_row[0])

query = green_stream.writeStream.foreachBatch(peek).start()

Row(lpep_pickup_datetime='2019-10-01 00:26:02', lpep_dropoff_datetime='2019-10-01 00:39:58', PULocationID=112, DOLocationID=196, passenger_count=1.0, trip_distance=5.88, tip_amount=0.0, timestamp=datetime.datetime(2024, 3, 27, 12, 1, 36, 624000))


In [40]:
query.stop()

In [3]:
from pyspark.sql import types

schema = types.StructType() \
    .add("lpep_pickup_datetime", types.StringType()) \
    .add("lpep_dropoff_datetime", types.StringType()) \
    .add("PULocationID", types.IntegerType()) \
    .add("DOLocationID", types.IntegerType()) \
    .add("passenger_count", types.DoubleType()) \
    .add("trip_distance", types.DoubleType()) \
    .add("tip_amount", types.DoubleType())

In [4]:
from pyspark.sql import functions as F

green_stream = green_stream \
  .select(F.from_json(F.col("value").cast('STRING'), schema).alias("data")) \
  .select("data.*")

In [5]:
from pyspark.sql.functions import current_timestamp, col, window

green_stream = green_stream \
  .withColumn("timestamp", current_timestamp())

In [75]:
popular_destinations = green_stream \
    .groupBy("DOLocationID") \
    .count() \
    .orderBy("count", ascending=False)

In [None]:
query = popular_destinations \
    .writeStream \
    .outputMode("complete") \
    .trigger(processingTime='5 seconds') \
    .format("console") \
    .option("truncate", "false") \
    .start()

In [None]:
query.awaitTermination(30)

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "C:\Program Files\Spark\python\lib\py4j-0.10.9.7-src.zip\py4j\clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\OTkachuk\AppData\Local\anaconda3\Lib\socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Program Files\Spark\python\lib\py4j-0.10.9.7-src.zip\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Program Files\Spark\python\lib\py4j-0.10.9.7-src.zip\py4j\clientserver.py", line 538, in send_command
    logge

In [None]:
query.status

In [65]:
queryPeek = green_stream \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .option("truncate", False) \
    .trigger(processingTime='5 seconds') \
    .start()

In [46]:
queryPeek.status

{'message': 'Waiting for next trigger',
 'isDataAvailable': False,
 'isTriggerActive': False}

In [45]:
queryPeek.awaitTermination(30)

False

In [67]:
queryPeek.stop()

In [77]:
queryMemory = popular_destinations \
    .writeStream \
    .outputMode("complete") \
    .format("memory") \
    .option("truncate", "false") \
    .queryName("queryMemory") \
    .start()

In [61]:
queryMemory = green_stream \
    .writeStream \
    .outputMode("append") \
    .format("memory") \
    .option("truncate", "false") \
    .queryName("queryMemory") \
    .start()

In [78]:
from IPython.display import display, clear_output
import time

while True:
    clear_output(wait=True)
    display(queryMemory.status)
    display(spark.sql('SELECT * FROM queryMemory').show())
    time.sleep(1)

{'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}

+------------+-----+
|DOLocationID|count|
+------------+-----+
+------------+-----+



None

KeyboardInterrupt: 

In [76]:
queryMemory.stop()