# Setup

In [1]:
import random
import numpy as np
import pandas as pd

SEED = 42
random.seed(SEED)
np.random.seed(SEED)

# Classes

In [2]:
TARGET_TIRE_PRESSURE = 2.5
MINIMUM_TIRE_PRESSURE = 1.5
MINIMUM_ENGINE_RPM = 800

class Car:
    """The attributes are used as features of our dataset.
    They are updated in the `step()` method of the `Environment` class."""

    def __init__(self, model, make, year, number_seats, weight_in_kg, price_in_euro, fuel_level_in_percent):
        # Attributes that are constant for a given car
        self.model: str = model
        self.make: str = make
        self.year: int = year
        self.number_seats: int = number_seats
        self.weight_in_kg: int = weight_in_kg
        self.price_in_euro: int = price_in_euro

        # Attributes that are updated in the `step()` method of the `Environment` class
        self.fuel_level_in_percent: float | None = fuel_level_in_percent  # 0 - 1
        self.tire_pressure: float = TARGET_TIRE_PRESSURE
        self.acceleration: float = 0
        self.speed: int = 0
        self.engine_temperature: float = 20
        self.engine_rpm: int = MINIMUM_ENGINE_RPM
        self.engine_oil_pressure: float = 80
        self.cruise_control: bool = False
        self.cruise_control_speed: int | None = None

In [3]:
class Environment:
    """The `Environment` class is responsible for updating the state of the car."""

    def __init__(self, ambient_light_level_in_lux: int, chance_of_drive_end_arbitrary: float = 0.01):
        """Create a new environment."""
        self.ambient_light_level_in_lux: int = ambient_light_level_in_lux
        self.current_timestamp = 0
        self.ended = False
        self.chance_of_drive_end_arbitrary: float = chance_of_drive_end_arbitrary

    def step(self, car: Car, next_acceleration: float):
        """Update the state of the car.

        For every attribute of the car, we compute a change and apply it to the current value.
        How the attributes change over time is dependent on the attribute:
        - `ended`:
            - Probability of the drive ending is 1 if ambient light level is below 300
            - And `chance_of_drive_end_arbitrary` otherwise
        - `ambient_light_level_in_lux`:
            - Monotonically decreasing by a random value between 0 and 100 (uniform)
            - If the ambient light level is below 300, the drive ends
            - Lower bound is 0
        - `tire_pressure`:
            - Bound to [1.5, 2.5]
            - Probability of decreasing is 2%
            - Value of the change is between -0.1 and -0.2 (uniform)
            - If the tire pressure is below 1.5, it is set to 2.5 (i.e. the tires are inflated)
        - `cruise_control`:
            - Probability of activating is 1%
            - Probability of deactivating is 10%
            - If cruise control is active, the cars speed is unchanged and 
              the cruise control speed is set to the current speed
            - If cruise control is not active, the cars speed is changed based
              on the acceleration, and the cruise control speed is set to `None`
        - `speed`:
            - Bound to [0, 300]
            - Value of the change is the acceleration multiplied by the tire pressure
        - `engine_temperature`:
            - Bound to [85, 115]
            - Correlated with `acceleration`
        - `engine_rpm`:
            - Bound to [800, 7000]
            - Correlated with `acceleration`
        - `engine_oil_pressure`:
            - Bound to [0, 200]
            - Correlated with `engine_rpm`
        - `fuel_level_in_percent`:
            - Always decreases by 0.005 * (TARGET_TIRE_PRESSURE / car.tire_pressure)
            - If the car is accelerating, it decreases by an additional 0.01
            - If the fuel level is 0, it is set to 1 (i.e. the car is refueled)
            - If the fuel level is `None`, it is never refueled
            - The probability of refueling is calculated based on the current fuel level
              and calculated by -ln(fuel_level) / 20
        """

        self.current_timestamp += 1
        if self.ended:
            raise Exception("The drive has already ended")

        self.update_ambient_light()
        self.check_end_of_drive()
        self.update_tire_pressure(car=car)
        self.update_cruise_control(car=car, next_acceleration=next_acceleration)
        self.update_speed(car=car, next_acceleration=next_acceleration)
        self.update_engine_parameters(car=car)
        self.update_fuel_level(car=car)

    def update_ambient_light(self):
        delta: int = -int(random.uniform(0, 100))
        self.ambient_light_level_in_lux = max(0, self.ambient_light_level_in_lux + delta)

    def check_end_of_drive(self):
        if self.ambient_light_level_in_lux < 300 or random.random() < self.chance_of_drive_end_arbitrary:
            self.ended = True

    def update_tire_pressure(self, car):
        if random.random() < 0.02:
            delta: float = -random.uniform(0.1, 0.2)
            car.tire_pressure = max(MINIMUM_TIRE_PRESSURE, car.tire_pressure + delta)
        if car.tire_pressure <= MINIMUM_TIRE_PRESSURE:
            car.tire_pressure = TARGET_TIRE_PRESSURE

    def update_cruise_control(self, car, next_acceleration):
        if random.random() < 0.01:
            car.cruise_control = True
        elif random.random() < 0.1:
            car.cruise_control = False

        if car.cruise_control:
            car.cruise_control_speed = car.speed
        else:
            car.acceleration = next_acceleration
            car.cruise_control_speed = None

    def update_speed(self, car, next_acceleration):
        if not car.cruise_control:
            delta = next_acceleration * (car.tire_pressure / TARGET_TIRE_PRESSURE)
            car.speed = max(0, min(300, int(car.speed + delta)))

    def update_engine_parameters(self, car):
        # Engine Temperature
        acceleration_normalized = (car.acceleration + 10) / 20
        temp = (acceleration_normalized * 30) + 70 + random.normalvariate(mu=0, sigma=2.5)
        car.engine_temperature = max(85, min(115, temp))

        # Engine RPM
        rpm = int((acceleration_normalized * 6200) + 600 + random.normalvariate(mu=0, sigma=1000))
        car.engine_rpm = max(800, min(7000, rpm))

        # Engine Oil Pressure
        rpm_normalized: float = car.engine_rpm / 7000
        pressure: float = rpm_normalized * 180 + random.normalvariate(10, 20)
        car.engine_oil_pressure = max(0, min(200, pressure))

    def update_fuel_level(self, car):
        delta = -0.005 * (TARGET_TIRE_PRESSURE / car.tire_pressure)
        if car.acceleration > 0:
            delta -= 0.01

        if car.fuel_level_in_percent is not None:
            probability_of_refuel = 1 if car.fuel_level_in_percent == 0 else -np.log(max(car.fuel_level_in_percent, 0.0001)) / 20
            if random.random() < probability_of_refuel:
                delta += 1

            car.fuel_level_in_percent = max(0, min(1, car.fuel_level_in_percent + delta))

# Preconfigured cars

In [4]:
# Sample cars
audi_a7 = Car(model="A7", make="Audi", year=2020, number_seats=5, weight_in_kg=1645, price_in_euro=72400, fuel_level_in_percent=0.7)
bmw_3_series = Car(
    model="3 Series", make="BMW", year=2021, number_seats=5, weight_in_kg=1570, price_in_euro=50200, fuel_level_in_percent=0.8
)
mercedes_e_class = Car(
    model="E-Class", make="Mercedes-Benz", year=2019, number_seats=5, weight_in_kg=1780, price_in_euro=64500, fuel_level_in_percent=0.6
)
ford_mustang = Car(
    model="Mustang", make="Ford", year=2022, number_seats=4, weight_in_kg=1685, price_in_euro=56500, fuel_level_in_percent=0.7
)
toyota_camry = Car(
    model="Camry", make="Toyota", year=2020, number_seats=5, weight_in_kg=1525, price_in_euro=38900, fuel_level_in_percent=0.9
)
volkswagen_golf = Car(
    model="Golf", make="Volkswagen", year=2021, number_seats=5, weight_in_kg=1265, price_in_euro=27800, fuel_level_in_percent=0.8
)
tesla_model_s = Car(
    model="Model S", make="Tesla", year=2022, number_seats=5, weight_in_kg=2100, price_in_euro=79900, fuel_level_in_percent=0.85
)
honda_civic = Car(
    model="Civic", make="Honda", year=2019, number_seats=5, weight_in_kg=1295, price_in_euro=25500, fuel_level_in_percent=0.75
)
jaguar_f_type = Car(
    model="F-Type", make="Jaguar", year=2023, number_seats=2, weight_in_kg=1665, price_in_euro=81200, fuel_level_in_percent=0.7
)
# Append cars to a list
car_list = [
    audi_a7,
    bmw_3_series,
    mercedes_e_class,
    ford_mustang,
    toyota_camry,
    volkswagen_golf,
    tesla_model_s,
    honda_civic,
    jaguar_f_type,
]

# Generators

In [5]:
def generate_car() -> Car:
    car: Car = random.choice(car_list)
    car.year = random.randint(1990, 2024)

    # Depreciate the car's price each year since its manufacture
    car.price_in_euro = max(
        20_000,
        car.price_in_euro + sum(random.randint(-700, 100) for _ in range(2024 - car.year)),
    )

    car.fuel_level_in_percent = random.random()
    return car


def generate_env(chance_of_drive_end_arbitrary: float) -> Environment:
    return Environment(
        ambient_light_level_in_lux=random.randint(5000, 15000),
        chance_of_drive_end_arbitrary=chance_of_drive_end_arbitrary,
    )


def create_row(car: Car, env: Environment, ride_number: int) -> pd.DataFrame:
    return pd.DataFrame(
        [
            {
                "model": car.model,
                "make": car.make,
                "year": car.year,
                "number_seats": car.number_seats,
                "weight_in_kg": car.weight_in_kg,
                "price_in_euro": car.price_in_euro,
                "ride": ride_number,
                "current_timestamp": env.current_timestamp,
                "ambient_light_level_in_lux": env.ambient_light_level_in_lux,
                "fuel_level_in_percent": car.fuel_level_in_percent,
                "tire_pressure": car.tire_pressure,
                "acceleration": car.acceleration,
                "speed": car.speed,
                "engine_temperature": car.engine_temperature,
                "engine_rpm": car.engine_rpm,
                "engine_oil_pressure": car.engine_oil_pressure,
                "cruise_control": car.cruise_control,
                "cruise_control_speed": car.cruise_control_speed,
            }
        ]
    )

# Dataset generator

In [6]:
import random
import pandas as pd

def generate_dataset(
    number_of_rows: int,
    custom_outlier_percentage_engine_temperature: float,
    custom_outlier_percentage_engine_rpm: float,
    number_of_order_inversions: int,
    chance_of_drive_end_arbitrary: float,
) -> pd.DataFrame:
    """Generate a dataset simulating car rides.
    
    This function creates a DataFrame with data generated
    by simulating a car's environment and its interactions.

    A feature of the dataset is that it is time series data.
    I.e. the values of the rows are dependent on the values
    of the previous rows.

    Apart from simulating MCAR, MNAR values, outliers and time
    inaccuracy, the dataset also contains correlations between
    some features.
    A quirk to be discovered is that the ride always ends when
    the ambient light level is below 300 lux. The reason for that
    is that the car is not equipped with headlights.

    For information about the distributions and properties of the
    features please look at the docstring of the `step()` method
    in the `Environment` class.
    
    Args:
    - number_of_rows (int): The number of rows in the dataset
    - custom_outlier_percentage_engine_temperature (float between 0 and 1): Probability of an outlier in engine temperature
    - custom_outlier_percentage_engine_rpm (float between 0 and 1): Probability of an outlier in engine RPM
    - number_of_order_inversions (int larger than 0): Number of times rows in the dataset are swapped
    - chance_of_drive_end_arbitrary (float between 0 and 1): Probability of a simulated drive ending at each step

    Returns:
    - A pandas DataFrame representing the generated dataset
    """

    output_df = pd.DataFrame()
    car, env, ride_number = initialize_simulation(chance_of_drive_end_arbitrary=chance_of_drive_end_arbitrary)

    next_acceleration: float = random.uniform(0, 10)

    for _ in range(number_of_rows):
        if env.ended:
            car, env, ride_number = restart_ride(chance_of_drive_end_arbitrary=chance_of_drive_end_arbitrary, ride_number=ride_number)

        next_acceleration = calculate_next_acceleration(acceleration=next_acceleration)
        env.step(car=car, next_acceleration=next_acceleration)

        row = create_row(car=car, env=env, ride_number=ride_number)
        output_df = pd.concat([output_df, row], ignore_index=True)

    compute_and_print_correlations(output_df)
    manipulate_dataset(df=output_df, 
                       temp_outlier_percentage=custom_outlier_percentage_engine_temperature, 
                       rpm_outlier_percentage=custom_outlier_percentage_engine_rpm, 
                       order_inversions=number_of_order_inversions)

    return output_df

def initialize_simulation(chance_of_drive_end_arbitrary: float) -> tuple[Car, Environment, int]:
    """Initialize the simulation environment and car.

    Args:
    - chance_of_drive_end_arbitrary: Probability of a simulated drive ending at each step

    Returns:
    - Tuple of initialized Car, Environment, and ride number
    """
    car: Car = generate_car()
    env: Environment = generate_env(chance_of_drive_end_arbitrary=chance_of_drive_end_arbitrary)
    return car, env, 0

def restart_ride(chance_of_drive_end_arbitrary: float, ride_number: int) -> tuple[Car, Environment, int]:
    """Restart the ride by generating a new car and environment.

    Args:
    - chance_of_drive_end_arbitrary: Probability of a simulated drive ending at each step
    - ride_number: The current ride number

    Returns:
    - Tuple of new Car, Environment, and incremented ride number
    """
    car: Car = generate_car()
    env: Environment = generate_env(chance_of_drive_end_arbitrary=chance_of_drive_end_arbitrary)
    return car, env, ride_number+1

def calculate_next_acceleration(acceleration) -> float:
    """Calculate the next acceleration value based on a normal distribution.

    Returns:
    - A float representing the next acceleration value
    """
    diff = random.normalvariate(mu=0.3, sigma=5.5)
    return max(-10, min(10, acceleration + diff))


def compute_and_print_correlations(df: pd.DataFrame):
    """Compute and print correlations in the dataset.

    Args:
    - df: The dataset DataFrame
    """
    corr_temp_accel: float = df[df["acceleration"] != 0]["engine_temperature"].corr(df["acceleration"])
    corr_rpm_accel: float = df[df["acceleration"] != 0]["engine_rpm"].corr(df["acceleration"])
    corr_oil_pressure_rpm: float = df["engine_oil_pressure"].corr(df["engine_rpm"])

    print(f"Correlation: engine_temperature <-> acceleration: {corr_temp_accel}"
          f"\nCorrelation: engine_rpm <-> acceleration: {corr_rpm_accel}"
          f"\nCorrelation: engine_oil_pressure <-> engine_rpm: {corr_oil_pressure_rpm}")


def manipulate_dataset(df: pd.DataFrame, temp_outlier_percentage: float, rpm_outlier_percentage: float, order_inversions: int):
    """Manipulate the dataset by adding outliers, swapping rows, and simulating sensor errors.

    Args:
    - df: The dataset DataFrame
    - temp_outlier_percentage: Probability of an outlier in engine temperature
    - rpm_outlier_percentage: Probability of an outlier in engine RPM
    - order_inversions: Number of times rows in the dataset are swapped
    """

    simulate_broken_fuel_sensor(df=df)
    simulate_time_inaccuracy(df=df, order_inversions=order_inversions)
    introduce_outliers(df=df, temp_outlier_percentage=temp_outlier_percentage, rpm_outlier_percentage=rpm_outlier_percentage)
    simulate_missing_data(df=df)


def simulate_broken_fuel_sensor(df: pd.DataFrame):
    """Simulate data missing not at random (MNAR).

    Reason: Fuel sensor unfunctional for fuel level between 85% and 90%.

    Args:
    - df: The dataset DataFrame
    """
    df["fuel_level_in_percent"] = df["fuel_level_in_percent"].map(lambda x: None if 0.85 < x < 0.9 else x)


def simulate_time_inaccuracy(df: pd.DataFrame, order_inversions: int):
    """Simulate time inaccuracy by swapping rows.

    Args:
    - df: The dataset DataFrame
    - order_inversions: Number of times rows in the dataset are swapped
    """
    for _ in range(order_inversions):
        index = random.randint(0, len(df) - 2)
        df.iloc[index], df.iloc[index + 1] = df.iloc[index + 1].copy(), df.iloc[index].copy()


def introduce_outliers(df: pd.DataFrame, temp_outlier_percentage: float, rpm_outlier_percentage: float):
    """Introduce outliers into the dataset.

    Args:
    - df: The dataset DataFrame
    - temp_outlier_percentage: Probability of an outlier in engine temperature
    - rpm_outlier_percentage: Probability of an outlier in engine RPM
    """

    df["engine_temperature"] = df["engine_temperature"].map(lambda x: x**2 if random.random() < temp_outlier_percentage else x)
    df["engine_rpm"] = df["engine_rpm"].map(lambda x: int(random.normalvariate(mu=x, sigma=1000)) if random.random() < rpm_outlier_percentage else x)


def simulate_missing_data(df: pd.DataFrame):
    """Simulate data missing completely at random (MCAR).

    Args:
    - df: The dataset DataFrame
    """

    df["engine_temperature"] = df["engine_temperature"].map(lambda x: None if random.random() < 0.005 else x)
    df["engine_oil_pressure"] = df["engine_oil_pressure"].map(lambda x: None if random.random() < 0.01 else x)

In [7]:
df = generate_dataset(
    number_of_rows=20_000,
    custom_outlier_percentage_engine_temperature=0.005,
    custom_outlier_percentage_engine_rpm=0.005,
    chance_of_drive_end_arbitrary=0.01,
    number_of_order_inversions=20,
)

Correlation: engine_temperature <-> acceleration: 0.8818346186790944
Correlation: engine_rpm <-> acceleration: 0.9193901901477703
Correlation: engine_oil_pressure <-> engine_rpm: 0.9443685223102286


In [8]:
df

Unnamed: 0,model,make,year,number_seats,weight_in_kg,price_in_euro,ride,current_timestamp,ambient_light_level_in_lux,fuel_level_in_percent,tire_pressure,acceleration,speed,engine_temperature,engine_rpm,engine_oil_pressure,cruise_control,cruise_control_speed
0,3 Series,BMW,1991,5,1570,39986,0,1,7520,0.743807,2.50000,5.496593,5,95.551409,6253,196.911722,False,
1,3 Series,BMW,1991,5,1570,39986,0,2,7434,0.728807,2.50000,10.000000,15,99.018399,6290,150.832828,False,
2,3 Series,BMW,1991,5,1570,39986,0,3,7408,0.713807,2.50000,8.750467,23,99.300320,6954,200.000000,False,
3,3 Series,BMW,1991,5,1570,39986,0,4,7387,0.698807,2.50000,6.672518,29,89.770588,5672,136.032187,False,
4,3 Series,BMW,1991,5,1570,39986,0,5,7348,0.683807,2.50000,10.000000,39,102.628854,6657,200.000000,False,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
19995,E-Class,Mercedes-Benz,2013,5,1780,20000,254,3,9579,0.345510,2.01391,5.776843,140,91.714345,4894,132.048692,False,
19996,E-Class,Mercedes-Benz,2013,5,1780,20000,254,4,9554,0.329303,2.01391,7.658607,146,95.174602,5834,145.880749,False,
19997,E-Class,Mercedes-Benz,2013,5,1780,20000,254,5,9516,0.313096,2.01391,10.000000,154,100.955816,7000,188.849634,False,
19998,E-Class,Mercedes-Benz,2013,5,1780,20000,254,6,9463,0.296889,2.01391,10.000000,162,97.368373,5192,189.716227,False,


In [None]:
df.to_csv("car_data.csv", index=False)

: 