In [1]:
import warnings
import polars as pl
import numpy as np
import concurrent.futures
from tqdm.auto import tqdm

warnings.filterwarnings("ignore")

In [2]:
raw_df = pl.read_parquet("datasets/01_tr_density/ist_traffic_density_rev02.zstd")

In [3]:
# Checking for the date with the highest total missing values for all GEOHASH
(
    raw_df
    .groupby(pl.col('DATE_TIME').dt.date())
    .agg(pl.col('NUMBER_OF_VEHICLES').null_count())
    .sort('NUMBER_OF_VEHICLES')
    .tail(1)
)

DATE_TIME,NUMBER_OF_VEHICLES
date,u32
2022-06-07,43512


In [4]:
raw_df.columns

['DATE_TIME',
 'LATITUDE',
 'LONGITUDE',
 'GEOHASH',
 'MINIMUM_SPEED',
 'MAXIMUM_SPEED',
 'AVERAGE_SPEED',
 'NUMBER_OF_VEHICLES']

In [5]:
def impute_data(col: str, gh: str, df: pl.DataFrame) -> pl.LazyFrame:
    """
    Imputes missing values for a specific column in a time series DataFrame
    using linear interpolation and seasonal patterns.

    Args:
        col (str): Name of the column to impute.
        gh (str): GEOHASH value to filter the DataFrame.
        df (pl.DataFrame): Input DataFrame containing the time series data.

    Returns:
        pl.LazyFrame: LazyFrame with missing values imputed using linear
        interpolation and seasonal patterns.

    Notes:
        - The input DataFrame `df` is expected to have columns 'DATE_TIME' and 'GEOHASH'.
        - The 'DATE_TIME' column should be of datetime type.
        - The function filters the DataFrame based on the specified GEOHASH value and selects
          the relevant column.
        - The function then extracts HOUr and DAYOFWEEK features from DATE_TIME column
        - The rows are then sorted by HOUR, followed by DAYOFWEEK
        - The function then performs linear interpolation using the calculated patterns to impute
          missing values in the column.
        - The resulting DataFrame contains the imputed values and is sorted by 'DATE_TIME'.
        - Intermediate columns 'HOUR', 'DAYOFWEEK', and the original column 'col' are dropped from 
        the output LazyFrame.
    """
    lazy_df = (
        df.lazy()
        .select(pl.col(['DATE_TIME', 'GEOHASH', col]))
        .filter(pl.col("GEOHASH")==gh)
        .with_columns(pl.col('DATE_TIME').dt.hour().alias("HOUR"),
                      pl.col('DATE_TIME').dt.weekday().alias("DAYOFWEEK"))
        .sort(['HOUR', 'DAYOFWEEK'])
        .with_columns(pl.col(col).interpolate().alias(f"{col}_filled"))
        .sort('DATE_TIME')
        .drop(['HOUR', 'DAYOFWEEK', col])
    )

    return lazy_df.collect()

In [6]:
def impute_all(gh, df):
    """
    Imputes missing values for multiple columns in a time series DataFrame
    using linear interpolation and seasonal patterns.

    Args:
        gh (str): GEOHASH value to filter the DataFrame.
        df (pl.DataFrame): Input DataFrame containing the time series data.

    Returns:
        pl.DataFrame: DataFrame with missing values imputed using linear
        interpolation and seasonal patterns.

    Notes:
        - The input DataFrame `df` is expected to have columns 'DATE_TIME' and 'GEOHASH'.
        - The function filters the DataFrame based on the specified GEOHASH value and selects
          the relevant columns.
        - The imputation tasks for each column are parallelized using concurrent.futures.ThreadPoolExecutor
          for improved performance.
        - Missing values are imputed using linear interpolation and seasonal patterns.
        - The resulting DataFrame contains the imputed values for all columns and is returned.

    Example:
        >>> imputed_df = impute_all('your_geohash', your_dataframe)
    """    
    cols=['MINIMUM_SPEED', 'MAXIMUM_SPEED', 'AVERAGE_SPEED', 'NUMBER_OF_VEHICLES']

    # Use concurrent.futures to run functions simultaneously
    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = [executor.submit(impute_data, col=col, gh=gh, df=df) for col in cols]


    # Get the property lists separately
    concurrent.futures.wait(futures)
    
    # Retrieve the results from the completed futures
    results = [future.result() for future in futures]

    joined = pl.concat(results, how='align')

    return joined

In [7]:
# impute_all(gh='sxk3xq', df=raw_df)

In [8]:
# Creating a list of unique GEOHASH's
ghs = np.array(raw_df['GEOHASH'].unique())
ghs

array(['sxk61n', 'sxk3k3', 'sxk9rb', ..., 'sxk1v1', 'sxk96g', 'sxk6mw'],
      dtype='<U6')

In [9]:
%%time
# Perform imputation for each GEOHASH in the DataFrame and concatenate the results
df = pl.concat((impute_all(gh=gh, df=raw_df) for gh in tqdm(ghs)))

  0%|          | 0/1813 [00:00<?, ?it/s]

Wall time: 42min 2s


# Final Join

In [10]:
# Drop the original columns with missing rows and join the dataset with the df with filled values
final_df = (
    raw_df.drop(['MINIMUM_SPEED', 'MAXIMUM_SPEED', 'AVERAGE_SPEED', 'NUMBER_OF_VEHICLES'])
    .join(df, on=['GEOHASH', 'DATE_TIME'], how='inner')
)

In [11]:
# Removing the "_filled" suffix from some of the column names
final_df.columns = [col.replace("_filled", "") if col.endswith("_filled") else col for col in final_df.columns]

In [12]:
final_df.head()

DATE_TIME,LATITUDE,LONGITUDE,GEOHASH,MINIMUM_SPEED,MAXIMUM_SPEED,AVERAGE_SPEED,NUMBER_OF_VEHICLES
datetime[ns],f32,f32,str,u8,u8,u8,u16
2020-01-01 00:00:00,41.168518,28.526001,"""sxk61n""",94,65,84,9
2020-01-01 01:00:00,41.168518,28.526001,"""sxk61n""",108,63,79,10
2020-01-01 02:00:00,41.168518,28.526001,"""sxk61n""",96,79,90,5
2020-01-01 03:00:00,41.168518,28.526001,"""sxk61n""",96,66,81,4
2020-01-01 04:00:00,41.168518,28.526001,"""sxk61n""",85,78,83,2


In [13]:
# No more Null Values
final_df.null_count()

DATE_TIME,LATITUDE,LONGITUDE,GEOHASH,MINIMUM_SPEED,MAXIMUM_SPEED,AVERAGE_SPEED,NUMBER_OF_VEHICLES
u32,u32,u32,u32,u32,u32,u32,u32
0,0,0,0,0,0,0,0


In [14]:
final_df.write_parquet("datasets/01_tr_density/ist_traffic_density_rev03.zstd", compression='zstd')