In [1]:
import datetime
import os
from typing import Callable

import pandas as pd

In [2]:
def read_xlsx(path: str, partitions: dict[str, str]) -> pd.DataFrame | None:
    if path.lower().endswith(".xlsx"):
        if f"device{partitions["Device"]}" not in path:
            raise Exception(f"File {path} in wrong Device directory")
        return pd.read_excel(path)
    

def read_csv(path: str, _: dict[str, str]) -> pd.DataFrame | None:
    if path.lower().endswith(".csv"):
        return pd.read_csv(path)


def read_all_partitions(path: str, read_file: Callable[[str, dict[str, str]], pd.DataFrame]) -> pd.DataFrame:
    def _read_partition(path: str, partition_cols: list[list[str]]) -> pd.DataFrame:
        if os.path.isdir(path):
            return [
                df
                for subpath in os.listdir(path)
                for df in _read_partition(f"{path}/{subpath}", partition_cols + ([subpath.split("=")] if "=" in subpath else []))
            ]
        else:
            partitions: dict[str, str] = dict(partition_cols)
            df = read_file(path, partitions)
            return [] if df is None else [ df.assign(**partitions) ]
    
    dfs = _read_partition(path, [])
    return pd.concat(dfs) if dfs else pd.DataFrame()


def parse_temtop_datetime(input: str) -> datetime.datetime:
    return datetime.datetime.fromisoformat(
        input.replace(" ", "T").replace("(", "").replace(")", "")
    )


def pm25_aqi(pm25: float) -> float:
    table: list[tuple[float, float, float, float]] = [
        (0, 9.0, 0, 50),
        (9.1, 35.4, 51, 100),
        (35.5, 55.4, 101, 150),
        (55.5, 125.4, 151, 200),
        (125.5, 225.4, 201, 300),
        (225.5, 1000, 301, 500),
    ]
    c_p: float = int(pm25*10)/10
    bp_lo, bp_hi, i_lo, i_hi = next(row for row in table if row[0] <= c_p <= row[1])
    
    return (i_hi - i_lo) / (bp_hi - bp_lo) * (c_p - bp_lo) + i_lo


def read_m10plus_data(path: str) -> pd.DataFrame:
    df = read_all_partitions(path, read_xlsx)
    df["Time"] = df["Time"].apply(parse_temtop_datetime)
    df = df.reset_index().drop(["NO."], axis=1)

    return df


def read_m2000c_data(path: str) -> pd.DataFrame:
    df = read_all_partitions(path, read_csv)
    df = df.rename(
        columns={
            "DATE": "Time",
            "PM2.5(ug/m3)": "PM2.5(ug/m³)",
            "PM10(ug/m3)": "PM10(ug/m³)",
            "CO2(ppm)": "CO₂(ppm)",
            "TEMPERATURE": "Temperature(℉)",
            "HUMIDITY": "Humidity(%RH)"
        }
    )
    df["Time"] = df["Time"].apply(parse_temtop_datetime)
    df["AQI"] = df["PM2.5(ug/m³)"].apply(pm25_aqi)
    df = df.reset_index().drop(["TEMPUNIT"], axis=1)

    return df
    

In [3]:
m10plus_df = read_m10plus_data("data/Format=XLSX")
m2000c_df = read_m2000c_data("data/Format=CSV")
df = pd.concat([m10plus_df, m2000c_df])
display(
    df.groupby(["Model", "Device"]).agg(
        {
            "Time": ["min", "max", "count"],
            "Floor": ["nunique"],
            "Location": ["nunique"]
        }
    )
)

Unnamed: 0_level_0,Unnamed: 1_level_0,Time,Time,Time,Floor,Location
Unnamed: 0_level_1,Unnamed: 1_level_1,min,max,count,nunique,nunique
Model,Device,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2,Unnamed: 6_level_2
M10+,1,2024-08-18 17:00:00+08:00,2024-09-21 14:00:00+08:00,471,1,2
M10+,2,2024-08-18 17:00:00+08:00,2024-08-24 14:00:00+08:00,136,1,3
M10+,3,2024-08-17 20:00:00+08:00,2024-09-21 14:00:00+08:00,825,1,3
M2000C II,0,2024-09-21 14:58:56,2024-09-28 15:19:13,2021,1,1
