In [1]:
import sys
import json
import pyspark
import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql.functions import from_json, col, input_file_name
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType


In [2]:
def unmount_from_adl():
  dbutils.fs.unmount("/mnt/root")
def mount_to_adl():
  configs = {"dfs.adls.oauth2.access.token.provider.type": "ClientCredential",
             "dfs.adls.oauth2.client.id": "1001f267-f8ee-400d-a1cb-ab70c0d97884",
             "dfs.adls.oauth2.credential": "7hfZu9Ol/mjwshNcxIrAtrBshSMSWTP6ArJnUchRccw=",
             "dfs.adls.oauth2.refresh.url": "https://login.microsoftonline.com/6ee535f2-3064-4ac9-81d8-4ceb2ff790c6/oauth2/token"}
  dbutils.fs.mount(source = "adl://maintenance40.azuredatalakestore.net/",  mount_point = "/mnt/root",  extra_configs = configs)

In [3]:

def create_paths_rawdata(year,month,day,stations = []):
  path_to_daily_data = "{}/{}/{}".format(year,month,day)
  paths =[]
  for station in stations: 
    path = "mnt/root/rawdata/trackcircuits/{}/bn-maintenance40-eh/trackcircuit{}/*/{}/*/*/*".format(station, station, path_to_daily_data)
    paths.append(path)
  return paths

def read_rawdata(year, month, day, stations = []):
  # Creating paths to read
  paths = create_paths_rawdata(year = year, month = month, day = day, stations = stations)
  # Reading data from datalake
  return spark.read.format("com.databricks.spark.avro").load(paths)
    
def read_unpacked_rawdata(year, month, day, stations = []):
  
  df = read_rawdata(year = year, month = month, day = day, stations = stations)
 
  json_schema = StructType([
    StructField("currentDirectionKind", StringType(), True),
    StructField("eventAt", DoubleType(), True),
    StructField("measurement", LongType(), True),
    StructField("tcid", StringType(), True)])
    
    #Exctracting body from binary
  df = df.withColumn('body', df['body'].cast('string'))
  body = df.select("body")
  data = body.withColumn('body', from_json(col('body'), json_schema)).select('body.*')
    
  # Adding station column (this is done by choosing the fifth element in the filepath) and this approach works for all TC not in "SmallContributors"-paths
  data = data.withColumn("filename", input_file_name())    
  split_col = pyspark.sql.functions.split(data['filename'], '/')
  data = data.withColumn('Station', split_col.getItem(5))
  
  #Fixing the column names (adding columns with the same name as the old data)
  data = data.withColumn("orgTimestamp", data.eventAt)
  data = data.withColumn("Measurement", data.measurement)
  data = data.withColumn("syncTimestamp", (F.round(data.eventAt/250)*250).cast("double"))
  data = data.withColumn("TrackCircuitId", data.tcid)
  data = data.withColumn("Current", data.currentDirectionKind)
  data = data.withColumn("Date", F.lit("{}{}{}".format(year,month,day)))
  
  data = data.select("Date","Station","TrackCircuitId","Measurement","orgTimestamp", "syncTimestamp","Current")
  
  bad_data = data.filter("measurement > 32000") # A value of 32764 implies that the sensor sends out some error code and we are therefore filter these. We will have to look into it at a later period
  good_data = data.filter("measurement < 32000") # A value of 32764 implies that the sensor sends out some error code and we are therefore filter these. We will have to look into it at a later period

  return good_data, bad_data

def read_preprocessed_data(folder, year, month, day, file, stations = []):
  # Reading any of the daily preprocessed files 
  approved_files = ["preprocessed","plateau","passage"]
  if file.lower() in approved_files:  
    path = "/mnt/root/ml/trackcircuits/data/daily/{}/{}/{}/{}/{}/".format(year, month, day, folder, file)
  df = spark.read.format("com.databricks.spark.avro").option("basePath", path).load("{}".format(path))
  if stations:
    df = df.filter(df.Station.isin(stations))
  return df

def read_features_data(folder, year, month, day, stations = []):
  # Reading all daily features files
  for feature in ["feature_measurement", "feature_plateau", "feature_passage"]:
    path = "/mnt/root/ml/trackcircuits/data/daily/{}/{}/{}/{}/{}/".format(year,month,day,folder,feature)
    temp_df = spark.read.format("com.databricks.spark.avro").option("basePath", path).load(path)
    temp_df_pd = temp_df.filter(temp_df.Station.isin(stations)).toPandas()
    
    if feature == 'feature_measurement':
      df = temp_df_pd
    else:
      df = pd.merge(df, temp_df_pd, on =["Station","TrackCircuitId"], how = "left")
  
  return df

def read_threshold_fc_data(folder, filter = []):
  # Reading thresholddata for specific folder, can be used to only read specified trackcircuits or all track circuits if filter is not specified
  path = '/mnt/root/ml/trackcircuits/data/threshold_fc/{}/'.format(folder)
  if not filter: 
    df = spark.read.format("com.databricks.spark.avro").option("basePath", path).load(path)
  else:
    df = spark.read.format("com.databricks.spark.avro").option("basePath", path).load(path)
    df = df.filter(df.TrackCircuitId.isin(filter))
  
  return df

def read_normalization_data(folder):
  path = "/mnt/root/ml/trackcircuits/data/norm_matrix/{}/".format(folder)
  df = spark.read.format("com.databricks.spark.avro").option("basePath", path).load(path)
  
  return df

In [4]:
def perform_deadband(df, key, orderby, value_col):
  windowSpec = \
    Window \
      .partitionBy(key) \
      .orderBy(orderby)
  df = df.withColumn("prevval", F.lag(df[value_col]).over(windowSpec))
  df = df.filter("{} != prevval or prevval is null".format(value_col))
  df = df.drop("prevval")
  return df


def pivot_data(df, group_key, pivot_key, pivot_values, pivot_new_columns, value_col):
  df_pivot = df.groupBy(group_key).pivot(pivot_key, pivot_values).sum(value_col)
  i = 0
  for value in pivot_values:
    df_pivot = df_pivot.withColumnRenamed(value, pivot_new_columns[i])
    i += 1
  return df_pivot

def forward_fill(df, key, orderby, columns_to_ffill):
  # define the window
  windowSpec = \
    Window \
      .partitionBy(key) \
      .orderBy(orderby) \
      .rowsBetween(-sys.maxsize, 0) # sys.maxsize is utilize to start from beginning from each partition

  for column in columns_to_ffill: 
    # define the forward-filled column
    col_ffill = F.last(df[column], ignorenulls=True).over(windowSpec)
    # do the fill 
    df = df.withColumn(column,  col_ffill)
  return df

def get_info_from_next_row(df, key, orderby, value_col, new_column_name):
  windowSpec = \
    Window \
      .partitionBy(key) \
      .orderBy(orderby) \

  new_column = F.lead(df[value_col], 1).over(windowSpec)
  df = df.withColumn(new_column_name, new_column)
  return df

def get_info_from_previous_row(df, key, orderby, value_col, new_column_name):
  windowSpec = \
    Window \
      .partitionBy(key) \
      .orderBy(orderby) \

  new_column = F.lag(df[value_col], 1).over(windowSpec)
  df = df.withColumn(new_column_name, new_column)
  return df

def get_changed_values_only(df, column_one, column_two, new_column_name, generate_sk = 0):
  # This function checks if column_one and column_two have different values and return the value of column_two when that is true, else it returns None
  if generate_sk == 0:
    df = df.withColumn(new_column_name,
                       F.when(F.isnull(df[column_two]), df[column_one])
                       .when(df[column_two]==df[column_one], None)
                       .otherwise(df[column_two]))
  else: 
    df = df.withColumn(new_column_name,
                       F.when(F.isnan(df[column_two]) ,F.monotonically_increasing_id()) # First row gets first ID
                               .when(df[column_two]==df[column_one], None) # No id generated for rows following start of plateau
                               .otherwise(F.monotonically_increasing_id())) # ID generated for first measurement of new plateau
  return df

def normalize_data(df, folder):
  norm_matrix = read_normalization_data(folder)
  df = df.join(norm_matrix, on =["Station","TrackCircuitId","State"], how = "left")
  
  # Calculate normalized RC and FC values as (value - mean) / stddev
  df = df.withColumn("Measurement_FC_norm", (df["Measurement_FC"] - df["wAvgFC"]) / df["wStdFC"])
  df = df.withColumn("Measurement_RC_norm", (df["Measurement_RC"] - df["wAvgRC"]) / df["wStdRC"])

  return df

In [5]:
class TrackCircuitState:
  # Specifying states
  TC_OCCUPIED_STATE = "Occupied"
  TC_FREE_STATE = "Free"
  TC_UNKNOWN_STATE = "Unknown"
  TC_ARRIVING_STATE = 'Arriving'
  TC_DEPARTING_STATE = 'Departing'
  TC_UNCERTAIN_STATE = 'Uncertain'
  # Setting borders for RC
  BORDER_HIGH_RC = 140
  BORDER_LOW_RC = 120
  
  def set_state_rc(self, df):
    df = df.withColumn("State_RC", 
                       F.when(df["Measurement_RC"]< self.BORDER_LOW_RC, self.TC_OCCUPIED_STATE)
                       .when(df["Measurement_RC"] > self.BORDER_HIGH_RC, self.TC_FREE_STATE)
                       .otherwise(self.TC_UNCERTAIN_STATE)
                      )
    return df
    
  def set_state_fc(self, df, folder):
    threshold_fc = read_threshold_fc_data(folder)
    df = df.join(threshold_fc.select("Station","TrackCircuitId", "border_low_FC", "border_high_FC"), ["Station","TrackCircuitId"])
    df = df.withColumn("State_FC",
                       F.when(df["Measurement_FC"] < df["border_low_FC"], self.TC_FREE_STATE)
                       .when(df["Measurement_FC"] > df["border_high_FC"], self.TC_OCCUPIED_STATE)
                       .otherwise(self.TC_UNCERTAIN_STATE))
    return df
  
  def set_state_first(self, df, folder):
    
    df = self.set_state_rc(df)
    df = self.set_state_fc(df, folder)
    
    df = df.withColumn("State1",
                       F.when(df["State_RC"] == df["State_FC"], df["State_RC"])
                       .otherwise(self.TC_UNKNOWN_STATE)
                      )
    return df
  
  def set_state_final(self, df, folder, calculate_first_state = 1):
    
    if calculate_first_state == 1:
      #setting the first state
      df = self.set_state_first(df, folder)
      
    # Checking the line of combination of states to set states Arriving and Departing
    # In order to do that we need to attach information from last state and next state for each unknown state (and an unknown state can span for a (always unknown) number of points)
    # previous rows
    df = get_info_from_previous_row(df, key = ["Station","TrackCircuitId"], orderby = "Timestamp", value_col = "State1", new_column_name = "previousState")
    df = get_changed_values_only(df, column_one = "State1" , column_two = "previousState", new_column_name = 'previousPlateauState')
    df = forward_fill(df, key = ["Station","TrackCircuitId"], orderby = "Timestamp", columns_to_ffill = ["previousPlateauState"])
    # next rows
    df = get_info_from_next_row(df, key = ["Station","TrackCircuitId"], orderby = "Timestamp", value_col = "State1", new_column_name = "nextState")
    df = get_changed_values_only(df, column_one = "State1" , column_two = "nextState", new_column_name = 'nextPlateauState')
    df = forward_fill(df, key = ["Station","TrackCircuitId"], orderby = F.col("Timestamp").desc(), columns_to_ffill = ["nextPlateauState"])
    
    # Setting final state based on the sequence of states. 
    # Final states is one of the following Free, Occupied, Unknown, Arriving, Departing
    df = df.withColumn("State", 
                       # df["State"] kan only be Free, Occupied or Unknown
                       F.when((df["State1"]== self.TC_FREE_STATE) | (df["State1"] == self.TC_OCCUPIED_STATE), df["State1"]) 
                       # When an unknown is preceeded by an Free and succeded by an Occupied it is an arriving state
                       .when((df["previousPlateauState"] == self.TC_FREE_STATE) & (df["nextPlateauState"]== self.TC_OCCUPIED_STATE), self.TC_ARRIVING_STATE) 
                       # When an unknown is preceeded by an Occupied and succeded by an Free it is an Departing state
                       .when((df["previousPlateauState"] == self.TC_OCCUPIED_STATE) & (df["nextPlateauState"]== self.TC_FREE_STATE), self.TC_DEPARTING_STATE)
                       .otherwise(self.TC_UNKNOWN_STATE) 
                              )
    
    # Triming the dataset, removing the first and the last line of each track circuit to avoid null-values due to lead/lag-functions
    df = df.filter("nextState is not null and previousState is not null")

    df = df.select("Date","Station","TrackCircuitId","Timestamp","Measurement_FC","Measurement_RC","End","Deltatime","DeltatimeSeconds","DeltaWeights","State_RC","State_FC","State")
    
    return df
  
  #To be written
  # calculate_fc_thresholds(self, df, folder)
  # calculate_fc_threshold should save the thresholds
  

In [6]:
def wavg (values, weights):
  wavg = F.sum(values*weights)/F.sum(weights)
  return wavg

def wstd (values, wavg, weights): # Here we need to pass wavg as well since it dont work with a call towards function wavg
  wv = F.sum(((values-wavg)**2)*weights) / F.sum(weights)
  wstd_biased = F.sqrt(wv)  
  wstd = F.when(F.sum(weights) == 1, wstd_biased).otherwise(wstd_biased * F.sum(weights) / (F.sum(weights) - 1))
  return wstd

In [7]:
# This snippet is on function that returns a dataframe that describes plateaus. 
# As of now the following data will exist on plateau
# 1. Start time, End time, Lenght and number of observations
# 2. Minium, Maximum and weighted average of RC and FC measurement
# 3. Weighted standard deviation of the RC and FC measuremnet

def calculate_plateau_info(df):
  key = ["Station","TrackCircuitId","State","plateauId"]
  # 1.
  # Creating group and calculating basic information
  tmpGrp = df.groupby(key)
  tmpBasic = tmpGrp.agg(F.min("Timestamp").alias("Start"), # Min timestamp is start of plateau
                        F.max("End").alias("End"), # Max end (timestamp) is end of plateau
                        (F.max("End") - F.min("Timestamp")).alias("Length"), # Difference between start and stop i length of plateau. protip: This should equal sum(Deltatime) F.sum("Deltatime")
                        F.count(F.lit(1)).alias("Count") # Number of observations in the plateau
                       ) 
  # 2.
  # Calculation of min, max and weighted average
  tmpRCFC = tmpGrp.agg(F.min("Measurement_RC_norm").alias("Min_RC"), # Minimum RC value
                       F.max("Measurement_RC_norm").alias("Max_RC"), # Maximum RC value
                       wavg(df_pivot["Measurement_RC_norm"], df_pivot["DeltaWeights"]).alias("Avg_RC"),
                       F.min("Measurement_FC_norm").alias("Min_FC"), # Minimum FC value
                       F.max("Measurement_FC_norm").alias("Max_FC"), # Maximum FC value
                       wavg(df_pivot["Measurement_FC_norm"], df_pivot["DeltaWeights"]).alias("Avg_FC")
                      )

  # 3.
  # Joining average calculation on df in order to use it in calculation for weighted standard deviation
  df = df.join(tmpRCFC.select("Station","TrackCircuitId","State","plateauId","Avg_RC", "Avg_FC"),
               on = (key),
               how = "left")
  
  # Redoing the basis for norm_matrix to include weighted average 
  tmpGrp = df.groupBy(key)

  # Calculation of weighted standard deviation
  tmpStd = tmpGrp.agg(wstd(df["Measurement_RC_norm"], df["Avg_RC"], df["DeltaWeights"]).alias("Std_RC"),
                      wstd(df["Measurement_FC_norm"], df["Avg_FC"], df["DeltaWeights"]).alias("Std_FC")
                     )

  ########################
  # Joining the datasets to form the df_plateau to return
  df_plateau = tmpBasic.join(tmpRCFC, on = (key), how = "left") \
                       .join(tmpStd, on = (key), how = "left") 
                      
  return df_plateau



In [8]:
# Creating some features on daily level based on measurements
# E.g. min, max, std, and so on.

# Creation of daily dataset (one row per TC and day that exist in the data)

# 1. Unknown Ratio (both count and length)
# 2. Free average measurements RC/FC
# 3. Occupied average measurements RC/FC
# 4. Free standard deviation RC/FC
# 5. Occupied standart deviation RC/FC

def calculate_day_meas(df):
  
  TC_OCCUPIED_STATE = TrackCircuitState().TC_OCCUPIED_STATE
  TC_FREE_STATE = TrackCircuitState().TC_FREE_STATE
  TC_UNKNOWN_STATE = TrackCircuitState().TC_UNKNOWN_STATE
  TC_ARRIVING_STATE = TrackCircuitState().TC_ARRIVING_STATE
  TC_DEPARTING_STATE = TrackCircuitState().TC_DEPARTING_STATE
  TC_UNCERTAIN_STATE = TrackCircuitState().TC_UNCERTAIN_STATE

  # Defining a key for use in multiple places in this snippet
  key = ["Date","Station", "TrackCircuitId"]

  # 1.

  df_day_meas = df.groupBy(key).pivot("State",[TC_FREE_STATE, TC_OCCUPIED_STATE,TC_UNKNOWN_STATE,TC_ARRIVING_STATE,TC_DEPARTING_STATE])\
                               .agg(F.count(F.lit(1)).alias("count"),
                                    F.sum("Deltatime").alias("length")
                                   )

  df_day_meas = df_day_meas.withColumn("FREE_COUNT", df_day_meas["Free_count"])
  df_day_meas = df_day_meas.withColumn("OCCUPIED_COUNT", df_day_meas["Occupied_count"])
  df_day_meas = df_day_meas.withColumn("UNKNOWN_COUNT", df_day_meas["Unknown_count"])
  df_day_meas = df_day_meas.withColumn("ARRIVING_COUNT", df_day_meas["Arriving_count"])
  df_day_meas = df_day_meas.withColumn("DEPARTING_COUNT", df_day_meas["Departing_count"])
  df_day_meas = df_day_meas.withColumn("UNKNOWN_LENGTH", df_day_meas["Unknown_length"])
  df_day_meas = df_day_meas.withColumn("ARRIVING_LENGTH", df_day_meas["Arriving_length"])
  df_day_meas = df_day_meas.withColumn("DEPARTING_LENGTH", df_day_meas["Departing_length"])
  df_day_meas = df_day_meas.withColumn("TOTAL_COUNT", (F.coalesce(df_day_meas["FREE_COUNT"],F.lit(0)) +
                                                       F.coalesce(df_day_meas["OCCUPIED_COUNT"],F.lit(0)) + 
                                                       F.coalesce(df_day_meas["UNKNOWN_COUNT"],F.lit(0)) + 
                                                       F.coalesce(df_day_meas["ARRIVING_COUNT"],F.lit(0)) +
                                                       F.coalesce(df_day_meas["DEPARTING_COUNT"],F.lit(0))))

  df_day_meas = df_day_meas.withColumn("UNKNOWN_RATIO", df_day_meas["UNKNOWN_COUNT"]/df_day_meas["TOTAL_COUNT"])
  df_day_meas = df_day_meas.withColumn("ARRIVING_RATIO", (df_day_meas["ARRIVING_COUNT"] / df_day_meas["TOTAL_COUNT"]))
  df_day_meas = df_day_meas.withColumn("DEPARTING_RATIO", (df_day_meas["DEPARTING_COUNT"] / df_day_meas["TOTAL_COUNT"]))

  columns_included = ["Date","Station","TrackCircuitId","FREE_COUNT","OCCUPIED_COUNT","UNKNOWN_COUNT","ARRIVING_COUNT", "DEPARTING_COUNT", "TOTAL_COUNT","UNKNOWN_RATIO","ARRIVING_RATIO","DEPARTING_RATIO","UNKNOWN_LENGTH","ARRIVING_LENGTH","DEPARTING_LENGTH"]

  df_day_meas = df_day_meas.select(columns_included).fillna(0, subset = ["FREE_COUNT", "OCCUPIED_COUNT", "UNKNOWN_COUNT", "ARRIVING_COUNT", 
                                                                         "DEPARTING_COUNT", "TOTAL_COUNT", "UNKNOWN_RATIO","ARRIVING_RATIO","DEPARTING_RATIO",
                                                                         "UNKNOWN_LENGTH","ARRIVING_LENGTH","DEPARTING_LENGTH"])


  # 2.
  tmpGrp = df.filter("state = '{}'".format(TC_FREE_STATE)).groupby(key)
  tmpFree = tmpGrp.agg(wavg(df["Measurement_RC_norm"], df["Deltaweights"]).alias("RC_F_AVG_MEASUREMENT"),
                       wavg(df["Measurement_FC_norm"], df["Deltaweights"]).alias("FC_F_AVG_MEASUREMENT")
                      )

  # 3.
  tmpGrp = df.filter("state = '{}'".format(TC_OCCUPIED_STATE)).groupby(key)
  tmpOccupied = tmpGrp.agg(wavg(df["Measurement_RC_norm"], df["Deltaweights"]).alias("RC_O_AVG_MEASUREMENT"),
                           wavg(df["Measurement_FC_norm"], df["Deltaweights"]).alias("FC_O_AVG_MEASUREMENT")
                          )

  df_day_meas = df_day_meas.join(tmpFree, on = key, how = "left")
  df_day_meas = df_day_meas.join(tmpOccupied, on = key, how = "left")


  # Joining average calculation on df in order to use it in calculation for weighted standard deviation
  df = df.join(tmpFree.select("Date","Station","TrackCircuitId","RC_F_AVG_MEASUREMENT", "FC_F_AVG_MEASUREMENT"),
               on = key,
               how = "left")
  df = df.join(tmpOccupied.select("Date","Station","TrackCircuitId","RC_O_AVG_MEASUREMENT", "FC_O_AVG_MEASUREMENT"),
               on = key,
               how = "left")


  # Calculation of weighted standard deviation
  # 4. 
  tmpGrp = df.filter("state = '{}'".format(TC_FREE_STATE)).groupby(key)
  tmpFree = tmpGrp.agg(wstd(df["Measurement_RC_norm"], df["RC_F_AVG_MEASUREMENT"], df["DeltaWeights"]).alias("RC_F_STD_MEASUREMENT"),
                       wstd(df["Measurement_FC_norm"], df["FC_F_AVG_MEASUREMENT"], df["DeltaWeights"]).alias("FC_F_STD_MEASUREMENT")
                      )
  # 5.
  tmpGrp = df.filter("state = '{}'".format(TC_OCCUPIED_STATE)).groupby(key)
  tmpOccupied = tmpGrp.agg(wstd(df["Measurement_RC_norm"], df["RC_O_AVG_MEASUREMENT"], df["DeltaWeights"]).alias("RC_O_STD_MEASUREMENT"),
                           wstd(df["Measurement_FC_norm"], df["FC_O_AVG_MEASUREMENT"], df["DeltaWeights"]).alias("FC_O_STD_MEASUREMENT")
                          )

  df_day_meas = df_day_meas.join(tmpFree, on = key, how = "left")
  df_day_meas = df_day_meas.join(tmpOccupied, on = key, how = "left")
  
  return df_day_meas

