## Handle multiple parquets 

In [19]:
import polars as pl
import numpy as np
from datetime import datetime, timedelta
import os

# 1. Create dataframe for 100 devices, 1000 metrics, 20 timestamps each
def generate_data():
    records = []
    start_date = datetime(2023, 10, 1)

    # for d in range(500):
    for d in range(20):
        device = f"device_{d}"
        for m in range(60000):
            metric = f"metric_{m}"
            for i in range(20):
                timestamp = start_date + timedelta(days=i)
                value = np.random.rand()
                value2 = np.random.rand() if i < 19 else None
                records.append((device, metric, timestamp, value, value2))
    
    return pl.DataFrame(
        records,
        schema=["device", "metric", "timestamp", "value", "value2"]
    )
# 2. Save to Parquet partitioned by device
def save_partitioned(df: pl.DataFrame, base_dir: str):
    os.makedirs(base_dir, exist_ok=True)
    devices = df.select("device").unique().to_series().to_list()
    for device in devices:
        device_df = df.filter(pl.col("device") == device)
        path = os.path.join(base_dir, f"{device}.parquet")
        device_df.write_parquet(path)

# Step 1
df = generate_data()

# Step 2
save_partitioned(df, "device_data")

  return dispatch(args[0].__class__)(*args, **kw)


In [20]:

# 3. Load data for specific devices
def load_devices(devices, base_dir):
    dfs = []
    for device in devices:
        path = os.path.join(base_dir, f"{device}.parquet")
        if os.path.exists(path):
            dfs.append(pl.read_parquet(path))
    return pl.concat(dfs)

# 4. Modify values
def modify_data(df: pl.DataFrame):
    return df.with_columns([
        (pl.col("value") * 2).alias("value")
    ])

# 5. Save modified data back
def save_modified_devices(df: pl.DataFrame, base_dir: str):
    devices = df.select("device").unique().to_series().to_list()
    for device in devices:
        device_df = df.filter(pl.col("device") == device)
        path = os.path.join(base_dir, f"{device}.parquet")
        device_df.write_parquet(path)

# === Run everything ===



# Step 3
selected_devices = ["device_1", "device_2", "device_3"]
subset = load_devices(selected_devices, "device_data")

# Step 4
modified = modify_data(subset)

# Step 5
save_modified_devices(modified, "device_data")


## Using Polars Lazy API

In [21]:

# 3. Load data lazily for selected devices
def load_devices_lazy(devices, base_dir):
    lazy_dfs = []
    for device in devices:
        path = os.path.join(base_dir, f"{device}.parquet")
        if os.path.exists(path):
            lazy_dfs.append(pl.read_parquet(path, use_pyarrow=True).lazy())
    return pl.concat(lazy_dfs)

# 4. Modify lazily (deferred execution)
def modify_data_lazy(lazy_df: pl.LazyFrame):
    return lazy_df.with_columns([
        (pl.col("value") * 2).alias("value")
    ])

# 5. Save each device's data (still need to collect before writing)
def save_modified_devices_lazy(lazy_df: pl.LazyFrame, base_dir: str):
    df = lazy_df.collect()  # Materialize once after transformations
    devices = df.select("device").unique().to_series().to_list()
    for device in devices:
        device_df = df.filter(pl.col("device") == device)
        path = os.path.join(base_dir, f"{device}.parquet")
        device_df.write_parquet(path)

# === Run the pipeline ===

# Step 3
selected_devices = ["device_1", "device_2", "device_3"]
lazy_subset = load_devices_lazy(selected_devices, "device_data")

# Step 4
lazy_modified = modify_data_lazy(lazy_subset)

# Step 5
save_modified_devices_lazy(lazy_modified, "device_data")


## Compare with Pandas

Current conclusion: not so different.

In [22]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import os

# 1. Generate Data
def generate_data():
    records = []
    start_date = datetime(2023, 10, 1)

    for d in range(20):
        device = f"device_{d}"
        for m in range(60000):
            metric = f"metric_{m}"
            for i in range(20):
                timestamp = start_date + timedelta(days=i)
                value = np.random.rand()
                value2 = np.random.rand() if i < 19 else None
                records.append((device, metric, timestamp, value, value2))

    df = pd.DataFrame(
        records,
        columns=["device", "metric", "timestamp", "value", "value2"]
    )
    return df

# 2. Save per device
def save_partitioned_by_device(df, base_dir):
    os.makedirs(base_dir, exist_ok=True)
    for device, group in df.groupby("device"):
        group.to_parquet(os.path.join(base_dir, f"{device}.parquet"), index=False)

# 3. Load selected devices
def load_devices(devices, base_dir):
    dfs = []
    for device in devices:
        path = os.path.join(base_dir, f"{device}.parquet")
        if os.path.exists(path):
            dfs.append(pd.read_parquet(path))
    return pd.concat(dfs, ignore_index=True)

# 4. Modify
def modify(df):
    df["value"] *= 2
    return df

# 5. Save back
def save_modified_devices(df, base_dir):
    for device, group in df.groupby("device"):
        group.to_parquet(os.path.join(base_dir, f"{device}.parquet"), index=False)

# === Run ===

# Step 1
df = generate_data()

# Step 2
save_partitioned_by_device(df, "pandas_device_data")


In [23]:

# Step 3
selected = ["device_1", "device_2", "device_3"]
subset = load_devices(selected, "pandas_device_data")

# Step 4
modified = modify(subset)

# Step 5
save_modified_devices(modified, "pandas_device_data")

In [2]:
import os
os.path.join("./folder","my file.json")

'./folder/my file.json'