# <center>Compute Telemetry Facts</center>

In this notebook we will see how to compute the trip facts from telemetry points stored in SESA's bucket for raw telemetry. We will load telemetry from January 2019 to September 2019.

First, let's load the needed libraries.

In [1]:
# Spark required imports
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql import Window
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import when

import pandas as pd

# Access to IBM Cloud Object Storage
import ibmos2spark

# Python Utilities
from datetime import datetime

Waiting for a Spark session to start...
Spark Initialization Done! ApplicationId = app-20191126200302-0000
KERNEL_ID = 5a9ddd59-3021-41c9-8379-b8086764820d


## Service Credentials

In [2]:
# The code was removed by Watson Studio for sharing.

In [3]:
# The code was removed by Watson Studio for sharing.

## Load Auxiliary datasets

We will use the following data sets to determinate the day type and the segment road type.

In [4]:
# Segment types with speed limits
segment_type = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .load(local_cos.url('SegmentType.csv', local_bucket))
#segment_type.show(5)

feriados = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .load(local_cos.url('Feriados.csv', local_bucket))
feriados = feriados.withColumnRenamed("Año","Year")
feriados = feriados.withColumnRenamed("Fecha Real","Fecha_Feriado")
feriados = feriados.select('Year','Feriado','Fecha',F.unix_timestamp("Fecha_Feriado","dd/MM/yyyy").cast(TimestampType()).alias("Fecha_Feriado"))
feriados = feriados.withColumn("Fecha_Feriado", feriados.Fecha_Feriado.cast(DateType()))
#feriados.show(5)

## Load telemetry by month

In [4]:
period_filter = "2019-10"  # Extraer telemetría de octubre

### Telemetry Schema

In [6]:
telemetry_raw_schema = StructType([
    StructField("DeviceId", LongType(), False),
    StructField("Imei", StringType(), False),
    StructField("Latitude", DoubleType(), False),
    StructField("Longitude", DoubleType(), False),
    StructField("Altitude", DoubleType(), False),
    StructField("Heading", DoubleType(), False),
    StructField("QPoint", StringType(), False),
    StructField("Speed", DoubleType(), False),
    StructField("TripID", LongType(), False),
    StructField("MessageTime", TimestampType(), False),
    StructField("GeneratedMessageTime", TimestampType(), False),
    StructField("SegmentType", ShortType(), True),
    StructField("Address", StringType(), False),
    StructField("Mileage", DoubleType(), False),
    StructField("AlertId", ShortType(), False),
    StructField("AlertMessage", StringType(), False),
    StructField("GpsStatus", StringType(), False),
    StructField("CountNumber", StringType(), False),
    StructField("CrashCounter", StringType(), False)
])

In [None]:
cos_bucket_name = "bkt-prod-sesa-telemetry-raw-data-by-date"
cos_file_prefix = "in.lw.telemetry/dt="

rawdf = spark.read.json(cos.url(cos_file_prefix+period_filter+"*", cos_bucket_name), schema=telemetry_raw_schema)

#"{:,}".format(rawdf.count())|

In [10]:
#rawdf.count()

The following cell contains some hard filters that need to applied to the telemetry.

In [None]:
#Longitude and Latitude maximum and minimum for Ecuador
minLon=-92.0115860669999108
maxLon=-75.2272639579998952
minLat=-5.0113725789998824
maxLat=1.6643740910001550

# TripID == 0 means that the car is parked. For now, we just want the data when the car in in movement.
rawdf = rawdf.where(F.col("TripID")>0)

rawdf = rawdf.dropDuplicates(["DeviceId", "TripID","Latitude", "Longitude", "MessageTime"]) 

#Remove inconsistent lat,lon rows
rawdf = rawdf.filter( rawdf.Longitude.between(minLon, maxLon) & rawdf.Latitude.between(minLat, maxLat))

#remove recors with mileage == 0
rawdf = rawdf.filter(rawdf.Mileage>0)

#Remove data with low GPS signal and no AlertId
rawdf = rawdf.where((rawdf.GpsStatus=="A") | (rawdf.AlertId.isin(17,35,36,69,70,96,108,109,110,111)))

In [None]:
#Months Filter
startTime = datetime(2018,9,30,23,59,50)
endTime = datetime(2019,11,1,4,59,59)

## Trip Facts

In [None]:
devices_df = rawdf.filter(rawdf.GeneratedMessageTime.between(startTime, endTime))
devices_df = devices_df.select(['DeviceId', 
                                'TripID', 
                                'Latitude', 
                                'Longitude', 
                                'MessageTime', 
                                'GeneratedMessageTime', 
                                'Mileage', 
                                'AlertId', 
                                'GpsStatus',
                                'SegmentType', 
                                'CountNumber'
                               ])
devices_df.columns

In [None]:
#Trips.filter(F.col('TripID') == 15393996).show()

In [None]:
#Arreglo para hacer el cálculo del CalculatedMessageTime
telemetry_facts = devices_df.select(
    "DeviceId",
    "TripID",
    F.unix_timestamp(F.from_utc_timestamp("MessageTime", "UTC")).alias("MessageTime"),
    F.unix_timestamp(F.from_utc_timestamp("GeneratedMessageTime", "UTC")).alias("GeneratedMessageTime"),
    F.conv("CountNumber",16,10).alias("Sequence"),
    "Mileage",
    "SegmentType",
    "AlertId", 
    F.when(devices_df.AlertId==35, 1).otherwise(F.when(devices_df.AlertId==36, 3).otherwise(2)).alias("FrameOrder")
)

w1 = Window.partitionBy("TripID","DeviceId").orderBy("FrameOrder","Mileage","MessageTime").rowsBetween(0,1)
tf_2= telemetry_facts.select(
    "DeviceId",
    "TripID",
    "Mileage",
    "MessageTime",
    "GeneratedMessageTime",
    (when(F.col("AlertId") == 35,  F.greatest(F.col("GeneratedMessageTime"), F.col("MessageTime")))
    .when(F.col("AlertId") == 36,  F.least(F.col("GeneratedMessageTime"), F.col("MessageTime")))
    .otherwise(F.col("GeneratedMessageTime"))).alias("CalculatedMessageTime"),
    "SegmentType",
    "AlertId",
    "Sequence"
).orderBy("TripID","DeviceId","FrameOrder","Mileage","MessageTime")

### Trip Sum

In [None]:
#devices_df = devices_df.join(segment_type, devices_df.SegmentType == segment_type.segment_type)
dev_df = tf_2.join(segment_type, tf_2.SegmentType == segment_type.segment_type)
timeFmt = "yyyy-MM-dd'T'HH:mm:ss.SSS"

#win = Window.partitionBy(['DeviceId','TripID']).orderBy('Sequence')
win = Window.partitionBy(['DeviceId','TripID']).orderBy(['CalculatedMessageTime', 'Sequence'])

df_1=dev_df.withColumn('prev_batch', F.lag('speed_limit').over(win)) \
    .withColumn('flag', F.when(F.col('speed_limit') == F.col('prev_batch'),0).otherwise(1)) \
    .withColumn('batch_order', F.sum('flag').over(win)) \
    .drop('prev_batch', 'flag') \
    .sort(['DeviceId','TripID','MessageTime','GeneratedMessageTime'])

df_1=df_1.groupBy(['DeviceId','TripID', "batch_order","speed_limit"]).agg(F.max("Mileage").alias('Max_Mil'), F.min("Mileage").alias("Min_Mil"), 
                                                                          F.max('CalculatedMessageTime').alias("Max_Time"), F.min('CalculatedMessageTime').alias("Min_Time"), F.count("*").alias("N_Tramas")).orderBy(['DeviceId','TripID', "batch_order"])


# BAtc his the flag that anounces change in the speed_limit 
win = Window.partitionBy(['DeviceId','TripID']).orderBy('batch_order')

df_2 = df_1.withColumn('maxMileage_prevST', F.lag('Max_Mil').over(win))\
        .withColumn('maxTime_prevST', F.lag('Max_Time').over(win))\
        .withColumn('speed_limitDistance', F.when(F.col('maxMileage_prevST').isNull(),F.abs(F.col('Max_Mil') - F.col('Min_Mil'))).otherwise(F.abs(F.col('Max_Mil') - F.col('maxMileage_prevST'))))\
        .withColumn('speed_limitDuration', F.when(F.col('maxTime_prevST').isNull(),F.abs(F.col('Max_Time') - F.col('Min_Time')) )\
                    .otherwise(F.abs(F.col('Max_Time') - F.col('maxTime_prevST'))))

trip_facts=df_2.groupBy(['DeviceId','TripID', "speed_limit"]).agg(F.sum("N_tramas").alias("N_tramas"), 
                                                                  F.sum("speed_limitDistance").alias("DistanciaRecorrida"),
                                                                  F.sum("speed_limitDuration").alias("DuracionRecorrida")).orderBy(['DeviceId','TripID'])


#Order by other MT, GM, or CODE

devices_df = devices_df.join(segment_type, devices_df.SegmentType == segment_type.segment_type)

timeFmt = "yyyy-MM-dd'T'HH:mm:ss.SSS"

win = Window.partitionBy(['DeviceId','TripID']).orderBy('MessageTime','GeneratedMessageTime')

#batch_order column is to group CODE2 as per the ordered timestamp
df_1=devices_df.withColumn('prev_batch', F.lag('speed_limit').over(win)) \
    .withColumn('flag', F.when(F.col('speed_limit') == F.col('prev_batch'),0).otherwise(1)) \
    .withColumn('batch_order', F.sum('flag').over(win)) \
    .drop('prev_batch', 'flag') \
    .sort(['DeviceId','TripID','MessageTime','GeneratedMessageTime'])

df_1=df_1.groupBy(['DeviceId','TripID', "batch_order","speed_limit"]).agg(F.max("Mileage").alias('Max_Mil'), F.min("Mileage").alias("Min_Mil"), F.max('MessageTime').alias("Max_Time"), F.min('MessageTime').alias("Min_Time"), F.count("*").alias("N_Tramas")).orderBy(['DeviceId','TripID', "batch_order"])

win = Window.partitionBy(['DeviceId','TripID']).orderBy('batch_order')
df_2 = df_1.withColumn('maxMileage_prevST', F.lag('Max_Mil').over(win))\
        .withColumn('maxTime_prevST', F.lag('Max_Time').over(win))\
        .withColumn('speed_limitDistance', F.when(F.col('maxMileage_prevST').isNull(),F.col('Max_Mil') - F.col('Min_Mil')  ).otherwise(F.col('Max_Mil') - F.col('maxMileage_prevST')))\
        .withColumn('speed_limitDuration', F.when(F.col('maxTime_prevST').isNull(),(F.unix_timestamp(F.col('Max_Time'), format=timeFmt) - F.unix_timestamp(F.col('Min_Time'), format=timeFmt)) )\
                    .otherwise((F.unix_timestamp(F.col('Max_Time'), format=timeFmt) - F.unix_timestamp(F.col('maxTime_prevST'), format=timeFmt))))

trip_facts=df_2.groupBy(['DeviceId','TripID', "speed_limit"]).agg(F.sum("N_tramas").alias("N_tramas"), F.sum("speed_limitDistance").alias("DistanciaRecorrida"),F.sum("speed_limitDuration").alias("DuracionRecorrida")).orderBy(['DeviceId','TripID'])

In [None]:
trip_facts.printSchema()

In [None]:
trip_facts = trip_facts.withColumn('Distance_LV50', when(F.col("speed_limit")== 50, F.col("DistanciaRecorrida")).otherwise(0))
trip_facts = trip_facts.withColumn('Duracion_LV50', when(F.col("speed_limit")== 50, F.col("DuracionRecorrida")).otherwise(0))
trip_facts = trip_facts.withColumn('Distance_LV70', when(F.col("speed_limit")== 70, F.col("DistanciaRecorrida")).otherwise(0))
trip_facts = trip_facts.withColumn('Duracion_LV70', when(F.col("speed_limit")== 70, F.col("DuracionRecorrida")).otherwise(0))
trip_facts = trip_facts.withColumn('Distance_LV80', when(F.col("speed_limit")== 80, F.col("DistanciaRecorrida")).otherwise(0))
trip_facts = trip_facts.withColumn('Duracion_LV80', when(F.col("speed_limit")== 80, F.col("DuracionRecorrida")).otherwise(0))
trip_facts = trip_facts.withColumn('Distance_LV90', when(F.col("speed_limit")== 90, F.col("DistanciaRecorrida")).otherwise(0))
trip_facts = trip_facts.withColumn('Duracion_LV90', when(F.col("speed_limit")== 90, F.col("DuracionRecorrida")).otherwise(0))
trip_facts = trip_facts.withColumn('Distance_LV100', when(F.col("speed_limit")== 100, F.col("DistanciaRecorrida")).otherwise(0))
trip_facts = trip_facts.withColumn('Duracion_LV100', when(F.col("speed_limit")== 100, F.col("DuracionRecorrida")).otherwise(0))
trip_facts = trip_facts.groupby('DeviceId','TripID').agg(F.max("N_tramas").alias("N_tramas"), F.sum("DistanciaRecorrida").alias("DistanciaST"), F.sum("DuracionRecorrida").alias("DuracionST"), 
                                            F.sum("Distance_LV50").alias("Dist_50"), F.sum("Duracion_LV50").alias("Dur_50"), F.sum("Distance_LV70").alias("Dist_70"), 
                                            F.sum("Duracion_LV70").alias("Dur_70"), F.sum("Distance_LV80").alias("Dist_80"), F.sum("Duracion_LV80").alias("Dur_80"),
                                            F.sum("Distance_LV90").alias("Dist_90"), F.sum("Duracion_LV90").alias("Dur_90"), F.sum("Distance_LV100").alias("Dist_100"), F.sum("Duracion_LV100").alias("Dur_100")
                                           )

In [19]:
#trip_facts.write.parquet(local_cos.url("TF_V5_TripFacts"+period_filter , local_bucket), mode = 'overwrite')

### Start and End of Trips variables

The following cell are used to calculate the distance and duration of the trip.

In [None]:
# list of feriados
feriados=feriados.withColumn("Unix_Feriado", F.unix_timestamp("Fecha_Feriado")) 
listFeriado=feriados.toPandas()["Unix_Feriado"].values.tolist()

In [None]:
format = "yyyy-MM-dd HH:mm:ss"

Start_Trips = devices_df.filter(devices_df.AlertId==35).select("DeviceId", "TripID", F.greatest("GeneratedMessageTime", "MessageTime").alias("CalculatedMessageTime"), 'Mileage' ,'Latitude', "Longitude")\
    .selectExpr("DeviceId", "TripID", "CalculatedMessageTime as StartCalculatedMessageTime", 'Mileage as StartMileage', 'Latitude as Latitude_ON', "Longitude as Longitude_ON")

# Day time of the start of the trip
Start_Trips = Start_Trips.withColumn("EC_Time_ON", F.from_utc_timestamp(Start_Trips.StartCalculatedMessageTime, "Etc/GMT+5"))

# Holiday, work day, or weekend the start of the trip
Start_Trips = Start_Trips.withColumn("tiempo_1", F.unix_timestamp(Start_Trips.EC_Time_ON.cast(DateType())))
Start_Trips = Start_Trips.withColumn("Start_Holiday",F.when(Start_Trips.tiempo_1.isin(listFeriado),"Holiday").otherwise(F.when(F.date_format('EC_Time_ON', 'u') <= 5, "Weekday").otherwise("Weekend")))
#Start_Trips = Start_Trips.withColumn("Start_DayWeek", F.date_format('EC_Time_ON', 'u'))
Start_Trips = Start_Trips.drop("tiempo_1")

In [22]:
#Start_Trips.write.parquet(local_cos.url("TF_V5_StartTrips"+period_filter , local_bucket), mode = 'overwrite')

In [None]:
Finish_Trips = devices_df.filter(devices_df.AlertId==36).select("DeviceId", "TripID", F.least("GeneratedMessageTime", "MessageTime").alias("FinishCalculatedMessageTime"), 'Mileage','Latitude', "Longitude")\
    .selectExpr("DeviceId", "TripID", "FinishCalculatedMessageTime as FinishCalculatedMessageTime", 'Mileage as FinishMileage', 'Latitude as Latitude_OFF', "Longitude as Longitude_OFF")

# Day time of the end of the trip
Finish_Trips = Finish_Trips.withColumn("EC_Time_OFF", F.from_utc_timestamp(Finish_Trips.FinishCalculatedMessageTime, "Etc/GMT+5"))
Finish_Trips = Finish_Trips.withColumn("Time",F.split(F.split("EC_Time_OFF", " ")[1],":")[0])
Finish_Trips = Finish_Trips.withColumn("DayTime",F.when(F.col("Time").between(6,8),'Manana 6-9').otherwise(F.when(F.col("Time").between(9,11),'Manana 9-12').otherwise(F.when(F.col("Time").between(12,14),'Tarde 12-15').\
                                      otherwise(F.when(F.col("Time").between(15,17),'Tarde 15-18').otherwise(F.when(F.col("Time").between(18,20),'Noche 18-21').otherwise(F.when(F.col("Time").between(21,23),'Noche 21-24').\
                                      otherwise(F.when(F.col("Time").between (0,2),'Madrugada 0-3').otherwise('Madrugada 3-6'))))))))
#Finish_Trips = Finish_Trips.withColumn("FinishDaytime",F.when(F.split(F.split("EC_Time_OFF", " ")[1],":")[0].between(6, 18), "Diurno").otherwise("Nocturno"))

# Holiday, work day, or weekend the end of the trip
Finish_Trips = Finish_Trips.withColumn("tiempo_1", F.unix_timestamp(Finish_Trips.EC_Time_OFF.cast(DateType())))
Finish_Trips = Finish_Trips.withColumn("DayType",F.when(Finish_Trips.tiempo_1.isin(listFeriado),"Weekend").otherwise(F.when(F.date_format('EC_Time_OFF', 'u') <= 5, "Weekday").otherwise("Weekend")))
Finish_Trips = Finish_Trips.withColumn("DayWeek", F.date_format('EC_Time_OFF', 'u'))
Finish_Trips = Finish_Trips.withColumn("Month", F.date_format('EC_Time_OFF', 'M'))
Finish_Trips = Finish_Trips.withColumn("Date", F.date_format('EC_Time_OFF', 'dd-MM-yyyy'))
Finish_Trips = Finish_Trips.drop("tiempo_1", "Time")

In [None]:
Finish_Trips.printSchema()

In [25]:
#Finish_Trips.write.parquet(local_cos.url("TF_V5_FinishTrips"+period_filter , local_bucket), mode = 'overwrite')

In [None]:
# Join datasets 
StartEndTrips = Start_Trips.join(Finish_Trips, ["DeviceId", "TripID"], "inner")

### Distance

In [None]:
StartEndTrips = StartEndTrips.withColumn("Dist_km", StartEndTrips.FinishMileage - StartEndTrips.StartMileage)
StartEndTrips = StartEndTrips.filter((F.col("Dist_km")>0.3) & (F.col("Dist_km") < 1140))
StartEndTrips = StartEndTrips.drop("StartMileage","FinishMileage", 'Start_Holiday', 'Finish_Holiday')

### Duration

In [None]:
timeDiff = (F.unix_timestamp(StartEndTrips.FinishCalculatedMessageTime, format=timeFmt)
            - F.unix_timestamp(StartEndTrips.StartCalculatedMessageTime, format=timeFmt))

StartEndTrips = StartEndTrips.withColumn("Duration_s", timeDiff)

# time greater than 3 minutes and less than 12 hours
StartEndTrips = StartEndTrips.filter((F.col("Duration_s")> 180) & (F.col("Duration_s") < 43200))

In [None]:
Trips = trip_facts.join(StartEndTrips, ["DeviceId", "TripID"], "inner")

In [30]:
#Trips.orderBy("DeviceId", "TripID").show()

In [None]:
Trips.printSchema()

In [32]:
#Trips.withColumn('Resta', F.col("Dist_km") - F.col("DistanciaST")).orderBy('Resta', ascending = False).show()

In [None]:
Trips.write.parquet(local_cos.url("TripFacts_V5_"+period_filter , local_bucket), mode = 'overwrite')

In [8]:
Trips3 = spark.read.parquet(local_cos.url("TripFacts_V5_2019-09" , local_bucket))
Trips3.count()

624095

In [5]:
Trips2 = spark.read.parquet(local_cos.url("TripFacts_V5_"+period_filter , local_bucket))

# Mahalanobis
### Fuctions to remove outliers with Mahalanobis distance

In [6]:
Trips2.count()

647525

In [None]:
#convertir a pandas dataframe
datos_pd = Trips2.toPandas()
print('Done')
datos_pd.shape

In [None]:
import numpy as np
from scipy.stats import chi2

def maha_dist(df):
    mean = df.mean()
    S_1 = np.linalg.inv(df.cov())

    def fun(row):
        A = np.dot((row.T - mean), S_1)
        return np.dot(A, (row - mean))
    df["MAH_dist"] = df.apply(fun, axis=1)

    return df

def remove_outl(df, cols = ['Dist_km', 'Duration_h', 'DistanciaST', 'DuracionST_h']):
    try:
        if df.shape[0]>30:
            df = df.sort_values(by="TripID")
            MahalaDF_SESA_PD = df[cols]
            MahalaDF_SESA_PD = maha_dist(MahalaDF_SESA_PD[cols])
            MahalaDF_SESA_PD["TripID"]= df.TripID
            MahalaDF_SESA_PD['Prob_SESA'] = 1 - chi2.cdf(MahalaDF_SESA_PD['MAH_dist'], len(cols))
            MahalaDF_SESA_PD = MahalaDF_SESA_PD[MahalaDF_SESA_PD.Prob_SESA>0.001]
            df = df[df.TripID.isin(MahalaDF_SESA_PD.TripID)]
            return df
        else:
            print("Not enough trips to perform outlier detection")
    except:
        print("Something went wrong. Check there is a TripID identifier.")

In [None]:
#remove outliers
datos_clean = remove_outl(datos_pd, cols=['Dist_km', 'Duration_h', 'DistanciaST', 'DuracionST_h'])
datos_clean.shape

In [None]:
pd.set_option('float_format', '{:f}'.format)
datos_clean.loc[datos_clean.Duration_h<datos_clean.DuracionST_h,[ 'Duration_h', 'DistanciaST',
       'DuracionST_h', 'Dur_50_h', 'Dur_70_h', 'Dur_80_h', 'Dur_90_h',
       'Dur_100_h']].describe()

In [None]:
trips_clean_spark = sqlContext.createDataFrame(datos_clean)
#project.save_data("TF_clean_jun_jul_agos.csv", datos_clean.to_csv(index=False, header = True), overwrite=True)
#Save cleaned trip facts
name = "TripFacts_V5_clean"+period_filter
trips_clean_spark.write.parquet(path=local_cos.url(name, local_bucket), mode="overwrite", compression="none")
print("Done")