# Partitioned sensors model
This notebook trains multiple sensors models for each traffic network partition and writes to HDFS predictions and corresponding real sensor values as well as training and prediction time and finally validation error and training error.

In [1]:
import time
import math

from pyspark.sql.types import *
from pyspark.sql import Row, DataFrame
from pyspark.sql.functions import *

import tensorflow as tf
import numpy as np

from hops import hdfs
from tempfile import TemporaryFile

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
629,application_1533548649762_0075,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


In [2]:
tf.__version__

'1.8.0'

## Define Parameters

In [3]:
# Dataset Parameters
year = 2016
month = 11
partition_min = 3
direction = "backward"

root_path = "hdfs:///Projects/traffic_reginbald/processed_traffic_data/"
partition_file_path = root_path + "partitions/" + direction + "_partitions-" + str(partition_min) + "min.csv"
sensor_file_path = root_path + str(year) + "-" + str(month) + "_all-sensors-timeseries-parquet/*.parquet"
folder_path = root_path + str(year) + "-" + str(month) + "_single-sensor-30-min-supervised-parquet/"
export_path = root_path + str(year) + "-" + str(month) + "_" + direction + "_partitions-" + str(partition_min) + "-min-output/"
    
batch_size = 100
num_epochs = 100
dataset_split = 0.70
max_density = 200

# Network Parameters
past_steps = 10
future_steps = 30
lstm_units = 200

# Training Parameters
learning_rate = 0.001
display_step = 200

## Import Data

In [4]:
sensors = spark.read.parquet(sensor_file_path).columns[1:]

In [5]:
print("Number of sensors: " + str(len(sensors)))
print("First sensor: " + sensors[0])

Number of sensors: 1941
First sensor: E182N-0005-1

In [6]:
partition_schema = StructType() \
    .add('node', StringType(), False) \
    .add('partition', IntegerType(), False)

In [7]:
partitions_raw_df = spark.read.csv(
    partition_file_path, 
    sep=';', 
    schema=partition_schema,
    ignoreLeadingWhiteSpace=True,
    ignoreTrailingWhiteSpace=True,
    header=True,
    timestampFormat='yyyy/MM/dd HH:mm:ss.SSS'
)

In [8]:
@udf(StringType())
def shot_identifier(identifier):
    return identifier[:-2]

sensors_df = spark.createDataFrame(sc.parallelize([Row(identifier=s) for s in sensors]), ["identifier"]) \
    .withColumn("identifier_alt", shot_identifier("identifier"))
sensors_df.count()

1941

In [9]:
partitions_df = partitions_raw_df.alias("p").join(
    sensors_df.alias("s"),
    col("s.identifier_alt") == col("p.node"),
    "rightouter"
)
partitions_df.count()

1941

In [10]:
# These sensors are not connected to the rest of the graph and should be removed
partitions_df.where(col('p.node').isNull()).show()

+----+---------+------------+--------------+
|node|partition|  identifier|identifier_alt|
+----+---------+------------+--------------+
|null|     null|E18W-37625-1|    E18W-37625|
|null|     null|E18W-37625-2|    E18W-37625|
|null|     null|E4_A-31975-1|    E4_A-31975|
+----+---------+------------+--------------+

In [11]:
partitions_df = partitions_df.where(~col('p.node').isNull()) \
    .select(col("s.identifier").alias("identifier"), col("p.partition").alias("partition"))
partitions_df.count()

1938

In [12]:
max_partition_id = partitions_df.agg(max('partition')).collect()[0][0]

In [13]:
def load_data(sensors):
    columns = ["t-9", "t-8", "t-7", "t-6", "t-5", "t-4", "t-3", "t-2", "t-1", "t", 
               "t+1", "t+2", "t+3", "t+4", "t+5", "t+6", "t+7", "t+8", "t+9", "t+10", 
               "t+11", "t+12", "t+13", "t+14", "t+15", "t+16", "t+17", "t+18", "t+19", "t+20", 
               "t+21", "t+22", "t+23", "t+24", "t+25", "t+26", "t+27", "t+28", "t+29", "t+30"]
    
    data = np.array(np.array(spark.read.parquet(folder_path + sensors[0]).orderBy('Timestamp').select(columns).collect()))
    shape = data.shape
    data = data.reshape((shape[0], shape[1], 1))
    
    for i in range(1, len(sensors)):
        data = np.append(data, np.array(spark.read.parquet(folder_path + sensors[i]).orderBy('Timestamp').select(columns).collect()).reshape((shape[0], shape[1], 1)), 2)
    return data

## Prepare Data For Supervised Learning

In [14]:
def normalize(data):
    scale_min = 0
    scale_max = 1
    
    std = (data - 0) / (max_density - 0)
    out = std * (scale_max - scale_min) + scale_min
    return out

In [15]:
def prepare_dataset(data, n_sensors):
    data_normalized = normalize(data)
    x_dataset = data_normalized[:,:10,:]
    y_dataset = data_normalized[:,10:,:]
    
    x_dataset = np.reshape(x_dataset, (-1, past_steps, n_sensors))
    y_dataset = np.reshape(y_dataset, (-1, future_steps * n_sensors))
    
    train_size = int(len(x_dataset) * dataset_split)

    x_train = x_dataset[:train_size, :]
    x_test = x_dataset[train_size:, :]

    y_train = y_dataset[:train_size, :]
    y_test = y_dataset[train_size:, :]
    
    return x_train, y_train, x_test, y_test

## Define Neural Network

In [16]:
def define_model(n_sensors):
    inputs = tf.keras.Input(shape=(past_steps, n_sensors))

    lstm_1 = tf.keras.layers.LSTM(units=lstm_units, return_sequences=True)(inputs)
    lstm_2 = tf.keras.layers.LSTM(units=lstm_units)(lstm_1)

    dense = tf.keras.layers.Dense(
        units=500,
        activation='linear',
        kernel_constraint=tf.keras.constraints.NonNeg() 
    )(lstm_2)

    outputs = tf.keras.layers.Dense(
        units=future_steps * n_sensors
    )(dense)

    model = tf.keras.Model(inputs=inputs, outputs=outputs)

    model.compile(
        optimizer='adam',         # Optimizer to use.
        loss='mean_squared_error' # Loss function to use.
    )
    return model

## Define Model Training

In [17]:
def train_model(model, x_train, y_train):
    # Define early stopping criteria
    earlystop = tf.keras.callbacks.EarlyStopping(
        monitor='val_loss', # Quantity to be monitored.
        min_delta=0.0001,   # Minimum change to qualify as an improvement.
        patience=10,         # Number of epochs with no improvement to stop training.
        verbose=0,          # Silent
        mode='auto'         # Direction of improvement is inferred.
    )

    # Start time
    t_start = time.time()

    # Train model
    model_info = model.fit(
        x=x_train,             # Training data
        y=y_train,             # Label data
        batch_size=batch_size,         # Number of samples per gradient update
        epochs=num_epochs,             # Number of iterations over the entire dataset
        verbose=0,             # Silent
        callbacks=[earlystop], # List of callbacks to apply during training
        validation_split=0.2   # Fraction of the training data to be used as validation data
    )

    # End time
    t_end = time.time()
    
    loss = [float(x) for x in model_info.history['loss']]
    val_loss = [float(x) for x in model_info.history['val_loss']]
    training_time = (t_end - t_start)
    
    return loss, val_loss, training_time

## Define Evaluation of Model

In [18]:
def denormalize(data_scaled):
    scale_min = 0
    scale_max = 1
    
    std = data_scaled / ((scale_max - scale_min) + scale_min)
    data = (std * (max_density - 0)) + 0 
    
    return data

In [19]:
def evaluate_model(model, x_test):
    p_start = time.time()
    predictions = model.predict(x_test)
    p_end = time.time()
    
    prediction_time = (p_end - p_start)
    
    return predictions, prediction_time

## Export Data

In [20]:
def read_hdfs_file(file_path):
    fs_handle = hdfs.get_fs()
    temp_file = TemporaryFile()
    fd = fs_handle.open_file(file_path, mode='r')
    temp_file.write(fd.read())
    temp_file.seek(0)
    np_array = np.load(temp_file)
    return np_array

In [21]:
def write_hdfs_file(file_path, data):
    fs_handle = hdfs.get_fs()
    temp_file = TemporaryFile()
    np.save(temp_file, data, allow_pickle=False)
    temp_file.seek(0)

    fd = fs_handle.open_file(file_path, mode='w')
    fd.write(temp_file.read())
    fd.close()

In [22]:
def export_data(partition_id, sensors, true_values, pred_values, loss, val_loss, training_time, prediction_time):
    
    # Export partition specific data
    write_hdfs_file(export_path +"partition_"+ str(partition_id) + "/prediction_time.npy", prediction_time)
    write_hdfs_file(export_path +"partition_"+ str(partition_id) + "/training_time.npy", training_time)
    write_hdfs_file(export_path +"partition_"+ str(partition_id) + "/val_loss.npy", val_loss)
    write_hdfs_file(export_path +"partition_"+ str(partition_id) + "/loss.npy", loss)
    
    # Export data for each sensor
    for i in range(0, len(sensors)):
        write_hdfs_file(export_path + sensors[i] + "/true_values.npy", true_values[:,:,i])
        write_hdfs_file(export_path + sensors[i] + "/pred_values.npy", pred_values[:,:,i])

## Run Training and Evaluation on all partitions

In [23]:
for partition_id in range(max_partition_id + 1):
    partition_sensors = partitions_df.where(col("partition") == partition_id).rdd.map(lambda row: row["identifier"]).collect()
    data = load_data(partition_sensors)
    x_train, y_train, x_test, y_test = prepare_dataset(data, len(partition_sensors))
    model = define_model(len(partition_sensors))
    loss, val_loss, training_time = train_model(model, x_train, y_train)
    predictions, prediction_time = evaluate_model(model, x_test)
    y_values = np.reshape(np.array([denormalize(y) for y in y_test]), (-1, future_steps, len(partition_sensors)))
    pred_values = np.reshape(np.array([denormalize(y) for y in predictions]), (-1, future_steps, len(partition_sensors)))
    export_data(partition_id, partition_sensors, y_values, pred_values, loss, val_loss, training_time, prediction_time)

## Verify Export

In [24]:
read_hdfs_file(export_path +"partition_"+ str(max_partition_id) + "/prediction_time.npy")

array(9.78538299)

In [25]:
read_hdfs_file(export_path +"partition_"+ str(max_partition_id) + "/training_time.npy")

array(429.19977212)

In [26]:
read_hdfs_file(export_path +"partition_"+ str(max_partition_id) + "/val_loss.npy")

array([0.00071579, 0.00066156, 0.00070745, 0.00064122, 0.00063627,
       0.0006209 , 0.00064779, 0.00064366, 0.00065276, 0.00063779,
       0.00064346])

In [27]:
read_hdfs_file(export_path +"partition_"+ str(max_partition_id) + "/loss.npy")

array([0.00103688, 0.00070439, 0.00065093, 0.0006119 , 0.00058769,
       0.00056787, 0.00055059, 0.00053865, 0.00052679, 0.00051777,
       0.00050831])

In [28]:
partition_true = read_hdfs_file(export_path + "E4N-47465-1/true_values.npy")

In [29]:
single_true = read_hdfs_file(root_path + "2016-11_single-sensor-30-min-output/E4N-47465-1/true_values.npy")

In [30]:
single_true = np.reshape(single_true, partition_true.shape)

In [31]:
(single_true == partition_true).sum() == 352628

False

In [32]:
partition_true.shape

(12085, 30)