In [1]:
import time
import re, ast
import numpy as np
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--conf spark.ui.port=4040 --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.0,com.datastax.spark:spark-cassandra-connector_2.11:2.0.0-M3 pyspark-shell'



In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, Row
conf = SparkConf() \
    .setAppName("Streaming test") \
    .setMaster("local[2]") \
    .set("spark.cassandra.connection.host", "127.0.0.1") # "local[2]" to run locally with 2 cores
sc = SparkContext(conf=conf) 
sqlContext=SQLContext(sc)
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

In [3]:
ssc = StreamingContext(sc, 10) 
# 0.1 = batchDuration
# Main entry point for Spark Streaming functionality. A StreamingContext represents the
# connection to a Spark cluster, and can be used to create DStream various input sources.
# A Discretized Stream (DStream), the basic abstraction in Spark Streaming, 
# is a continuous sequence of RDDs (of the same type) representing a continuous stream of data
# DStreams can either be created from live data (such as, data from TCP sockets, Kafka, Flume, etc.) using a 
# StreamingContext or it can be generated by transforming existing DStreams using operations such as map and window.
# https://blog.jetoile.fr/2014/05/rdd-quest-ce-que-cest.html
ssc.checkpoint("checkpoint")
# Sets the context to periodically checkpoint the DStream operations for master fault-tolerance.
# Parameters:	directory – HDFS-compatible directory where the checkpoint data will be reliably stored
ssc.remember(1)
# Set each DStreams in this context to remember RDDs it generated in the last given duration. 

In [4]:
kvs = KafkaUtils.createStream(ssc, "127.0.0.1:2181", "spark-streaming-consumer", {'test2': 1})
# createStream(ssc, zkQuorum, groupId, topics)
# Parameters:	
#				ssc – StreamingContext object
#				zkQuorum – Zookeeper quorum (hostname:port,hostname:port,..).
#				groupId – The group id for this consumer.
#				topics – Dict of (topic_name -> numPartitions) to consume. Each partition is consumed in its own thread.
# Topic partitions in Kafka does not correlate to partitions of RDDs generated in Spark Streaming. 

In [5]:
def readInput(line):
    # Convert the input string into a pair of numbers
    print('now')
    vec= [float(x) for x in line.split()] 


    # The pair represents the inputs for two model (forgetting factor 1 and 0.99)
    return [('mod1',('mod1',np.array(vec))),('mod2',('mod2',np.array(vec)))]

In [6]:
def updateFunction(new_values, state): 
    ## RLS update function, formulas slide 51
    ## new_values = (key, [i, y, proxies])
    ## state = what is given by sc.parallelize in next cell on the first call
    ##       = what this function returned on the other calls
    if (len(new_values)>0 ):
        
        key=new_values[0][0]
        yx=new_values[0][1]
        i=yx[0]
        y=yx[1]
        x=yx[2:]
        n=len(x)
        
        beta=state[1]
        beta.shape=(n,1) # Transorms beta in a matrix of n rows and 1 column
        V=state[2]
        mu=state[3]
        sse=state[4]  ## sum of squared errors
        N=state[5]    ## number of treated samples
        x.shape=(1,n)
        err=y-x.dot(beta)        
        sse=sse+pow(err,2.0)
        V=1.0/mu*(V-V.dot(x.T).dot(x).dot(V)/(1.0+float(x.dot(V).dot(x.T)))) # dot = matrix multiplication
                                                                             # .T = Transpose
        gamma=V.dot(x.T)
        beta=beta+gamma*err
        proxyPrediction=x.dot(beta)
        
        errNaivePrec=y-x[0][-1]
        errNaiveMean=y-np.mean(x)
        mseMoyNumerator=state[6]
        mseMoyNumerator=mseMoyNumerator+(y-np.mean(x))
        
        if (key=='mod1'):
            return (key,beta,V,mu,sse/(N+1.0),N+1,(sse/(N+1.0))/(mseMoyNumerator/(N+1.0)),i, proxyPrediction, errNaivePrec, errNaiveMean)  ## update formula mod1
        else:
            return (key,beta,V,mu,sse/(N+1.0),N+1,(sse/(N+1.0))/(mseMoyNumerator/(N+1.0)),i, proxyPrediction, errNaivePrec, errNaiveMean)  ## update formula mod2
        
    else:
        return state

In [7]:
import re, ast
n=2 # number of features
beta1=np.zeros(n)  ## initial parameter vector slide 49
beta2=np.zeros(n)
V1=np.diag(np.zeros(n)+10) ## initial covariance matrix slide 49
V2=np.diag(np.zeros(n)+1)
mu1=1.0 # forgetting factor slide 50
mu2=0.99
data = kvs.map(lambda x: np.array(ast.literal_eval(x[1])))
# map: Returns a new DStream by applying a function to each element of DStream.
# literal_eval: This can be used for safely evaluating strings containing Python values (strings, numbers, tuples, lists, dicts, booleans, and None)
# from untrusted sources without the need to parse the values oneself.
data=data.flatMap(lambda x: [('mod1',('mod1',1.0*np.array(x))),
                            ('mod2',('mod2',1.0*np.array(x)))])
data.pprint() # prints received array
# Return a new DStream by applying a function to all elements of this DStream, and then flattening the results
#initialStateRDD = sc.parallelize([('k',([1,2,3]))])
initialStateRDD = sc.parallelize([(u'mod1', ('mod1',beta1,V1,mu1,0,0,0,0,0,0,0)),
                                  (u'mod2', ('mod2',beta2,V2,mu2,0,0,0,0,0,0,0))])
# The elements of the collection [] are copied to form a distributed dataset that can be operated on in parallel.
# ('mod1',beta1,V1,mu1,0,0,0)) == what updateFunction returns
data2=data.updateStateByKey(updateFunction,initialRDD=initialStateRDD)
# Return a new "state" DStream where the state for each key is updated by applying the given function on the previous
# state of the key and the new values for the key. This can be used to maintain arbitrary state data for each key.
#data.pprint()
data2.map(lambda x: [x[1][i] for i in [0,1,4,6,8,9,10]]).pprint() 
# prints key, beta, MSE_RLS, NMSE, proxyPrediction, errNaivePrec, errNaiveMean

In [8]:
ssc.start()


-------------------------------------------
Time: 2017-12-08 19:22:00
-------------------------------------------

-------------------------------------------
Time: 2017-12-08 19:22:00
-------------------------------------------
['mod2', array([ 0.,  0.]), 0, 0, 0, 0, 0]
['mod1', array([ 0.,  0.]), 0, 0, 0, 0, 0]

-------------------------------------------
Time: 2017-12-08 19:22:10
-------------------------------------------
('mod1', ('mod1', array([ 1.        , -0.77094046,  0.02368754,  0.34072766])))
('mod2', ('mod2', array([ 1.        , -0.77094046,  0.02368754,  0.34072766])))
('mod1', ('mod1', array([ 2.        , -0.11617189, -0.14629314,  0.5132684 ])))
('mod2', ('mod2', array([ 2.        , -0.11617189, -0.14629314,  0.5132684 ])))
('mod1', ('mod1', array([ 3.        ,  1.65946254, -0.48928035,  0.1562094 ])))
('mod2', ('mod2', array([ 3.        ,  1.65946254, -0.48928035,  0.1562094 ])))
('mod1', ('mod1', array([ 4.        , -0.3222144 ,  0.90729529,  0.7306827 ])))
('mod2', (

-------------------------------------------
Time: 2017-12-08 19:23:00
-------------------------------------------
['mod2', array([[-0.03599725],
       [ 0.1403977 ]]), array([[ 0.12717148]]), array([[-0.27469402]]), array([[-0.11973561]]), 0.38251882000000004, 0.11579502500000005]
['mod1', array([[-0.04243389],
       [ 0.15463469]]), array([[ 0.16967995]]), array([[ 1.33482568]]), array([[-0.13068061]]), 0.38251882000000004, 0.11579502500000005]

-------------------------------------------
Time: 2017-12-08 19:23:10
-------------------------------------------
('mod1', ('mod1', array([ 56.        ,  -1.20757979,   1.41293325,  -0.56652481])))
('mod2', ('mod2', array([ 56.        ,  -1.20757979,   1.41293325,  -0.56652481])))
('mod1', ('mod1', array([ 57.        ,  -0.2596798 ,   0.99227225,  -0.84519616])))
('mod2', ('mod2', array([ 57.        ,  -0.2596798 ,   0.99227225,  -0.84519616])))
('mod1', ('mod1', array([ 58.        ,   0.38912414,   1.43358774,  -0.96170958])))
('mod2', ('mo

-------------------------------------------
Time: 2017-12-08 19:24:00
-------------------------------------------
['mod2', array([[-0.43279779],
       [ 0.00533517]]), array([[ 0.10189335]]), array([[ 0.56877031]]), array([[ 0.73667721]]), 1.5968459223, 1.7038434472999999]
['mod1', array([[-0.46169122],
       [ 0.01335917]]), array([[ 0.11417168]]), array([[ 0.6463551]]), array([[ 0.77430437]]), 1.5968459223, 1.7038434472999999]

-------------------------------------------
Time: 2017-12-08 19:24:10
-------------------------------------------
('mod1', ('mod1', array([ 115.        ,   -0.41498075,    0.45919923,    0.54180242])))
('mod2', ('mod2', array([ 115.        ,   -0.41498075,    0.45919923,    0.54180242])))
('mod1', ('mod1', array([ 116.        ,    1.12905491,   -1.31459507,   -1.78389381])))
('mod2', ('mod2', array([ 116.        ,    1.12905491,   -1.31459507,   -1.78389381])))
('mod1', ('mod1', array([  1.17000000e+02,   8.08323628e-01,   1.50764196e+00,
         7.11715686

-------------------------------------------
Time: 2017-12-08 19:25:00
-------------------------------------------
['mod2', array([[-0.39947268],
       [ 0.07563797]]), array([[ 0.02994503]]), array([[ 1.7094744]]), array([[-0.15150634]]), 0.48147315509999999, 0.32553808055000005]
['mod1', array([[-0.42151183],
       [ 0.08309347]]), array([[ 0.03063375]]), array([[ 1.71531141]]), array([[-0.15959213]]), 0.48147315509999999, 0.32553808055000005]

-------------------------------------------
Time: 2017-12-08 19:25:10
-------------------------------------------
('mod1', ('mod1', array([  1.75000000e+02,  -1.32806260e-01,   7.20667141e-01,
        -8.48987193e-01])))
('mod2', ('mod2', array([  1.75000000e+02,  -1.32806260e-01,   7.20667141e-01,
        -8.48987193e-01])))
('mod1', ('mod1', array([ 176.        ,    0.89995901,    0.56642592,   -1.41264782])))
('mod2', ('mod2', array([ 176.        ,    0.89995901,    0.56642592,   -1.41264782])))
('mod1', ('mod1', array([ 177.        ,   -0

-------------------------------------------
Time: 2017-12-08 19:26:40
-------------------------------------------
['mod2', array([[-0.3895624],
       [ 0.0627045]]), array([[ 0.03809163]]), array([[ 3.36230431]]), array([[-0.91648382]]), -0.33725801, -1.9949094550000002]
['mod1', array([[-0.40349157],
       [ 0.06381049]]), array([[ 0.03322998]]), array([[ 2.37416241]]), array([[-0.94795005]]), -0.33725801, -1.9949094550000002]

-------------------------------------------
Time: 2017-12-08 19:26:50
-------------------------------------------

-------------------------------------------
Time: 2017-12-08 19:26:50
-------------------------------------------
['mod2', array([[-0.3895624],
       [ 0.0627045]]), array([[ 0.03809163]]), array([[ 3.36230431]]), array([[-0.91648382]]), -0.33725801, -1.9949094550000002]
['mod1', array([[-0.40349157],
       [ 0.06381049]]), array([[ 0.03322998]]), array([[ 2.37416241]]), array([[-0.94795005]]), -0.33725801, -1.9949094550000002]

---------------

In [None]:
ssc.stop(stopSparkContext=False,stopGraceFully=True)

In [None]:
a=[1,2,3,4]


In [None]:
[a[i] for i in [1,0,2,3]]
