In [1]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import from_json, window, to_json

In [2]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1,com.datastax.spark:spark-cassandra-connector_2.12:3.0.0-beta,com.datastax.cassandra:cassandra-driver-core:3.9.0 pyspark-shell'

In [3]:
bootstrap_servers = 'broker:9094'
topic = 'coins'

In [4]:
cassandra_connection = 'cassandra:9042'

In [5]:
spark = SparkSession \
    .builder \
    .appName('StructuredStreamingKafka') \
    .config("spark.cassandra.connection.host", 'cassandra') \
    .config("spark.cassandra.connection.port", '9042') \
    .getOrCreate()


In [6]:
def readFromCassandra(dataFrame, table, keyspace):
    spark.read \
        .format("org.apache.spark.sql.cassandra") \
        .options(table="coins_data", keyspace="coins") \
        .load().show()

def writeToCassandraHistoricalData(dataFrame, batch_id):  
    dataFrame.write\
    .format("org.apache.spark.sql.cassandra")\
    .mode('append')\
    .options(table='coins_historical', keyspace='coins')\
    .save()
    
def writeToCassandraAssetsData(dataFrame, batch_id):  
    dataFrame.write\
    .format("org.apache.spark.sql.cassandra")\
    .mode('append')\
    .options(table='coins_assets', keyspace='coins')\
    .save()

In [7]:
kafkaRawDf = spark \
    .readStream \
    .format('kafka') \
    .option('kafka.bootstrap.servers',bootstrap_servers) \
    .option('subscribe',topic) \
    .option("failOnDataLoss", "false") \
    .load()

In [8]:
kafkaKeyValueDf = kafkaRawDf \
    .selectExpr('CAST(key as String)','CAST(value as String)')

In [9]:
coin_schema = StructType(
    [
        StructField('24hVolume',   StringType(),True),
        StructField('btcPrice',   StringType(),True),
        StructField('change', StringType(),True),
        StructField('coinrankingUrl',    StringType(),True),
        StructField('color',    StringType(),True),
        StructField('iconUrl',StringType(),True),
        StructField('listedAt',   LongType(),  True),
        StructField('lowVolume',   BooleanType(),  True),
        StructField('marketCap',   DoubleType(),  True),
        StructField('name',   StringType(),  True),
        StructField('price',   StringType(),  True),
        StructField('rank',   LongType(),  True),
        StructField("sparkline", ArrayType(StringType())),
        StructField('symbol', StringType(),  True),
        StructField('tier', LongType(),  True),
        StructField('uuid', StringType(),  True)
   
    ]
)

In [10]:
coinsAssetsToCassandara = kafkaKeyValueDf \
    .withColumn('coin', from_json('value', coin_schema)) \
    .selectExpr('coin.uuid', 'coin.name', 'coin.symbol', 'coin.coinrankingUrl' , 'coin.color', 'coin.iconUrl') \
    .withColumnRenamed('coinrankingUrl', 'coinrankingurl') \
    .withColumnRenamed('iconUrl', 'iconurl')

coinsHistoricalToCassandara = kafkaKeyValueDf \
    .withColumn('coin', from_json('value', coin_schema)) \
    .selectExpr('coin.uuid', 'coin.name', 'coin.price', 'coin.btcPrice', 'coin.24hVolume', 'coin.marketCap', 'coin.tier' ) \
    .withColumnRenamed('btcPrice', 'btcprice') \
    .withColumnRenamed('24hVolume', 'dailyvolume') \
    .withColumnRenamed('marketCap', 'marketcap') \

coinsDf = kafkaKeyValueDf \
    .withColumn('coin', from_json('value', coin_schema)) \
    .selectExpr('coin.*') 

In [11]:
cassandraAssetsDataQuery = coinsAssetsToCassandara  \
    .writeStream \
    .foreachBatch(writeToCassandraAssetsData) \
    .outputMode("update") \
    .start()

cassandraHistoricalDataQuery = coinsHistoricalToCassandara  \
    .writeStream \
    .foreachBatch(writeToCassandraHistoricalData) \
    .outputMode("update") \
    .start()

consoleQueryDebugging = coinsDf \
    .writeStream \
    .format("console") \
    .start()


kafkaRealTimeQuery = coinsDf \
    .selectExpr("to_json(struct(24hVolume, change, marketCap, price, rank, name, symbol, sparkline)) AS value") \
    .writeStream \
    .format("kafka") \
    .outputMode("append") \
    .option("kafka.bootstrap.servers", bootstrap_servers) \
    .option("topic", "coins-api") \
    .option("checkpointLocation", "/tmp/context-kafka/checkpoint") \
    .start()


In [None]:
cassandraAssetsDataQuery.awaitTermination()
cassandraHistoricalDataQuery.awaitTermination()
consoleQueryDebugging.awaitTermination()
kafkaRealTimeQuery.awaitTermination()