In [1]:
import pandas as pd
import polars as pl

from pandarallel import pandarallel
from datetime import datetime
from tqdm import tqdm
import dateutil.parser

pandarallel.initialize(nb_workers=64)

INFO: Pandarallel will run on 64 workers.
INFO: Pandarallel will use Memory file system to transfer data between the main process and workers.


In [2]:
dt_transforms = [
    pl.col('timestamp').str.to_datetime(), 
    (pl.col('timestamp').str.to_datetime().dt.year()-2000).cast(pl.UInt8).alias('year'), 
    pl.col('timestamp').str.to_datetime().dt.month().cast(pl.UInt8).alias('month'),
    pl.col('timestamp').str.to_datetime().dt.day().cast(pl.UInt8).alias('day'), 
    pl.col('timestamp').str.to_datetime().dt.hour().cast(pl.UInt8).alias('hour')
]

data_transforms = [
    pl.col('anglez').cast(pl.Int16), # Casting anglez to 16 bit integer
    (pl.col('enmo')*1000).cast(pl.UInt16), # Convert enmo to 16 bit uint
]

train_series = pl.scan_parquet('../data/train_series.parquet').with_columns(
    dt_transforms + data_transforms
)

train_events = pl.read_csv('../data/train_events.csv').with_columns(
    dt_transforms
).drop_nulls()

test_series = pl.scan_parquet('../data/test_series.parquet').with_columns(
    dt_transforms + data_transforms
)

# Removing null events and nights with mismatched counts from series_events
mismatches = train_events.drop_nulls().group_by(['series_id', 'night']).agg([
    ((pl.col('event') == 'onset').sum() == (pl.col('event') == 'wakeup').sum()).alias('balanced')
    ]).sort(by=['series_id', 'night']).filter(~pl.col('balanced'))

for mm in mismatches.to_numpy(): 
    train_events = train_events.filter(~((pl.col('series_id') == mm[0]) & (pl.col('night') == mm[1])))

# Getting series ids as a list for convenience
series_ids = train_events['series_id'].unique(maintain_order=True).to_list()

# Updating train_series to only keep these series ids
train_series = train_series.filter(pl.col('series_id').is_in(series_ids))

In [None]:
series_id = series_ids[0]

train_series_df = train_series.collect()

# Plotting the series
import plotly.express as px

fig = px.line(
    train_series_df.filter(train_series_df['series_id'] == "038441c925bb"),
    x='timestamp',
    y=['enmo', 'anglez']
)

haha = train_events.filter(train_events["series_id"] == "038441c925bb").to_pandas()

# show oneset and wakeup events as vertical lines on the figure
for i in range(len(haha)):
    if haha.iloc[i]['event'] == 'onset':
        fig.add_vline(x=haha.iloc[i]['timestamp'], line_width=1, line_dash="dash", line_color="green")
    elif haha.iloc[i]['event'] == 'wakeup':
        fig.add_vline(x=haha.iloc[i]['timestamp'], line_width=1, line_dash="dash", line_color="red")

fig.show()

# save figure as html
fig.write_html("038441c925bb.html")

In [None]:
train_events.dropna(subset=["timestamp"], inplace=True)
train_events["step"] = train_events["step"].astype(int)

In [None]:
from concurrent.futures import ProcessPoolExecutor
from tqdm import tqdm

def parse_timestamps(batch):
    return [dateutil.parser.isoparse(ts) for ts in batch]

def chunks(lst, n):
    """Yield successive n-sized chunks from lst."""
    for i in range(0, len(lst), n):
        yield lst[i:i + n]

timestamps = train_series["timestamp"].values
chunk_size = len(timestamps) // (64 * 64)
batches = list(chunks(timestamps, chunk_size))

# Using `futures` to keep the original order
futures = []

with ProcessPoolExecutor(max_workers=64) as executor:
    # Submit all batches as futures
    for batch in batches:
        futures.append(executor.submit(parse_timestamps, batch))

    # Progress bar for futures as they are submitted in order
    parsed_timestamps = []
    for future in tqdm(futures, total=len(futures)):
        # Results will be added maintaining the original order
        parsed_timestamps.extend(future.result())

In [None]:
train_series["timestamp"]

In [None]:
train_series.series_id.value_counts()

In [None]:
train_events