In [1]:
# Notebook based on https://towardsdatascience.com/building-a-real-time-prediction-pipeline-using-spark-structured-streaming-and-microservices-626dc20899eb
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages=org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 pyspark-shell"

In [2]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
import pandas as pd
import uuid
import random
from confluent_kafka import Producer
import json
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
import requests

In [None]:
# kafka producer variables

simple_messages = [
'I love this pony',
'This restaurant is great',
'The weather is bad today',
'I will go to the beach this weekend',
'She likes to swim',
'Apple is a great company'
]

bootstrap_servers='kafka:9092'
topic='test'
msg_count=5

In [None]:
def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {}'.format(msg.topic()))

def confluent_kafka_producer():

    p = Producer({'bootstrap.servers': bootstrap_servers})
    for data in simple_messages:
        
        record_key = str(uuid.uuid4())
        record_value = json.dumps({'data': data})
        
        p.produce(topic, key=record_key, value=record_value, on_delivery=delivery_report)
        p.poll(0)

    p.flush()
    print('we\'ve sent {count} messages to {brokers}'.format(count=len(simple_messages), brokers=bootstrap_servers))

In [None]:
confluent_kafka_producer()

Message delivered to test
Message delivered to test
Message delivered to test
Message delivered to test
Message delivered to test
Message delivered to test
we've sent 6 messages to kafka:9092


In [3]:
spark = SparkSession \
    .builder \
    .appName('RealtimeKafkaML') \
    .getOrCreate()

df_raw = spark \
  .readStream \
  .format('kafka') \
  .option('kafka.bootstrap.servers', "broker:29092") \
  .option("startingOffsets", "earliest") \
  .option('subscribe', "alt_liquidations") \
  .load()

df_json = df_raw.selectExpr('CAST(value AS STRING) as json')

schema = StructType([StructField('ticker', StringType()),\
                            StructField('amount', FloatType()),\
                            StructField('side', StringType()),\
                            StructField('price', FloatType()),
                             StructField('timestamp', StringType()),
                    ])

In [4]:
total = df_json.select(from_json(df_json.json, schema).alias('data')) \
        .groupBy("data.ticker") \
        .agg(sum("data.amount").alias("total_liq")) \
        .sort(desc("total_liq")) \
        .writeStream \
        .format("console") \
        .outputMode("complete").start()

In [7]:
total.stop()

In [None]:
# read a small batch of data from kafka and display to the console

schema = StructType([StructField('data', StringType())])

df_json.select(from_json(df_json.json, schema).alias('raw_data')) \
  .select('raw_data.data') \
  .writeStream \
  .trigger(once=True) \
  .format("console") \
  .start() \
  .awaitTermination()

In [None]:
# Test service
import requests
import json

data_jsons = '{"data":"' + simple_messages[1] + '"}'
print(data_jsons)
result = requests.post('http://127.0.0.1:9000/predict', json=json.loads(data_jsons))
print(json.dumps(result.json()))

#vader_udf = udf(lambda data: apply_sentiment_analysis(data), StringType())

In [None]:
def apply_sentiment_analysis(data):
    import requests
    import json
    
    result = requests.post('http://localhost:9000/predict', json=json.loads(data))
    return json.dumps(result.json())

def apply_sum(data):
    import requests
    import json
    
    result = requests.post('http://localhost:9000/sum', json=json.loads(data))
    return json.dumps(result.json())

vader_udf = udf(lambda data: apply_sentiment_analysis(data), StringType())
sum_udf = udf(lambda data: apply_sum(data), StringType())

In [None]:
schema_input = StructType([StructField('data', StringType())])
schema_predict_output = StructType([StructField('neg', StringType()),\
                            StructField('pos', StringType()),\
                            StructField('neu', StringType()),\
                            StructField('compound', StringType())])

schema_sum_output = StructType([StructField('count', StringType())])

df_json.select(from_json(df_json.json, schema_input).alias('sentence'),\
               from_json(vader_udf(df_json.json), schema_predict_output).alias('response'))\
  .select('sentence.data', 'response.*') \
  .distinct() \
  .writeStream \
  .trigger(once=True) \
  .format("console") \
  .start() \
  .awaitTermination()

AssertionError: 

In [5]:
spark.stop()

In [10]:
def post_sink():
    crypto_sink = {
        "name": "questdb-sink-crypto",
            "config": {
                "connector.class":"io.questdb.kafka.QuestDBSinkConnector",
                "tasks.max":"1",
                "topics": "topic_ETH, topic_BTC, topic_SOL",
                "key.converter": "org.apache.kafka.connect.storage.StringConverter",
                "value.converter": "org.apache.kafka.connect.json.JsonConverter",
                "key.converter.schemas.enable": "false",
                "value.converter.schemas.enable": "false",
                "host": "questdb",
                "timestamp.field.name": "timestamp",
                "symbols":"currency"
            }
        }

    result = requests.post('http://127.0.0.1:8083/connectors', json=crypto_sink)
    print(result)

In [19]:
from cryptofeed import FeedHandler
from cryptofeed.backends.kafka import BookKafka, TradeKafka
from cryptofeed.defines import L2_BOOK, TRADES, L3_BOOK
from cryptofeed.exchanges import Coinbase, Binance, Bitfinex


def produuucer():
    f = FeedHandler()
    HOSTNAME = 'broker:29092'
    cbs = {TRADES: TradeKafka(bootstrap_servers=HOSTNAME), L2_BOOK: BookKafka(bootstrap_servers=HOSTNAME)}

    # Add trade and lv 2 bitcoin data to Feed
    f.add_feed(Bitfinex(max_depth=25, channels=[TRADES, L2_BOOK], symbols=['BTC-USD'], callbacks=cbs))
    # f.add_feed(Binance(max_depth=25, channels=[TRADES, L2_BOOK], symbols=['BTC-BUSD'], callbacks=cbs))
    
    # Example of how to extract level 3 order book data
    # f.add_feed(Coinbase(max_depth=25, channels=[TRADES, L2_BOOK], symbols=['BTC-USD'], callbacks=cbs))

    f.run()

In [20]:
produuucer()

2023-07-14 00:31:29,415 : ERROR : FH: Unhandled RuntimeError('This event loop is already running') - shutting down
Traceback (most recent call last):
  File "/opt/conda/lib/python3.11/site-packages/cryptofeed/feedhandler.py", line 151, in run
    loop.run_forever()
  File "/opt/conda/lib/python3.11/asyncio/base_events.py", line 596, in run_forever
    self._check_running()
  File "/opt/conda/lib/python3.11/asyncio/base_events.py", line 588, in _check_running
    raise RuntimeError('This event loop is already running')
RuntimeError: This event loop is already running
2023-07-14 00:31:29,415 : ERROR : FH: Unhandled RuntimeError('This event loop is already running') - shutting down
Traceback (most recent call last):
  File "/opt/conda/lib/python3.11/site-packages/cryptofeed/feedhandler.py", line 151, in run
    loop.run_forever()
  File "/opt/conda/lib/python3.11/asyncio/base_events.py", line 596, in run_forever
    self._check_running()
  File "/opt/conda/lib/python3.11/asyncio/base_even

RuntimeError: This event loop is already running