In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.avro.functions import from_avro
import findspark
from utils.schemas import get_trade_schema
from utils.spark import write_stream_to_console


In [2]:
packages = [
    "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2",
    "org.apache.spark:spark-avro_2.12:3.1.2"
    ] 
# findspark.add_packages(packages)

packages_str = ','.join(packages)
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = f'--packages {packages_str} pyspark-shell'

spark = SparkSession.builder\
    .appName('trades-5m-snapshots')\
    .config("spark.packages", packages_str)\
    .master("spark://127.0.0.1:7077")\
    .getOrCreate()
    # .config("spark.driver.host", "localhost")\

broker_url = "127.0.0.1:9092"



:: loading settings :: url = jar:file:/usr/local/Caskroom/miniconda/base/envs/personal/lib/python3.7/site-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/mrot/.ivy2/cache
The jars for the packages stored in: /Users/mrot/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.apache.spark#spark-avro_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-4b397953-5ed1-48b4-978c-0a2931e1ab22;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.1.2 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.1.2 in central
	found org.apache.kafka#kafka-clients;2.6.0 in central
	found com.github.luben#zstd-jni;1.4.8-1 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.2 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
	found org.apache.spark#spark-avro_2.12;3.1.2 in central
:: resolution report :: resolve 406ms :: artifacts dl 9ms
	:: modules in use:
	com

In [3]:
schema_registry_url = 'http://localhost:7070'
trades_schema = get_trade_schema()

trades_rx_dstream = spark.readStream\
  .format("kafka")\
  .option("kafka.bootstrap.servers", broker_url)\
  .option("subscribe", 'trades-rx')\
  .option("startingOffsets", "earliest")\
  .option("maxOffsetsPerTrigger", 20)\
  .load().select(
    from_avro('value', trades_schema).alias('value')
  ).selectExpr('value.*')

In [4]:
trades_rx_dstream.printSchema()

root
 |-- symbol: string (nullable = true)
 |-- price: float (nullable = true)
 |-- quantity: float (nullable = true)
 |-- buyer: string (nullable = true)
 |-- seller: string (nullable = true)
 |-- time: timestamp (nullable = true)



In [5]:
write_stream_to_console(trades_rx_dstream)

21/10/20 21:29:09 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/z7/5594v_fd6q337t3912lvx6rw0000gn/T/temporary-65d65c74-7273-49a4-bc12-c20236a78b00. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


In [6]:


trades_rx_dstream.groupBy(
    F.window("time", "10 seconds")
).count().printSchema()

root
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- count: long (nullable = false)



In [7]:
trades_rx_dstream.writeStream\
    .outputMode('append')\
    .format('csv')\
    .option("checkpointLocation", "/tmp/kafka-checkpoint-0") \
    .option("failOnDataLoss", "false").start('output/').awaitTermination()


21/10/20 21:27:52 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) (172.20.0.3 executor 0): org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition trades-rx-0 could be determined



KeyboardInterrupt: 