# Consumer notebook

You can run this notebook after running the producer  

In [1]:
from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F
from pyspark.sql.types import StructField, StringType, StructType, IntegerType, FloatType
import pandas as pd
import csv
import pandas as pd
import numpy as np
from datetime import datetime
from pyspark.streaming import StreamingContext
import time
import re, ast
import os
import glob
import traceback


We used all the available workers in our machines to distribute jobs

In [2]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--conf spark.ui.port=4040 '+\
        '--conf spark.driver.memory=2g  pyspark-shell '
    

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("StreamingAnalytics") \
    .getOrCreate()

sc = spark.sparkContext

This function creates a connection to a network socket to the producer. We may change the batch interval. The function returns the Spark context, Spark streaming context, and DStream object

In [3]:

def getDStream(spark, batch_interval=5):

    # Get Spark context
    sc = spark.sparkContext
    sc.setLogLevel("ERROR")

    #Create streaming context, with required batch interval
    ssc = StreamingContext(sc, batch_interval)

    #Checkpointing needed for stateful transforms
    ssc.checkpoint("checkpoint")
    
    # Create a DStream that represents streaming data from a network socket
    # See https://spark.apache.org/docs/latest/streaming-programming-guide.html#a-quick-example
    dstream = ssc.socketTextStream("localhost", 9999)
    
    return [sc,ssc,dstream]

##  Task 1

Read the data line by line implement a persistence model to predict the next count from the previous count. Here no need for a stateful case because the process does not imply the existence of a state. So we increment the value of t_gap by 1, indicating the next time gap. Return the same count values for all 18 sensors, as we assume that the count remains constant from one time gap to the next.

In [4]:
# Get the DStream object containing the streaming data sent by the producer notebook
[sc, ssc, dstream] = getDStream(spark=spark, batch_interval=2)
parsed_lines = dstream.map(lambda line: line.split(","))
modif_lines = parsed_lines. map (lambda line: [int(line[0]) + 1]+line[1:])
modif_lines.pprint()


In [5]:
ssc.start()

-------------------------------------------
Time: 2023-05-15 18:25:04
-------------------------------------------

-------------------------------------------
Time: 2023-05-15 18:25:06
-------------------------------------------

-------------------------------------------
Time: 2023-05-15 18:25:08
-------------------------------------------
[2, '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0']
[3, '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0']
[4, '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0']
[5, '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0']
[6, '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0']
[7, '0

-------------------------------------------
Time: 2023-05-15 18:25:32
-------------------------------------------



In [6]:
ssc.stop(stopSparkContext=False,stopGraceFully=False)

-------------------------------------------
Time: 2023-05-15 18:25:34
-------------------------------------------



## Task 2

In task 2, we employ the "stateful" method, we take into consideration previous count values along with the current t_gap. The number of previous count values we consider is determined by the 'State' variable, and its quantity depends on the value of V specified in the code.



In [7]:
def updateFunction(new_values, state): 
    ## Update the sequential estimate of sample mean of x[0] 
    
    L=len(new_values) ## size of the buffer

    if (L>0):
        #Loop over the rows of current batch

        previous_rows = state[0]
        V = state[1]

        for l in range(L):
          #prec_values will contain the previous values needed for the weighted average
          prec_values=[]
          # Current row
          value=new_values[l] 
          
          #updating the state for the next loop iteration
          new_previous_rows=previous_rows[1:]+(np.array(value[1:]),)
          
          #predicting the next value 
          t_gap=value[0]+1
          for i in range(len(previous_rows)):
              prec_values.append(previous_rows[i])
          prec_values.append(np.array(value[1:]))
          #Calculate the average of these previous count values.
          predicted_count=np.sum(prec_values, axis=0)/V
          
          print('V= ',len(previous_rows)+1)
          prediction="Time gap : "+ str(t_gap) + "  "
          for i in range(len(predicted_count)):
#     Set the value of the count at the next time gap to be equal to the calculated average        
              prediction += f"| Sensor {i+1} : {str(predicted_count[i])} "

          print(prediction)
          
        
        return (new_previous_rows, V)
    else:
        return state

In [8]:
# number of previous count values we consider is determined by the 'State' variable
V=4
state1=()
for i in range(V-1):
    state1+=(np.array([0]*18),)
state = (state1, V)
print(state)
# Transform the state into an RDD
initialStateRDD = sc.parallelize([('state', state)])

((array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]), array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]), array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])), 4)


In [9]:
[sc,ssc,dstream]=getDStream(spark=spark,batch_interval=2)
dataS = dstream.map(lambda x: np.array(ast.literal_eval(x)))
dataS=dataS.flatMap(lambda x: [('state',x)])
# conclude by applying the `updateFunction` on the Spark Streaming Dstream by means of the `updateStateByKey` operation.
updatedS=dataS.updateStateByKey(updateFunction,initialRDD=initialStateRDD)
updatedS.pprint()   


In [10]:
ssc.start()

-------------------------------------------
Time: 2023-05-15 18:25:44
-------------------------------------------
('state', ((array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]), array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]), array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])), 4))

-------------------------------------------
Time: 2023-05-15 18:25:46
-------------------------------------------
('state', ((array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]), array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]), array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])), 4))

-------------------------------------------
Time: 2023-05-15 18:25:48
-------------------------------------------
('state', ((array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]), array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]), array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])), 4))

-------------------------------------

In [11]:
ssc.stop(stopSparkContext=False,stopGraceFully=False)

-------------------------------------------
Time: 2023-05-15 18:26:12
-------------------------------------------
('state', ((array([1., 0., 0., 0., 0., 0., 1., 0., 1., 0., 0., 0., 1., 0., 0., 0., 0.,
       2.]), array([0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 1., 0., 0.,
       1.]), array([0., 0., 0., 0., 0., 0., 1., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0.,
       0.])), 4))



## Task 3 and 4

This task will predict the next values with embedding orders of 1,2,3,4  using single step of the Recursive Least Squares (with forgetting factor $\nu$).

\begin{equation*}
\begin{cases}
V_{(t)}&=\frac{1}{\nu} \left(V_{(t-1)}
-\frac{V_{(t-1)} x^T_{t} x_{t} V_{(t-1)}}{1+ x_{t} V_{(t-1)} x^T_{t}} \right)\\[3pt]
\alpha_{(t)}&= V_{(t)} x^T_{t} \\[3pt]
e&= y_{t}- x_{t} \hat{\beta}_{(t-1)}  \\[3pt]
\hat{\beta}_{(t)}&=\hat{\beta}_{(t-1)}+ \alpha_{(t)} e \\[3pt]
\end{cases}
\end{equation*}

where $V$ is the covariance matrix and $\beta$ is the set of parameters of the linear model.




In [12]:
# defining the RLS function
def RLSstep(y, x, n, beta, V, nu):
    try :
        x.shape = (1,n)
        x1 = np.append(1,x)
        x1.shape = (1, n+1)
        
        V = (1/nu) *  (V - V.dot(x1.T).dot(x1).dot(V) / (1 + float(x1.dot(V).dot(x1.T))))
        alpha = V.dot(x1.T)
        yhat = x1.dot(beta)
        err = y - yhat  
        beta = beta + alpha * err  
        return (beta, V, err, yhat)
    except Exception as e:
        print("error in rls with values : ", y, x, n, beta, V, nu)
        print(f"""
            Shapes : \n
            y : {y.shape},\n
            x : {x.shape},\n
            beta : {beta.shape}
        """)
        print(f"Error: {e}\nTraceback: {traceback.format_exc()}")
        

### Stateful processing : initial state
We use a stateful processing to update the state. 

A state is create for each sensor and embedding order in the list. If one wishes to build a model for only one embedding order for each sensor, it is possible my making a list of only one element.

The following code is scalable in terms of different embedding orders, number of sensors, and number of time-related features.
- For task 3, the only features are the recent counts
- For task 4, we also add time-related features

Therefore, if one wishes to **not** consider time-related features, it is possible to do it by setting `temporal_features` to zero.


In [13]:
# Number of temporal features => set to zero if Task 3 !!!!
temporal_features = 0 # Task 3 
# temporal_features = 2 # Task 4 (day of the week, time of the day)

s = 18
embedding_orders = [1, 2, 3, 4]

# Initializing the states
states = []
# Fixed initial values
E = np.zeros((1,1)) 
mse=np.zeros((1,1))
N=0
for i in range(s) :
    for embed in embedding_orders :
        key = (f"rls_device{i+1}_embed{embed}")

        # number of features
        n = embed + temporal_features
        beta1 = np.zeros(n+1) 
        beta1.shape = (n+1,1)

        v0 = 10 ## initialization covariance
        V1 = np.diag(np.zeros(n+1)+v0) ## initial covariance matrix 

        nu1 = 1.0 # forgetting factor

        D = np.zeros((1, embed))

        values = (beta1, V1, nu1, mse, N, D, E, embed)

        state = (key, values)
        states.append(state)


statesRDD = sc.parallelize(states)
statesRDD.count() # 18x4 = 72

72

In [14]:
def get_day_of_the_week(tgap) :
    # 0 = monday
    # 3 = thursday
    # 6 = sunday
    tgaps_per_week = 96*7
    # Computes the time gap since the beginning of the week
    tgap_mod = tgap % tgaps_per_week

    # Add the original day of the week (6th December 2018 is thursday), acts like an offset
    original_day = 3
    # Get the provisional day of the week, between 1 and 7 but doesn't contain the offset
    day = tgap_mod//96
    # Add the offset
    total = original_day + day 
    # Compute modulo 7 to have the exact day of the week
    final = total%7

    return final 

def get_hour(tgap) :
    tgap_in_day = tgap%96
    # 4 time gaps make 1 hour
    return tgap_in_day//4


In [15]:
def updateFunction(new_values, state): 
    L = len(new_values)
    if L > 0:
        try:
            beta, V, nu, sse, N, DD, E, embed = state
            # print("curr state: ", beta, V, nu, sse, N, DD, E, embed)
            # print("new values: ", new_values[0], "len :", len(new_values[0]))
            
            # Iterate over each record in the incoming batch
            for i in range(len(new_values[0])):
                # Unpack the record
                record = new_values[0][i] # record = (tgap, count)
                y = record[1] 
                timegap = record[0]

                # Extract time-related features
                day = get_day_of_the_week(timegap)
                hour = get_hour(timegap)
                
                x_counts = DD[0,:embed].tolist()
                x_temporal = [day, hour]

                # counts and temporal features
                if temporal_features != 0 :                 # Task 4
                    n = embed + temporal_features           
                    x = np.array(x_counts + x_temporal)     

                else :                                      # Task 3
                    n = embed                               
                    x = np.array(x_counts)                  
                

                # print(f"Current record : {record[0]} / current batch : {x} and measure {y}")
                # Compute RLS state update
                RL = RLSstep(y, x, n, beta, V, nu)
                
                # Update the state values using results from RLSstemp

                beta = RL[0]
                beta.shape = (n+1, 1)
                V = RL[1]
                err=RL[2]
                E = np.append(E, RL[2])
                
                # Update sum of squares - for MSE computation
                sse = sse+pow(err,2.0)

                sse= sse/(N+1.0)
                N += 1

                # Replace history by new one (removing the oldest value)
                DD = np.insert(DD[0,:-1], 0, y).reshape((1, embed))
                

            return (beta, V, nu, sse, N, DD, E, embed)

        except Exception as e:
            print("Erreur dans update with values : ", beta, V, nu, sse, N, DD, E, embed)
            print(f"Error: {e}\nTraceback: {traceback.format_exc()}")

    else:
        return state

In [16]:
[sc,ssc,dstream]=getDStream(spark=spark,batch_interval=10)

dataS = dstream.map(lambda x: np.array(ast.literal_eval(x)))
# dataS.pprint()
# Only keep one column for now
# dataS = dataS.map(lambda x : x[:,:2])
dataS = dataS.filter(lambda x: len(x) > 0)

dataS = dataS.flatMap(lambda x: [(f'rls_device{i}_embed{j}',
                                  np.hstack((x[:, 0].reshape(x.shape[0], 1),
                                             x[:, i].reshape(x.shape[0], 1))))
                                 for i in range(1, x.shape[1])
                                 for j in embedding_orders
                                 ])

updatedS=dataS.updateStateByKey(updateFunction,initialRDD=statesRDD)
# outState = updatedS.pprint()

# printing out updated values of the state
# test = updatedS.pprint()
output = updatedS.map(lambda x : f"""
    Key : {str(x[0])}>
    Embedding order : {str(x[1][7])}
    Iteration : {str(x[1][4])}
    MSE : {str(x[1][3][0][0])}
""").pprint()
output = updatedS.map(lambda x : f"""
    Key : {str(x[0])}
    Embedding order : {str(x[1][7])}
    Iteration : {str(x[1][4])}
    MSE : {str(x[1][3][0][0])}
""").saveAsTextFiles("outputs/part")

In [17]:
ssc.start()

-------------------------------------------
Time: 2023-05-15 18:27:00
-------------------------------------------

    Key : rls_device2_embed2>
    Embedding order : 2
    Iteration : 96
    MSE : 0.0


    Key : rls_device3_embed3>
    Embedding order : 3
    Iteration : 96
    MSE : 0.0


    Key : rls_device4_embed2>
    Embedding order : 2
    Iteration : 96
    MSE : 0.0


    Key : rls_device10_embed1>
    Embedding order : 1
    Iteration : 96
    MSE : 0.0


    Key : rls_device12_embed1>
    Embedding order : 1
    Iteration : 96
    MSE : 0.0


    Key : rls_device14_embed2>
    Embedding order : 2
    Iteration : 96
    MSE : 0.0


    Key : rls_device14_embed3>
    Embedding order : 3
    Iteration : 96
    MSE : 0.0


    Key : rls_device18_embed3>
    Embedding order : 3
    Iteration : 96
    MSE : 0.0


    Key : rls_device2_embed3>
    Embedding order : 3
    Iteration : 96
    MSE : 0.0


    Key : rls_device4_embed3>
    Embedding order : 3
    Iteration : 96
    MS

-------------------------------------------
Time: 2023-05-15 18:28:20
-------------------------------------------

    Key : rls_device2_embed2>
    Embedding order : 2
    Iteration : 864
    MSE : 0.0


    Key : rls_device3_embed3>
    Embedding order : 3
    Iteration : 864
    MSE : 0.0


    Key : rls_device4_embed2>
    Embedding order : 2
    Iteration : 864
    MSE : 0.002306573379905587


    Key : rls_device10_embed1>
    Embedding order : 1
    Iteration : 864
    MSE : 0.0


    Key : rls_device12_embed1>
    Embedding order : 1
    Iteration : 864
    MSE : 0.0


    Key : rls_device14_embed2>
    Embedding order : 2
    Iteration : 864
    MSE : 0.0


    Key : rls_device14_embed3>
    Embedding order : 3
    Iteration : 864
    MSE : 0.0


    Key : rls_device18_embed3>
    Embedding order : 3
    Iteration : 864
    MSE : 0.004267452257428015


    Key : rls_device2_embed3>
    Embedding order : 3
    Iteration : 864
    MSE : 0.0


    Key : rls_device4_embed3>
    Em

In [18]:
ssc.stop(stopSparkContext=False,stopGraceFully=False)

## Going from task 3 to task 4
- Set `temporal_features = 2` in the code block that initializes the states
## Going from task 4 to task 3
- Set `temporal_features = 0` in the code block that initializes the states