# Create input data for GraphWaveNet



In [1]:
import datetime
import yaml
import numpy as np
import pandas as pd
import os
import pickle
import json

# We want to import stuff from parent directory
import sys
import os
sys.path.append(os.path.join(sys.path[0], '..'))

from src import db_requests
from src import data_preperation



## Traffic speed input data creation

In [2]:
##### CONFIGURATIONS #####

with open("../db.yaml", 'r') as dbfi:
    db_credentials = yaml.safe_load(dbfi)

## DATA ##
source_city = "wolfsburg"
target_city = "braunschweig"

# Note: we get all data and split later
train_date_begin = "2018-12-01"
train_date_end = "2019-01-31"
test_date_begin = "2018-12-01"
test_date_end = "2019-01-31"
hour_from = "7"
hour_to = "21"

min_measurements = 3000

mapping_save_path = f'../data/mapping_{source_city}_{target_city}.json'

output_dir = f"data/zero_st2/{source_city}_{target_city}"
output_adj_filename = f"data/sensor_graph/adj_mx_{target_city}.pkl"

In [3]:
#### Load target_source_matching ####
# Open Mapping
with open(mapping_save_path) as f:
    id_mapping = json.load(f)
# keys are imported as str. Convert to int
id_mapping = dict([int(k), v] for k, v in id_mapping.items())

FileNotFoundError: [Errno 2] No such file or directory: '../data/Ablation/mapping_notopology_wolfsburg_braunschweig.json'

In [None]:
###### FETCH DATA #####

# load traffic speed data
source_data = db_requests.getTrafficDataMinMeasurements(source_city, min_measurements, train_date_begin, train_date_end, hour_from, hour_to, db_credentials=db_credentials)
target_data = db_requests.getTrafficDataMinMeasurements(target_city, min_measurements, train_date_begin, train_date_end, hour_from, hour_to, db_credentials=db_credentials)

# Get Street Features: id, length, speed_limit as max_speed, type, source, target
streetFeats = db_requests.getStreetGraph(db_credentials)

# Merge with traffic speed data
#source_data = source_data.merge(streetFeats, left_on="id", right_on="id")
#test_data = test_data.merge(streetFeats, left_on="id", right_on="id", how="left")

In [None]:
#a = train_data.values
#np.savez_compressed("data/mobility_data", a = a)

In [None]:
# DCRNN Function needs a format like this:
#     id1  id2  id3
# t1
# t2
# t3
source_mx = pd.pivot_table(source_data, values='speed', index='time', columns=['id'], aggfunc=np.mean)
target_mx = pd.pivot_table(target_data, values='speed', index='time', columns=['id'], aggfunc=np.mean)

In [None]:
source_mx.head()

In [None]:
target_mx.head()

### Mapping 

In [None]:
# iterate through columns
# find matching street in source
# fill column with source street values
# NEW: Aggregate different streets then fill
target_source_mx = pd.DataFrame(index=target_mx.index, columns=target_mx.columns)
for road_id in target_mx.columns:
    matched_source_id = id_mapping[road_id]
    target_source_mx[road_id] = source_mx[matched_source_id].mean(axis=1)

In [None]:
target_source_mx

In [None]:
# Get an Error Value
T = target_mx.values
T_new = target_source_mx.values
# Fill NANs
T[np.isnan(T)] = 0
T_new[np.isnan(T_new)] = 0
diff_mx = target_mx.values - target_source_mx.values
np.set_printoptions(precision=3)
MAE_cols = np.mean(np.abs(diff_mx), axis = 0)
MAE = np.mean(MAE_cols)
MAE

Now this function creates: <br>
X: (N, timesteps, nodes, (speed, timeofday))<br>
Y: (N, timesteps, nodes, (speed, timeofday))
<br>
We want:<br>
X: (N, timesteps, nodes, (daily periodic, weekly periodic))<br>
Y: (N, timesteps=1, nodes, speed)
<br>
So we need to get, instead of the last 12 timesteps: <br>
4 timesteps from the last 3 days at the same time + <br>
4 timesteps from the last 3 weeks at the same time

In [None]:
# Function taken from DCRNN
def generate_train_val_test_periodic(df, test_share = 0.2, train_share = 0.7):
    if df.isnull().values.any():
        df = df.fillna(0)
    print('Warning: NaN Values in Data found. Filled them with 0!')

    # 0 is the latest observed sample.
    x_offsets = np.sort(np.arange(-2, 2, 1))
    # Predict the next one hour
    y_offsets = np.sort(np.arange(0, 1, 1)) # = [0] -> we dont use offsets for y just predict t
    
    # x: (num_samples, input_length, num_nodes, input_dim)
    # y: (num_samples, output_length, num_nodes, output_dim)
    x, y = generate_graph_seq2seq_io_data_periodic(
        df,
        x_offsets=x_offsets,
        y_offsets=y_offsets,
        add_time_in_day=False,
        add_day_in_week=False,
    )

    # Write the data into npz file.
    # num_test = 6831, using the last 6831 examples as testing.
    # for the rest: 7/8 is used for training, and 1/8 is used for validation.
    num_samples = x.shape[0]
    # Schestakov: swaped round() to int() as it was giving an error in line 86 for not being int
    num_test = int(num_samples * test_share)
    num_train = int(num_samples * train_share)
    num_val = num_samples - num_test - num_train

    # train
    x_train, y_train = x[:num_train], y[:num_train]
    # val
    x_val, y_val = (
        x[num_train: num_train + num_val],
        y[num_train: num_train + num_val],
    )
    # test
    x_test, y_test = x[-num_test:], y[-num_test:]
    
    print("x_train shape: ", x_train.shape, ", y_train shape: ", y_train.shape)
    print("x_val shape: ", x_val.shape, ", y_val shape: ", y_val.shape)
    print("x_test shape: ", x_test.shape, ", y_test shape: ", y_test.shape)
    
    return x_train, y_train,  x_val, y_val, x_test, y_test, x_offsets, y_offsets


def generate_graph_seq2seq_io_data_periodic(
        df, x_offsets, y_offsets, add_time_in_day=True, add_day_in_week=False, scaler=None):
    """
    Generate samples from
    :param df:
    :param x_offsets:
    :param y_offsets:
    :param add_time_in_day:
    :param add_day_in_week:
    :param scaler:
    :return:
    # x: (epoch_size, input_length, num_nodes, input_dim)
    # y: (epoch_size, output_length, num_nodes, output_dim)
    """

    num_samples, num_nodes = df.shape
    data = np.expand_dims(df.values, axis=-1)
    data_list = [data]
    df.index = pd.to_datetime(df.index)
    if add_time_in_day:
        time_ind = (df.index.values - df.index.values.astype("datetime64[D]")) / np.timedelta64(1, "D")
        time_in_day = np.tile(time_ind, [1, num_nodes, 1]).transpose((2, 1, 0))
        data_list.append(time_in_day)
    if add_day_in_week:
        day_in_week = np.zeros(shape=(num_samples, num_nodes, 7))
        day_in_week[np.arange(num_samples), :, df.index.dayofweek] = 1
        data_list.append(day_in_week)

    data = np.concatenate(data_list, axis=-1)


    min_t = df.index.get_loc(df.index[0] + datetime.timedelta(days=21, minutes=30))
    max_t = df.shape[0] - 3
    x_weeks, x_days, y = [], [], []
    for t in range(min_t, max_t):
        time = df.index[t] 
        # Previous 3 weeks 
        w1 = df.index.get_loc(time - datetime.timedelta(days=21))
        w2 = df.index.get_loc(time - datetime.timedelta(days=14))
        w3 = df.index.get_loc(time - datetime.timedelta(days=7))
        x_w1 = data[w1+x_offsets]
        x_w2 = data[w2+x_offsets]
        x_w3 = data[w3+x_offsets]

        x_w = np.vstack((x_w1,x_w2,x_w3))

        # Previous 3 days
        d1 = df.index.get_loc(time - datetime.timedelta(days=3))
        d2 = df.index.get_loc(time - datetime.timedelta(days=2))
        d3 = df.index.get_loc(time - datetime.timedelta(days=1))

        x_d1 = data[d1+x_offsets]
        x_d2 = data[d2+x_offsets]
        x_d3 = data[d3+x_offsets]

        x_d = np.vstack((x_d1,x_d2,x_d3))

        # Labels
        y_t = data[t + y_offsets]



        x_weeks.append(x_w)
        x_days.append(x_d)
        y.append(y_t)


    x_weeks = np.stack(x_weeks, axis=0)
    x_days = np.stack(x_days, axis=0)
    x = np.concatenate((x_weeks,x_days),axis=3)
    y = np.stack(y, axis=0)   
    return x,y

def store_data(output_dir, x_train, y_train,  x_val, y_val, x_test, y_test, x_offsets, y_offsets):
    for cat in ["train", "val", "test"]:
        _x, _y = locals()["x_" + cat], locals()["y_" + cat]
        print(cat, "x: ", _x.shape, "y:", _y.shape)
        np.savez_compressed(
            os.path.join(output_dir, "%s.npz" % cat),
            x=_x,
            y=_y,
            x_offsets=x_offsets.reshape(list(x_offsets.shape) + [1]),
            y_offsets=y_offsets.reshape(list(y_offsets.shape) + [1]),
        )

In [None]:
# There are missing timesteps in the data which made the dataprocessing not working.
# We aggregate the missing timesteps.
def aggregate_missing_timesteps(df):
    while(check_if_missing(df)):
        df = aggregate_missing_timesteps_helper(df)
    
    return df
    
    
def aggregate_missing_timesteps_helper(df):
    #### The dataframe has not all timesteps
    #### Add mmissing timesteps and average the missing values
    df2 = df.copy()
    last_timestep = df.index[0] - datetime.timedelta(hours=10, minutes=15)
    for index, row in df.iterrows(): 
        timestep_minus_15 = index - datetime.timedelta(minutes=15)
        # Check first if its outside the range 7:00 - 21:00
        if not datetime.datetime.time(timestep_minus_15) < datetime.time(hour=7):

            # Now check if it is the same as last timestep
            if not timestep_minus_15 == last_timestep:
                # change index of row
                missing_row = row
                missing_row.name = timestep_minus_15
                df2 = df2.append(missing_row)
        else:
            # If its earlier than 7, check if last timestamp of 21:00 is available
            timestep_last_day = index - datetime.timedelta(hours=10, minutes=15)
            if not last_timestep == timestep_last_day:
                missing_row = row
                missing_row.name = timestep_last_day
                df2 = df2.append(missing_row)
                

        last_timestep = index
    df2 = df2.sort_index()
    return df2

def check_if_missing(df):
    missing = False
    last_timestep = df.index[0] - datetime.timedelta(hours=10, minutes=15)
    for index, row in df.iterrows(): 
        timestep_minus_15 = index - datetime.timedelta(minutes=15)

        # Check first if its outside the range 7:00 - 21:00
        if not datetime.datetime.time(timestep_minus_15) < datetime.time(hour=7):

            # Now check if it is the same as last timestep
            if not timestep_minus_15 == last_timestep:
                missing = True
        else:
            # If its earlier than 7, check if last timestamp of 21:00 is available
            timestep_last_day = index - datetime.timedelta(hours=10, minutes=15)
            if not last_timestep == timestep_last_day:
                missing = True
        last_timestep = index
    return missing

In [None]:
target_source_mx = aggregate_missing_timesteps(target_source_mx)
target_mx = aggregate_missing_timesteps(target_mx)

In [None]:
x_train, y_train,  x_val, y_val, x_test, y_test, x_offsets, y_offsets = generate_train_val_test_periodic(target_source_mx,  test_share= 0.243, train_share=0.7)

In [None]:
x_train_rl, y_train_rl,  x_val_rl, y_val_rl, x_test_rl, y_test_rl, x_offsets2, y_offsets2 = generate_train_val_test_periodic(target_mx,  test_share= 0.243, train_share=0.7)

In [None]:
######## Safe all ##########

# For training we use x and y from target_source_mx
# For testing we use x from target_source_mx, but y from target_mx to have the real numbers

store_data(output_dir, x_train, y_train,  x_val, y_val, x_test, y_test_rl, x_offsets, y_offsets)

## Adjacency Matrix - Data creation 

In [None]:
# Now we need to get a Table with Columns: ID | ID | distance
# This is used then to calculate the adj_mx

# First get all used IDs 
id_list = list(target_mx.columns)



In [None]:
##################### DB Functions ########################
import sqlalchemy as sa

def createEngine(db_credentials):
    db_url = "postgresql://"+db_credentials["user"]+":"+db_credentials["pw"]+"@"\
             +db_credentials["host"]+"/mobility-share"

    engine = sa.create_engine(db_url)
    return engine

def mobility_share_query(db_credentials, query):
    db_url = "postgresql://" + db_credentials["user"] + ":" + db_credentials["pw"] + "@" \
                 + db_credentials["host"] + "/mobility-share"
    engine = sa.create_engine(db_url)
    #query = "select id, length, speed_limit as max_speed, type, source, target from streetgraph"
    ret = pd.read_sql(query, engine)
    return ret

def id_list_to_string(id_list):
    ret = "("
    for _id in id_list:
        ret = ret + "\'" + str(_id) + "\', "
    ret = ret[:-2] + ")"
    return ret

In [None]:
# Build the query which returns the Distance Table with selected IDs
query = "SELECT sg1.id, sg2.id, ST_Distance_Sphere(st_line_interpolate_point(sg1.geometry,0.5), st_line_interpolate_point(sg2.geometry,0.5))"\
        " FROM public.streetgraph_hannover sg1, public.streetgraph_hannover sg2"\
        " WHERE sg1.id in " + id_list_to_string(id_list) +\
        " AND sg2.id in " + id_list_to_string(id_list)

In [None]:
distance_df = mobility_share_query(db_credentials, query)

In [None]:
################ FROM DCRNN gen_adj_mx.py ###################

def get_adjacency_matrix(distance_df, sensor_ids, normalized_k=0.1):
    """

    :param distance_df: data frame with three columns: [from, to, distance].
    :param sensor_ids: list of sensor ids.
    :param normalized_k: entries that become lower than normalized_k after normalization are set to zero for sparsity.
    :return:
    """
    num_sensors = len(sensor_ids)
    dist_mx = np.zeros((num_sensors, num_sensors), dtype=np.float32)
    dist_mx[:] = np.inf
    # Builds sensor id to index map.
    #sensor_id_to_ind = {}
    #for i, sensor_id in enumerate(sensor_ids):
    #    sensor_id_to_ind[sensor_id] = i

    sensor_id_to_ind = dict(zip(sensor_ids.iloc[:,0], sensor_ids.index))

    # Fills cells in the matrix with distances.
    for row in distance_df.values:
        if row[0] not in sensor_id_to_ind or row[1] not in sensor_id_to_ind:
            continue
        dist_mx[sensor_id_to_ind[row[0]], sensor_id_to_ind[row[1]]] = row[2]

    # Calculates the standard deviation as theta.
    distances = dist_mx[~np.isinf(dist_mx)].flatten()
    std = distances.std()
    adj_mx = np.exp(-np.square(dist_mx / std))
    # Make the adjacent matrix symmetric by taking the max.
    # adj_mx = np.maximum.reduce([adj_mx, adj_mx.T])

    # Sets entries that lower than a threshold, i.e., k, to zero for sparsity.
    adj_mx[adj_mx < normalized_k] = 0
    return sensor_ids, sensor_id_to_ind, adj_mx


In [None]:
id_df =  pd.DataFrame({'ID':id_list})
sensor_ids, sensor_id_to_ind, adj_mx = get_adjacency_matrix(distance_df,id_df)

In [None]:
adj_mx

In [None]:
# Save to pickle file.
with open(output_adj_filename, 'wb') as f:
    pickle.dump([sensor_ids, sensor_id_to_ind, adj_mx], f, protocol=2)
print("Saved as " + output_adj_filename)

In [None]:
len(id_list)