## Recursive Least Square model with Kafka and Spark streaming 

This notebook provides a Recursive Least Square model on streaming data coming from a Kafka producer. 

This notebook uses 
* the [Python client for the Apache Kafka distributed stream processing system](http://kafka-python.readthedocs.io/en/master/index.html) to receive messages from a Kafka cluster. 
* [Spark streaming](https://spark.apache.org/docs/latest/streaming-programming-guide.html) for processing the streaming data


### General import

In [1]:
import time
import re, ast
import numpy as np
import os
import pandas as pd
import math

### Start Spark session


In [2]:
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

os.environ['PYSPARK_SUBMIT_ARGS'] = '--conf spark.ui.port=4040 '+\
                                '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.1 '+\
                                '--conf spark.driver.memory=2g  pyspark-shell'

spark = SparkSession \
    .builder \
    .master("local[3]") \
    .appName("KafkaReceive") \
    .getOrCreate()

### Connect to Kafka server on topic persistence

In [3]:
#This function creates a connection to a Kafka stream
#You may change the topic, or batch interval
#The function returns the Spark context, Spark streaming context, and DStream object
def getKafkaDStream(spark,topic,batch_interval):

    #Get Spark context
    sc=spark.sparkContext

    #Create streaming context, with required batch interval
    ssc = StreamingContext(sc, batch_interval)

    #Checkpointing needed for stateful transforms
    ssc.checkpoint("checkpoint")
    
    #Create a DStream that represents streaming data from Kafka, for the required topic 
    dstream = KafkaUtils.createStream(ssc, "127.0.0.1:2181", "spark-streaming-consumer", {topic: 1})
    
    return [sc,ssc,dstream]


In [4]:
def getSensorsLoc(locations_file):
    """
    returns an array where each element is [sensor, x_coord, y_coord] based on a location file
    """
    sensors_loc = []
    with open(locations_file, "r") as f:
        lines = f.readlines()
        for line in lines:
            info = line.split(" ")
            sensor = int(info[0])
            x = float(info[1])
            y = float(info[2])
            sensors_loc.append([sensor, x, y])
    sensors_loc = np.array(sensors_loc)
    return sensors_loc

def getNClosestNeighbors(sensorId, sensors_loc, n):
    """
    returns a list of n closest neighbors ordered from closest to furthest to the given sensorId
    """

    index_sensor_id = np.where(sensors_loc[:,0] == sensorId)[0][0]
    x_sensor = sensors_loc[index_sensor_id, 1]
    y_sensor = sensors_loc[index_sensor_id, 2]

    neighbors = []
    distances = []
    for i in range(len(sensors_loc)):
        if i!= index_sensor_id:
            id_neighbor = sensors_loc[i,0]
            x_neighbor = sensors_loc[i,1]
            y_neighbor = sensors_loc[i,2]
            x = x_sensor - x_neighbor
            y = y_sensor - y_neighbor
            distance = math.sqrt(math.pow(x,2) + math.pow(y,2))
            neighbors.append(id_neighbor)
            distances.append(distance)
    ar_neighbors = np.array(neighbors)
    ar_distances = np.array(distances)
    inds = ar_distances.argsort()
    sorted_neighbors = ar_neighbors[inds]
    sorted_distances = ar_distances[inds]

    return sorted_neighbors[:n]

### Generate lists of 5 closest neighbors of sensor 1 and of sensor 24

In [5]:
DATA_LOCATION = "../data"
LOC = "mote_locs.txt"
data_loc = "{}/{}".format(DATA_LOCATION, LOC)
sensors_loc = getSensorsLoc(data_loc)
n = 5   #Max number of neighbors considered for all models (to synchronize with sender)
closest_neighbors_sensor_1 = getNClosestNeighbors(1, sensors_loc,n)
closest_neighbors_sensor_24 = getNClosestNeighbors(24, sensors_loc,n)

In [6]:
def convertTimeToSlots(dataframe):
    """
    Add a column 'slot' to the dataframe and divides the relative time in slots
    Also replace the 'seconds' value by the amount of seconds at the center of the slot
    :param dataframe: dataframe on which the operation are performed
    :return: modified dataframe
    """
    interval_slot = 30
    #divide data in slots of 30sec, add each slot value to each entry
    dataframe["slot"] = (dataframe["seconds"]//interval_slot).astype(int)
    #transform the seconds so that for each slot, its corresponding 'seconds' value is at the center of this slot (usefull for plots)
    dataframe["seconds"] = interval_slot*(dataframe["slot"] + dataframe["slot"]+1) / 2
    
    #Take care of the potential multiple value appearing within the same slot -> average them
    dataframe = dataframe.groupby(["slot", "SensorId"]).agg("mean")
    dataframe.reset_index(inplace=True)
    return dataframe

def fillMissingRows(dataframe, day):
    """
    Fill missing rows of the dataframe to ensure that there is a value at each time step (slot)
    so a prediction and a correction can be performed
    :param dataframe: dataframe on which the operation are performed
    :param day: relative day corresponding to the data (0 to 7)
    :return: completed dataframe
    """
    interval_slot = 30
    slots_per_day = 2880
    start_slot = day * slots_per_day
    end_slot = (day+1) * slots_per_day

    sensor_id = dataframe["SensorId"].values[0]
    missing_values = {"slot": [], "seconds": [], "SensorId": []}
    for i in range (start_slot.astype(int), end_slot.astype(int)): #nb of slots for current day
        if i not in dataframe["slot"].values:
            seconds = interval_slot*(i + i+1) / 2
            missing_values["slot"].append(i)
            missing_values["seconds"].append(seconds)
            missing_values["SensorId"].append(sensor_id)
    #Build DataFrame with missing values
    temp_missing = pd.DataFrame(missing_values)
    #Merge the two Dataframe and sort them by values of the 'slot' column
    #At this point, the temperature values are still missing -> NaN
    complete_temp = dataframe.append(temp_missing).sort_values('slot')
    #Replace NaN by values extracted from a linear method based on the neighbors
    complete_temp["Value"] = complete_temp["Value"].interpolate(limit_direction="both")
    return complete_temp


def preprocessDataFrame(dataframe, day):
    """
    Preprocess the different dataframes to add their time slots and their missing values
    :param dataframe: dataframe to be preprocessed 
    :param day: relative day corresponding to the data (0 to 7)
    :return: complete output_df and merged list of complete neighbors df
    """
    slots_per_day = 2880
    dataframe = fillMissingRows(convertTimeToSlots(dataframe), day)
    dataframe.reset_index(level=0, inplace=True)
    dataframe["slot"] = dataframe["slot"]%slots_per_day
    return dataframe



In [7]:
def extractInputsOutputs(temp_df, neighbors_df, day):
    """
    Preprocess the dataframe containing the information of the desired sensor and the dataframe of its neighbors
    :param temp_df: dataframe of desired sensor to be preprocessed 
    :param neighbors_df: dataframe of neighbors to be preprocessed 
    :param day: relative day corresponding to the data (0 to 7)
    """
    output = preprocessDataFrame(temp_df, day)
    sensors_ids = neighbors_df["SensorId"].unique()
    preprocessed_neighbors = []
    for sensor in sensors_ids:
        sub_df = neighbors_df[neighbors_df["SensorId"] == sensor]
        preprocessed_neighbors.append(preprocessDataFrame(sub_df, day-1))
    inputs = pd.concat(preprocessed_neighbors)
    return output, inputs

In [8]:
def applyOneDayRLS(output_df, input_dfs, forgetting_factor, betas, covar_matrix):
    """
    Apply the Recursive Least Square algorithm to predict the temperature of the desired sensor
    :param output_df: dataframe containing information of the desired sensor -> what we want to predict
    :param input_dfs: dataframe containing information of the neighbors -> features to learn
    :param forgetting_factor: forgetting factor
    :param betas: weights
    :param covar_matrix: covariance matrix
    """
    
    slots_per_day = 2880
    predictions = []
    for slot in range (slots_per_day):
        # truth value of current slot - next day
        output = output_df[output_df.slot == slot].Value
        # features for current slot (neighbors values) - current day
        inputs = np.asarray(input_dfs[input_dfs.slot == slot].Value)
        # compute signal
            
        signal = np.dot(inputs.T, betas)
        predictions.append(signal)
            
        # compute error and update weight
        error = output - signal
        delta_weight = covar_matrix.dot(inputs.T) * error.values[0]
        betas = betas + delta_weight
            
        # update covariance matrix
        numerator = np.dot(np.dot(np.dot(covar_matrix, inputs), inputs), covar_matrix)
        denominator = forgetting_factor + np.dot(np.dot(inputs.T, covar_matrix), inputs)
        covar_matrix = (1/forgetting_factor) * (covar_matrix - (numerator/denominator))
    return predictions, betas, covar_matrix

In [9]:
#Update function: new_values are the set of values received during the batch interval, state is the current state
#The function estimates the prediction error on the set of new values, and update the state for the persistence model
def updateFunction(new_values, state): 
    seconds_per_day = 86400
    slots_per_day = 2880

    sensorToPredict = state[0]
    betas = state[1]
    closest_neighbors = state[2]
    forgetting_factor = state[3]
    covar_matrix = state[4]
    input_prev_day = state[5]
    output_current_day = state[6]

    if len(new_values)>0 :

        #Transforms list of entries into an array
        array_values=np.array(new_values)
        day = np.floor(array_values[0][1] / 86400)
        dataframe = pd.DataFrame({"Value":array_values[:,0], "seconds":array_values[:,1],"SensorId":array_values[:,2]})

        if day < 1: #batch is the first day -> bootstrap system
            input_prev_day = dataframe[dataframe["SensorId"].isin(closest_neighbors)]
            
        else:  #Day from 2 to 8 -> make a prediction and adapt algo     
            output_day = dataframe[dataframe["SensorId"] == sensorToPredict]

            output, inputs = extractInputsOutputs(output_day, input_prev_day, day)
            predictions, betas, covar_matrix = applyOneDayRLS(output, inputs, forgetting_factor, betas, covar_matrix)

            truth = output["Value"].values.tolist()
            seconds = output["seconds"].values.tolist()

            output_current_day= [predictions,truth,seconds]
            input_prev_day = dataframe[dataframe["SensorId"].isin(closest_neighbors)]


    state=[sensorToPredict, betas, closest_neighbors, forgetting_factor, covar_matrix, input_prev_day, output_current_day]

    #state is now the last received measurement
    return (state)

In [10]:
#Helper functions

#Print number of partitions and number of records for an RDD
def printInfoRDD(rdd):
    if rdd is not None:
        print("The RDD has "+str(rdd.getNumPartitions())+" partitions")
        print("The RDD has "+str(rdd.count())+" elements")
    else:
        print("No info to provide")
        
#Save state in global Python variable
def saveState(rdd):
    global state_global
    if rdd is not None:
        data=rdd.collect()
        state_global.append(data)


In [11]:
#Initial state
n_features=5 # number of features/neighbors

forgetting_factor = 1.0
input_prev_day = None
output_current_day = None

closest_neighbors_1 = closest_neighbors_sensor_1[:n_features]
closest_neighbors_24 = closest_neighbors_sensor_24[:n_features]
covar_matrix = np.diag(np.zeros(n_features)+1)
betas = np.zeros(n_features)

state_1 = [1, betas, closest_neighbors_1, forgetting_factor, covar_matrix, input_prev_day, output_current_day]

state_24 = [24, betas, closest_neighbors_24, forgetting_factor, covar_matrix, input_prev_day, output_current_day]


#Batch interval (to be synchronized with KafkaSend)
interval=60

#This variable is used to retrieve state data (through saveState function)
state_global=[]

#Create dtsream
[sc,ssc,dstream]=getKafkaDStream(spark=spark,topic='RLS',batch_interval=interval)

#Evaluate string content (a list) and cast as float value

dstream = dstream.map(lambda x: np.array(ast.literal_eval(x[1])))

#Group by sensor id. x[3] is here the sensorToPredict (for example '1'), and x are the sensor measurement, seconds and sensorId, sensorToPredict)
dstream=dstream.flatMap(lambda x: [(x[3],x)])
dstream.foreachRDD(printInfoRDD)

initialStateRDD = sc.parallelize([(1, state_1),(24,state_24)])

print("Number of partitions for StateRDD: "+str(initialStateRDD.getNumPartitions()))

state_stream=dstream.updateStateByKey(updateFunction,initialRDD=initialStateRDD)
      
state_stream.foreachRDD(printInfoRDD)
state_stream.foreachRDD(saveState)


Number of partitions for StateRDD: 3


### Start streaming application

In [12]:
#For synchronization with receiver (for the sake of the simulation), starts at a number of seconds multiple of interval
current_time=time.time()
time_to_wait=interval-current_time%interval
time.sleep(time_to_wait)

ssc.start()

The RDD has 0 partitions
The RDD has 0 elements
The RDD has 3 partitions
The RDD has 2 elements
The RDD has 93 partitions
The RDD has 42773 elements
The RDD has 3 partitions
The RDD has 2 elements
The RDD has 134 partitions
The RDD has 23166 elements
The RDD has 3 partitions
The RDD has 2 elements
The RDD has 150 partitions
The RDD has 21619 elements
The RDD has 3 partitions
The RDD has 2 elements
The RDD has 182 partitions
The RDD has 20677 elements
The RDD has 3 partitions
The RDD has 2 elements
The RDD has 170 partitions
The RDD has 21750 elements
The RDD has 3 partitions
The RDD has 2 elements
The RDD has 166 partitions
The RDD has 21137 elements
The RDD has 3 partitions
The RDD has 2 elements
The RDD has 167 partitions
The RDD has 22543 elements
The RDD has 3 partitions
The RDD has 2 elements
The RDD has 180 partitions
The RDD has 23562 elements
The RDD has 3 partitions
The RDD has 2 elements


### Stop streaming

In [13]:
#Wait to receive all data up to day 8 before stopping
ssc.stop(stopSparkContext=False,stopGraceFully=False)

## Collect results, plot predictions, compute MSE

In [14]:
#For plots
import plotly.graph_objs as go
from plotly.offline import download_plotlyjs, init_notebook_mode, plot, iplot
init_notebook_mode()


In [15]:
def cleanStateGlobal(state_global_sensor):
    """
    get rid of the potential first empty batch information that were stored
    """
    i = 0
    #While the first state is not initialised yet (inputs are == None)
    while (state_global_sensor[i][5] is None): #Empty batch -> receiver started before sender to make sure to have first batch completely
        state_global_sensor = state_global_sensor[i+1:]
    return state_global_sensor

state_global_1 = []
state_global_24 = []
for i in range(len(state_global)):
    for j in range(len(state_global[i])):
        if state_global[i][j][0] == 1:
            state_global_1.append(state_global[i][j][1])
        else:
            state_global_24.append(state_global[i][j][1])
            
state_global_1 = cleanStateGlobal(state_global_1)
state_global_24 = cleanStateGlobal(state_global_24)

In [16]:
#Retrieve day 1 to day 8 data from the states
predictions_1, truth_1, seconds_1 = [], [], []
predictions_24, truth_24, seconds_24 = [], [], []

for i in range (1,8):
    output_1 = state_global_1[i][6]
    predictions_1.extend(output_1[0])
    truth_1.extend(output_1[1])
    seconds_1.extend(output_1[2])
    
    output_24 = state_global_24[i][6]
    predictions_24.extend(output_24[0])
    truth_24.extend(output_24[1])
    seconds_24.extend(output_24[2])


In [17]:
def plotRLSModel(sensor_id, truth, predictions, seconds, day):
    if day == "all":
        MSE = np.mean((np.array(truth)-np.array(predictions))**2)
        seconds_plot = seconds
        truth_plot = truth
        predictions_plot = predictions
        days_string = "on the 8 days"
    else:
        sec_per_day = 86400
        begin_day_sec = day*sec_per_day
        end_day_sec = (day+1) * sec_per_day
        sec_array = np.array(seconds)
        index_start = np.where(sec_array >= begin_day_sec)[0][0]
        index_end = np.where(sec_array <= end_day_sec)[0][-1] + 1
        
        MSE = np.mean((np.array(truth[index_start:index_end])-np.array(predictions[index_start:index_end]))**2)
        seconds_plot = seconds[index_start:index_end]
        truth_plot = truth[index_start:index_end]
        predictions_plot = predictions[index_start:index_end]
        days_string = "on day " + str(day+1)
        
    trace_truth = go.Scatter(
        y = truth_plot[10:],
        x = seconds_plot,
        name="Truth"
    )

    trace_predictions = go.Scatter(
        y = predictions_plot[10:],
        x = seconds_plot,
        name="Predictions"
    )

    layout= go.Layout(
        title= 'Truth and predictions for sensor ' + str(sensor_id) + ', '+days_string+'<br>RLS model <br>'+\
                'Mean square error: '+str(MSE),
        xaxis= dict(
            title= 'Time (seconds)',
        ),
        yaxis=dict(
            title= 'Temperature',
        ),
        showlegend= True
    )

    fig= go.Figure(data=[trace_truth,trace_predictions], layout=layout)
    iplot(fig)


In [19]:
plotRLSModel(1,truth_1, predictions_1, seconds_1,7)
plotRLSModel(24, truth_24, predictions_24, seconds_24,7)


In [20]:
SLOTS_PER_DAY = 2880
def computeMSERange(truth, predictions):
    index_start = 0
    all_MSE = []
    for index_end in range(index_start+1, len(truth)):
        # only consider 24h at the time
        if index_end >= SLOTS_PER_DAY - index_start:
            index_start += 1
        intermediate_MSE = np.mean((np.array(truth[index_start:index_end])-np.array(predictions[index_start:index_end]))**2)
        all_MSE.append(intermediate_MSE)
    
    return all_MSE

In [24]:
def plotMSE(sensor_id, seconds, MSE):
        
    trace_error = go.Scatter(
        y = MSE[SLOTS_PER_DAY:],
        x = seconds[SLOTS_PER_DAY:],
        name="Mean Square Error"
    )
        
    layout= go.Layout(
        title= 'Evolution of MSE for sensor ' + str(sensor_id) + ',<br>RLS',
        xaxis= dict(
            title= 'Time (seconds)',
        ),
        yaxis=dict(
            title= 'MSE',
        ),
        showlegend= True
    )
        
    fig= go.Figure(data=[trace_error], layout=layout)
    iplot(fig)

In [25]:
MSE_1 = computeMSERange(truth_1, predictions_1)
MSE_24 = computeMSERange(truth_24, predictions_24)

In [26]:
plotMSE(1, seconds_1, MSE_1)
plotMSE(24, seconds_24, MSE_24)