# Data Cleaning

In [None]:
DATA_PATH = "../output/raw-csv"
OUTPUT_PATH = "../output/csv"
OUTPUT_STAT_PATH = "../output/csv-stat"

OUTLIER_THRESHOLDS = {
    "pH": (0, 14),
    "EC": (0, 2000),
    "Temp": (0, 50),
    "DO": (0, 20)
}

In [None]:

from pathlib import Path
from pandas import DataFrame, read_csv
from logging import basicConfig, INFO
from dataclasses import dataclass
from IPython.display import display

basicConfig(level=INFO)

from mp import mp_print, mp_exec

In [None]:
DATA_PATH = Path(DATA_PATH).resolve()
OUTPUT_PATH = Path(OUTPUT_PATH).resolve()
OUTPUT_STAT_PATH = Path(OUTPUT_STAT_PATH).resolve()

OUTPUT_PATH.mkdir(parents=True, exist_ok=True)
OUTPUT_STAT_PATH.mkdir(parents=True, exist_ok=True)

In [None]:
def iter_files():
    for file in filter(lambda x: x.is_file() and x.is_file(), DATA_PATH.iterdir()):
        yield file

In [None]:
@dataclass
class Stat:
    name: str
    total: int
    missing: int
    outliers: int
    valid: int
    threshold_outliers: int
    iqr_outliers: int
    ph_iqr: float
    ph_lb: float
    ph_up: float
    ec_iqr: float
    ec_lb: float
    ec_up: float
    temp_iqr: float
    temp_lb: float
    temp_up: float
    do_iqr: float
    do_lb: float
    do_up: float

def task(file: Path):
    station = file.name.split(".")[0]
    mp_print(f"Processing : {station}")
    
    stat = Stat(station,0,0,0,0,0,0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0)
    df = read_csv(file)
    stat.total = len(df)

    df = df[["Datetime", "pH", "EC", "Temp", "DO"]]

    df.dropna(inplace=True)
    stat.missing = stat.total - len(df)

    size = len(df)
    for column, (low, high) in OUTLIER_THRESHOLDS.items():
        df = df[(df[column] >= low) & (df[column] <= high)]
        stat.threshold_outliers += size - len(df)
        size = len(df)

    Q1 = df[["pH", "EC", "Temp", "DO"]].quantile(0.25)
    Q3 = df[["pH", "EC", "Temp", "DO"]].quantile(0.75)
    IQR = Q3 - Q1
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR

    df = df[
        (df["pH"] >= lower_bound["pH"]) & (df["pH"] <= upper_bound["pH"]) &
        (df["EC"] >= lower_bound["EC"]) & (df["EC"] <= upper_bound["EC"]) &
        (df["Temp"] >= lower_bound["Temp"]) & (df["Temp"] <= upper_bound["Temp"]) &
        (df["DO"] >= lower_bound["DO"]) & (df["DO"] <= upper_bound["DO"])
    ]
    
    for column in ["pH", "EC", "Temp", "DO"]:
        setattr(stat, f"{column.lower()}_iqr", IQR[column])
        setattr(stat, f"{column.lower()}_lb", lower_bound[column])
        setattr(stat, f"{column.lower()}_ub", upper_bound[column])

    stat.iqr_outliers = size - len(df)
    stat.outliers = stat.threshold_outliers + stat.iqr_outliers
    stat.valid = len(df)

    assert stat.total == stat.missing + stat.outliers + stat.valid

    return (station, df, stat)

In [None]:
data: dict[str, DataFrame] = {x: (y, z) for x, y, z in mp_exec(task, iter_files(), unorder=True)}

stats: list[Stat] = []

for station, values in data.items():
    df, stat = values
    print(f"Station : {station}, data : {df.shape}")
    stats.append(stat)
    df.to_csv(OUTPUT_PATH / f"{station}.csv", index=False)

In [None]:
stats_df = DataFrame([s.__dict__ for s in stats])
stats_df.set_index("name", inplace=True)
stats_df.sort_values("valid", ascending=False, inplace=True)
stats_df.to_csv(OUTPUT_STAT_PATH / "stat.csv", index=True)

display(stats_df[["total", "missing", "outliers", "threshold_outliers", "iqr_outliers", "valid"]])

In [None]:
display(stats_df[[f"{x}_{y}" for y in ["iqr", "lb", "up"] for x in ["ph", "ec", "temp", "do"]]].round(3))