# FUEL

In [1]:
from pyspark.sql import functions as F
from pyspark.sql.types import StructType,StructField, StringType, DoubleType, DateType,TimestampType

from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql import Row
from pyspark import SQLContext

import json 

from datetime import datetime, timedelta
import math
import os
from IPython.core.display import display, HTML
from datetime import datetime
import pyspark.conf as conf
conf.autoBroadcastJoinThreshold = -1 # this parameters avoids stopping the program when it takes too long
display(HTML("<style>.container { width:100% !important; }</style>"))

In [2]:
spark = SparkSession.builder.appName("TFG").getOrCreate()

# SCHEMAS

In [3]:
schema_vehicle = StructType([ \
    StructField("DEVICE_ID",StringType(),True), \
    StructField("LATITUDE",DoubleType(),True), \
    StructField("LONGITUDE",DoubleType(),True), \
    StructField("DATETIME",TimestampType(),True), \
    StructField("SPEED",DoubleType(),True), \
  ])
schema_fuel = StructType([ \
    StructField("DEVICE_ID",StringType(),True), \
    StructField("LITRES",DoubleType(),True), \
    StructField("DATETIME",TimestampType(),True)
  ])

# First look at the data

In [4]:
first_look_csv = 'data/fuel_original/fuel_20210201_20210201_0_anon.csv' 

first_look_csv = spark.read.csv(first_look_csv, header=True, inferSchema=True, sep='|', schema=schema_fuel)

first_look_csv.printSchema()
first_look_csv.show(3, truncate=False)

root
 |-- DEVICE_ID: string (nullable = true)
 |-- LITRES: double (nullable = true)
 |-- DATETIME: timestamp (nullable = true)

+---------+------------------+-----------------------+
|DEVICE_ID|LITRES            |DATETIME               |
+---------+------------------+-----------------------+
|4BF9LOXT |2413.4900000000002|2021-02-01 01:00:18.147|
|QI9IXIGJ |696.38            |2021-02-01 01:00:05.02 |
|QI9IXIGJ |696.38            |2021-02-01 01:00:06.023|
+---------+------------------+-----------------------+
only showing top 3 rows



# COMMON FUNCTIONS

In [5]:
def get_distance(lat1deg, lon1deg, lat2deg, lon2deg):
    '''
    return: distance in metres from 2 locations (lat, lon)
    '''
    if not all((lat1deg, lon1deg, lat2deg, lon2deg)):
        return 0.0
    
    #approximate radius of earth in m
    R = 6373000.0

    lat1 = math.radians(lat1deg)
    lon1 = math.radians(lon1deg)
    lat2 = math.radians(lat2deg)
    lon2 = math.radians(lon2deg)

    dlon = lon2 - lon1
    dlat = lat2 - lat1

    a = math.sin(dlat / 2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon / 2)**2
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))

    distance = R * c
    return distance

# register as a UDF 
get_distance_udf = F.udf(get_distance, DoubleType())

In [6]:
def get_day_month_year(filename):
    '''
    return: day, month and year of the filename
    '''
    dia = filename.split('_')[1][6:]
    mes = filename.split('_')[1][4:6]
    ano = filename.split('_')[1][:4]
    
    return dia, mes, ano

In [7]:
def get_delta_time(timestr): 
    '''
    return: deltatime as float in seconds from HH:MM:SS.mmm
    '''
    values = timestr.split(':')
    #Generate a timedelta
    delta = timedelta(hours=float(values[0]), minutes=float(values[1]), seconds=float(values[2]))
    #Represent in Seconds
    return delta.total_seconds()

# register as a UDF 
get_delta_time_udf = F.udf(get_delta_time, DoubleType())

In [8]:
def abs_time_delta(y,x): 
    '''
    return: duration in seconds
    '''
    if not all((x, y)):
        return 0.0
    delta = math.fabs((x-y).total_seconds())
    return delta

# register as a UDF 
abs_time_delta_udf = F.udf(abs_time_delta, DoubleType())

In [9]:
# return time - 31 seconds
mintimeudf = F.udf(lambda x: (x - timedelta(seconds=31)), TimestampType())
# return time + 31 seconds
maxtimeudf = F.udf(lambda x: (x + timedelta(seconds=31)), TimestampType())

# FUEL TREATMENT

In [10]:
def get_fuel(file, df_vehicle):
    df_fuel_initial = spark.read.csv(file, header=True,inferSchema=True, sep='|',schema=schema_fuel
                                    ).withColumnRenamed('DATETIME', 'DATETIME_F')
    df_fuel1 = df_fuel_initial.withColumn('prev_fuel', F.lag('LITRES').over(Window.partitionBy("DEVICE_ID").orderBy('DATETIME_F'))) \
                        .withColumn('diff_fuel', F.when(F.isnull(F.col('prev_fuel')),0).otherwise(F.col('LITRES')-F.col('prev_fuel'))) \
                        .withColumn('start_time', mintimeudf(F.col('DATETIME_F'))) \
                        .withColumn('end_time', maxtimeudf(F.col('DATETIME_F'))) \
                        .drop('prev_fuel')
    
    
    df_fuel2 = df_fuel1.join(df_vehicle, on=((df_vehicle.DEVICE_ID==df_fuel1.DEVICE_ID) &
                                         (df_vehicle.DATETIME_V.between(df_fuel1.start_time, df_fuel1.end_time))), how='left') \
                    .drop('start_time', 'end_time') \
                    .drop(df_vehicle.DEVICE_ID) \
                    .cache()
    #df_fuel_no_date = df_fuel2.where(F.col('DATETIME_V').isNull())
    df_fuel3 = df_fuel2.where(F.col('DATETIME_V').isNotNull())
    window_fuel = Window.partitionBy(["DEVICE_ID", "DATETIME_F"]).orderBy(F.col('diff_date').desc())
    df_fuel4 = df_fuel3.withColumn('diff_date', abs_time_delta_udf(F.col('DATETIME_F'), F.col('DATETIME_V'))) \
                    .withColumn('next_diff_date', F.lead('diff_date', default=1000).over(window_fuel)) \
                    .withColumn('result', F.when(F.col('diff_date') < F.col('next_diff_date'), 1).otherwise(0)) \
                    .filter(F.col('result') == 1) \
                    .drop('diff_date', 'next_diff_date', 'result')
    df_fuel5 = df_fuel4.withColumn('PREV_LATITUDE', F.lag('LATITUDE').over(Window.partitionBy("DEVICE_ID").orderBy('DATETIME_F'))) \
                    .withColumn('PREV_LONGITUDE', F.lag('LONGITUDE').over(Window.partitionBy("DEVICE_ID").orderBy('DATETIME_F'))) \
                    .withColumn('DISTANCE', get_distance_udf(F.col('LATITUDE'), F.col('LONGITUDE'), F.col('PREV_LATITUDE'), F.col('PREV_LONGITUDE'))) \
                    .select('DEVICE_ID','LITRES','DATETIME_F','diff_fuel','SPEED','DISTANCE') \
                    .cache()
    
    return df_fuel5

     

# SAVE FUEL

In [11]:
def save_fuel(df_fuel, output_file):
    data_fuel = df_fuel.rdd.toLocalIterator()
    primera_linea = True
    for row in data_fuel:
        if primera_linea:
            primera_linea = False
        else:
            output_file.write(', ')
        date = '{}/{}/{}'.format(row['DATETIME_F'].strftime("%Y"),row['DATETIME_F'].strftime("%m"),row['DATETIME_F'].strftime("%d"))
        output_file.write('{"device_id": "' + row['DEVICE_ID'] 
                          + '", "date": "' + date
                          + '", "diff_fuel": "' + str(row['diff_fuel'])
                          #+ '", "speed": "' + str(row['SPEED']) 
                          + '", "distance": "' + str(row['DISTANCE']) + '"}')
    
    spark.catalog.clearCache  

In [None]:
MAIN_START_TIME = datetime.now()
directory = "data/fleet_original/"
starting_time = datetime.now()
for filename in os.listdir(directory):
    # leemos csv
    if filename.endswith(".csv"):
        
        dia, mes, ano =  get_day_month_year(filename)
        
        #---------------------
        df_vehicles = spark.read.csv(directory+filename, header=True,inferSchema=True, sep='|',schema=schema_vehicle
                                    ).withColumnRenamed('DATETIME', 'DATETIME_V')
        print('{} read at {}'.format(filename, datetime.now()))

        #---------------------
        output_file = open('json_data/fuel/fuel_{}{}{}.json'.format(ano,mes,dia,ano,mes,dia), "wt")
        output_file.write('{"type": "FeatureCollection", "features": [')
        
        directory_fuel = "data/fuel_original/"
        fuel_filename = 'fuel_{}{}{}_{}{}{}_0_anon.csv'.format(ano,mes,dia,ano,mes,dia)
        df_fuel = get_fuel(directory_fuel + fuel_filename, df_vehicles)
        print('df_fuel loaded at {}'.format(datetime.now()))
        save_fuel(df_fuel, output_file)
        print('save fuel done at {}'.format(datetime.now()))
        output_file.write(']}')
        output_file.close()
        

logrecords_20210201_20210201_0_anon.csv read at 2022-02-16 15:05:41.464618
df_fuel loaded at 2022-02-16 15:05:43.399125


In [None]:
print('fuel_treatment process finished (duration = {} hours, {} minutes)'.format(
    ((datetime.now() - MAIN_START_TIME).seconds)//3600,
    (((datetime.now() - MAIN_START_TIME).seconds)//60)%60))

In [None]:
# pruebas memoria 

In [8]:
df_vehicle = spark.read.csv( "datos/logrecord_febrero/logrecords_20210201_20210201_0_anon.csv", header=True,inferSchema=True, sep='|',schema=schema_vehicle
                                    ).withColumnRenamed('DATETIME', 'DATETIME_V')
    
df_fuel_initial = spark.read.csv('datos/fuel/fuel_20210201_20210201_0_anon.csv', header=True,inferSchema=True, sep='|',schema=schema_fuel
                                ).withColumnRenamed('DATETIME', 'DATETIME_F')
df_fuel_initial.show(5)

df_fuel1 = df_fuel_initial.withColumn('prev_fuel', F.lag('LITRES').over(Window.partitionBy("DEVICE_ID").orderBy('DATETIME_F'))) \
                    .withColumn('diff_fuel', F.when(F.isnull(F.col('prev_fuel')),0).otherwise(F.col('LITRES')-F.col('prev_fuel'))) \
                    .withColumn('start_time', mintimeudf(F.col('DATETIME_F'))) \
                    .withColumn('end_time', maxtimeudf(F.col('DATETIME_F'))) \
                    .drop('prev_fuel')

df_fuel1.show(5)
df_fuel2 = df_fuel1.join(df_vehicle, on=((df_vehicle.DEVICE_ID==df_fuel1.DEVICE_ID) &
                                     (df_vehicle.DATETIME_V.between(df_fuel1.start_time, df_fuel1.end_time))), how='left') \
                .drop('start_time', 'end_time') \
                .drop(df_vehicle.DEVICE_ID) \
                .cache()

df_fuel2.show(5)
#df_fuel_no_date = df_fuel2.where(F.col('DATETIME_V').isNull())
df_fuel3 = df_fuel2.where(F.col('DATETIME_V').isNotNull())
window_fuel = Window.partitionBy(["DEVICE_ID", "DATETIME_F"]).orderBy(F.col('diff_date').desc())
df_fuel4 = df_fuel3.withColumn('diff_date', abs_time_delta_udf(F.col('DATETIME_F'), F.col('DATETIME_V'))) \
                .withColumn('next_diff_date', F.lead('diff_date', default=1000).over(window_fuel)) \
                .withColumn('result', F.when(F.col('diff_date') < F.col('next_diff_date'), 1).otherwise(0)) \
                .filter(F.col('result') == 1) \
                .drop('diff_date', 'next_diff_date', 'result')

df_fuel4.show(5)
df_fuel5 = df_fuel4.withColumn('PREV_LATITUDE', F.lag('LATITUDE').over(Window.partitionBy("DEVICE_ID").orderBy('DATETIME_F'))) \
                .withColumn('PREV_LONGITUDE', F.lag('LONGITUDE').over(Window.partitionBy("DEVICE_ID").orderBy('DATETIME_F'))) \
                .withColumn('DISTANCE', get_distance_udf(F.col('LATITUDE'), F.col('LONGITUDE'), F.col('PREV_LATITUDE'), F.col('PREV_LONGITUDE'))) \
                .select('DEVICE_ID','LITRES','DATETIME_F','diff_fuel','SPEED','DISTANCE') \
                .cache()
df_fuel5.show(5)

+---------+------------------+--------------------+
|DEVICE_ID|            LITRES|          DATETIME_F|
+---------+------------------+--------------------+
| 4BF9LOXT|2413.4900000000002|2021-02-01 01:00:...|
| QI9IXIGJ|            696.38|2021-02-01 01:00:...|
| QI9IXIGJ|            696.38|2021-02-01 01:00:...|
| QI9IXIGJ|            696.39|2021-02-01 01:00:...|
| QI9IXIGJ|            696.39|2021-02-01 01:00:...|
+---------+------------------+--------------------+
only showing top 5 rows

+---------+------+--------------------+---------+--------------------+--------------------+
|DEVICE_ID|LITRES|          DATETIME_F|diff_fuel|          start_time|            end_time|
+---------+------+--------------------+---------+--------------------+--------------------+
| 4XUR6E8S|852.54|2021-02-01 07:36:...|      0.0|2021-02-01 07:35:...|2021-02-01 07:36:...|
| 4XUR6E8S|852.54|2021-02-01 07:36:...|      0.0|2021-02-01 07:35:...|2021-02-01 07:36:...|
| 4XUR6E8S|852.54|2021-02-01 07:36:...|      0.