In [1]:
# Defining new environment variable 'PYSPARK_SUBMIT_ARGS' 
import os, sys
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.0 --jars /usr/local/jar_files/spark-streaming-kafka-0-8-assembly_2.11-2.3.2.jar pyspark-shell'

In [2]:
# Import findspark 
import findspark

# Initialize and provide path
findspark.init("/usr/share/spark/spark-2.3.2-bin-hadoop2.7/")

In [3]:
##   Importing Spark Libraries
#    Spark
from pyspark import SparkContext
#    Spark Streaming
from pyspark.streaming import StreamingContext
#    Kafka
from pyspark.streaming.kafka import KafkaUtils
#    KafkaConsumer
from kafka import KafkaConsumer

##   Additional Libraries
#    Calculation methods
from pyspark.sql import functions as F
from  pyspark.sql.functions import abs
from  pyspark.sql.functions import log
from pyspark.sql.types import DateType
from pyspark.sql.functions import stddev
from pyspark.sql.functions import sqrt
from scipy.stats import norm
import math
import numpy as np
#     User defined function, in order to use (norm.cdf) from scipy.stats
from pyspark.sql.functions import udf


In [15]:
# Import SparkSession
from pyspark.sql import SparkSession

#    Build the SparkSession
spark = SparkSession.builder \
    .master("local") \
    .appName("master_consumer") \
    .config("spark.mongodb.input.uri", "mongodb://localhost/stocks_db.stock_coll") \
    .config("spark.mongodb.output.uri", "mongodb://localhost/stocks_db.stock_coll") \
    .getOrCreate()
   
#    Create SparkContext
sc = spark.sparkContext
#    Set SparkContext Interval to 10
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, 10)

In [5]:
#    Import Spark SQL and Row
from pyspark.sql import Row,SQLContext

#    Function that defines SQLContext
def get_sql_context_instance(spark_context):
    if ('sqlContextSingletonInstance' not in globals()):
        globals()['sqlContextSingletonInstance'] = SQLContext(spark_context)
    return globals()['sqlContextSingletonInstance']

In [6]:
#    PyMongo for MongoDB
import pymongo

def write_mongo(df):
    try:
        # Append the input df to the database
        df.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()
    except:
        e = sys.exc_info()[0]
        print("Mongo Error: %s" % e)

In [14]:
stockData = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri","mongodb://localhost/stocks_db.stockhist_coll").load()
stockData  = stockData.drop("close", "high", "low", "open", "volumne", "frequency", "intraperiod")
stockData = stockData.withColumn("date", stockData["date"].cast(DateType()))
stockData.createOrReplaceTempView("AAPL")
stockData = spark.sql("SELECT * FROM AAPL WHERE date BETWEEN '2019-01-01' AND '2020-01-01'")
stockData = stockData.withColumn("volatility", abs(stockData["adj_close"]/stockData["prev_close"]-1))
stockData.show(truncate=False)
meanVol = stockData.select(stddev("volatility")*math.sqrt(252)).take(1)[0][0]

+--------------------------+---------+--------+--------+--------+-----------+----------+----------+-----------+---------------------+
|_id                       |adj_close|adj_high|adj_low |adj_open|adj_volume |date      |prev_close|volume     |volatility           |
+--------------------------+---------+--------+--------+--------+-----------+----------+----------+-----------+---------------------+
|[5e3d35671bc8b17f7ef416e4]|293.65   |293.68  |289.52  |289.93  |2.5247625E7|2019-12-31|291.52    |2.5247625E7|0.0073065312843030306|
|[5e3d35671bc8b17f7ef416e5]|291.52   |292.69  |285.22  |289.46  |3.6059614E7|2019-12-30|289.8     |3.6059614E7|0.005935127674258078 |
|[5e3d35671bc8b17f7ef416e6]|289.8    |293.97  |288.12  |291.12  |3.6592936E7|2019-12-27|289.91    |3.6592936E7|3.794280983754339E-4 |
|[5e3d35671bc8b17f7ef416e7]|289.91   |289.98  |284.7   |284.82  |2.3334004E7|2019-12-26|284.27    |2.3334004E7|0.019840292679494942 |
|[5e3d35671bc8b17f7ef416e8]|284.27   |284.89  |282.9197|284.69

In [8]:
##    Defining external variables
#     Expiration time in months
T = 1/12
#     Square root of expiration time
T_s = (1/12)**0.5
#     Risk-free rate (constant  ->  source: https://www.statista.com/statistics/885892/average-risk-free-rate-austria/)
r = 0.013
#     Annualized historical volatility to the power of 2 (Assumption 252 Trading day) 
#     set: 02-07-2020 till 01-01-2019
meanVolPow = meanVol**2
# Euler's number to the power of the negative risk-free rate times the expiration time
eu_exp = np.exp(-r * T)


def getOptionPrices(time, rdd):
    
    try:
        # Get SparkSQLContext from the current context
        sql_context = get_sql_context_instance(rdd.context)
        
        # Convert RDD to Row RDD
        row_rdd = rdd.map(lambda w: Row( updated_on=w['updated_on'], ticker=w['ticker'], last_price=w['last_price'], prev_price=w["prev_price"])) 
        # Create a DF of the Row RDD
        df = sql_context.createDataFrame(row_rdd)
        
        # Insert d1 and d2 for:
        #     out-of-the-money option (OTM)
        df = df.withColumn("d1_otm", (log(df["last_price"]/(df["last_price"]+20))+(r + 0.5 * meanVolPow)*T) / (meanVol * T_s))
        df = df.withColumn("d1_otm", (log(df["last_price"]/(df["last_price"]+20))+(r + 0.5 * meanVolPow)*T) / (meanVol * T_s))
        df = df.withColumn("d2_otm", (df["d1_otm"]-(meanVol * T_s)))
        #     at-the-money option (ATM)
        df = df.withColumn("d1_atm", (log(df["last_price"]/df["last_price"])+(r + 0.5 * meanVolPow)*T) / (meanVol * T_s))
        df = df.withColumn("d1_atm", (log(df["last_price"]/df["last_price"])+(r + 0.5 * meanVolPow)*T) / (meanVol * T_s))
        df = df.withColumn("d2_atm", (df["d1_atm"]-(meanVol * T_s)))
        #     in-the-money option (ITM)
        df = df.withColumn("d1_itm", (log(df["last_price"]/(df["last_price"]-20))+(r + 0.5 * meanVolPow)*T) / (meanVol * T_s))
        df = df.withColumn("d1_itm", (log(df["last_price"]/(df["last_price"]-20))+(r + 0.5 * meanVolPow)*T) / (meanVol * T_s))
        df = df.withColumn("d2_itm", (df["d1_itm"]-(meanVol * T_s)))
        
        # Creating "User Defined Function" in Spark, applying the cumulative distributed function for normal distributions on input variable x
        norm_cdf = udf(lambda x: float(norm.cdf(x)))
        
        # Calculating probabilities for d1 and d2 (cumulative distribution function)
        #     out-of-the-money option (OTM)
        df = df.withColumn("d1_otm_cdf", norm_cdf(df['d1_otm']))
        df = df.withColumn("d2_otm_cdf", norm_cdf(df['d2_otm']))
        
        #     at-the-money option (ATM)
        df = df.withColumn("d1_atm_cdf", norm_cdf(df['d1_atm']))
        df = df.withColumn("d2_atm_cdf", norm_cdf(df['d2_atm']))
                
        #     in-the-money option (ITM)
        df = df.withColumn("d1_itm_cdf", norm_cdf(df['d1_itm']))
        df = df.withColumn("d2_itm_cdf", norm_cdf(df['d2_itm']))
        
        # Calculating option prices with Black-Scholes-Model
        df = df.withColumn("optionPrice_otm", df["last_price"] * df["d1_otm_cdf"] - (df["last_price"]+20) * eu_exp * df["d2_otm_cdf"])
        df = df.withColumn("optionPrice_atm", df["last_price"]*df["d1_atm_cdf"] - (df["last_price"]) * eu_exp * df["d2_atm_cdf"])
        df = df.withColumn("optionPrice_itm", df["last_price"]*df["d1_itm_cdf"] - (df["last_price"]-20) * eu_exp * df["d2_itm_cdf"])
        
        # Register table for SQL usage
        df.registerTempTable("stocks")
        
        ## Select whole data
        #stocks_df = sql_context.sql("select * from stocks")
        
        ## Select relevant data only
        stocks_df = sql_context.sql("select ticker, updated_on, last_price, prev_price, optionPrice_otm, \
                                     optionPrice_atm, optionPrice_itm from stocks")
        
        #  show df
        stocks_df.show(truncate=False)

        # write the resulting df to databse with pymongo and MongoDB Spark Connector
        write_mongo(stocks_df)
        
    # print errors in the output, in order to not stop the notebook
    except:
        e = sys.exc_info()[0]
        print("Process Error: %s" % e)

In [9]:
#    Defining Kafka Stream
kafkaStream1 = KafkaUtils.createStream(ssc, 'localhost:2181', groupId='spark-streaming-consumer', topics={'data1': 1})

In [None]:
#    Input stream from producer with (lambda :  json.loads)
parsed_hist = kafkaStream1.map(lambda v: json.loads(v[1]))
#    Call previously defined function "getOptionPrices"
parsed_hist.foreachRDD(getOptionPrices)
#    Starting stream processing
ssc.start()
ssc.awaitTermination()

+------+------------------------+----------+----------+------------------+------------------+------------------+
|ticker|updated_on              |last_price|prev_price|optionPrice_otm   |optionPrice_atm   |optionPrice_itm   |
+------+------------------------+----------+----------+------------------+------------------+------------------+
|AAPL  |2020-02-07T20:16:44.463Z|318.89    |318.975   |1.5005144896275198|7.551152711650815 |21.469603546277938|
|AAPL  |2020-02-07T20:16:55.340Z|318.89    |318.89    |1.5005144896275198|7.551152711650815 |21.469603546277938|
|AAPL  |2020-02-07T20:17:06.221Z|318.745   |318.89    |1.498516501580518 |7.547719185534646 |21.467705103309413|
|AAPL  |2020-02-07T20:17:17.734Z|318.705   |318.685   |1.4979655009836819|7.5467720059163526|21.467181576451082|
|AAPL  |2020-02-07T20:17:29.469Z|318.705   |318.705   |1.4979655009836819|7.5467720059163526|21.467181576451082|
|TSLA  |2020-02-07T20:17:56.607Z|744.96    |744.0     |9.526912343562657 |17.64027321042181 |29.