# Homework — Stage 15: Orchestration & System Design

## 1) Project Task Decomposition

In [4]:
from pathlib import Path
import pandas as pd

# resolve repo root whether you run from notebooks/ or root
ROOT = Path.cwd()
if not (ROOT / "src").exists():  # likely running from notebooks/
    ROOT = ROOT.parent

tasks = pd.DataFrame({
    "task":       ["ingest", "clean", "train_or_score", "report"],
    "inputs":     [["data/raw.ext"],
                   ["data/raw.ext"],
                   ["data/clean.json", "model/model.pkl"],
                   ["data/clean.json", "model/model.pkl"]],
    "outputs":    [["data/raw.json"],
                   ["data/clean.json"],
                   ["model/model.pkl", "data/scores.json"],
                   ["reports/report.txt"]],
    "idempotent": [True, True, True, True],
    "owner":      ["data_eng", "data_eng", "ml_eng", "analyst"],
})
tasks

Unnamed: 0,task,inputs,outputs,idempotent,owner
0,ingest,[data/raw.ext],[data/raw.json],True,data_eng
1,clean,[data/raw.ext],[data/clean.json],True,data_eng
2,train_or_score,"[data/clean.json, model/model.pkl]","[model/model.pkl, data/scores.json]",True,ml_eng
3,report,"[data/clean.json, model/model.pkl]",[reports/report.txt],True,analyst


## 2) Dependencies (DAG)

In [5]:
# edges: task -> needs these to finish first
dag = {
    "ingest": [],
    "clean": ["ingest"],
    "train_or_score": ["clean"],
    "report": ["train_or_score"],
}
dag

{'ingest': [],
 'clean': ['ingest'],
 'train_or_score': ['clean'],
 'report': ['train_or_score']}

## 3) Logging & Checkpoints Plan

In [6]:
import pandas as pd

logging_plan = pd.DataFrame({
    "task": ["ingest","clean","train_or_score","report"],
    "log_messages": [
        ["start/end","rows","source_uri","artifact_path"],
        ["start/end","rows_in/out","null_pct","artifact_path"],
        ["start/end","params","metric/score","artifact_path"],
        ["start/end","rows_in","summary_path","artifact_path"],
    ],
    "checkpoint_artifact": [
        "data/raw.json",
        "data/clean.json",
        "data/scores.json",
        "reports/report.txt",
    ],
    "retention": ["7d","14d","30d","90d"]
})
logging_plan

Unnamed: 0,task,log_messages,checkpoint_artifact,retention
0,ingest,"[start/end, rows, source_uri, artifact_path]",data/raw.json,7d
1,clean,"[start/end, rows_in/out, null_pct, artifact_path]",data/clean.json,14d
2,train_or_score,"[start/end, params, metric/score, artifact_path]",data/scores.json,30d
3,report,"[start/end, rows_in, summary_path, artifact_path]",reports/report.txt,90d


## 4) Right-Sizing Automation

Automate now: ingest, clean, train_or_score — deterministic, idempotent steps that can run daily via a simple scheduler (cron/GitHub Actions).
Keep manual (for now): report — reviewed weekly so we can add commentary/QA before sharing.
Why: upstream steps have clear I/O contracts and low review needs; the report is stakeholder-facing and benefits from a human check. We’ll automate it later once feedback stabilizes.
Logging/alerts: task-level logs to file + console; on error, notify Slack/email; artifacts saved as checkpoints under data/ / model/ / reports/.