In [1]:
# Parameters

batchDuration=5   # Batch duration in seconds
n=3               # Number of columns of a sample (with the intercept)
aVariance1=1      # First model - a of the variance of the estimator V_(0) = aI with a > 0
aVariance2=1      # Second model - a of the variance of the estimator V_(0) = aI with a > 0
mu1=1.0           # First model - forgetting factor
mu2=0.9           # Second model - forgetting factor

In [2]:
import time
import re, ast
import numpy as np
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--conf spark.ui.port=4040 --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.0,com.datastax.spark:spark-cassandra-connector_2.11:2.0.0-M3 pyspark-shell'

In [3]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, Row
conf = SparkConf() \
    .setAppName("Streaming test") \
    .setMaster("local[2]") \
    .set("spark.cassandra.connection.host", "127.0.0.1") # "local[2]" to run locally with 2 cores
sc = SparkContext(conf=conf) 
sqlContext=SQLContext(sc)
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

In [4]:
# Main entry point for Spark Streaming functionality. A StreamingContext represents the
# connection to a Spark cluster, and can be used to create DStream various input sources.
# A Discretized Stream (DStream), the basic abstraction in Spark Streaming, 
# is a continuous sequence of RDDs (of the same type) representing a continuous stream of data
# DStreams can either be created from live data (such as, data from TCP sockets, Kafka, Flume, etc.) using a 
# StreamingContext or it can be generated by transforming existing DStreams using operations such as map and window.
ssc = StreamingContext(sc, batchDuration) 

# Sets the context to periodically checkpoint the DStream operations for master fault-tolerance.
ssc.checkpoint("checkpoint")

# Set each DStreams in this context to remember RDDs it generated in the last given duration. 
ssc.remember(1)

In [5]:
kvs = KafkaUtils.createStream(ssc, "127.0.0.1:2181", "spark-streaming-consumer", {'volatility': 1})

In [6]:
def readInput(line):
    # Converts the input string into a pair of numbers
    print('now')
    vec= [float(x) for x in line.split()] 

    # The pair represents the inputs for two model (forgetting factor 1 and 0.99)
    return [('mod1',('mod1',np.array(vec))),('mod2',('mod2',np.array(vec)))]

In [7]:
def updateFunction(new_values, state): 
    ## RLS update function
    ## new_values = (key, [i, y, proxies])
    ## state = what is given by sc.parallelize in next cell on the first call
    ##       = what this function returned on the other calls
    if (len(new_values)>0 ):
        
        key=new_values[0][0]
        yx=new_values[0][1]
        i=yx[0]
        y=yx[1]
        x=yx[2:]
        n=len(x)
        
        beta=state[1]
        beta.shape=(n,1) # Transorms beta in a matrix of n rows and 1 column
        V=state[2]
        mu=state[3]
        sse=state[4]  ## Sum of squared errors
        N=state[5]    ## Number of treated samples
        x.shape=(1,n)
        err=y-x.dot(beta)        
        sse=sse+pow(err,2.0)
        V=1.0/mu*(V-V.dot(x.T).dot(x).dot(V)/(1.0+float(x.dot(V).dot(x.T)))) # dot = matrix multiplication
                                                                             # .T = Transpose
        gamma=V.dot(x.T)
        beta=beta+gamma*err
        proxyPrediction=x.dot(beta)
        
        errNaivePrec=y-x[0][-1]
        errNaiveMean=y-np.mean(x[0][1:])
        sseNaivePrec=state[7]
        sseNaiveMean=state[8]
        sseNaivePrec=sseNaivePrec+pow(errNaivePrec,2.0)
        sseNaiveMean=sseNaiveMean+pow(errNaiveMean,2.0)
        
        MSE_RLS=sse/(N+1.0)
        MSE_NaivePrec=sseNaivePrec/(N+1.0)
        MSE_NaiveMean=sseNaiveMean/(N+1.0)
        
        if (key=='mod1'):
            return (key,beta,V,mu,MSE_RLS,N+1, proxyPrediction, sseNaivePrec, sseNaiveMean, MSE_RLS/MSE_NaivePrec, MSE_RLS/MSE_NaiveMean)  ## update formula mod1
        else:
            return (key,beta,V,mu,MSE_RLS,N+1, proxyPrediction, sseNaivePrec, sseNaiveMean, MSE_RLS/MSE_NaivePrec, MSE_RLS/MSE_NaiveMean)  ## update formula mod1
        
    else:
        return state

In [8]:
import re, ast

beta1=np.zeros(n)  ## Initial parameter vector
beta2=np.zeros(n)
V1=np.diag(np.zeros(n)+aVariance1) ## Initial covariance matrix
V2=np.diag(np.zeros(n)+aVariance2)

# Returns a new DStream by applying a function to each element of DStream.
data = kvs.map(lambda x: np.array(ast.literal_eval(x[1])))

# Returns a new DStream by applying a function to all elements of this DStream, and then flattening the results
data=data.flatMap(lambda x: [('mod1',('mod1',1.0*np.array(x))),
                            ('mod2',('mod2',1.0*np.array(x)))])

# The elements of the collection [] are copied to form a distributed dataset that can be operated on in parallel.
initialStateRDD = sc.parallelize([(u'mod1', ('mod1',beta1,V1,mu1,0,0,0,0,0)),
                                  (u'mod2', ('mod2',beta2,V2,mu2,0,0,0,0,0))])

# Returns a new "state" DStream where the state for each key is updated by applying the given function on the previous
# state of the key and the new values for the key. This can be used to maintain arbitrary state data for each key.
data2=data.updateStateByKey(updateFunction,initialRDD=initialStateRDD)

# Displays key, beta, proxyPrediction, MSE_RLS, NMSE_prec and NMSE_mean
data2.map(lambda x: ["Key: ", x[1][0], "Coefficients: ", np.array2string((x[1][1].T)[0]), "Prediction: ", np.array2string(x[1][6][0]), "MSE_RLS: ", np.array2string(x[1][4][0]), "NMSE_prec: ", np.array2string(x[1][9][0]), "NMSE_mean: ", np.array2string(x[1][10][0])]).pprint() 

In [None]:
# Starts the reception and the forecasting
ssc.start()

In [None]:
# Stops the reception and the forecasting
ssc.stop(stopSparkContext=False,stopGraceFully=True)