## Polars QC Tools

This notebook is a reference collection of functions and tips for cleaning datasets using polars. This is not designed to be run on a specific dataset, just provide snippets for common cleaning tasks.


### Set Up:

In [None]:
import polars as pl
import numpy as np
import re

In [None]:
pl.Config.set_tbl_rows(20)
pl.Config.set_tbl_cols(100)

In [None]:
# For small/medium data, can use eager mode:
df = pl.read_csv("my_dataset.csv")
# For large data, use lazy mode:
lf = pl.scan_csv(
"my_dataset.csv",
# separator=",",
# has_header=True,
# infer_schema_length=1000, # increase if many columns
# ignore_errors=True, # tolerate some bad rows
# dtypes={"col_a": pl.Int64, "col_b": pl.Utf8},
)

# optional small eager copy for quick peeks
df = lf.collect()
df.head()

### Process large data sets (streaming)

# Polars encourages a single lazy plan (like a pipeline) rather than manual
# Python loops like I did with pandas chunks. With .scan_csv + .collect(streaming=True), Polars # streams from disk with pushdowns, similar to chunking but faster and simpler.

In [None]:
# Example cleaner function that returns a LazyFrame expression pipeline
def process_lazy(lf: pl.LazyFrame) -> pl.LazyFrame:
    return (
        lf.with_columns(
            # Example vectorized cleaner on a text column
            pl.col("raw_column")
            .cast(pl.Utf8)
            .str.strip()
            .str.to_lowercase()
            .alias("cleaned_column")
        )
        .filter(pl.col("cleaned_column") == pl.lit("filter_value"))
        # keep only a subset of columns downstream
        .select(["cleaned_column", pl.all().exclude("cleaned_column")])
    )


# Build the pipeline lazily
processed_lf = process_lazy(pl.scan_csv("my_large_dataset.csv"))


# Collect with streaming enabled (low memory)
processed_df = processed_lf.collect(streaming=True)


# Write to disk (CSV). When using streaming, polars will stream rows out.
processed_df.write_csv("cleaned_output.csv")

### Clean up columns

In [None]:
# Example column rename function:
def snake_case(name: str) -> str:
    name = re.sub(r"[^\w\s]", "", name)
    name = re.sub(r"\s+", "_", name).strip().lower()
    return name

# Lazy example for large dataframes:
lf_clean_cols = lf.rename({c: snake_case(c) for c in lf.collect_schema().names()})

# Eager example for small dataframes:
df = df.rename({c: snake_case(c) for c in df.columns})

### Fix datatypes and categoricals

In [None]:
# Numeric parsing with coercion-like behavior
lf_types = (
    lf_clean_cols.with_columns(
        # Numeric downcast-like: choose explicit, then optionally cast to smaller
        pl.col("num_feature").cast(pl.Float64, strict=False),
        # Strings (Utf8)
        pl.col("str_feature").cast(pl.Utf8, strict=False),
        # Dates (coerce invalid to null)
        pl.col("date_col").str.strptime(pl.Date, strict=False, format=None),
        # Clip values
        pl.col("col").clip_min(0).clip_max(100).alias("col"),
    )
)

In [None]:
# Categoricals 
lf_cats = lf_types.with_columns(
    pl.col("cat_uncoded").cast(pl.Categorical).alias("cat_encoded_cat"),
    # integer codes for categoricals
    pl.col("cat_uncoded").cast(pl.Categorical).cat.to_physical().alias("cat_encoded"),
)

# One-hot/dummies
lf_dummies = lf_cats.to_dummies(columns=["col"]) # expands columns into 0/1

### Missing Data

In [None]:
# Missing data report:

def missing_report(lf: pl.LazyFrame) -> pl.DataFrame:
    cols = lf.collect_schema().names()
    base = lf.select([
        pl.len().alias("n_rows"),
        *[pl.col(c).null_count().alias(f"{c}__nulls") for c in cols],
    ]).collect()


    n = int(base[0, "n_rows"]) if base.height else 0


    rows = []
    for c in cols:
        nulls = int(base[0, f"{c}__nulls"]) if n else 0
        pct = round((nulls / n * 100), 3) if n else 0.0
        level = "high" if pct > 15 else ("medium" if pct > 5 else "low")
        rows.append((c, nulls, pct, level))


    return pl.DataFrame(rows, schema=["column", "total_missing", "percent_missing", "missing_level"]).sort("percent_missing", descending=True)


miss_df = missing_report(lf_dummies)

In [None]:
# Or this method using melt instead of python loops:

def missing_report_fast(lf: pl.LazyFrame) -> pl.DataFrame:
    stats = (
        lf.select(
            pl.len().alias("n_rows"),
            pl.all().null_count()                 # one scalar per column
        )
        .collect()                                # 1-row wide table
    )

    return (
        stats
        .melt(id_vars="n_rows",
              variable_name="column",
              value_name="total_missing")
        .with_columns([
            (pl.col("total_missing") / pl.col("n_rows") * 100)
                .round(3)
                .alias("percent_missing"),
            pl.when(pl.col("percent_missing") > 15)
              .then("high")
              .when(pl.col("percent_missing") > 5)
              .then("medium")
              .otherwise("low")
              .alias("missing_level"),
        ])
        .select(["column", "total_missing", "percent_missing", "missing_level"])
        .sort("percent_missing", descending=True)
    )


In [None]:
# Get value_counts for columns with missing data, eager example
collected = lf_dummies.collect()
for c in collected.columns:
    if collected[c].null_count() > 0:
        print("\n", c)
        print(collected[c].value_counts().head(20))

In [None]:
# Replace missing values with mean or other value
lf_imputed = lf_dummies.with_columns(
    pl.col("col").fill_null(pl.col("col").mean())
)

In [None]:
# Drop columns with fewer than N non-nulls (here: at least 5 values)
lf_threshold = (
    lf_imputed.lazy()
    .select([
        *[
            pl.when((pl.len() - pl.col(c).null_count()) >= 5)
            .then(pl.col(c))
            .otherwise(pl.lit(None).cast(pl.Null))
            .alias(c)
            for c in lf_imputed.collect_schema().names()
        ]
    ])
)

### Duplicate checking

In [None]:
def duplicate_summary(lf: pl.LazyFrame, subset: list[str] | None = None) -> dict:
    df = lf.collect()
    if subset:
        partial = df.is_duplicated(subset=subset).sum()
    else:
        partial = 0
    exact = df.is_duplicated().sum()
    return {"exact_duplicates": int(exact), "partial_duplicates": int(partial)}


_ = duplicate_summary(lf_imputed, subset=["col_a", "col_b"])


# Drop duplicate rows (all columns)
lf_nodup = lf_imputed.unique(keep="first")


# Drop duplicates on a subset
lf_nodup_subset = lf_imputed.unique(subset=["col_a", "col_b"], keep="first")

### Outliers and distribution

In [None]:
# Outlier labeling with IQR (keeps rows, adds a boolean flag)

def with_iqr_flag(lf: pl.LazyFrame, col: str, flag_name: str | None = None) -> pl.LazyFrame:
    flag = flag_name or f"{col}_is_outlier"
    q1 = pl.col(col).quantile(0.25, interpolation="nearest")
    q3 = pl.col(col).quantile(0.75, interpolation="nearest")
    iqr = (q3 - q1)
    lo = q1 - 1.5 * iqr
    hi = q3 + 1.5 * iqr
    return lf.with_columns((~pl.col(col).is_between(lo, hi, closed="both")).alias(flag))


lf_flagged = with_iqr_flag(lf_nodup, "num_col")


# If you need to actually filter outliers:
lf_no_outliers = lf_flagged.filter(pl.col("num_col_is_outlier").not_())

In [None]:
# Polars doesnt have built-in plotting, so convert to pandas for visualization:
df = lf_flagged.select(["feature_a", "feature_b"]).collect().to_pandas()

# See pandas_qc_tools.ipynb for plotting examples

In [None]:
# Log transform (create a new column, safe for negatives via clip)
lf_log = lf_flagged.with_columns(
    pl.col("num_col").clip_min(0).add(1.0).log().alias("num_col_log1p")
)

### Text cleaning

In [None]:
# Canonicalize strings
lf_text = (
    lf_log.with_columns(
        pl.col("column_str")
        .cast(pl.Utf8, strict=False)
        .str.normalize("NFKC")
        .str.strip()
        .str.to_lowercase()
        .str.replace_all("&", "and")
        .str.replace_all(r"[^\w\s]", "")
        .alias("column_str")
    )
)

In [None]:
# Categorize strings based on keyword. Example with colors

category_map = {
    "blue": ["azure", "cerulean", "sky blue"],
    "red": ["magenta", "dark red", "red orange"],
}


# Build an expression assigning the first matching label; else 'other'
expr = pl.lit("other")
for label, words in category_map.items():
    pat = "|".join(re.escape(w) for w in words)
    expr = pl.when(pl.col("color").cast(pl.Utf8, strict=False).str.to_lowercase().str.contains(pat)).then(pl.lit(label)).otherwise(expr)


lf_text_cats = lf_text.with_columns(expr.alias("color_category"))


# Replace text value with contained keyword precedence
lf_place_norm = (
    lf_text_cats.with_columns(
        pl.when(pl.col("Place").cast(pl.Utf8).str.contains("name_a", literal=True))
        .then(pl.lit("name_a"))
        .when(pl.col("Place").cast(pl.Utf8).str.contains("name_b", literal=True))
        .then(pl.lit("name_b"))
        .otherwise(pl.col("Place").cast(pl.Utf8).str.replace_all("-", "_"))
        .alias("Place")
    )
) 

In [None]:
# Replace text value with contained keyword
place = df['Place']
name_a = place.str.contains('name_a')
name_b = place.str.contains('name_b')
df['Place'] = np.where(name_a, 'name_a',
                       np.where(name_b, 'name_b',
                                place.str.replace('-', '_')))

### Chain cleaning functions

In [None]:
def clean_data_lazy(
    lf: pl.LazyFrame,
    *,
    dedup_subset: list[str] | None = None,
    category_map: dict[str, list[str]] | None = None,
    text_col: str | None = None,
    new_col_name: str = "category",
) -> tuple[pl.LazyFrame, pl.DataFrame]:
    out = lf
    if dedup_subset:
        out = out.unique(subset=dedup_subset, keep="first")


    if category_map and text_col:
        expr = pl.lit("other")
        for label, words in category_map.items():
            pat = "|".join(re.escape(w) for w in words)
            expr = (
            pl.when(pl.col(text_col).cast(pl.Utf8, strict=False).str.to_lowercase().str.contains(pat))
            .then(pl.lit(label))
            .otherwise(expr)
            )
        out = out.with_columns(expr.alias(new_col_name))


        # Missing report 
        mr = missing_report(out)
        return out, mr


cleaned_lf, missing_rep = clean_data_lazy(
    lf_place_norm,
    dedup_subset=["col_a", "col_b"],
    category_map=category_map,
    text_col="color",
    new_col_name="color_category",
)



# Collect final result (streaming recommended for large data)
cleaned_df = cleaned_lf.collect(streaming=True)
cleaned_df.write_csv("final_cleaned.csv")