In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window
import os

os.environ["HADOOP_HOME"] = "C:\\hadoop"
os.environ["hadoop.home.dir"] = "C:\\hadoop"

spark = SparkSession.builder \
    .appName("WriteTest") \
    .config("spark.sql.adaptive.enabled", "false") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.local.dir", "C:/temp/spark-temp") \
    .config("spark.executor.memory", "12g") \
    .getOrCreate()

# Loading the data:

In [4]:
# This loads the CSV file and sets the column names to be the header values as well as infers the schema
# We cache this as it will be re-used many times
dfVehicle = spark.read.csv("C:/Users/tjsde/OneDrive/Documents/Git/tech-test-data/supporting-data/vehicle.csv", header=True, inferSchema=True).cache()
dfVehicle.show()

+---------------+----+----------+-----------+----------+----------+--------------+------------------+--------------+-------------------+---------+------------------+-----------------+--------------------+---------+----------------+-----------------+
|vehicle_spec_id|year|      make|      Model|drivetrain|max_torque|max_horsepower|max_horsepower_rpm|max_torque_rpm|engine_displacement|fuel_type|fuel_tank_capacity|fuel_economy_city|fuel_economy_highway|cylinders|forced_induction|device_generation|
+---------------+----+----------+-----------+----------+----------+--------------+------------------+--------------+-------------------+---------+------------------+-----------------+--------------------+---------+----------------+-----------------+
|        1000500|2016|     Honda|      Civic|         2|       174|           140|              6500|          1500|              1.799|     1059|                47|             16.5|                20.0|        4|            1054|                5|


In [5]:
# Defining the schema

drive_schema = StructType([
    StructField("trip_id", StringType(), True),               # Primary key
    StructField("datetime", TimestampType(), True),           # Primary key
    StructField("vehicle_spec_id", LongType(), True),
    StructField("engine_coolant_temp", DoubleType(), True),
    StructField("eng_load", DoubleType(), True),
    StructField("fuel_level", DoubleType(), True),
    StructField("iat", DoubleType(), True),
    StructField("rpm", DoubleType(), True),
    StructField("lat", DoubleType(), True),
    StructField("long", DoubleType(), True),
    StructField("velocity", DoubleType(), True)
])

# I was having issue loading all the prquet with a * wildcard so I decided to union each file into one dataframe

folder = "C:/Users/tjsde/OneDrive/Documents/Git/tech-test-data/supporting-data/drive"
files = [os.path.join(folder, f) for f in os.listdir(folder) if f.endswith(".parquet")]

dfs = [spark.read.schema(drive_schema).parquet(f) for f in files]
dfDriving = dfs[0]
for d in dfs[1:]:
    dfDriving = dfDriving.unionByName(d)

# We create a surrogate key from the trip_id such that it's a long type (as per the final table requirments)
# My assumption for the requirment of a long column type is to optimize the resulting table (longs are easier to process and lower in byte size)
# We repartition and cache this table to prevent out of memory exceptions
dfDriving = dfDriving.withColumn("trip_id", dense_rank().over(Window.orderBy("trip_id")).cast("long")).repartition(200, "trip_id").cache()

dfDriving.show(truncate=False)

+-------+-------------------+---------------+-------------------+--------+----------+------+-------+-------+------------------+--------+
|trip_id|datetime           |vehicle_spec_id|engine_coolant_temp|eng_load|fuel_level|iat   |rpm    |lat    |long              |velocity|
+-------+-------------------+---------------+-------------------+--------+----------+------+-------+-------+------------------+--------+
|26     |2017-02-27 23:27:20|1000512        |90.0               |193.25  |117.0     |102.94|1913.08|30.9375|-87.10694444444444|52.28   |
|26     |2017-02-27 22:55:07|1000512        |92.0               |200.33  |113.0     |109.58|1906.01|30.9375|-87.6438888888889 |52.2    |
|26     |2017-02-27 23:23:48|1000512        |92.0               |197.69  |106.0     |96.66 |1915.43|30.9375|-87.16583333333334|64.09   |
|26     |2017-02-27 23:17:53|1000512        |88.0               |203.73  |105.0     |103.58|1907.7 |30.9375|-87.26444444444445|28.66   |
|26     |2017-02-27 23:09:58|1000512     

# Daily trip job:

In [6]:
date = "2017/01/22" # This is the input date in PST; we assume this format will always be followed

# This updates the timestamp to be in PST from UTC
# We filter for records only on that date
dfDailyTrip = dfDriving.withColumn("datetime_pst", from_utc_timestamp("datetime", "America/Los_Angeles")) \
    .filter(to_date(col("datetime_pst")) == to_date(lit(date), "yyyy/MM/dd"))

# This adds in the make and model of the car for each trip, if unknown then it's NULL
dfDailyTrip = dfDailyTrip.join(
    dfVehicle.select(col("vehicle_spec_id"), col("make"), col("Model").alias("model")),
    on="vehicle_spec_id",
    how="left"
)

dfDailyTrip.show(truncate=False)

+---------------+-------+-------------------+-------------------+--------+----------+------+-------+-------+------------------+--------+-------------------+----+-------+
|vehicle_spec_id|trip_id|datetime           |engine_coolant_temp|eng_load|fuel_level|iat   |rpm    |lat    |long              |velocity|datetime_pst       |make|model  |
+---------------+-------+-------------------+-------------------+--------+----------+------+-------+-------+------------------+--------+-------------------+----+-------+
|1000501        |270    |2017-01-22 17:35:25|94.0               |211.57  |100.0     |102.42|2122.19|31.4375|-81.59722222222221|72.87   |2017-01-22 09:35:25|Jeep|Compass|
|1000501        |270    |2017-01-22 17:38:52|96.0               |201.9   |119.0     |106.77|2125.4 |31.4375|-81.53972222222222|74.49   |2017-01-22 09:38:52|Jeep|Compass|
|1000501        |270    |2017-01-22 18:09:40|99.0               |208.91  |112.0     |107.31|2121.67|31.4375|-81.02638888888889|67.89   |2017-01-22 10:

In [7]:
# Calculate time difference in seconds between consecutive readings using a window partitioned by trip_id and ordered by datetime_pst
# In simple terms we use the window defined with lag() to determine the diference in time between each record in hours for each trip
dfWithTimeDiff = dfDailyTrip.withColumn("prev_time", lag("datetime_pst").over(Window.partitionBy("trip_id").orderBy("datetime_pst"))) \
    .withColumn("time_diff_hours", 
                (unix_timestamp("datetime_pst") - unix_timestamp("prev_time")) / 3600) # Converts the seconds to hours

# Calculate segment distance: velocity (km/h) × time_diff_hours
# Now we know the time between each record in hours, we can multiply it by the velocity to determine the distance covered between each record for each trip
dfWithDistance = dfWithTimeDiff.withColumn("segment_distance_km", 
                                           col("velocity") * col("time_diff_hours"))

# Gets the sum of the distance of each trip
# This gets the distance of each trip by getting the sum of the distance covered between each record
dfTripDistance = dfWithDistance.groupBy("trip_id", "make", "model", "vehicle_spec_id").agg(
    sum("segment_distance_km").alias("trip_distance_km")
)

# Gets the sum of the distance of every trip with each vehicle
dfVehicleDistance = dfTripDistance.groupBy("make", "model", "vehicle_spec_id").agg( 
    sum("trip_distance_km").alias("distance_travelled")
)

try:
    dfVehicleDistance.show()
except Exception as e:
    print("Failed:", e)

+----------+-----------+---------------+------------------+
|      make|      model|vehicle_spec_id|distance_travelled|
+----------+-----------+---------------+------------------+
|      NULL|       NULL|        1000505| 836.2803777777787|
|      NULL|       NULL|        1000508|470.54720277777835|
|      NULL|       NULL|        1000516|232.36941666666593|
|      Jeep|    Compass|        1000501| 675.5876249999998|
|      NULL|       NULL|        1000512|28.409774999999982|
|      NULL|       NULL|        1000513|236.59707222222238|
|      NULL|       NULL|        1000517|245.92137499999984|
|      NULL|       NULL|        1000509|223.45613611111085|
|      Audi|         Q3|        1000507|196.74214166666633|
|      NULL|       NULL|        1000510|243.43013333333352|
|      NULL|       NULL|        1000518|160.91343888888886|
|Volkswagen|     Passat|        1000504|189.36821944444415|
|     Honda|      Civic|        1000500|296.58623611111034|
|       BMW|3 Series GT|        1000506|

In [8]:
# This gets the difference in time between the first and last datetime_pst of each trip
# This assums that every trip doesn't have multiple vehicles;
# We could check if a trip has multiple vehicles by chaning if there are any duplicate trip_id values
dfDailyTripDuration = dfDailyTrip.groupBy("trip_id", "make", "model", "vehicle_spec_id").agg(
    ((unix_timestamp(max("datetime_pst")) - unix_timestamp(min("datetime_pst"))) / 60).alias("trip_duration_minutes") # Converts the seconds to minutes
)

dfDailyTripDuration.show()

+-------+----------+-----------+---------------+---------------------+
|trip_id|      make|      model|vehicle_spec_id|trip_duration_minutes|
+-------+----------+-----------+---------------+---------------------+
|    270|      Jeep|    Compass|        1000501|   123.66666666666667|
|    415|      Audi|         Q3|        1000507|   109.18333333333334|
|   1694|      NULL|       NULL|        1000512|   15.933333333333334|
|    845|      NULL|       NULL|        1000505|                 78.5|
|   1504|       BMW|3 Series GT|        1000506|   133.23333333333332|
|   1048|      NULL|       NULL|        1000505|                123.8|
|    586|      NULL|       NULL|        1000505|                 83.3|
|    589|      NULL|       NULL|        1000517|   122.76666666666667|
|    413|      NULL|       NULL|        1000505|   132.51666666666668|
|    161|      NULL|       NULL|        1000505|   24.983333333333334|
|    712|      NULL|       NULL|        1000510|    78.98333333333333|
|    2

In [9]:
# We perform a join betwen the two dataframes on the vehicle_spec_id in order to return the desired table
# Joining on vehicle_spec_id allows for the most accurate data;
# If we were to join on make & model, there could be a case where the make and model are duplicated but the vehicle_spec_id is different because it's from a different year for example;
# This also allows us to calculate distanct values for vehicles we don't the make and model info for
dfDailyTripFinal = dfDailyTripDuration.join(
    dfVehicleDistance,
    dfDailyTripDuration.vehicle_spec_id ==  dfVehicleDistance.vehicle_spec_id,
    how="left"
).drop(dfDailyTripDuration["make"]).drop(dfDailyTripDuration["model"]) # We drop duplicate columns from one side of the join

# We add in the date that was passed as input as this data is processed for that date
# We select the columns in the order & types desired
dfDailyTripFinal = dfDailyTripFinal.withColumn("date_pst", to_date(lit(date), "yyyy/MM/dd")) \
    .select("date_pst", "trip_id", "make", "model", col("trip_duration_minutes").cast(DecimalType(10, 4)), col("distance_travelled").cast(DecimalType(10, 4)))
    
dfDailyTripFinal.show(truncate=False)

+----------+-------+-------+-----+---------------------+------------------+
|date_pst  |trip_id|make   |model|trip_duration_minutes|distance_travelled|
+----------+-------+-------+-----+---------------------+------------------+
|2017-01-22|339    |Hyundai|Creta|75.0667              |237.6225          |
|2017-01-22|1350   |Hyundai|Creta|71.0500              |237.6225          |
|2017-01-22|216    |Hyundai|Creta|89.9167              |237.6225          |
|2017-01-22|845    |NULL   |NULL |78.5000              |836.2804          |
|2017-01-22|1048   |NULL   |NULL |123.8000             |836.2804          |
|2017-01-22|586    |NULL   |NULL |83.3000              |836.2804          |
|2017-01-22|413    |NULL   |NULL |132.5167             |836.2804          |
|2017-01-22|161    |NULL   |NULL |24.9833              |836.2804          |
|2017-01-22|1412   |NULL   |NULL |139.5167             |836.2804          |
|2017-01-22|1188   |NULL   |NULL |136.4167             |836.2804          |
|2017-01-22|

In [10]:
# When writing the parquet, we use the input date to generate a datetime folder structure to meet the last two characteristics;
# We could also use partitionBy() on the date_pst column when saving the parquet so that it's always organized correctly (the method I used wouldn't be accurate if we had late arriving data when processing daily for example)
# We also ensure that overwrite is enabled so if it's ran on the same input date multiple times it will update whats written

try:
    dfDailyTripFinal.write.mode("overwrite").parquet(f"C:/Users/tjsde/OneDrive/Documents/Git/tech-test-data/written-data/{date}/")
except Exception as e:
    print("Write failed:", e)

# Trip SQL

In [11]:
#  We create our views here to perform transformations on
dfVehicle.createOrReplaceTempView("Vehicle")
dfDriving.createOrReplaceTempView("Drive")

result = spark.sql(""" 
    -- This returns the start and end times for each trip_id; 
    -- This can be used to determine the start and end fuel levels and assums that a trip doesn't include refueling stops
    SELECT
        trip_id,
        MIN(datetime) AS start_time,
        MAX(datetime) AS end_time
    FROM Drive
    GROUP BY trip_id
""")
result.show()

+-------+-------------------+-------------------+
|trip_id|         start_time|           end_time|
+-------+-------------------+-------------------+
|     26|2017-02-27 22:00:00|2017-02-27 23:50:01|
|     29|2017-02-01 16:00:00|2017-02-01 17:04:38|
|    474|2017-01-07 12:00:00|2017-01-07 12:31:38|
|    964|2017-01-28 11:00:00|2017-01-28 11:45:33|
|   1677|2017-01-01 00:00:00|2017-01-01 02:34:48|
|   1697|2017-01-07 02:00:00|2017-01-07 04:30:21|
|     65|2017-01-08 09:00:00|2017-01-08 10:15:07|
|    191|2017-01-06 20:00:00|2017-01-06 21:56:59|
|    418|2017-02-14 06:00:00|2017-02-14 06:05:15|
|    541|2017-01-21 23:00:00|2017-01-22 00:41:55|
|    558|2017-01-06 20:00:00|2017-01-06 21:02:40|
|   1010|2017-01-21 18:00:00|2017-01-21 20:04:08|
|   1224|2017-02-14 07:00:00|2017-02-14 09:05:34|
|   1258|2017-01-06 20:00:00|2017-01-06 21:42:24|
|   1277|2017-01-27 07:00:00|2017-01-27 08:30:32|
|   1360|2017-02-22 23:00:00|2017-02-22 23:12:02|
|    222|2017-01-06 23:00:00|2017-01-07 01:49:41|


In [12]:
result = spark.sql(""" 
    -- Using the previous query again we can do a join to get a table of start_fuel levels for each trip_id
    WITH trip_times AS (
    SELECT
        trip_id,
        MIN(datetime) AS start_time,
        MAX(datetime) AS end_time
    FROM Drive
    GROUP BY trip_id
    )
                   
    SELECT
        d.trip_id,
        d.fuel_level AS start_fuel
    FROM Drive d
    JOIN trip_times t 
        ON d.trip_id = t.trip_id AND d.datetime = t.start_time
""")
result.show()

+-------+----------+
|trip_id|start_fuel|
+-------+----------+
|    505|      91.0|
|    620|     105.0|
|    708|    184.41|
|    781|     111.0|
|   1103|      85.0|
|   1423|    148.11|
|   1537|      96.0|
|     36|    193.32|
|     95|      81.0|
|    213|    134.35|
|    323|    179.31|
|    574|     100.0|
|    614|      99.0|
|    961|    144.54|
|   1318|      85.0|
|   1512|     112.0|
|     85|    133.92|
|    125|      65.0|
|    178|    154.98|
|    557|     101.0|
+-------+----------+
only showing top 20 rows


In [13]:
result = spark.sql(""" 
    -- We can do the same as above but for end_fuel
    WITH trip_times AS (
    SELECT
        trip_id,
        MIN(datetime) AS start_time,
        MAX(datetime) AS end_time
    FROM Drive
    GROUP BY trip_id
    )
                   
    SELECT
        d.trip_id,
        d.fuel_level AS end_fuel
    FROM Drive d
    JOIN trip_times t 
        ON d.trip_id = t.trip_id AND d.datetime = t.end_time
""")
result.show()

+-------+--------+
|trip_id|end_fuel|
+-------+--------+
|    113|  169.62|
|    116|    96.0|
|    415|    55.0|
|    560|    90.0|
|   1321|  141.75|
|   1608|  146.12|
|   1645|   105.0|
|    528|  204.27|
|    968|    83.0|
|   1579|  116.19|
|   1633|  174.52|
|    235|  180.36|
|    237|  194.89|
|   1055|    67.0|
|   1303|    72.0|
|   1347|    82.0|
|   1449|  187.29|
|   1609|  136.36|
|    357|  154.63|
|    441|  178.92|
+-------+--------+
only showing top 20 rows


In [14]:
result = spark.sql(""" 
    -- This is the aggregation needs to provide average_eng_load_perc & average_velocity
    -- We can use the formula in the instructions along with the AVG() method to return average_eng_load_perc
    -- We can simply use AVG() to return the average_velocity
    -- This also assumes that a trip only uses one make/model of vehicle and doesn't change
    SELECT
        d.trip_id,
        v.make,
        v.model,
        v.fuel_tank_capacity,
        AVG(100.0 * (d.eng_load / 255)) AS average_eng_load_perc,
        AVG(d.velocity) AS average_velocity
    FROM Drive d
    JOIN Vehicle v 
        ON d.vehicle_spec_id = v.vehicle_spec_id
    WHERE v.make IS NOT NULL AND v.model IS NOT NULL
    GROUP BY d.trip_id, v.make, v.model, v.fuel_tank_capacity
""")
result.show()

+-------+----------+-----------+------------------+---------------------+------------------+
|trip_id|      make|      model|fuel_tank_capacity|average_eng_load_perc|  average_velocity|
+-------+----------+-----------+------------------+---------------------+------------------+
|     29|Volkswagen|     Passat|                66|    82.77660302584538| 58.03923433874713|
|    474|Volkswagen|     Passat|                66|    76.44867164348615| 67.47900473933647|
|   1697|      Audi|         Q3|                64|    77.64465598254368| 64.98969518953648|
|    541|   Hyundai|      Creta|                55|     77.2560471408968| 53.84619359058186|
|   1010|       BMW|3 Series GT|                57|    85.49964701144201| 72.07063901194786|
|   1277|   Hyundai|      Creta|                55|    81.96017366637443| 65.74867660592673|
|   1360|      Jeep|    Compass|                60|    74.94978981910882| 69.54193637621024|
|    270|      Jeep|    Compass|                60|     82.37563036533

In [15]:
result = spark.sql(""" 
    -- We can use everything so far to return the final table
    WITH trip_times AS (
    SELECT
        trip_id,
        MIN(datetime) AS start_time,
        MAX(datetime) AS end_time
    FROM Drive
    GROUP BY trip_id
    ),

    start_fuel AS (
    SELECT
        d.trip_id,
        d.fuel_level AS start_fuel
    FROM Drive d
    JOIN trip_times t 
        ON d.trip_id = t.trip_id AND d.datetime = t.start_time
    ),

    end_fuel AS (
    SELECT
        d.trip_id,
        d.fuel_level AS end_fuel
    FROM Drive d
    JOIN trip_times t 
        ON d.trip_id = t.trip_id AND d.datetime = t.end_time
    ),

    aggregated_metrics AS (
    SELECT
        d.trip_id,
        v.make,
        v.model,
        v.fuel_tank_capacity,
        AVG(100.0 * (d.eng_load / 255)) AS average_eng_load_perc,
        AVG(d.velocity) AS average_velocity
    FROM Drive d
    JOIN Vehicle v 
        ON d.vehicle_spec_id = v.vehicle_spec_id
    WHERE v.make IS NOT NULL AND v.model IS NOT NULL
    GROUP BY d.trip_id, v.make, v.model, v.fuel_tank_capacity
    )

                   
    -- For the fuel_used we get the fuel level converted to litres at the start and end of the trip and compare the difference;
    -- This would assume the tank isn't refilled mid journey and a "trip" is defined as a continous journey without any stoppages
    -- We also ROUND to 2 decimal places and CAST them to a DECIMAL type
    SELECT
        a.trip_id,
        CAST(ROUND(a.average_eng_load_perc, 2) AS DECIMAL(10, 2)) AS average_eng_load_perc,
        CAST(ROUND(a.average_velocity, 2) AS DECIMAL(10, 2)) AS average_velocity,
        CAST(ROUND(((sf.start_fuel - ef.end_fuel) / 255.0) * a.fuel_tank_capacity, 2) AS DECIMAL(10, 2)) AS fuel_used
    FROM aggregated_metrics a
    JOIN start_fuel sf 
        ON a.trip_id = sf.trip_id
    JOIN end_fuel ef 
        ON a.trip_id = ef.trip_id
""")
result.show()

+-------+---------------------+----------------+---------+
|trip_id|average_eng_load_perc|average_velocity|fuel_used|
+-------+---------------------+----------------+---------+
|     29|                82.78|           58.04|     1.55|
|    474|                76.45|           67.48|    -0.89|
|   1697|                77.64|           64.99|    -0.50|
|    541|                77.26|           53.85|     0.43|
|   1010|                85.50|           72.07|     0.22|
|   1277|                81.96|           65.75|     0.65|
|   1360|                74.95|           69.54|    -2.82|
|    270|                82.38|           69.11|     1.65|
|    730|                82.23|           64.34|    -0.35|
|    938|                76.11|           77.50|     0.83|
|   1371|                75.69|           71.95|     1.02|
|   1551|                80.31|           51.03|     0.94|
|   1642|                75.30|           65.98|    -3.06|
|    243|                78.05|           70.89|     3.1

In [16]:
result = spark.sql(""" 
    -- The above code is quite inefficent with the joins so we can use FIRST_VALUE & LAST_VALUE instead of joining multiple tables
    -- We make the same assumptions about the data as above:
    WITH fuel_extremes AS (
    SELECT
      d.trip_id,
      d.vehicle_spec_id,
      v.make,
      v.model,
      v.fuel_tank_capacity,
      d.velocity,
      100.0 * (d.eng_load / 255) AS eng_load_perc,

      -- Fuel level at start and end of trip
      -- We use the trip_id as the partition; so it gets the first value of trip_id based on datetime
      FIRST_VALUE(d.fuel_level) OVER (PARTITION BY d.trip_id ORDER BY d.datetime ASC) AS start_fuel_level,
      LAST_VALUE(d.fuel_level) OVER (
        PARTITION BY d.trip_id ORDER BY d.datetime ASC
        -- Apply the function over all rows in the partition, regardless of the current row
        ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
      ) AS end_fuel_level

    FROM Drive d
    JOIN Vehicle v 
        ON d.vehicle_spec_id = v.vehicle_spec_id
    WHERE v.make IS NOT NULL AND v.model IS NOT NULL
  )

  SELECT
    trip_id,
    CAST(ROUND(AVG(eng_load_perc), 2) AS DECIMAL(10, 2)) AS average_eng_load_perc,
    CAST(ROUND(AVG(velocity), 2) AS DECIMAL(10, 2)) AS average_velocity,

    -- Fuel used based on start and end fuel levels
    CAST(ROUND(
      ((MAX(start_fuel_level) - MAX(end_fuel_level)) / 255.0) * MAX(fuel_tank_capacity),
      2
    ) AS DECIMAL(10, 2)) AS fuel_used

  FROM fuel_extremes
  GROUP BY trip_id, make, model
""")
result.show()

+-------+---------------------+----------------+---------+
|trip_id|average_eng_load_perc|average_velocity|fuel_used|
+-------+---------------------+----------------+---------+
|     29|                82.78|           58.04|     1.55|
|    474|                76.45|           67.48|    -0.89|
|   1697|                77.64|           64.99|    -0.50|
|    541|                77.26|           53.85|     0.43|
|   1010|                85.50|           72.07|     0.22|
|   1277|                81.96|           65.75|     0.65|
|   1360|                74.95|           69.54|    -2.82|
|    270|                82.38|           69.11|     1.65|
|    730|                82.23|           64.34|    -0.35|
|    938|                76.11|           77.50|     0.83|
|   1371|                75.69|           71.95|     1.02|
|   1551|                80.31|           51.03|     0.94|
|   1642|                75.30|           65.98|    -3.06|
|    243|                78.05|           70.89|     3.1

In [17]:
result = spark.sql(""" 
    -- The output doesn't look correct for fuel_used in the last table
    -- We can see here that the fuel_level doesn't go down as time moves forward; it moves inconsistently up and down every second which isn't how an actual journey would effect a tank of fuel
    SELECT
        d.datetime,
        d.fuel_level
    FROM Drive d
    LEFT JOIN (
        SELECT
            vehicle_spec_id,
            make,
            Model AS model,
            fuel_tank_capacity
        FROM Vehicle
    ) v ON d.vehicle_spec_id = v.vehicle_spec_id
    WHERE d.trip_id = '1'
    ORDER BY d.datetime ASC
""")
result.show(truncate=False)

+-------------------+----------+
|datetime           |fuel_level|
+-------------------+----------+
|2017-01-06 23:00:00|197.03    |
|2017-01-06 23:00:01|190.67    |
|2017-01-06 23:00:02|193.94    |
|2017-01-06 23:00:03|187.4     |
|2017-01-06 23:00:04|189.89    |
|2017-01-06 23:00:05|194.05    |
|2017-01-06 23:00:06|188.78    |
|2017-01-06 23:00:07|196.58    |
|2017-01-06 23:00:08|193.97    |
|2017-01-06 23:00:09|188.22    |
|2017-01-06 23:00:10|192.73    |
|2017-01-06 23:00:11|200.3     |
|2017-01-06 23:00:12|197.34    |
|2017-01-06 23:00:13|194.52    |
|2017-01-06 23:00:14|201.92    |
|2017-01-06 23:00:15|207.06    |
|2017-01-06 23:00:16|193.81    |
|2017-01-06 23:00:17|190.04    |
|2017-01-06 23:00:18|196.45    |
|2017-01-06 23:00:19|189.48    |
+-------------------+----------+
only showing top 20 rows


In [18]:
result = spark.sql(""" 
    -- Instead we could calculate the value based on the MIN & MAX fuel_level assuming this data simply has the fuel_levels in the wrong order
    -- This would once again assume the fuel is not refilled mid-trip
    -- The result seems more realistic assuming you use 5-10 litres every 100 KM
    SELECT
        d.trip_id,

        -- Average engine load as percentage
        CAST(ROUND(AVG(100.0 * (d.eng_load / 255)), 2) AS DECIMAL(10, 2)) AS average_eng_load_perc,

        -- Average velocity
        CAST(ROUND(AVG(d.velocity), 2) AS DECIMAL(10, 2)) AS average_velocity,

        -- Fuel used based on max and min fuel level
        CAST(ROUND(
            (((MAX(d.fuel_level) - MIN(d.fuel_level)) / 255.0) * v.fuel_tank_capacity),
            2
        ) AS DECIMAL(10, 2)) AS fuel_used

    FROM Drive d
    JOIN Vehicle v ON d.vehicle_spec_id = v.vehicle_spec_id
    WHERE v.make IS NOT NULL AND v.model IS NOT NULL
    GROUP BY d.trip_id, v.make, v.model, v.fuel_tank_capacity
""")
result.show()

+-------+---------------------+----------------+---------+
|trip_id|average_eng_load_perc|average_velocity|fuel_used|
+-------+---------------------+----------------+---------+
|     29|                82.78|           58.04|    12.94|
|    474|                76.45|           67.48|     8.69|
|   1697|                77.64|           64.99|    10.79|
|    541|                77.26|           53.85|     8.63|
|   1010|                85.50|           72.07|     9.61|
|   1277|                81.96|           65.75|     9.92|
|   1360|                74.95|           69.54|     6.82|
|    270|                82.38|           69.11|    11.53|
|    730|                82.23|           64.34|     7.89|
|    938|                76.11|           77.50|     8.83|
|   1371|                75.69|           71.95|     6.45|
|   1551|                80.31|           51.03|     9.41|
|   1642|                75.30|           65.98|    12.47|
|    243|                78.05|           70.89|    10.8