In [5]:
import sys, subprocess, json, os
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "dagster", "pandas", "scikit-learn"])

import numpy as np, pandas as pd
from pathlib import Path
from dagster import (
    asset, AssetCheckResult, asset_check, Definitions, materialize, Output,
    DailyPartitionsDefinition, IOManager, io_manager
)
from sklearn.linear_model import LinearRegression

BASE = Path("/content/dagstore"); BASE.mkdir(parents=True, exist_ok=True)
START = "2025-08-01"

In [6]:
class CSVIOManager(IOManager):
    def __init__(self, base: Path): self.base = base
    def _path(self, key, ext): return self.base / f"{'_'.join(key.path)}.{ext}"
    def handle_output(self, context, obj):
        if isinstance(obj, pd.DataFrame):
            p = self._path(context.asset_key, "csv"); obj.to_csv(p, index=False)
            context.log.info(f"Saved {context.asset_key} -> {p}")
        else:
            p = self._path(context.asset_key, "json"); p.write_text(json.dumps(obj, indent=2))
            context.log.info(f"Saved {context.asset_key} -> {p}")
    def load_input(self, context):
        k = context.upstream_output.asset_key; p = self._path(k, "csv")
        df = pd.read_csv(p); context.log.info(f"Loaded {k} <- {p} ({len(df)} rows)"); return df

@io_manager
def csv_io_manager(_): return CSVIOManager(BASE)

daily = DailyPartitionsDefinition(start_date=START)

In [7]:
@asset(partitions_def=daily, description="Synthetic raw sales with noise & occasional nulls.")
def raw_sales(context) -> Output[pd.DataFrame]:
    rng = np.random.default_rng(42)
    n = 200; day = context.partition_key
    x = rng.normal(100, 20, n); promo = rng.integers(0, 2, n); noise = rng.normal(0, 10, n)
    sales = 2.5 * x + 30 * promo + noise + 50
    x[rng.choice(n, size=max(1, n // 50), replace=False)] = np.nan
    df = pd.DataFrame({"date": day, "units": x, "promo": promo, "sales": sales})
    meta = {"rows": n, "null_units": int(df["units"].isna().sum()), "head": df.head().to_markdown()}
    return Output(df, metadata=meta)

@asset(description="Clean nulls, clip outliers for robust downstream modeling.")
def clean_sales(context, raw_sales: pd.DataFrame) -> Output[pd.DataFrame]:
    df = raw_sales.dropna(subset=["units"]).copy()
    lo, hi = df["units"].quantile([0.01, 0.99]); df["units"] = df["units"].clip(lo, hi)
    meta = {"rows": len(df), "units_min": float(df.units.min()), "units_max": float(df.units.max())}
    return Output(df, metadata=meta)

@asset(description="Feature engineering: interactions & standardized columns.")
def features(context, clean_sales: pd.DataFrame) -> Output[pd.DataFrame]:
    df = clean_sales.copy()
    df["units_sq"] = df["units"] ** 2; df["units_promo"] = df["units"] * df["promo"]
    for c in ["units", "units_sq", "units_promo"]:
        mu, sigma = df[c].mean(), df[c].std(ddof=0) or 1.0
        df[f"z_{c}"] = (df[c] - mu) / sigma
    return Output(df, metadata={"rows": len(df), "cols": list(df.columns)})

In [8]:
@asset_check(asset=clean_sales, description="No nulls; promo in {0,1}; units within clipped bounds.")
def clean_sales_quality(clean_sales: pd.DataFrame) -> AssetCheckResult:
    nulls = int(clean_sales.isna().sum().sum())
    promo_ok = bool(set(clean_sales["promo"].unique()).issubset({0, 1}))
    units_ok = bool(clean_sales["units"].between(clean_sales["units"].min(), clean_sales["units"].max()).all())
    passed = bool((nulls == 0) and promo_ok and units_ok)
    return AssetCheckResult(
        passed=passed,
        metadata={"nulls": nulls, "promo_ok": promo_ok, "units_ok": units_ok},
    )

@asset(description="Train a tiny linear regressor; emit R^2 and coefficients.")
def tiny_model_metrics(context, features: pd.DataFrame) -> dict:
    X = features[["z_units", "z_units_sq", "z_units_promo", "promo"]].values
    y = features["sales"].values
    model = LinearRegression().fit(X, y)
    return {"r2_train": float(model.score(X, y)),
            **{n: float(c) for n, c in zip(["z_units","z_units_sq","z_units_promo","promo"], model.coef_)}}

In [9]:
defs = Definitions(
    assets=[raw_sales, clean_sales, features, tiny_model_metrics, clean_sales_quality],
    resources={"io_manager": csv_io_manager}
)

if __name__ == "__main__":
    run_day = os.environ.get("RUN_DATE") or START
    print("Materializing everything for:", run_day)
    result = materialize(
        [raw_sales, clean_sales, features, tiny_model_metrics, clean_sales_quality],
        partition_key=run_day,
        resources={"io_manager": csv_io_manager},
    )
    print("Run success:", result.success)

    for fname in ["raw_sales.csv","clean_sales.csv","features.csv","tiny_model_metrics.json"]:
        f = BASE / fname
        if f.exists():
            print(fname, "->", f.stat().st_size, "bytes")
            if fname.endswith(".json"):
                print("Metrics:", json.loads(f.read_text()))

2025-08-16 04:03:00 +0000 - dagster - DEBUG - __ephemeral_asset_job__ - 52884413-5e11-4434-8a98-3e44dc808c28 - 479 - RUN_START - Started execution of run for "__ephemeral_asset_job__".
2025-08-16 04:03:00 +0000 - dagster - DEBUG - __ephemeral_asset_job__ - 52884413-5e11-4434-8a98-3e44dc808c28 - 479 - ENGINE_EVENT - Executing steps in process (pid: 479)
2025-08-16 04:03:00 +0000 - dagster - DEBUG - __ephemeral_asset_job__ - 52884413-5e11-4434-8a98-3e44dc808c28 - 479 - RESOURCE_INIT_STARTED - Starting initialization of resources [io_manager].
2025-08-16 04:03:00 +0000 - dagster - DEBUG - __ephemeral_asset_job__ - 52884413-5e11-4434-8a98-3e44dc808c28 - 479 - RESOURCE_INIT_SUCCESS - Finished initialization of resources [io_manager].
2025-08-16 04:03:00 +0000 - dagster - DEBUG - __ephemeral_asset_job__ - 52884413-5e11-4434-8a98-3e44dc808c28 - 479 - LOGS_CAPTURED - Started capturing logs in process (pid: 479).
2025-08-16 04:03:00 +0000 - dagster - DEBUG - __ephemeral_asset_job__ - 52884413-5

Materializing everything for: 2025-08-01


2025-08-16 04:03:00 +0000 - dagster - INFO - __ephemeral_asset_job__ - 52884413-5e11-4434-8a98-3e44dc808c28 - clean_sales - Loaded AssetKey(['raw_sales']) <- /content/dagstore/raw_sales.csv (200 rows)
2025-08-16 04:03:00 +0000 - dagster - DEBUG - __ephemeral_asset_job__ - 52884413-5e11-4434-8a98-3e44dc808c28 - 479 - clean_sales - LOADED_INPUT - Loaded input "raw_sales" using input manager "io_manager", from output "result" of step "raw_sales"
2025-08-16 04:03:00 +0000 - dagster - DEBUG - __ephemeral_asset_job__ - 52884413-5e11-4434-8a98-3e44dc808c28 - 479 - clean_sales - STEP_INPUT - Got input "raw_sales" of type "DataFrame". (Type check passed).
2025-08-16 04:03:00 +0000 - dagster - DEBUG - __ephemeral_asset_job__ - 52884413-5e11-4434-8a98-3e44dc808c28 - 479 - clean_sales - STEP_OUTPUT - Yielded output "result" of type "DataFrame". (Type check passed).
2025-08-16 04:03:00 +0000 - dagster - INFO - __ephemeral_asset_job__ - 52884413-5e11-4434-8a98-3e44dc808c28 - clean_sales - Saved Asse

Run success: True
raw_sales.csv -> 9894 bytes
clean_sales.csv -> 9731 bytes
features.csv -> 27106 bytes
tiny_model_metrics.json -> 174 bytes
Metrics: {'r2_train': 0.9547421316644358, 'z_units': 32.931720119978166, 'z_units_sq': 11.998171264446981, 'z_units_promo': -6.324476924103006, 'promo': 44.90030628506354}
