In [None]:
# Second version. This version is more efficient than the previous one 
# because the amount of joined data is reduced.
from pyspark.streaming import StreamingContext

In [None]:
historicalInputFile = "data/Ex64/data/historicalData.txt"

In [None]:
# Read the historical data and compute the maximum and minimum price for each stock
# Non-streaming RDD
historicalDataRDD = sc.textFile(historicalInputFile)

In [None]:
# Return one pair (stockId, (price, price) )  for each input record
def extractStockIdPricePrice(line):
    fields = line.split(",")
    
    stockId = fields[1]
    price = fields[2]
    
    return (stockId, (float(price), float(price)) )


stockIdPriceHistoricalRDD = historicalDataRDD.map(extractStockIdPricePrice)

In [None]:
# Compute max and min for each stockId based on the historical data
stockIdPriceHistoricalMaxMinRDD = stockIdPriceHistoricalRDD\
.reduceByKey(lambda v1, v2: ( max(v1[0],v2[0]), min(v1[1],v2[1]) ) ).cache()

In [None]:
# Create a Spark Streaming Context object
#ssc = StreamingContext(sc, 60)
ssc = StreamingContext(sc, 10)

In [None]:
# Create a (Receiver) DStream that will connect to localhost:9999
pricesDStream = ssc.socketTextStream("localhost", 9999)

In [None]:
# Compute max and min for each stockId of each input batch
stockIdPriceDStream = pricesDStream.map(extractStockIdPricePrice)\
.reduceByKey(lambda v1, v2: ( max(v1[0],v2[0]), min(v1[1],v2[1]) ) )

In [None]:
# Join stockIdPriceDStream with stockIdPriceHistoricalMaxMinRDD
# Join the RDD associated with the content of the current batch and 
# the non-streaming RDD stockIdPriceHistoricalMaxMinRDD
stockIdPriceMaxMinDStream = stockIdPriceDStream\
.transform(lambda batchRDD: batchRDD.join(stockIdPriceHistoricalMaxMinRDD))

In [None]:
# Select only stocks with stream max price > maximum historical price 
# or stream min price < minimum historical price
def anomalyValue(pair):
    stockBatchMaxPrice = pair[1][0][0]
    stockBatchMinPrice = pair[1][0][1]
    
    stockHistoricalMaxPrice = pair[1][1][0]
    stockHistoricalMinPrice = pair[1][1][1]
    
    if stockBatchMaxPrice>stockHistoricalMaxPrice or stockBatchMinPrice<stockHistoricalMinPrice:
        return True
    else:
        return False

    

selectedStockPricesDStream = stockIdPriceMaxMinDStream\
.filter(anomalyValue)

In [None]:
# Retrieve only the stockIDs of the selected stocks
# keys is not available for DStreams.
# transform must be used or map
selectStockIdsDStream = selectedStockPricesDStream\
.transform(lambda batchRDD: batchRDD.keys())

In [None]:
selectStockIdsDStream.pprint()

In [None]:
#Start the computation
ssc.start()

In [None]:
# Run this application for 90 seconds
ssc.awaitTerminationOrTimeout(90)
ssc.stop(stopSparkContext=False)

In [None]:
ssc.stop(stopSparkContext=False)