In [14]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, lit

spark = (SparkSession
         .builder
         .appName("PythonTest")
         .getOrCreate())

In [None]:
from pyspark.sql.types import *

schema = StructType(
    [
        StructField("Time", DayTimeIntervalType(), False),
        StructField("Driver", StringType(), False),
        StructField("DriverNumber", StringType(), False),
        StructField("LapTime", LongType(), False),
        StructField("LapNumber", IntegerType() ,False),
        StructField("Stint", IntegerType() ,False),
        StructField("PitOutTime", DayTimeIntervalType() ,False),
        StructField("PitinTime", DayTimeIntervalType() ,False),
        StructField("Sector1Time", DayTimeIntervalType() ,False),
        StructField("Sector2Time", DayTimeIntervalType() ,False),
        StructField("Sector3Time", DayTimeIntervalType() ,False),
        StructField("Sector1SessionTime", DayTimeIntervalType() ,False),
        StructField("Sector2SessionTime", DayTimeIntervalType() ,False),
        StructField("Sector3SessionTime", DayTimeIntervalType() ,False),
        StructField("SpeedI1", DoubleType(), True),
        StructField("SpeedI2", DoubleType(), True),
        StructField("SpeedIFL", DoubleType(), True),
        StructField("SpeedST", DoubleType(), True),
        StructField("IsPersonalBest", BooleanType(), False),
        StructField("Compound", StringType(), False),
        StructField("TyreLife", IntegerType(), False),
        StructField("FreshTyre", BooleanType(), False),
        StructField("Team", StringType(), False),
        StructField("LapStartTime", DayTimeIntervalType() ,False),
        StructField("LapStartDate", DayTimeIntervalType() ,False),
        StructField("TrackStatus", StringType(), False),
        StructField("Position", IntegerType(), False),
        StructField("Deleted", BooleanType(), True),
        StructField("DeletedReason", StringType(), True),
        StructField("FastF1Generated", BooleanType(), True),
        StructField("IsAccurate", BooleanType(), True),
    ]
)

In [56]:
import pyspark


f1_file_path = "/../../../data/2023_Bahrain Grand Prix_race_data.parquet"

# f1_df = (spark.read.format("parquet")
#          .option("header", "true")
#          .option("inferSchema", "true")
#          .load(f1_file_path)
#          )

spark.conf.set("spark.sql.legacy.parquet.nanosAsLong", "true")

df = spark.read.option("spark.sql.parquet.int64AsTimestamp", "true") \
                .schema(schema=schema) \
               .parquet(f1_file_path)
               
df.show()

+--------------------+------+------------+------------+---------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------+-------+--------+-------+--------------+--------+--------+---------+---------------+--------------------+--------------------+-----------+--------+-------+-------------+---------------+----------+
|                Time|Driver|DriverNumber|     LapTime|LapNumber|Stint|          PitOutTime|           PitinTime|         Sector1Time|         Sector2Time|         Sector3Time|  Sector1SessionTime|  Sector2SessionTime|  Sector3SessionTime|SpeedI1|SpeedI2|SpeedIFL|SpeedST|IsPersonalBest|Compound|TyreLife|FreshTyre|           Team|        LapStartTime|        LapStartDate|TrackStatus|Position|Deleted|DeletedReason|FastF1Generated|IsAccurate|
+--------------------+------+------------+------------+---------+-----+--------------------+--------------------

In [58]:
import pyspark.sql.functions as F

df = df.withColumn(
    "LapTime", col("LapTime") / lit(1000000000)
)

In [59]:
#from pyspark.pandas import to_timedelta
from pathlib import Path

def transform():
    
    save_path : Path = Path(save_path)
    fuel_start : int = 100 
    
    def get_data():
        if df:
            return df
        spark.conf.set("spark.sql.legacy.parquet.nanosAsLong", "true")

        df = spark.read.option("spark.sql.parquet.int96AsTimestamp", "true") \
                    .parquet(f1_file_path)
                    
        return df
    
    race_data = get_data()
    
    if 'Time' in race_data.columns:
        df = df.withColumn('Time', df.Time / 10e9).head(10)
        
    # lap_length, track_length = TRACK_INFO[gp_name]
    main_cols = ['Driver', 'Team', 'Compound', 'TyreLife', 'LapTime', 'SpeedI1', 'SpeedI2', 'SpeedFL', 'LapNumber', 'DriverNumber']
    
    race_lap_with_fuel = calculateFCL()
        

In [None]:
# if 'Time' in df.columns:
#     df = df.withColumn('Time', df.Time / 10e9)
#     df = df.withColumn('LapTime', df.LapTime / 10e8)
        

In [60]:
df.select('LapTime', 'LapNumber').show()

+-------+---------+
|LapTime|LapNumber|
+-------+---------+
| 99.019|        1|
| 97.974|        2|
| 98.006|        3|
| 97.976|        4|
| 98.035|        5|
| 97.986|        6|
| 98.021|        7|
| 98.154|        8|
| 98.278|        9|
| 98.369|       10|
| 98.483|       11|
| 98.591|       12|
| 98.482|       13|
|101.295|       14|
|118.378|       15|
| 97.801|       16|
| 97.648|       17|
| 97.614|       18|
| 97.712|       19|
| 97.788|       20|
+-------+---------+
only showing top 20 rows



In [61]:
lap_length = 5.412
track_length = 308.238
fuel_start = 100

df1 = df

In [62]:
df1.select('LapNumber').show()

+---------+
|LapNumber|
+---------+
|        1|
|        2|
|        3|
|        4|
|        5|
|        6|
|        7|
|        8|
|        9|
|       10|
|       11|
|       12|
|       13|
|       14|
|       15|
|       16|
|       17|
|       18|
|       19|
|       20|
+---------+
only showing top 20 rows



In [63]:
#import pandas as pd
#import numpy as np
    
lap_length = 5.412
track_length = 308.238
fuel_start = 100
# isnt race length better than track lenght? feels confusing
def _calculate_needed_fuel(lap_length, track_length, fuel_start=100, fuel_end=1):
    print(lap_length)
    
    laps_per_track = int(track_length / lap_length)
    fuel_per_lap = (fuel_start - fuel_end) / laps_per_track
    return fuel_per_lap

def _calculate_start_fuel(lap_number, fuel_per_lap, fuel_start=100):
    if lap_number == 1:
        return fuel_start
    else:
        return max(0, fuel_start - (lap_number - 1) * fuel_per_lap)

def _calculate_avg_fuel(lap_number, fuel_per_lap, fuel_start=100):

    if lap_number == 1:
        return fuel_start
    else:
        current_fuel = _calculate_start_fuel(lap_number, fuel_per_lap, fuel_start)
        prev_fuel = _calculate_start_fuel(lap_number - 1, fuel_per_lap, fuel_start)
        return (current_fuel + prev_fuel) / 2

fuel_per_lap = _calculate_needed_fuel(lap_length=lap_length, track_length=track_length, fuel_start=fuel_start)
        

df2 = df1.withColumn(
    "StartFuel",
    when(col("LapNumber") == 1, lit(fuel_start))
    .otherwise(fuel_start - (col("LapNumber") - 1) * fuel_per_lap)
)

df2 = df2.withColumn(
    "StartFuel",
    when(col("StartFuel") < 0, lit(0))
    .otherwise(col("StartFuel"))
)

df2





5.412


DataFrame[Time: interval day to second, Driver: string, DriverNumber: string, LapTime: double, LapNumber: int, Stint: int, PitOutTime: interval day to second, PitinTime: interval day to second, Sector1Time: interval day to second, Sector2Time: interval day to second, Sector3Time: interval day to second, Sector1SessionTime: interval day to second, Sector2SessionTime: interval day to second, Sector3SessionTime: interval day to second, SpeedI1: double, SpeedI2: double, SpeedIFL: double, SpeedST: double, IsPersonalBest: boolean, Compound: string, TyreLife: int, FreshTyre: boolean, Team: string, LapStartTime: interval day to second, LapStartDate: interval day to second, TrackStatus: string, Position: int, Deleted: boolean, DeletedReason: string, FastF1Generated: boolean, IsAccurate: boolean, StartFuel: double]

In [64]:
df2.select("StartFuel").show()

+-----------------+
|        StartFuel|
+-----------------+
|            100.0|
|98.23214285714286|
|96.46428571428571|
|94.69642857142857|
|92.92857142857143|
|91.16071428571429|
|89.39285714285714|
|           87.625|
|85.85714285714286|
|84.08928571428572|
|82.32142857142857|
|80.55357142857143|
|78.78571428571428|
|77.01785714285714|
|            75.25|
|73.48214285714286|
|71.71428571428572|
|69.94642857142857|
|68.17857142857143|
|66.41071428571428|
+-----------------+
only showing top 20 rows



In [65]:
fuel_penalty=0.03

df2 = df2.withColumn(
    "avg",
    when(col("LapNumber") == 1, lit(fuel_start))
    .when(col("LapNumber") == 2, col("StartFuel") + lit(fuel_start))
    .otherwise(col("StartFuel") + lit(fuel_per_lap))
)

df2 = df2.withColumn("FCL",
               col("LapTime") - (fuel_penalty * col('avg')))


df2 = df2.drop('avg')


In [66]:
df2.select('LapTime').show()

+-------+
|LapTime|
+-------+
| 99.019|
| 97.974|
| 98.006|
| 97.976|
| 98.035|
| 97.986|
| 98.021|
| 98.154|
| 98.278|
| 98.369|
| 98.483|
| 98.591|
| 98.482|
|101.295|
|118.378|
| 97.801|
| 97.648|
| 97.614|
| 97.712|
| 97.788|
+-------+
only showing top 20 rows



In [67]:
df2.select('FCL').show()

+------------------+
|               FCL|
+------------------+
|            96.019|
| 92.02703571428572|
| 95.05903571428571|
| 95.08207142857142|
| 95.19410714285713|
| 95.19814285714286|
| 95.28617857142858|
| 95.47221428571429|
| 95.64925000000001|
| 95.79328571428572|
| 95.96032142857143|
| 96.12135714285714|
| 96.06539285714285|
| 98.93142857142857|
|116.06746428571428|
| 95.54350000000001|
| 95.44353571428572|
| 95.46257142857144|
| 95.61360714285715|
| 95.74264285714285|
+------------------+
only showing top 20 rows



In [68]:
df.columns

['Time',
 'Driver',
 'DriverNumber',
 'LapTime',
 'LapNumber',
 'Stint',
 'PitOutTime',
 'PitinTime',
 'Sector1Time',
 'Sector2Time',
 'Sector3Time',
 'Sector1SessionTime',
 'Sector2SessionTime',
 'Sector3SessionTime',
 'SpeedI1',
 'SpeedI2',
 'SpeedIFL',
 'SpeedST',
 'IsPersonalBest',
 'Compound',
 'TyreLife',
 'FreshTyre',
 'Team',
 'LapStartTime',
 'LapStartDate',
 'TrackStatus',
 'Position',
 'Deleted',
 'DeletedReason',
 'FastF1Generated',
 'IsAccurate']

In [69]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, lit

spark = (SparkSession
         .builder
         .appName("PythonTest")
         .getOrCreate())

In [91]:
from pyspark.sql.types import *

schema = StructType(
    [
        StructField("Date", TimestampType() ,False),
        StructField("SesssionTime", DayTimeIntervalType(), False),
        StructField("DriverAhead", StringType(), True),
        StructField("DistanceToDriverAhead", DoubleType(), True),
        StructField("Time", DayTimeIntervalType(), False),
        StructField("RPM", DoubleType(), False),
        StructField("Speed", DoubleType(), False),
        StructField("nGear", LongType(), False),
        StructField("Throttle", DoubleType(), False),
        StructField("Brake", BooleanType(), False),
        StructField("DRS", LongType(), False),
        StructField("Source", StringType(), True),
        StructField("Distance", DoubleType(), False),
        StructField("RelativeDistance", DoubleType(), False),
        StructField("Status", StringType(), False),
        StructField("X", DoubleType(), False),
        StructField("Y", DoubleType(), False),
        StructField("Z", DoubleType(), False),
        StructField("DriverNumber", StringType(), False),
        StructField("LapNumber", DoubleType(), False),
    ]
)

In [92]:
import os

os.listdir("../../../data")

['2023_Bahrain Grand Prix_quali_telemetry.parquet',
 '2023_Bahrain Grand Prix_race_data.parquet',
 '2023_Bahrain Grand Prix_race_telemetry.parquet',
 '2023_Bahrain Grand Prix_quali_data.parquet']

In [117]:
f1_file_path = "../../../data/2023_Bahrain Grand Prix_race_telemetry.parquet"

#spark.conf.set("spark.sql.legacy.parquet.nanosAsLong", "false")

df = (spark
        .read
        .schema(schema)
        #.option("spark.sql.parquet.int96AsTimestamp", "true") 
        .option("header", "true")
        .parquet(f1_file_path))

In [118]:
df.show(10, truncate=False)

+----------------+------------+-----------+---------------------+-----------------------------------+-----------------+------------------+-----+--------+-----+---+-------------+----------------------+---------------------+-------+-------------------+------------------+-------------------+------------+---------+------+------+
|Date            |SesssionTime|DriverAhead|DistanceToDriverAhead|Time                               |RPM              |Speed             |nGear|Throttle|Brake|DRS|Source       |Distance              |RelativeDistance     |Status |X                  |Y                 |Z                  |DriverNumber|LapNumber|LonAcc|LatAcc|
+----------------+------------+-----------+---------------------+-----------------------------------+-----------------+------------------+-----+--------+-----+---+-------------+----------------------+---------------------+-------+-------------------+------------------+-------------------+------------+---------+------+------+
|1678028618501000|n

In [95]:
df.select("Time").where(col("Time").isNotNull()).show()

+--------------------+
|                Time|
+--------------------+
|INTERVAL '0 00:00...|
|INTERVAL '0 00:01...|
|INTERVAL '0 00:03...|
|INTERVAL '0 00:05...|
|INTERVAL '0 00:07...|
|INTERVAL '0 00:09...|
|INTERVAL '0 00:09...|
|INTERVAL '0 00:12...|
|INTERVAL '0 00:15...|
|INTERVAL '0 00:18...|
|INTERVAL '0 00:19...|
|INTERVAL '0 00:22...|
|INTERVAL '0 00:22...|
|INTERVAL '0 00:25...|
|INTERVAL '0 00:26...|
|INTERVAL '0 00:30...|
|INTERVAL '0 00:32...|
|INTERVAL '0 00:33...|
|INTERVAL '0 00:36...|
|INTERVAL '0 00:39...|
+--------------------+
only showing top 20 rows



In [96]:
df.dtypes

[('Date', 'timestamp'),
 ('SesssionTime', 'interval day to second'),
 ('DriverAhead', 'string'),
 ('DistanceToDriverAhead', 'double'),
 ('Time', 'interval day to second'),
 ('RPM', 'double'),
 ('Speed', 'double'),
 ('nGear', 'bigint'),
 ('Throttle', 'double'),
 ('Brake', 'boolean'),
 ('DRS', 'bigint'),
 ('Source', 'string'),
 ('Distance', 'double'),
 ('RelativeDistance', 'double'),
 ('Status', 'string'),
 ('X', 'double'),
 ('Y', 'double'),
 ('Z', 'double'),
 ('DriverNumber', 'string'),
 ('LapNumber', 'double')]

In [97]:
df.columns

['Date',
 'SesssionTime',
 'DriverAhead',
 'DistanceToDriverAhead',
 'Time',
 'RPM',
 'Speed',
 'nGear',
 'Throttle',
 'Brake',
 'DRS',
 'Source',
 'Distance',
 'RelativeDistance',
 'Status',
 'X',
 'Y',
 'Z',
 'DriverNumber',
 'LapNumber']

In [98]:
df.select('DRS').show()

+---+
|DRS|
+---+
|  1|
|  1|
|  1|
|  1|
|  1|
|  1|
|  1|
|  1|
|  1|
|  1|
|  1|
|  1|
|  1|
|  1|
|  1|
|  1|
|  1|
|  1|
|  1|
|  1|
+---+
only showing top 20 rows



In [99]:
from pyspark.sql.functions import col, when, lit

df.withColumn("DRS", 
                when(col("DRS").isin(10, 12, 14), 1)
                .otherwise(0)
            ).select("DRS").distinct().show()

+---+
|DRS|
+---+
|  1|
|  0|
+---+



In [100]:
import numpy as np
import math

class AccelerationComputations:
    
    def __init__(self):
        pass
    
    # credits to https://gist.github.com/TracingInsights/4d3bdeb135a01d7b11e35e5f83f60d6a
    #
    # Define a set of helper functions to perform the computations
    #
    # the t

    @classmethod
    def _smooth_derivative(cls, t_in, v_in, method = "centered"):
        #
        # Function to compute a smooth estimation of a derivative.
        # [REF: http://holoborodko.com/pavel/numerical-methods/numerical-derivative/smooth-low-noise-differentiators/]
        #

        # Configuration
        #
        # Derivative method: two options: 'smooth' or 'centered'. Smooth is more conservative
        # but helps to supress the very noisy signals. 'centered' is more agressive but more noisy

        t = t_in.copy()
        v = v_in.copy()
        epsilon = 1e-9
        
        # (0) Prepare inputs
        # (0.1) Time needs to be transformed to seconds
        if t.dtype == 'timedelta64[us]':
            t = t_in.apply(lambda x: x.total_seconds()).to_numpy()
 
        t = np.array(t)
        v = np.array(v)

        # (0.1) Assert they have the same size
        assert t.size == v.size

        # (0.2) Initialize output
        dvdt = np.zeros(t.size)

        # (1) Manually compute points out of the stencil

        # (1.1) First point
        dvdt[0] = (v[1] - v[0]) / (t[1] - t[0] + epsilon)

        # (1.2) Second point
        dvdt[1] = (v[2] - v[0]) / (t[2] - t[0] + epsilon)

        # (1.3) Third point
        dvdt[2] = (v[3] - v[1]) / (t[3] - t[1] + epsilon)

        # (1.4) Last points
        n = t.size
        dvdt[n - 1] = (v[n - 1] - v[n - 2]) / (t[n - 1] - t[n - 2] + epsilon)
        dvdt[n - 2] = (v[n - 1] - v[n - 3]) / (t[n - 1] - t[n - 3] + epsilon)
        dvdt[n - 3] = (v[n - 2] - v[n - 4]) / (t[n - 2] - t[n - 4] + epsilon)

        # (2) Compute the rest of the points
        if method == "smooth":
            c = [5.0 / 32.0, 4.0 / 32.0, 1.0 / 32.0]
            for i in range(3, t.size - 3):
                for j in range(1, 4):
                    if (t[i + j] - t[i - j]) == 0:
                        dvdt[i] += 0
                    else:
                        dvdt[i] += (
                            2 * j * c[j - 1] * (v[i + j] - v[i - j]) / (t[i + j] - t[i - j])
                        )
                        
        elif method == "centered":
            for i in range(1, t.size - 1):
                for j in range(1, 4):
                    delta_t = t[i + 1] - t[i - 1]
                    if abs(delta_t) > epsilon:
                        dvdt[i] = (v[i + 1] - v[i - 1]) / delta_t
                    else: # Handle case where t[i+1] == t[i-1]
                        # Option 1: Use forward/backward difference
                        delta_t_fwd = t[i+1] - t[i]
                        delta_t_bwd = t[i] - t[i-1]
                        if abs(delta_t_fwd) > epsilon:
                            dvdt[i] = (v[i+1] - v[i]) / delta_t_fwd
                        elif abs(delta_t_bwd) > epsilon:
                            dvdt[i] = (v[i] - v[i-1]) / delta_t_bwd
                        else:
                            dvdt[i] = 0 # Or NaN if points are truly identical in time
        return dvdt

    def _transform_to_pipi(self, input_angle):
        """
        Transforms an angle in radians to the range [-pi, pi].

        Args:
            input_angle: Angle in radians.

        Returns:
            Tuple: (output_angle, revolutions)
                output_angle: Angle wrapped to [-pi, pi].
                revolutions: Number of full revolutions difference.
        """
        pi = math.pi
        two_pi = 2 * pi

        # Simple modulo arithmetic approach
        output_angle = (input_angle + pi) % two_pi - pi

        # Ensure the result is exactly within [-pi, pi] due to potential floating point issues near pi
        if np.isclose(output_angle, pi):
            output_angle = -pi
        elif output_angle < -pi: # Should not happen with modulo, but as safeguard
            output_angle += two_pi
        elif output_angle > pi: # Should not happen with modulo
            output_angle -= two_pi


        # Calculate revolutions based on the wrapped angle
        # Use np.round for robustness against floating point inaccuracies
        revolutions = np.round((input_angle - output_angle) / two_pi)

        return output_angle, int(revolutions)

    def _remove_acceleration_outliers(self, acc_in):
        """
        Removes outliers from an acceleration array by replacing them.
        Creates a copy of the input array to avoid modifying the original.

        Args:
            acc_in: NumPy array of acceleration values.

        Returns:
            NumPy array with outliers handled.
        """
        acc = acc_in.copy() # Create a copy to avoid modifying the original array
        acc_threshold_g = 7.5 # Threshold in g's

        n = acc.size
        if n == 0:
            return acc # Return empty array if input is empty

        # Handle first point
        if abs(acc[0]) > acc_threshold_g:
            # Consider clipping instead of setting to 0:
            # acc[0] = np.sign(acc[0]) * acc_threshold_g
            acc[0] = 0.0 # Original logic

        # Handle middle points
        for i in range(1, n - 1):
            if abs(acc[i]) > acc_threshold_g:
                # Consider clipping: acc[i] = np.sign(acc[i]) * acc_threshold_g
                # Consider averaging neighbors: acc[i] = (acc[i-1] + acc[i+1]) / 2 (if acc[i+1] is not outlier)
                acc[i] = acc[i - 1] # Original logic: replace with previous value

        # Handle last point
        if n > 1: # Need at least two points to access acc[-2]
            if abs(acc[-1]) > acc_threshold_g:
                # Consider clipping: acc[-1] = np.sign(acc[-1]) * acc_threshold_g
                acc[-1] = acc[-2] # Original logic: replace with second-to-last value
        elif n == 1 and abs(acc[0]) > acc_threshold_g: # If only one point and it's an outlier
            acc[0] = 0.0 # Re-apply first point logic if needed (already done above)


        return acc


    def compute_accelerations(self, telemetry):
        # --- Input Preparation ---
        
        time_data = telemetry['Time']
        speed_kmh = np.array(telemetry['Speed'])
        distance = np.array(telemetry['Distance'])
        x_coords = np.array(telemetry['X'])
        y_coords = np.array(telemetry['Y'])

        # Convert speed to m/s
        v_mps = speed_kmh / 3.6
        g = 9.81 # Acceleration due to gravity

        # --- Longitudinal Acceleration ---
        # Calculate dv/dt (acceleration in m/s^2)
        lon_acc_mps2 = self._smooth_derivative(time_data, v_mps)
        # Convert to g's
        lon_acc_g = lon_acc_mps2 / g

        # --- Lateral Acceleration ---
        # Calculate path tangent components dx/ds and dy/ds (unitless)
        # Using distance 's' as the independent variable
        dx_ds = self._smooth_derivative(distance, x_coords)
        dy_ds = self._smooth_derivative(distance, y_coords)

        n_points = dx_ds.size
        if n_points == 0:
             return np.array([]), np.array([]) # Handle empty input

        # Calculate path angle theta (radians) iteratively
        theta = np.zeros(n_points)
        if n_points > 0:
             # Initial angle
             theta[0] = math.atan2(dy_ds[0], dx_ds[0])
             # Integrate angle changes, wrapping correctly
             for i in range(1, n_points): # *** FIXED LOOP START ***
                 # Calculate the angle change from the previous point
                 current_segment_angle = math.atan2(dy_ds[i], dx_ds[i])
                 delta_theta_raw = current_segment_angle - theta[i - 1]
                 # Wrap the change to [-pi, pi] to avoid large jumps
                 delta_theta_wrapped, _ = self._transform_to_pipi(delta_theta_raw) # :c
                 # Add the wrapped change to the previous angle
                 theta[i] = theta[i - 1] + delta_theta_wrapped

        # Calculate curvature kappa = d(theta)/ds (rad/meter)
        kappa = self._smooth_derivative(distance, theta)

        # Calculate lateral acceleration: a_lat = v^2 * kappa (m/s^2)
        lat_acc_mps2 = v_mps * v_mps * kappa
        # Convert to g's
        lat_acc_g = lat_acc_mps2 / g

        # --- Remove Outliers ---
        # Note: remove_acceleration_outliers now returns a copy
        lon_acc_g_clean = self._remove_acceleration_outliers(lon_acc_g)
        lat_acc_g_clean = self._remove_acceleration_outliers(lat_acc_g)

        # --- Return rounded results ---
        import pandas as pd
        
        telemetry["LonAcc"] = np.round(lon_acc_g_clean, 5)
        telemetry["LatAcc"] = np.round(lat_acc_g_clean, 5)
        
        return telemetry

In [119]:
df_1 = (
    df.select("Speed","DriverNumber", "LapNumber")
    .groupBy("DriverNumber", "LapNumber")
    .agg({"Speed" : "mean"})
    .withColumnRenamed("avg(Speed)", "MeanLapSpeed")
)

df = df.join(
    df_1,
    on=["DriverNumber", "LapNumber"],
    how="left"
)


In [116]:
df.show()

+------------+---------+--------------------+------------+-----------+---------------------+--------------------+-----------------+------------------+-----+------------------+-----+---+-------------+--------------------+--------------------+-------+-------------------+------------------+-------------------+------------------+------------------+------------------+
|DriverNumber|LapNumber|                Date|SesssionTime|DriverAhead|DistanceToDriverAhead|                Time|              RPM|             Speed|nGear|          Throttle|Brake|DRS|       Source|            Distance|    RelativeDistance| Status|                  X|                 Y|                  Z|      MeanLapSpeed|      MeanLapSpeed|      MeanLapSpeed|
+------------+---------+--------------------+------------+-----------+---------------------+--------------------+-----------------+------------------+-----+------------------+-----+---+-------------+--------------------+--------------------+-------+-------------------

In [14]:
df.dtypes

[('DriverNumber', 'string'),
 ('LapNumber', 'double'),
 ('Date', 'timestamp'),
 ('SesssionTime', 'interval day to second'),
 ('DriverAhead', 'string'),
 ('DistanceToDriverAhead', 'double'),
 ('Time', 'interval day to second'),
 ('RPM', 'double'),
 ('Speed', 'double'),
 ('nGear', 'bigint'),
 ('Throttle', 'double'),
 ('Brake', 'boolean'),
 ('DRS', 'bigint'),
 ('Source', 'string'),
 ('Distance', 'double'),
 ('RelativeDistance', 'double'),
 ('Status', 'string'),
 ('X', 'double'),
 ('Y', 'double'),
 ('Z', 'double'),
 ('MeanLapSpeed', 'double')]

In [120]:
schema = StructType([
    StructField("Date", LongType() ,False),
    StructField("SesssionTime", DayTimeIntervalType(), False),
    StructField("DriverAhead", StringType(), True),
    StructField("DistanceToDriverAhead", DoubleType(), True),
    StructField("Time", DayTimeIntervalType(), False),
    StructField("RPM", DoubleType(), False),
    StructField("Speed", DoubleType(), False),
    StructField("nGear", LongType(), False),
    StructField("Throttle", DoubleType(), False),
    StructField("Brake", BooleanType(), False),
    StructField("DRS", LongType(), False),
    StructField("Source", StringType(), True),
    StructField("Distance", DoubleType(), False),
    StructField("RelativeDistance", DoubleType(), False),
    StructField("Status", StringType(), False),
    StructField("X", DoubleType(), False),
    StructField("Y", DoubleType(), False),
    StructField("Z", DoubleType(), False),
    StructField("DriverNumber", StringType(), False),
    StructField("LapNumber", DoubleType(), False),
    StructField("MeanLapSpeed", DoubleType(), False),
    StructField("LonAcc", DoubleType(), True),
    StructField("LatAcc", DoubleType(), True)
])

In [16]:
df_1.dtypes

[('DriverNumber', 'string'),
 ('LapNumber', 'double'),
 ('MeanLapSpeed', 'double')]

In [113]:
df_1.show()

+------------+---------+------------------+
|DriverNumber|LapNumber|      MeanLapSpeed|
+------------+---------+------------------+
|          11|     15.0|194.35612153116574|
|          11|     47.0| 200.6963424215652|
|          20|     17.0|197.42696503408993|
|          20|     53.0|197.01068671232807|
|           4|      6.0|189.57265023776773|
|          63|     13.0|184.17259347880514|
|          14|     41.0|152.18477726347604|
|          16|     32.0| 195.9106679280459|
|          18|     14.0|193.58761614653844|
|          20|     25.0|194.84962536586335|
|          22|     50.0| 198.1708943530118|
|          24|      7.0|191.55871851830202|
|          31|     19.0|194.81823266670742|
|          77|     38.0|198.05058459827723|
|          77|     53.0|197.81893622612213|
|           1|      6.0|197.69601552877904|
|          10|     46.0|202.05557293686562|
|          23|     36.0|195.75353913971037|
|          23|     51.0|197.63542171028314|
|           4|     27.0|190.7481

In [121]:
computations = AccelerationComputations()

df = (
    df.groupby('DriverNumber', 'LapNumber')
    .applyInPandas(
        lambda pdf: computations.compute_accelerations(pdf), 
        schema=schema
        )
)

df.show()
# all_lon_series = [pd.Series(arr) for arr in all_lon]
# all_lat_series = [pd.Series(arr) for arr in all_lat]

# self.data['LonAcc'] = pd.concat(all_lon_series, ignore_index=True)
# self.data['LatAcc'] = pd.concat(all_lat_series, ignore_index=True)

# self.data['AbsLatAcc'] = self.data['LatAcc'].abs()
# self.data['AbsLonAcc'] = self.data['LonAcc'].abs()

# self.data['SumLatAcc'] = self.data.groupby(['DriverNumber', 'LapNumber'])['AbsLatAcc'].transform('sum')
# self.data['SumLonAcc'] = self.data.groupby(['DriverNumber', 'LapNumber'])['AbsLonAcc'].transform('sum')

# logging.debug("df after calculate_accelerations %s", self.data.head().to_string(max_cols=None))



+----------------+--------------------+-----------+---------------------+--------------------+------------------+------------------+-----+--------+-----+---+-------------+-------------------+--------------------+-------+-------------------+------------------+-------------------+------------+---------+------------------+------+--------+
|            Date|        SesssionTime|DriverAhead|DistanceToDriverAhead|                Time|               RPM|             Speed|nGear|Throttle|Brake|DRS|       Source|           Distance|    RelativeDistance| Status|                  X|                 Y|                  Z|DriverNumber|LapNumber|      MeanLapSpeed|LonAcc|  LatAcc|
+----------------+--------------------+-----------+---------------------+--------------------+------------------+------------------+-----+--------+-----+---+-------------+-------------------+--------------------+-------+-------------------+------------------+-------------------+------------+---------+------------------+---

                                                                                

In [18]:
from pyspark.sql.functions import col, when, lit, abs



df_3 = df_2.withColumn(
    'AbsLatAcc',
    abs('LatAcc')
)

df_3 = df_3.withColumn(
    'AbsLonAcc',
    abs('LonAcc')
)



In [19]:
df_3.show()

+------------+---------+--------------------+------------------+-------------------+-------------------+------------------+------+--------+---------+---------+
|DriverNumber|LapNumber|                Time|             Speed|           Distance|                  X|                 Y|LonAcc|  LatAcc|AbsLatAcc|AbsLonAcc|
+------------+---------+--------------------+------------------+-------------------+-------------------+------------------+------+--------+---------+---------+
|           1|        6|INTERVAL '0 00:00...|281.23928777142856|0.16063882115776318|  -383.132866323613| 1225.316997585461|3.0E-4|-0.77817|  0.77817|   3.0E-4|
|           1|        6|INTERVAL '0 00:00...| 281.2821421714286|0.47447099937246007|             -383.0|            1228.0|3.0E-4| -0.0787|   0.0787|   3.0E-4|
|           1|        6|INTERVAL '0 00:01...|             282.0|  5.735833333333332|-380.67605169384024|1274.5369401852865|2.1E-4| 0.00568|  0.00568|   2.1E-4|
|           1|        6|INTERVAL '0 00:0

In [20]:
import pyspark.sql.functions as F

df_4 = df_3.groupBy('DriverNumber', 'LapNumber').agg(F.sum("AbsLatAcc"))

In [21]:
df_3 = df_3.join(df_4, on=['DriverNumber', 'LapNumber'])

In [22]:
from pyspark.sql.window import Window
# Define window partitioned by DriverNumber and LapNumber
w = Window.partitionBy('DriverNumber', 'LapNumber')

# Add a new column with sum of AbsLatAcc per group
df_3 = df_3.withColumn('SumAbsLatAcc', F.sum('AbsLatAcc').over(w))

In [23]:
df_3.show()

                                                                                

+------------+---------+--------------------+------------------+-------------------+-------------------+------------------+------+--------+---------+---------+-----------------+-----------------+
|DriverNumber|LapNumber|                Time|             Speed|           Distance|                  X|                 Y|LonAcc|  LatAcc|AbsLatAcc|AbsLonAcc|   sum(AbsLatAcc)|     SumAbsLatAcc|
+------------+---------+--------------------+------------------+-------------------+-------------------+------------------+------+--------+---------+---------+-----------------+-----------------+
|           1|        4|INTERVAL '0 00:00...|       279.9928576|0.15449784914279757|-383.22860952411617| 1216.934212094218|2.0E-4| 5.47028|  5.47028|   2.0E-4|853.4900499999997|853.4900499999997|
|           1|        4|INTERVAL '0 00:00...|             280.0|0.23277777777778041|-383.18941313906845| 1217.710973505326|3.5E-4| 0.07369|  0.07369|   3.5E-4|853.4900499999997|853.4900499999997|
|           1|      

In [None]:
df_5 = df.dropDuplicates(['DriverNumber', 'LapNumber'])

In [45]:
w = Window.partitionBy("DriverNumber", "LapNumber")

df_6 = df_3.withColumn("TimeNumberLapTime", F.row_number().over(Window.partitionBy("DriverNumber", "LapNumber").orderBy("Time")))

df_6 = df_6.withColumn("TimeNumberLapCounts", F.max("TimeNumberLapTime").over(w))

df_6 = df_6.withColumn("LapProgress", col("TimeNumberLapTime") / col("TimeNumberLapCounts"))

In [46]:
df_6.show(truncate=False)



+------------+---------+-----------------------------------+------------------+-------------------+-------------------+------------------+------+--------+---------+---------+-----------------+-----------------+-----------------+-------------------+---------------------+
|DriverNumber|LapNumber|Time                               |Speed             |Distance           |X                  |Y                 |LonAcc|LatAcc  |AbsLatAcc|AbsLonAcc|sum(AbsLatAcc)   |SumAbsLatAcc     |TimeNumberLapTime|TimeNumberLapCounts|LapProgress          |
+------------+---------+-----------------------------------+------------------+-------------------+-------------------+------------------+------+--------+---------+---------+-----------------+-----------------+-----------------+-------------------+---------------------+
|1           |4        |INTERVAL '0 00:00:00' DAY TO SECOND|279.9928576       |0.15449784914279757|-383.22860952411617|1216.934212094218 |2.0E-4|5.47028 |5.47028  |2.0E-4   |853.490049999

                                                                                