### Author: Rafael de Oliveira Magalhães

# PEMSd3 Dataset - Data Cleaning

# Download Data

All data can be downloaded on the website https://pems.dot.ca.gov/.

First, it is necessary to create an account to access the data.

After logging in, go to 'Data Clearinghouse'. Then access:

- 'Station 5-Minute' -> 'District 3'. On this page, you will find data captured by sensors from 2001 to the present moment. Each day captured by the sensors is recorded in a single .txt file. To simplify the download, it is recommended to use an extension to download multiple files automatically.
- 'Station Metadata' -> 'District 3'. To download metadata files for the sensors, which are used to generate a map of monitored roadways.

Additionally, a list with a subset of the sensors is in the PEMSd3.csv file

# Imports

In [None]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
os.environ['CUDA_VISIBLE_DEVICES'] = '-1'

import tensorflow as tf

import keras
from keras.callbacks import ReduceLROnPlateau
from keras.layers import Dense, Dropout, Activation, Flatten
from keras.layers import Conv2D, MaxPooling2D, SeparableConv2D
from keras.regularizers import l2
from keras.optimizers import SGD, RMSprop
from keras.utils import to_categorical
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.callbacks import TensorBoard
import tensorflow.keras.backend as K
from keras.metrics import Metric
from keras.utils import plot_model
from keras.layers import Add, Concatenate, Input, GlobalAveragePooling2D, Layer
from keras import models, initializers
from keras.models import Model
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error, mean_absolute_percentage_error

from spektral.datasets import TUDataset
from spektral.layers import GCNConv, GlobalSumPool, ChebConv
from spektral.data import SingleLoader, BatchLoader
from spektral.data import Graph
from spektral.data import Dataset

import statsmodels.api as sm
from statsmodels.stats.diagnostic import het_breuschpagan, acorr_ljungbox
from scipy.stats import f_oneway, f, kstest, norm, ks_2samp, kendalltau
from scipy.interpolate import interp2d, RegularGridInterpolator, RectBivariateSpline, griddata

# Helper libraries

from bokeh.io import show
from bokeh.plotting import gmap
from bokeh.models import GMapOptions
from bokeh.io import output_notebook
from bokeh.models import ColumnDataSource, HoverTool
import csv
import pandas as pd
import geopandas as gpd
import osmnx as ox
import networkx as nx
import folium
import pyproj
import math as m
import numpy as np
import random
import scipy as sp
import datetime as dt
import re
import time
import gmaps as gm
from shapely.geometry import Point, LineString
from shapely import wkt
from numba import jit, cuda
from sodapy import Socrata
from datetime import datetime
import matplotlib.pyplot as plt
import warnings as w

# Auxiliary Functions

In [None]:
def generate_dict(array: list) -> dict:
    """
        Generate a dict from a array
    """
    dictionary = {value: index for index, value in enumerate(array)}
    return dictionary

In [None]:
def dict_sort(dictt):
    return dict(sorted(dictt.items(), key=lambda item: item[1]))

In [None]:
def binary_search(element, array: list):
    lo = 0
    hi = len(array) - 1
    while lo <= hi:
        mid = lo + (hi - lo)//2
        temp = array[mid]
        if element > temp:
            lo = mid + 1
        elif element < temp:
            hi = mid - 1
        else:
            return mid
    return None

In [None]:
def create_list_datetime(initial_date: dt.datetime, length: int) -> list:
    """
        Create a list of datetime object that represents a time series

        Parameters:
        - initial_date: Initial date of the list
        - length: Expected length of the generated list
    """
    date_start = initial_date
    array_dates = []
    i = 0
    while i < length:
        array_dates.append(date_start)
        time_change = dt.timedelta(minutes=5)
        date_start += time_change
        i += 1
    return array_dates

In [None]:
def list_datetime_timedelta(initial_date: dt.datetime, final_date: dt.datetime, timedelta: int) -> list:
    """
        Create a list of datetime objects by increasing time by timedelta
    """
    date_start = initial_date
    array_dates = []
    while date_start <= final_date:
        array_dates.append(date_start)
        time_change = dt.timedelta(minutes=timedelta)
        date_start += time_change
    return array_dates

In [None]:
def normalize_matrix(matrix: np.array) -> np.array:
    normalized_arr = (matrix - np.min(matrix)) / (np.max(matrix) - np.min(matrix))
    return normalized_arr

# Data Cleaning

In [None]:
def clean_txt(archive: str, directory_initial: str, directory_destiny: str, sensors, specific_sensor=None) -> None:
    """
        Cleans texts files by deleting unnecessary columns
    """
    new_name = directory_destiny + "/" + archive[:-3] + "csv"
    og_name = directory_initial + "/" + archive
    with open(new_name, 'w+') as new_arc:
        with open(og_name, "r") as arc:
            first_line = 'Timestamp,Station,Flow,Speed\n'
            new_arc.write(first_line)
            for line in arc:
                content = line.split(",")
                try:
                    sensor = int(content[1])
                except:
                    continue
                if specific_sensor is not None:
                    if sensor == specific_sensor:
                        new_line = content[0] + "," + content[1] + "," + content[9] + "," + content[11] + "\n"
                        new_arc.write(new_line)
                else:
                    if sensor in sensors:
                        new_line = content[0] + "," + content[1] + "," + content[9] + "," + content[11] + "\n"
                        new_arc.write(new_line)

In [None]:
def through_directory(directory_initial: str, directory_csv_files: str, sensors, specific_sensor=None) -> None:
    """
        Cleans text files from a directory and creates csv files
    """
    cont = 0
    for name_archive in os.listdir(directory_initial):
        print(cont)
        if name_archive.endswith('.txt'):
            clean_txt(name_archive, directory_initial, directory_csv_files, sensors, specific_sensor)
            cont += 1

In [None]:
dict_sensors = generate_dict_sensors("Adjacency_list.csv")
list_sensors = set(dict_sensors.keys())
directory_initial = "" # fill out
directory_csv_files = "" # fill out
through_directory(directory_initial, directory_csv_files, list_sensors)

# Data Processing

## Methods - Select Sensors

In [None]:
def generate_dict_sensors(archive: str) -> dict:
    """
      Read a csv file that contains the sensors and create a sensors dictionary
    """
    dataset = pd.read_csv(archive).dropna()
    unique_stations = pd.unique(dataset['from'])
    return generate_dict(unique_stations)

In [None]:
def exclude_sensors(dict_sensors: dict, exclude_lines: list) -> dict:
    """
        Exclude invalid sensors from the sensors dictionary
    """
    new_sensors = []
    sensors = list(dict_sensors.keys())
    for i in range(len(sensors)):
        if i not in exclude_lines:
            new_sensors.append(sensors[i])
    return generate_dict(new_sensors)

## Methods - Generate Temporal Series

In [None]:
def insert_flow_matrix(dict_dates: dict, dict_sensors: dict, directory: str) -> np.array:
  """
      Insert flow data in temporal series
  """
  matrix = np.full((len(dict_sensors), len(dict_dates)), np.nan)
  for name_archive in os.listdir(directory):
    if name_archive.endswith('.csv'):
        print(name_archive)
        archive = directory + "/" + name_archive
        dataset = pd.read_csv(archive)
        for i in range(len(dataset)):
          line = dataset.iloc[i]
          time = line['Timestamp']
          sensor = line['Station']
          try:
            ii = dict_sensors[sensor]
          except:
            continue
          format = "%m/%d/%Y %H:%M:%S"
          date = dt.datetime.strptime(time, format)
          j = dict_dates[date]
          matrix[ii][j] = line['Flow']
  return matrix

In [None]:
def insert_flow_list(dict_dates: dict, directory: str) -> np.array:
  """
      Insert flow data in temporal series
  """
  matrix = np.full((len(dict_dates)), np.nan)
  for name_archive in os.listdir(directory):
    if name_archive.endswith('.csv'):
        print(name_archive)
        archive = directory + "/" + name_archive
        dataset = pd.read_csv(archive)
        for i in range(len(dataset)):
          line = dataset.iloc[i]
          time = line['Timestamp']
          format = "%m/%d/%Y %H:%M:%S"
          date = dt.datetime.strptime(time, format)
          j = dict_dates[date]
          matrix[j] = line['Flow']
  return matrix

In [None]:
directory_csv_files = "" # fill out
initial_date = dt.datetime(2023, 1, 1, 0, 0, 0)
final_date = dt.datetime(2023, 8, 31, 23, 55, 0)
list_datetime = list_datetime_timedelta(initial_date, final_date, 5)
dict_dates = generate_dict(list_datetime)
matrix_flow = insert_flow_matrix(dict_dates, dict_sensors, directory_csv_files)

In [None]:
np.save("matrix_pemsd3.npy", matrix_flow) 

### Methods - Interpolation

In [None]:
def avaliate_nan_values(matrix: np.array) -> tuple:
    """
        Evaluate the amount of NaN values, of non NaN values, the percentage of NaN values and the list of NaN lines
    """
    lin, col = matrix.shape
    nan_lines = pd.DataFrame(columns=['NaN Values', 'Percentage'])
    count_nan = 0
    total = lin * col
    for i in range(lin):
        line = matrix[i]
        known_indexes = np.arange(len(line))[~np.isnan(line)]
        if len(known_indexes) == 0:
            nan_lines.loc[i] = [m.inf, m.inf]
            total -= col
            continue
        unknown_indexes = np.arange(len(line))[np.isnan(line)]
        count_nan += len(unknown_indexes)
        nan_lines.loc[i] = [len(unknown_indexes), len(unknown_indexes)/col * 100]
    return (count_nan, total, count_nan/total * 100, nan_lines)

In [None]:
def avaliate_nan_values_list(matrix: np.array) -> tuple:
    """
        Evaluate the amount of NaN values, of non NaN values, the percentage of NaN values and the list of NaN lines
    """
    length = len(matrix)
    nan_lines = pd.DataFrame(columns=['NaN Values', 'Percentage'])
    known_indexes = np.arange(length)[~np.isnan(matrix)]
    if len(known_indexes) == 0:
        nan_lines.loc[0] = [m.inf, m.inf]
    else:
        unknown_indexes = np.arange(length)[np.isnan(matrix)]
        nan_lines.loc[0] = [len(unknown_indexes), len(unknown_indexes)/length * 100]
    return nan_lines

In [None]:
def interpolate_list(matrix: np.array) -> tuple:
    """
        Interpolate a numpy array to fill the NaN values and exclude the NaN lines
    """
    length = len(matrix)
    known_indexes = np.arange(length)[~np.isnan(matrix)] 
    # Find the index of null values (NaN)
    unknown_indexes = np.arange(length)[np.isnan(matrix)]
    # Use the interp function to calculate estimated values for NaN
    estimated_values = np.interp(unknown_indexes, known_indexes, matrix[~np.isnan(matrix)])
    # Replace NaN values with estimated values
    matrix[unknown_indexes] = estimated_values
    return matrix

In [None]:
def interpolate_matrix(matrix: np.array) -> tuple:
    """
        Interpolate a numpy array to fill the NaN values and exclude the NaN lines
    """
    lin, col = matrix.shape
    exclude_lines = []
    for i in range(lin):
        line = matrix[i]
        known_indexes = np.arange(len(line))[~np.isnan(line)]
        if len(known_indexes) == 0:
            exclude_lines.append(i)
            continue
        # Find the index of null values (NaN)
        unknown_indexes = np.arange(len(line))[np.isnan(line)]
        if len(unknown_indexes) == 0:
            continue
        # Use the interp function to calculate estimated values for NaN
        estimated_values = np.interp(unknown_indexes, known_indexes, line[~np.isnan(line)])
        # Replace NaN values with estimated values
        line[unknown_indexes] = estimated_values
        matrix[i] = line
    return (matrix, exclude_lines)

### Methods - Transition Matrix

In [None]:
def generate_transition_matrix(dict_sensors,exclude_lines):
  matrix = np.zeros((len(dict_sensors),len(dict_sensors)))
  graph_csv = pd.read_csv("Adjacency_list.csv")
  graph_csv = graph_csv.dropna()
  for i in range(len(graph_csv)):
    line = graph_csv.iloc[i]
    #if line['from'] in exclude_lines or line['to'] in exclude_lines:
        #continue
    try:
        ii = dict_sensors[int(line['from'])]
        j = dict_sensors[int(line['to'])]
    except:
        continue
    matrix[ii][j] = line['distance']
  return matrix

In [None]:
def max_avg(dict_sensors: dict, directory_initial: str) -> tuple:
  list_max = np.zeros(len(dict_sensors))
  list_count = np.zeros(len(dict_sensors))

  for i in range(len(list_max)):
    list_max[i] = -m.inf

  for name_archive in os.listdir(directory_initial):
    if name_archive.endswith('.csv'):
      print(name_archive)
      archive = directory_initial + "/" + name_archive
      dataset = pd.read_csv(archive)
      dataset = dataset.dropna(subset=['Speed'])
      for i in range(len(dataset)):
        line = dataset.iloc[i]
        sensor = line['Station']
        speed = line['Speed']
        try:
            index = dict_sensors[sensor]
        except:
            continue
        list_count[index] += 1
        if speed > list_max[index]:
          list_max[index] = speed
  return (list_max, list_count)

In [None]:
def avg_speed(dict_sensors: dict, list_count: list, directory_initial: str) -> np.array:
  """
      Generate a numpy array of average speed for each node
  """
  list_avg = np.zeros(len(dict_sensors), dtype=float)
  for name_archive in os.listdir(directory_initial):
    if name_archive.endswith('.csv'):
      print(name_archive)
      archive = directory_initial + "/" + name_archive
      dataset = pd.read_csv(archive)
      dataset = dataset.dropna(subset=['Speed'])
      for i in range(len(dataset)):
        line = dataset.iloc[i]
        sensor = line['Station']
        speed = line['Speed']
        try:
            index = dict_sensors[sensor]
        except:
            continue
        list_avg[index] += speed / list_count[index]
  return list_avg

In [None]:
def interpolate_values(listt: np.array, value) -> np.array:
    """
      Replace values equals 'value' by the mean of the other values
    """
    sum_values = 0.0
    count = 0
    list_index = []
    for i in range(len(listt)):
        elem = listt[i]
        if elem != value:
            sum_values += elem
            count += 1
        else:
            list_index.append(i)
    for elem in list_index:
        listt[elem] = sum_values/count
    return listt

In [None]:
def transition_matrix_definitive(matrix_og: np.array, exclude_lines: list) -> np.array:
    """
        Exclude sensors (lines and columns) from the transition/adjacency matrix
    """
    lin, col = matrix_og.shape
    for i in range(lin):
        for j in range(len(exclude_lines)):
            jj = exclude_lines[j]
            elem = matrix_og[i][jj]
            if elem != 0.0:
                for k in range(col):
                    val = matrix_og[jj][k]
                    if val != 0.0 and matrix_og[i][k] == 0.0:
                        matrix_og[i][k] = val
    matrix_og = np.delete(matrix_og, exclude_lines, axis=0)
    matrix_og = np.delete(matrix_og, exclude_lines, axis=1)
    return matrix_og

In [None]:
def definitive_transition_matrix(matrix: np.array, list_max: np.array, list_avg: np.array) -> np.array:
    """
        Fill the transition matrix

        Args:
        - matrix: The transition matrix
        - list_max: The list of max speed for each node
        - list_avg: The list of average speed for each node
    """
    lin, col = matrix.shape
    print(lin, col)
    print(len(list_max), len(list_avg))
    for i in range(lin):
        count = 0
        for j in range(col):
            if matrix[i][j] != 0.0 and i != j:
                count += 1

        if count == 0:
            matrix[i][i] = 1
            print(i)
            continue

        matrix[i][i] = (list_max[i] - list_avg[i])/list_avg[i]

        for j in range(col):
            if matrix[i][j] != 0.0 and i != j:
                matrix[i][j] = (1 - matrix[i][i])/count
    return matrix

### Generate Transition Matrix

**Load Transition Matrix**

In [None]:
# Sensors without data
exclude_lines = [4, 78, 85, 198, 260, 316, 330, 331, 332, 333, 334, 338, 339]

In [None]:
dict_sensors = generate_dict_sensors("Adjacency_list.csv")

In [None]:
transition_matrix = generate_transition_matrix(dict_sensors, [])

In [None]:
transition_matrix = transition_matrix_definitive(transition_matrix, exclude_lines)

**Load Speed Data**

In [None]:
dataf = pd.read_csv('max_speed.csv')

In [None]:
df_avg = pd.read_csv('avg_speed.csv')
list_avg = df_avg['Avg Speed'].values

In [None]:
list_max = dataf['Max Speed'].values
list_count = dataf['Count'].values

In [None]:
list_max = interpolate_values(list_max, -m.inf)
list_avg = interpolate_values(list_avg, 0)

**Definitive Transition Matrix**

In [None]:
matrix = definitive_transition_matrix(transition_matrix, list_max, list_avg)

In [None]:
matrix_sparse = sp.sparse.csr_matrix(matrix)

# Methods - Temporal Sparsity

In [None]:
def remove_data(matrix: np.array, interval: int) -> np.array:
    """
        Removes columns from a temporal series
    """
    lin, col = matrix.shape
    list_index = np.zeros(col) == 1
    i = 0
    while i < col:
        list_index[i] = True
        i += interval
    return matrix[:,list_index]

# Methods - Remove Data for Interpolation

In [None]:
def create_nan_columns(matrix: np.array, interval: int) -> np.array:
    """
        Creates NaN columns into a temporal series
    """
    lin, col = matrix.shape
    nan_column = np.full((lin), np.nan)
    i = 0
    while i < col:
        if (i % (interval + 1) != 0):
            matrix[:, i] = nan_column
        i += 1
    return matrix

In [None]:
def remove_random_data(matrix: np.array, probability: float, length) -> np.array:
    """
        Replaces values to NaN
    """
    lin, col = matrix.shape
    for i in range(lin):
        for j in range(length):
            prob = random.random()
            if prob < probability:
                matrix[i][j] = np.nan
    return matrix

# Methods - Sensor Sparsity

In [None]:
def random_index(length: int, probability: float) -> np.array:
    """
        Creates a boolean array of sensors that will be excluded
    """
    array = np.ones((length), dtype=int)
    for i in range(length):
        prob = random.random()
        if prob < probability:
            array[i] = 0
    return array

In [None]:
def remove_sensors(matrix: np.array, list_index: np.array) -> np.array:
    """
        Remove sensors from a adjacency matrix
    """
    lin, col = matrix.shape

    for i in range(len(list_index)):
        if list_index[i] == 0:
            adj_i = []
            for j in range(col):
                if matrix[i][j] != 0.0:
                    adj_i.append(j)

            for j in range(lin):
                if matrix[j][i] != 0.0:
                    for k in adj_i:
                        matrix[j][k] = 1

    boolean_array = list_index == 1
    matrix = matrix[:,boolean_array]
    return matrix[boolean_array,:]

In [None]:
def update_data_matrix(matrix: np.array, list_index: np.array) -> np.array:
    """
        Remove sensors from the temporal series
    """
    list_index2 = []
    for i in range(len(list_index)):
        if list_index[i] == 0:
            list_index2.append(i)
    return np.delete(matrix, list_index2, axis=0)

In [None]:
def remove_sensors_list(listt: np.array, list_index: np.array) -> np.array:
    """
        Remove sensors from a list
    """
    boolean_array = list_index == 1
    return listt[boolean_array]