In [3]:
!pip install numpy
import polars as pl
import numpy as np
import random
from datetime import datetime, timedelta

In addition, using fork() with Python in general is a recipe for mysterious
deadlocks and crashes.

The most likely reason you are seeing this error is because you are using the
multiprocessing module on Linux, which uses fork() by default. This will be
fixed in Python 3.14. Until then, you want to use the "spawn" context instead.

See https://docs.pola.rs/user-guide/misc/multiprocessing/ for details.

  pid, fd = os.forkpty()


Collecting numpy
  Downloading numpy-2.1.3-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (62 kB)
Downloading numpy-2.1.3-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (16.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m16.0/16.0 MB[0m [31m743.6 kB/s[0m eta [36m0:00:00[0m00:01[0m00:02[0m
[?25hInstalling collected packages: numpy
Successfully installed numpy-2.1.3


# Descripcion

Los datos son notificaciones de dispositivos GPS en Mexico. En promedio generan notificaciones automatizadas cada 5 minutos si el carro esta encendido, y 30 si esta apagado.  

Cada notificacion esta acompannada de un evento de lo que esta ocurriendo, y trae la latitud y longitud.  

El objetico es predecir si un vehiculo esta siendo robado de acuerdo a sus notificaciones, por lo que el primer paso seria limpiar datos y hacer ingenieria de variables.

Trata de hacerlo **lazy** si puedes.

In [7]:
def generate_dummy_data(num_cars, start_time, end_time, working_hours_interval, non_working_hours_interval):
    data = []

    # Define the latitude and longitude ranges for Mexico
    min_latitude, max_latitude = 14.5388, 32.7186
    min_longitude, max_longitude = -118.4662, -86.7104

    for car_id in range(num_cars):
        current_time = start_time

        # Generate random initial latitude and longitude for each car
        latitude = random.uniform(min_latitude, max_latitude)
        longitude = random.uniform(min_longitude, max_longitude)

        while current_time < end_time:
            if current_time.weekday() < 5 and 9 <= current_time.hour < 17:
                # Working hours (Monday to Friday, 9 AM to 5 PM)
                interval = working_hours_interval
            else:
                # Non-working hours
                interval = non_working_hours_interval

            # Generate notification with 99% probability
            if random.random() < 0.99:
                notification = random.choice(["low_fuel", "tire_pressure", "engine_check", None])
                data.append((f"car_{car_id}", current_time.isoformat(), latitude, longitude, notification))

            # Generate additional notifications between intervals
            while True:
                additional_interval = random.expovariate(1 / (interval / 2))
                additional_time = current_time + timedelta(minutes=additional_interval)
                if additional_time >= current_time + timedelta(minutes=interval):
                    break
                notification = random.choice(["low_fuel", "tire_pressure", "engine_check", None])
                data.append((f"car_{car_id}", additional_time.isoformat(), latitude, longitude, notification))

            # Update latitude and longitude for car movement
            latitude += random.uniform(-0.01, 0.01)
            longitude += random.uniform(-0.01, 0.01)

            # Check if the car is among the 1% that can have 100 notifications within 5 minutes
            if random.random() < 0.01:
                burst_start_time = current_time + timedelta(minutes=random.uniform(0, interval))
                burst_end_time = burst_start_time + timedelta(minutes=5)
                while current_time < burst_end_time:
                    notification = random.choice(["low_fuel", "tire_pressure", "engine_check", None])
                    data.append((f"car_{car_id}", current_time.isoformat(), latitude, longitude, notification))
                    current_time += timedelta(seconds=random.uniform(1, 10))

            current_time += timedelta(minutes=interval)

    # Create a Polars DataFrame from the generated data
    df = pl.DataFrame(
        {
            "car_id": [record[0] for record in data],
            "timestamp": [record[1] for record in data],
            "latitude": [record[2] for record in data],
            "longitude": [record[3] for record in data],
            "notification": [record[4] for record in data],
        }
    )

    return df.lazy()

In [8]:
num_cars = 1000
start_time = datetime(2023, 1, 1, 0, 0, 0)  # Start of the week
end_time = start_time + timedelta(weeks=1)  # End of the week
working_hours_interval = 5  # Interval of 5 minutes during working hours
non_working_hours_interval = 30  # Interval of 30 minutes during non-working hours

# Generate the dummy data
data = generate_dummy_data(num_cars, start_time, end_time, working_hours_interval, non_working_hours_interval)

# Print the first few rows of the generated data
print(data.head())

naive plan: (run LazyFrame.explain(optimized=True) to see the optimized plan)

SLICE[offset: 0, len: 5]
  DF ["car_id", "timestamp", "latitude", "longitude"]; PROJECT */5 COLUMNS; SELECTION: None


## Limpieza de datos

### Timestamp

Convierte el `timestamp` que actualmente es string a formato de tiempo en polars

In [None]:
def generate_dummy_data(num_cars, start_time, end_time, working_hours_interval, non_working_hours_interval):
    data = []

    # Definir los rangos de latitud y longitud
    min_latitude, max_latitude = 14.5388, 32.7186
    min_longitude, max_longitude = -118.4662, -86.7104

    for car_id in range(num_cars):
        current_time = start_time

        # lat y long random para cada auto
        latitude = random.uniform(min_latitude, max_latitude)
        longitude = random.uniform(min_longitude, max_longitude)

        while current_time < end_time:
            if current_time.weekday() < 5 and 9 <= current_time.hour < 17:
                # horas de trabajo
                interval = working_hours_interval
            else:
                # horas no laborables
                interval = non_working_hours_interval

            # generar notificacion con un 99 por ciento de prob
            if random.random() < 0.99:
                notification = random.choice(["low_fuel", "tire_pressure", "engine_check", None])
                data.append((f"car_{car_id}", current_time.isoformat(), latitude, longitude, notification))

            # generar notificaciones adicionales entre intervalos
            while True:
                additional_interval = random.expovariate(1 / (interval / 2))
                additional_time = current_time + timedelta(minutes=additional_interval)
                if additional_time >= current_time + timedelta(minutes=interval):
                    break
                notification = random.choice(["low_fuel", "tire_pressure", "engine_check", None])
                data.append((f"car_{car_id}", additional_time.isoformat(), latitude, longitude, notification))

            # actualizar lat y long con el movimiento del carro
            latitude += random.uniform(-0.01, 0.01)
            longitude += random.uniform(-0.01, 0.01)

            # ver si el auto en el 1 por ciento tiene muchas notificaciones en menos de 5 minutos
            if random.random() < 0.01:
                burst_start_time = current_time + timedelta(minutes=random.uniform(0, interval))
                burst_end_time = burst_start_time + timedelta(minutes=5)
                while current_time < burst_end_time:
                    notification = random.choice(["low_fuel", "tire_pressure", "engine_check", None])
                    data.append((f"car_{car_id}", current_time.isoformat(), latitude, longitude, notification))
                    current_time += timedelta(seconds=random.uniform(1, 10))

            current_time += timedelta(minutes=interval)

    # Crear un dataframe de polars
    df = pl.DataFrame(
        {
            "car_id": [record[0] for record in data],
            "timestamp": [record[1] for record in data],
            "latitude": [record[2] for record in data],
            "longitude": [record[3] for record in data],
            "notification": [record[4] for record in data],
        }
    )

    return df.lazy()

# Generar datos dummy
start_time = datetime(2024, 11, 27, 0, 0, 0)
end_time = datetime(2024, 11, 28, 0, 0, 0)

df_lazy = generate_dummy_data(
    num_cars=10,
    start_time=start_time,
    end_time=end_time,
    working_hours_interval=5,
    non_working_hours_interval=30
)

# Convertir el timestamp a formato Datetime con soporte para microsegundos
df_lazy = df_lazy.with_columns(
    pl.col("timestamp").str.strptime(pl.Datetime, format="%Y-%m-%dT%H:%M:%S.%f", strict=False).alias("timestamp")
)

# Collect para materializar y ver los resultados
df = df_lazy.collect()
print(df.head())


shape: (5, 5)
┌────────┬────────────────────────────┬───────────┬────────────┬───────────────┐
│ car_id ┆ timestamp                  ┆ latitude  ┆ longitude  ┆ notification  │
│ ---    ┆ ---                        ┆ ---       ┆ ---        ┆ ---           │
│ str    ┆ datetime[μs]               ┆ f64       ┆ f64        ┆ str           │
╞════════╪════════════════════════════╪═══════════╪════════════╪═══════════════╡
│ car_0  ┆ null                       ┆ 18.219615 ┆ -93.297978 ┆ tire_pressure │
│ car_0  ┆ 2024-11-27 00:04:15.000834 ┆ 18.219615 ┆ -93.297978 ┆ engine_check  │
│ car_0  ┆ 2024-11-27 00:24:12.000774 ┆ 18.219615 ┆ -93.297978 ┆ low_fuel      │
│ car_0  ┆ 2024-11-27 00:15:21.000015 ┆ 18.219615 ┆ -93.297978 ┆ tire_pressure │
│ car_0  ┆ 2024-11-27 00:27:42.000888 ┆ 18.219615 ┆ -93.297978 ┆ low_fuel      │
└────────┴────────────────────────────┴───────────┴────────────┴───────────────┘


  pl.col("timestamp").str.strptime(pl.Datetime, format="%Y-%m-%dT%H:%M:%S.%f", strict=False).alias("timestamp")


### Ingenieria de variables

Dado que va a entrar a un modelo de machine learning es necesario que todas las variables sean numericas, y esten en formato tidy. Cada observacion en una fila, y cada variable en una columna. Por lo tanto se decidio crear estadisticos y agregar los datos a intervalos uniformes de `x` minutos.  

Por ejemplo, colapsar toda la informacion que ocurrion en el intervalo, como el numero de notificaciones en esos 5 minutos, el promedio entre notificaciones, y el tipo de notificaciones.

Existen varias formas de hacer esto, puedes hacerlo con `group_by` primero para crear las nuevas variables, o `group_by` (`rolling`, `dynamic`) usando operaciones sobre listas. Utiliza claude o chat_gpt

1. Crea una nueva variable que compute la diferencia de tiempo entre notificaciones del mismo vehiculo. Piensa como lo vas a hacer. Llama a esta variable `notification_time`
   


In [15]:
# Calcular la diferencia de tiempo entre notificaciones del mismo vehículo
df_lazy = df_lazy.with_columns(
    pl.col("timestamp")
    .diff()  # Calcula la diferencia de tiempo
    .cast(pl.Int64)  # Convierte la duración a enteros (segundos)
    .over("car_id")  # Asegura que las diferencias se calculen por grupo de vehículos
    .alias("notification_time")  # Nombramos la nueva columna
)

# Collect para materializar el resultado
df = df_lazy.collect()
print(df.head())




shape: (5, 6)
┌────────┬────────────────────────────┬───────────┬────────────┬───────────────┬───────────────────┐
│ car_id ┆ timestamp                  ┆ latitude  ┆ longitude  ┆ notification  ┆ notification_time │
│ ---    ┆ ---                        ┆ ---       ┆ ---        ┆ ---           ┆ ---               │
│ str    ┆ datetime[μs]               ┆ f64       ┆ f64        ┆ str           ┆ i64               │
╞════════╪════════════════════════════╪═══════════╪════════════╪═══════════════╪═══════════════════╡
│ car_0  ┆ null                       ┆ 18.219615 ┆ -93.297978 ┆ tire_pressure ┆ null              │
│ car_0  ┆ 2024-11-27 00:04:15.000834 ┆ 18.219615 ┆ -93.297978 ┆ engine_check  ┆ null              │
│ car_0  ┆ 2024-11-27 00:24:12.000774 ┆ 18.219615 ┆ -93.297978 ┆ low_fuel      ┆ 1196999940        │
│ car_0  ┆ 2024-11-27 00:15:21.000015 ┆ 18.219615 ┆ -93.297978 ┆ tire_pressure ┆ -531000759        │
│ car_0  ┆ 2024-11-27 00:27:42.000888 ┆ 18.219615 ┆ -93.297978 ┆ low_fuel    

2. Crea una nueva variable que compute la distancia que viajo el vehiculo desde la ultima notificacion. Llamala `distance`

In [17]:
!pip install geopy

In addition, using fork() with Python in general is a recipe for mysterious
deadlocks and crashes.

The most likely reason you are seeing this error is because you are using the
multiprocessing module on Linux, which uses fork() by default. This will be
fixed in Python 3.14. Until then, you want to use the "spawn" context instead.

See https://docs.pola.rs/user-guide/misc/multiprocessing/ for details.

  pid, fd = os.forkpty()


Collecting geopy
  Downloading geopy-2.4.1-py3-none-any.whl.metadata (6.8 kB)
Collecting geographiclib<3,>=1.52 (from geopy)
  Downloading geographiclib-2.0-py3-none-any.whl.metadata (1.4 kB)
Downloading geopy-2.4.1-py3-none-any.whl (125 kB)
Downloading geographiclib-2.0-py3-none-any.whl (40 kB)
Installing collected packages: geographiclib, geopy
Successfully installed geographiclib-2.0 geopy-2.4.1


In [20]:
from geopy.distance import geodesic

# Crear una función para calcular la distancia geodésica
def calculate_distance(prev_lat, prev_lon, lat, lon):
    if prev_lat is None or prev_lon is None:
        return None  # No hay distancia para la primera fila
    return geodesic((prev_lat, prev_lon), (lat, lon)).meters

# Calcular la distancia en el LazyFrame
df_lazy = df_lazy.with_columns([
    pl.col("latitude").shift(1).over("car_id").alias("prev_latitude"),
    pl.col("longitude").shift(1).over("car_id").alias("prev_longitude"),
]).with_columns(
    pl.struct(["prev_latitude", "prev_longitude", "latitude", "longitude"])
    .map_elements(lambda row: calculate_distance(row["prev_latitude"], row["prev_longitude"], row["latitude"], row["longitude"]))
    .alias("distance")
)

# Collect para materializar el resultado
df = df_lazy.collect()
print(df.head())



  df = df_lazy.collect()


shape: (5, 9)
┌────────┬────────────┬───────────┬────────────┬───┬────────────┬───────────┬───────────┬──────────┐
│ car_id ┆ timestamp  ┆ latitude  ┆ longitude  ┆ … ┆ notificati ┆ prev_lati ┆ prev_long ┆ distance │
│ ---    ┆ ---        ┆ ---       ┆ ---        ┆   ┆ on_time    ┆ tude      ┆ itude     ┆ ---      │
│ str    ┆ datetime[μ ┆ f64       ┆ f64        ┆   ┆ ---        ┆ ---       ┆ ---       ┆ f64      │
│        ┆ s]         ┆           ┆            ┆   ┆ i64        ┆ f64       ┆ f64       ┆          │
╞════════╪════════════╪═══════════╪════════════╪═══╪════════════╪═══════════╪═══════════╪══════════╡
│ car_0  ┆ null       ┆ 18.219615 ┆ -93.297978 ┆ … ┆ null       ┆ null      ┆ null      ┆ null     │
│ car_0  ┆ 2024-11-27 ┆ 18.219615 ┆ -93.297978 ┆ … ┆ null       ┆ 18.219615 ┆ -93.29797 ┆ 0.0      │
│        ┆ 00:04:15.0 ┆           ┆            ┆   ┆            ┆           ┆ 8         ┆          │
│        ┆ 00834      ┆           ┆            ┆   ┆            ┆           ┆

3. Crea intervalos de `x` minutos por carro. Como el numero de notificaciones en esos intervalos no es uniforme tienes que buscar funciones de polars especificas, pero ademas tienen que ser por vehiculo, pues tienen que ser del mismo. Revisa las funciones de `group_by` `dynamic` y `rolling`.
   1. Computa la media, mediana, varianza, max y min de `notification_time` los intervalos de `x` minutos
   2. Computa la media, mediana, varianza, max y min de `distance`


In [34]:
# Validar esquema inicial
print("Esquema inicial:")
print(df_lazy.schema)

# Inspeccionar valores en columnas problemáticas
try:
    print("Datos de 'notification_time':")
    print(df_lazy.select("notification_time").collect())
except Exception as e:
    print(f"Error en 'notification_time': {e}")

try:
    print("Datos de 'distance':")
    print(df_lazy.select("distance").collect())
except Exception as e:
    print(f"Error en 'distance': {e}")

try:
    print("Datos de 'timestamp':")
    print(df_lazy.select("timestamp").collect())
except Exception as e:
    print(f"Error en 'timestamp': {e}")



Esquema inicial:
Schema({'car_id': String, 'timestamp': Datetime(time_unit='us', time_zone=None), 'latitude': Float64, 'longitude': Float64, 'notification': String, 'notification_time': Int64, 'prev_latitude': Float64, 'prev_longitude': Float64, 'distance': Unknown})
Datos de 'notification_time':
shape: (10_823, 1)
┌───────────────────┐
│ notification_time │
│ ---               │
│ i64               │
╞═══════════════════╡
│ null              │
│ null              │
│ 1196999940        │
│ -531000759        │
│ 741000873         │
│ …                 │
│ 96000220          │
│ 835999950         │
│ -869999754        │
│ 997999415         │
│ -1019999453       │
└───────────────────┘
Datos de 'distance':


  print(df_lazy.schema)
  print(df_lazy.select("distance").collect())


shape: (10_823, 1)
┌──────────┐
│ distance │
│ ---      │
│ f64      │
╞══════════╡
│ null     │
│ 0.0      │
│ 0.0      │
│ 0.0      │
│ 0.0      │
│ …        │
│ 0.0      │
│ 0.0      │
│ 0.0      │
│ 0.0      │
│ 0.0      │
└──────────┘
Datos de 'timestamp':
shape: (10_823, 1)
┌────────────────────────────┐
│ timestamp                  │
│ ---                        │
│ datetime[μs]               │
╞════════════════════════════╡
│ null                       │
│ 2024-11-27 00:04:15.000834 │
│ 2024-11-27 00:24:12.000774 │
│ 2024-11-27 00:15:21.000015 │
│ 2024-11-27 00:27:42.000888 │
│ …                          │
│ 2024-11-27 23:34:24.000408 │
│ 2024-11-27 23:48:20.000358 │
│ 2024-11-27 23:33:50.000604 │
│ 2024-11-27 23:50:28.000019 │
│ 2024-11-27 23:33:28.000566 │
└────────────────────────────┘


In [35]:
# Limpiar y convertir columnas problemáticas
df_lazy_cleaned = (
    df_lazy
    .with_columns([
        pl.col("car_id"),  # Mantener como está
        pl.col("timestamp").cast(pl.Datetime),  # Convertir a Datetime
        pl.col("notification_time")
        .fill_null(0)  # Rellenar nulos con 0
        .cast(pl.Float64),  # Convertir a Float64
        pl.col("distance")
        .fill_null(0)  # Rellenar nulos con 0
        .cast(pl.Float64),  # Convertir a Float64
    ])
)

# Verificar esquema y valores después de la limpieza
print("Esquema después de la limpieza:")
print(df_lazy_cleaned.schema)

try:
    # Intentar recolectar datos
    cleaned_data = df_lazy_cleaned.collect()
    print("Datos después de la limpieza:")
    print(cleaned_data.head())
except Exception as e:
    print(f"Error después de la limpieza: {e}")



Esquema después de la limpieza:
Schema({'car_id': String, 'timestamp': Datetime(time_unit='us', time_zone=None), 'latitude': Float64, 'longitude': Float64, 'notification': String, 'notification_time': Float64, 'prev_latitude': Float64, 'prev_longitude': Float64, 'distance': Float64})


  print(df_lazy_cleaned.schema)
  cleaned_data = df_lazy_cleaned.collect()
thread '<unnamed>' panicked at crates/polars-core/src/series/mod.rs:1011:9:
implementation error, cannot get ref Float64 from Int32


PanicException: implementation error, cannot get ref Float64 from Int32