<a href="https://colab.research.google.com/github/tonyjosephsebastians/AI-Design-patterns/blob/main/GROUP_3_%E2%80%94_State_Breaks_When_Services_Restart.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

GROUP 3 ‚Äî State Breaks When Services Restart

Your system must survive restarts, never lose truth, and support replay / reprocessing.

This group is üî• very common in AI pipelines, job systems, RAG, agent workflows, payments.

The Core Failure (Understand This First)


‚ùå What goes wrong
- App restarts (deploy, crash, scale-down)
- In-memory state is wiped
- Jobs disappear
- Partial work is lost
- System no longer knows "what really happened"


Classic bad example
# BAD: in-memory job store
jobs = {}

def submit_job(job_id, payload):
    jobs[job_id] = "RUNNING"


üí• Restart happens ‚Üí jobs = {} ‚Üí truth is gone

Stateless Service Pattern (FOUNDATION)

In [None]:
üß† Idea

Services should not own state.
They compute, not remember.

‚úÖ Rule

No business truth in memory

All state lives outside (DB, cache, log)

Before vs After

Service = logic + state

Service = logic only
State = DB / Log / Queue

Stateless services allow horizontal scaling and safe restarts because all system truth is externalized

In [None]:
Externalized State Pattern
üß† Idea

Persist every meaningful state transition.

What counts as state?

Job status

Agent progress

RAG ingestion status

User workflow step

Example: Job table

Stateless Service Pattern ‚Üí No job truth stored in memory; all functions read/write DB.

Externalized State Pattern ‚Üí SQLite tables store job + step state.

Entity Decomposition Pattern ‚Üí jobs, job_steps, job_events, job_snapshots.

Canonical Data Model Pattern ‚Üí Canonical job/step state transitions via events.

Versioned Data Pattern ‚Üí schema_version stored in jobs + events.

Append-Only / Audit Log Pattern ‚Üí job_events is INSERT-only.

Replay / Reprocessing Pattern ‚Üí /jobs/{id}/replay rebuilds from events (+ snapshot).

Memento Pattern ‚Üí /snapshot stores reconstructed state checkpoint.

Soft Delete Pattern ‚Üí deleted_at instead of hard delete.

updated_at ‚Üí job and steps update their timestamps whenever state changes (important in interviews).

# Group 3 ‚Äî State survives restarts (FastAPI mini-project)

## Run
pip install -r requirements.txt
uvicorn app.main:app --reload

## Try
1) Create job
POST http://127.0.0.1:8000/jobs

2) Start job
POST /jobs/{job_id}/start

3) Complete steps
POST /jobs/{job_id}/steps/{step_name}/complete

4) Snapshot
POST /jobs/{job_id}/snapshot

5) Replay to rebuild state
POST /jobs/{job_id}/replay

6) Soft delete
DELETE /jobs/{job_id}

## What this shows
- Stateless Service: no in-memory truth
- Externalized State: SQLite persistence
- Entity Decomposition: jobs, steps, events, snapshots
- Canonical + Versioned model: schema_version in event payload
- Append-only audit log: events table only INSERT
- Replay: rebuild job state from events
- Memento: snapshots for faster restore
- Soft delete: deleted_at instead of hard delete
- updated_at: always updated when job/step changes


In [2]:
!pip install fastapi uvicorn pydantic nest-asyncio




Cell 2 ‚Äî Imports & DB Setup (Externalized State)

In [3]:
import sqlite3
import json
import uuid
from datetime import datetime, timezone
from typing import Dict, Any, Optional


In [4]:
DB_PATH = "group3_state.db"

def utcnow():
    return datetime.now(timezone.utc).isoformat()


In [5]:
def init_db():
    conn = sqlite3.connect(DB_PATH)
    conn.execute("PRAGMA journal_mode=WAL;")

    conn.execute("""
    CREATE TABLE IF NOT EXISTS jobs(
        job_id TEXT PRIMARY KEY,
        status TEXT,
        deleted_at TEXT,
        created_at TEXT,
        updated_at TEXT
    )
    """)

    conn.execute("""
    CREATE TABLE IF NOT EXISTS job_steps(
        step_id TEXT PRIMARY KEY,
        job_id TEXT,
        step_name TEXT,
        status TEXT
    )
    """)

    # üî• Append-only audit log
    conn.execute("""
    CREATE TABLE IF NOT EXISTS job_events(
        event_id INTEGER PRIMARY KEY AUTOINCREMENT,
        job_id TEXT,
        event_type TEXT,
        payload TEXT,
        created_at TEXT
    )
    """)

    # üî• Memento snapshot table
    conn.execute("""
    CREATE TABLE IF NOT EXISTS job_snapshots(
        snapshot_id INTEGER PRIMARY KEY AUTOINCREMENT,
        job_id TEXT,
        snapshot TEXT,
        created_at TEXT
    )
    """)

    conn.commit()
    conn.close()

init_db()
print("DB initialized")


DB initialized


In [6]:
def append_event(job_id: str, event_type: str, payload: Dict[str, Any]):
    conn = sqlite3.connect(DB_PATH)
    conn.execute(
        """
        INSERT INTO job_events(job_id, event_type, payload, created_at)
        VALUES (?, ?, ?, ?)
        """,
        (job_id, event_type, json.dumps(payload), utcnow())
    )
    conn.commit()
    conn.close()


In [7]:
DEFAULT_STEPS = ["INGEST", "EXTRACT", "EMBED", "INDEX"]

def create_job():
    job_id = str(uuid.uuid4())
    now = utcnow()

    conn = sqlite3.connect(DB_PATH)
    conn.execute(
        "INSERT INTO jobs VALUES (?, ?, NULL, ?, ?)",
        (job_id, "CREATED", now, now)
    )

    for step in DEFAULT_STEPS:
        conn.execute(
            "INSERT INTO job_steps VALUES (?, ?, ?, ?)",
            (str(uuid.uuid4()), job_id, step, "PENDING")
        )

    conn.commit()
    conn.close()

    append_event(job_id, "JOB_CREATED", {})
    return job_id


In [8]:
job_id = create_job()
job_id


'1075a6ae-6d05-49d0-9207-fbb02559748c'

Cell 5 ‚Äî Update Job & Steps (Externalized State)

In [9]:
def start_job(job_id: str):
    conn = sqlite3.connect(DB_PATH)
    conn.execute(
        "UPDATE jobs SET status=?, updated_at=? WHERE job_id=?",
        ("RUNNING", utcnow(), job_id)
    )
    conn.commit()
    conn.close()

    append_event(job_id, "JOB_STARTED", {})


In [11]:
def complete_step(job_id: str, step_name: str):
    conn = sqlite3.connect(DB_PATH)
    conn.execute(
        """
        UPDATE job_steps
        SET status=?
        WHERE job_id=? AND step_name=?
        """,
        ("DONE", job_id, step_name)
    )
    conn.commit()
    conn.close()

    append_event(job_id, "STEP_COMPLETED", {"step": step_name})


In [12]:
def complete_step(job_id: str, step_name: str):
    conn = sqlite3.connect(DB_PATH)
    conn.execute(
        """
        UPDATE job_steps
        SET status=?
        WHERE job_id=? AND step_name=?
        """,
        ("DONE", job_id, step_name)
    )
    conn.commit()
    conn.close()

    append_event(job_id, "STEP_COMPLETED", {"step": step_name})


In [16]:
start_job(job_id)
complete_step(job_id, "INGEST")
complete_step(job_id, "EXTRACT")

üß© Cell 6 ‚Äî üî• Replay Engine (Reprocessing Pattern)

In [17]:
def replay_job(job_id: str):
    conn = sqlite3.connect(DB_PATH)
    rows = conn.execute(
        "SELECT event_type, payload FROM job_events WHERE job_id=? ORDER BY event_id",
        (job_id,)
    ).fetchall()
    conn.close()

    state = {"status": None, "steps": {}}

    for event_type, payload in rows:
        payload = json.loads(payload)

        if event_type == "JOB_CREATED":
            state["status"] = "CREATED"
        elif event_type == "JOB_STARTED":
            state["status"] = "RUNNING"
        elif event_type == "STEP_COMPLETED":
            state["steps"][payload["step"]] = "DONE"

    return state


In [18]:
replay_job(job_id)


{'status': 'RUNNING', 'steps': {'INGEST': 'DONE', 'EXTRACT': 'DONE'}}

Cell 7 ‚Äî üî• Snapshot (Memento Pattern)

In [19]:
def create_snapshot(job_id: str):
    state = replay_job(job_id)
    conn = sqlite3.connect(DB_PATH)
    conn.execute(
        """
        INSERT INTO job_snapshots(job_id, snapshot, created_at)
        VALUES (?, ?, ?)
        """,
        (job_id, json.dumps(state), utcnow())
    )
    conn.commit()
    conn.close()


Cell 8 ‚Äî Restart Simulation

In [20]:
# After restart, DB still exists
replay_job(job_id)


{'status': 'RUNNING', 'steps': {'INGEST': 'DONE', 'EXTRACT': 'DONE'}}

No in-memory variables

State rebuilt from DB

This is exactly what interviewers want

In [21]:
def soft_delete(job_id: str):
    conn = sqlite3.connect(DB_PATH)
    conn.execute(
        "UPDATE jobs SET deleted_at=?, updated_at=? WHERE job_id=?",
        (utcnow(), utcnow(), job_id)
    )
    conn.commit()
    conn.close()

    append_event(job_id, "JOB_SOFT_DELETED", {})


In [22]:
soft_delete(job_id)
replay_job(job_id)


{'status': 'RUNNING', 'steps': {'INGEST': 'DONE', 'EXTRACT': 'DONE'}}

The Big Idea (Group 3) in One Sentence

Your app must never store ‚Äútruth‚Äù only in memory, because memory disappears on restart.

So we design the system so that:

memory can be empty any time,

and the system can still rebuild the truth from storage.

A Simple Story (Best way to remember)

Imagine you are running a restaurant kitchen.

‚ùå Bad system (in-memory truth)

You write all orders on a whiteboard.

Customer order comes ‚Üí you write on whiteboard

Kitchen is cooking ‚Üí status is on whiteboard

Then‚Ä¶ power goes off (restart)

üí• Whiteboard gets erased ‚Üí you don‚Äôt know:

which orders existed

what was already cooked

what is still pending
That‚Äôs exactly what happens when state lives in RAM.

‚úÖ Good system (external truth)

Instead, you write orders in a notebook (database).

Power goes off? No problem.
You open the notebook and continue.

That‚Äôs Externalized State + Stateless Service.

Now let‚Äôs connect this to your FastAPI/Colab project
1) Stateless Service Pattern
Meaning (easy)

The service should behave like:

‚ÄúI can be killed and restarted at any time and I still work.‚Äù

How?

No jobs = {} global dictionary holding truth

When request comes, read data from DB

Update DB

Return response

‚úÖ In your notebook:
We never stored job truth in a Python global variable.

2) Externalized State Pattern
Meaning

Store the truth somewhere outside the app process:

DB (SQLite/Postgres)

Redis (for fast state)

Queue/log (Kafka/SQS)

‚úÖ In your notebook:

Tables: jobs, job_steps, job_events, job_snapshots

Even if runtime restarts, SQLite file still exists

3) Entity Decomposition Pattern
Meaning (easy)

Don‚Äôt put everything into one big blob.

Instead split into smaller pieces (entities), like:

Job = the overall work

Steps = the parts of work

Events = the history of changes

Snapshots = a saved checkpoint

‚úÖ Why it matters?
If ‚Äúextract step‚Äù fails, you can re-run only extract.
If everything is one big JSON, it becomes messy.

‚úÖ In project:

jobs table ‚Üí job info

job_steps table ‚Üí step-level tracking

job_events table ‚Üí history

job_snapshots table ‚Üí checkpoint

4) Append-Only / Audit Log Pattern (Most important!)
Meaning (easy)

Instead of changing the past, you only add new facts.

Like a bank statement:

You don‚Äôt erase transactions

You only add new entries

‚úÖ Example
Instead of updating status = RUNNING ‚Üí DONE and losing history,
you add events:

JOB_CREATED

JOB_STARTED

STEP_COMPLETED (INGEST)

STEP_COMPLETED (EXTRACT)

‚úÖ In project:
append_event() only does INSERT into job_events.

This gives you:

full history

audit

debugging

replay ability

5) Replay / Reprocessing Pattern
Meaning (easy)

If you have the full event history, you can rebuild the current state anytime.

Like: ‚ÄúIf I re-read my bank statement from the top, I can compute my balance.‚Äù

‚úÖ In project:
replay_job(job_id) does:

read all events for that job

start with empty state

apply events one by one

rebuild status + steps

So after restart, system can ‚Äúremember‚Äù by replaying.

6) Memento Pattern (Snapshots)
Meaning (easy)

Replay can be slow if you have 1 million events.

So sometimes you save a checkpoint:

‚ÄúAt event #5000, state looked like this.‚Äù

Then to rebuild:

load snapshot

apply only events after snapshot

‚úÖ In project:
create_snapshot(job_id) saves the replay result into job_snapshots

This is like a video game save point üéÆ

7) Versioned Data Pattern
Meaning (easy)

Your system changes over time:

new fields

new step names

new payload structure

Old jobs shouldn‚Äôt break.

So we store schema_version.

‚úÖ In the earlier FastAPI repo version, we included schema_version.
In the Colab mini version, we kept it simple (but we can add it easily).

8) Canonical Data Model Pattern
Meaning (easy)

If multiple services read/write job info, they must agree on the same structure.

Otherwise one service says:

status="DONE"

another service uses:

status="COMPLETED"

üí• breaks replay & integration.

Canonical model = one shared truth structure.

9) Soft Delete Pattern
Meaning (easy)

Never permanently delete truth.

Instead:

mark as deleted

keep the history/events

‚úÖ Why?

audit required (banks, enterprise)

debugging

replay still possible

‚úÖ In project:
soft_delete(job_id) updates deleted_at instead of deleting rows.

Why interviewers LOVE Group 3

Because this is what breaks production systems:

‚úÖ ‚ÄúWe had jobs running‚Ä¶ then deploy happened‚Ä¶ and everything vanished.‚Äù

So when you answer:

‚ÄúWe use externalized state + append-only events + replay + snapshots‚Äù

They immediately think:
‚úÖ This person builds real systems.

A super simple ‚Äúrestart test‚Äù you can remember
Step A: create job + complete steps

Events:

JOB_CREATED

JOB_STARTED

STEP_COMPLETED INGEST

STEP_COMPLETED EXTRACT

Step B: restart app / runtime

Memory wiped

Step C: replay reads events and rebuilds state

Result:

status = RUNNING

steps = {INGEST:DONE, EXTRACT:DONE}

‚úÖ truth survived restart.

üß† Pattern Mapping (You SHOULD say this in interviews)
Pattern	Where shown
Stateless Service	No global variables
Externalized State	SQLite
Append-Only Log	job_events
Replay	replay_job()
Memento	job_snapshots
Soft Delete	deleted_at
Survive Restart	Runtime restart demo
üöÄ Next (Strongly Recommended)

I can next:
1Ô∏è‚É£ Convert this into Colab ‚Üí FastAPI server running inside notebook
2Ô∏è‚É£ Add RAG / Agent pipeline example using same patterns
3Ô∏è‚É£ Add diagram + interview Q&A sheet
4Ô∏è‚É£ Merge Group 2 + Group 3 into one project

Just tell me the number üëç

could you explain ore in depth and easy to understand

Of course, Bobikutty üôÇ ‚Äî let‚Äôs make this super clear, slow, and deep, but still easy. I‚Äôll explain using a simple story first, then map every pattern to what you saw in the Colab project.

The Big Idea (Group 3) in One Sentence

Your app must never store ‚Äútruth‚Äù only in memory, because memory disappears on restart.

So we design the system so that:

memory can be empty any time,

and the system can still rebuild the truth from storage.

A Simple Story (Best way to remember)

Imagine you are running a restaurant kitchen.

‚ùå Bad system (in-memory truth)

You write all orders on a whiteboard.

Customer order comes ‚Üí you write on whiteboard

Kitchen is cooking ‚Üí status is on whiteboard

Then‚Ä¶ power goes off (restart)

üí• Whiteboard gets erased ‚Üí you don‚Äôt know:

which orders existed

what was already cooked

what is still pending
That‚Äôs exactly what happens when state lives in RAM.

‚úÖ Good system (external truth)

Instead, you write orders in a notebook (database).

Power goes off? No problem.
You open the notebook and continue.

That‚Äôs Externalized State + Stateless Service.

Now let‚Äôs connect this to your FastAPI/Colab project
1) Stateless Service Pattern
Meaning (easy)

The service should behave like:

‚ÄúI can be killed and restarted at any time and I still work.‚Äù

How?

No jobs = {} global dictionary holding truth

When request comes, read data from DB

Update DB

Return response

‚úÖ In your notebook:
We never stored job truth in a Python global variable.

2) Externalized State Pattern
Meaning

Store the truth somewhere outside the app process:

DB (SQLite/Postgres)

Redis (for fast state)

Queue/log (Kafka/SQS)

‚úÖ In your notebook:

Tables: jobs, job_steps, job_events, job_snapshots

Even if runtime restarts, SQLite file still exists

3) Entity Decomposition Pattern
Meaning (easy)

Don‚Äôt put everything into one big blob.

Instead split into smaller pieces (entities), like:

Job = the overall work

Steps = the parts of work

Events = the history of changes

Snapshots = a saved checkpoint

‚úÖ Why it matters?
If ‚Äúextract step‚Äù fails, you can re-run only extract.
If everything is one big JSON, it becomes messy.

‚úÖ In project:

jobs table ‚Üí job info

job_steps table ‚Üí step-level tracking

job_events table ‚Üí history

job_snapshots table ‚Üí checkpoint

4) Append-Only / Audit Log Pattern (Most important!)
Meaning (easy)

Instead of changing the past, you only add new facts.

Like a bank statement:

You don‚Äôt erase transactions

You only add new entries

‚úÖ Example
Instead of updating status = RUNNING ‚Üí DONE and losing history,
you add events:

JOB_CREATED

JOB_STARTED

STEP_COMPLETED (INGEST)

STEP_COMPLETED (EXTRACT)

‚úÖ In project:
append_event() only does INSERT into job_events.

This gives you:

full history

audit

debugging

replay ability

5) Replay / Reprocessing Pattern
Meaning (easy)

If you have the full event history, you can rebuild the current state anytime.

Like: ‚ÄúIf I re-read my bank statement from the top, I can compute my balance.‚Äù

‚úÖ In project:
replay_job(job_id) does:

read all events for that job

start with empty state

apply events one by one

rebuild status + steps

So after restart, system can ‚Äúremember‚Äù by replaying.

6) Memento Pattern (Snapshots)
Meaning (easy)

Replay can be slow if you have 1 million events.

So sometimes you save a checkpoint:

‚ÄúAt event #5000, state looked like this.‚Äù

Then to rebuild:

load snapshot

apply only events after snapshot

‚úÖ In project:
create_snapshot(job_id) saves the replay result into job_snapshots

This is like a video game save point üéÆ

7) Versioned Data Pattern
Meaning (easy)

Your system changes over time:

new fields

new step names

new payload structure

Old jobs shouldn‚Äôt break.

So we store schema_version.

‚úÖ In the earlier FastAPI repo version, we included schema_version.
In the Colab mini version, we kept it simple (but we can add it easily).

8) Canonical Data Model Pattern
Meaning (easy)

If multiple services read/write job info, they must agree on the same structure.

Otherwise one service says:

status="DONE"

another service uses:

status="COMPLETED"

üí• breaks replay & integration.

Canonical model = one shared truth structure.

9) Soft Delete Pattern
Meaning (easy)

Never permanently delete truth.

Instead:

mark as deleted

keep the history/events

‚úÖ Why?

audit required (banks, enterprise)

debugging

replay still possible

‚úÖ In project:
soft_delete(job_id) updates deleted_at instead of deleting rows.

Why interviewers LOVE Group 3

Because this is what breaks production systems:

‚úÖ ‚ÄúWe had jobs running‚Ä¶ then deploy happened‚Ä¶ and everything vanished.‚Äù

So when you answer:

‚ÄúWe use externalized state + append-only events + replay + snapshots‚Äù

They immediately think:
‚úÖ This person builds real systems.

A super simple ‚Äúrestart test‚Äù you can remember
Step A: create job + complete steps

Events:

JOB_CREATED

JOB_STARTED

STEP_COMPLETED INGEST

STEP_COMPLETED EXTRACT

Step B: restart app / runtime

Memory wiped

Step C: replay reads events and rebuilds state

Result:

status = RUNNING

steps = {INGEST:DONE, EXTRACT:DONE}

‚úÖ truth survived restart.

Want me to upgrade your Colab into a ‚Äúreal FastAPI server inside Colab‚Äù?

Right now your Colab is ‚Äúservice functions only‚Äù.
Next step is to run FastAPI endpoints inside Colab (with uvicorn + ngrok) so it feels like a real API.

If you say ‚Äúyes make it real FastAPI in Colab‚Äù, I‚Äôll give you the notebook cells for that too.

yes

Perfect ‚úÖ Bobikutty ‚Äî here‚Äôs the ‚Äúreal FastAPI server inside Google Colab‚Äù version of your Group-3 project.

This notebook will:

start a real FastAPI API

expose it with a public URL using ngrok

persist state in SQLite file

let you restart the runtime and still replay state (proof that memory isn‚Äôt required)

Copy these cells in order into Colab.

‚úÖ Colab Notebook: Group 3 FastAPI Server + SQLite + Replay + Snapshot + Soft Delete
üß© Cell 1 ‚Äî Install dependencies
!pip -q install fastapi uvicorn nest-asyncio pyngrok pydantic

üß© Cell 2 ‚Äî Create the full FastAPI app file (app.py)
%%writefile app.py
import sqlite3
import json
import uuid
from datetime import datetime, timezone
from typing import Dict, Any, Optional, Tuple

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field

DB_PATH = "group3_state.db"
DEFAULT_STEPS = ["INGEST", "EXTRACT", "EMBED", "INDEX"]

# ---------------------------
# Utilities
# ---------------------------
def utcnow() -> str:
    return datetime.now(timezone.utc).isoformat()

def get_conn():
    conn = sqlite3.connect(DB_PATH)
    conn.row_factory = sqlite3.Row
    conn.execute("PRAGMA foreign_keys=ON;")
    conn.execute("PRAGMA journal_mode=WAL;")
    return conn

def init_db():
    conn = get_conn()

    conn.execute("""
    CREATE TABLE IF NOT EXISTS jobs(
        job_id TEXT PRIMARY KEY,
        job_type TEXT NOT NULL,
        status TEXT NOT NULL,
        schema_version INTEGER NOT NULL,
        deleted_at TEXT NULL,
        created_at TEXT NOT NULL,
        updated_at TEXT NOT NULL
    );
    """)

    conn.execute("""
    CREATE TABLE IF NOT EXISTS job_steps(
        step_id TEXT PRIMARY KEY,
        job_id TEXT NOT NULL,
        step_name TEXT NOT NULL,
        status TEXT NOT NULL,
        created_at TEXT NOT NULL,
        updated_at TEXT NOT NULL,
        UNIQUE(job_id, step_name),
        FOREIGN KEY(job_id) REFERENCES jobs(job_id)
    );
    """)

    # Append-only audit log (INSERT only)
    conn.execute("""
    CREATE TABLE IF NOT EXISTS job_events(
        event_id INTEGER PRIMARY KEY AUTOINCREMENT,
        job_id TEXT NOT NULL,
        event_type TEXT NOT NULL,
        schema_version INTEGER NOT NULL,
        payload_json TEXT NOT NULL,
        created_at TEXT NOT NULL,
        FOREIGN KEY(job_id) REFERENCES jobs(job_id)
    );
    """)

    # Memento snapshots
    conn.execute("""
    CREATE TABLE IF NOT EXISTS job_snapshots(
        snapshot_id INTEGER PRIMARY KEY AUTOINCREMENT,
        job_id TEXT NOT NULL,
        snapshot_json TEXT NOT NULL,
        created_at TEXT NOT NULL,
        FOREIGN KEY(job_id) REFERENCES jobs(job_id)
    );
    """)

    conn.commit()
    conn.close()

# ---------------------------
# Canonical statuses
# ---------------------------
JOB_CREATED = "CREATED"
JOB_RUNNING = "RUNNING"
JOB_DONE = "DONE"
JOB_FAILED = "FAILED"

STEP_PENDING = "PENDING"
STEP_DONE = "DONE"

# ---------------------------
# Schemas
# ---------------------------
class CreateJobRequest(BaseModel):
    job_type: str = Field(default="PDF_EXTRACT")
    schema_version: int = Field(default=1, ge=1)

class JobView(BaseModel):
    job_id: str
    job_type: str
    status: str
    schema_version: int
    deleted_at: Optional[str] = None
    created_at: str
    updated_at: str

class StepView(BaseModel):
    step_id: str
    job_id: str
    step_name: str
    status: str
    created_at: str
    updated_at: str

class JobWithSteps(BaseModel):
    job: JobView
    steps: list[StepView]

class ReplayResult(BaseModel):
    job_id: str
    rebuilt_status: Optional[str]
    steps: Dict[str, str]
    applied_events: int
    from_snapshot: bool

class SnapshotResult(BaseModel):
    job_id: str
    snapshot_created_at: str

# ---------------------------
# Event Store (Append-only)
# ---------------------------
def append_event(job_id: str, event_type: str, schema_version: int, payload: Dict[str, Any]) -> int:
    conn = get_conn()
    payload_json = json.dumps(payload, separators=(",", ":"), ensure_ascii=False)
    cur = conn.execute(
        """
        INSERT INTO job_events(job_id, event_type, schema_version, payload_json, created_at)
        VALUES (?, ?, ?, ?, ?)
        """,
        (job_id, event_type, schema_version, payload_json, utcnow())
    )
    conn.commit()
    conn.close()
    return int(cur.lastrowid)

def list_events(job_id: str):
    conn = get_conn()
    rows = conn.execute(
        "SELECT * FROM job_events WHERE job_id=? ORDER BY event_id ASC",
        (job_id,)
    ).fetchall()
    conn.close()
    out = []
    for r in rows:
        out.append({
            "event_id": r["event_id"],
            "event_type": r["event_type"],
            "schema_version": r["schema_version"],
            "payload": json.loads(r["payload_json"]),
            "created_at": r["created_at"]
        })
    return out

def save_snapshot(job_id: str, snapshot: Dict[str, Any]) -> str:
    conn = get_conn()
    created_at = utcnow()
    conn.execute(
        "INSERT INTO job_snapshots(job_id, snapshot_json, created_at) VALUES (?, ?, ?)",
        (job_id, json.dumps(snapshot, separators=(",", ":"), ensure_ascii=False), created_at)
    )
    conn.commit()
    conn.close()
    return created_at

def get_latest_snapshot(job_id: str) -> Optional[Dict[str, Any]]:
    conn = get_conn()
    row = conn.execute(
        """
        SELECT snapshot_json, created_at
        FROM job_snapshots
        WHERE job_id=?
        ORDER BY snapshot_id DESC
        LIMIT 1
        """,
        (job_id,)
    ).fetchone()
    conn.close()
    if not row:
        return None
    snap = json.loads(row["snapshot_json"])
    snap["_snapshot_created_at"] = row["created_at"]
    return snap

# ---------------------------
# Replay Engine
# ---------------------------
def apply_event(state: Dict[str, Any], event_type: str, payload: Dict[str, Any]) -> Dict[str, Any]:
    if event_type == "JOB_CREATED":
        state["status"] = JOB_CREATED
        state["steps"] = {}
        state["deleted"] = False
    elif event_type == "JOB_STARTED":
        state["status"] = JOB_RUNNING
    elif event_type == "STEP_CREATED":
        step = payload["step_name"]
        state["steps"][step] = STEP_PENDING
    elif event_type == "STEP_COMPLETED":
        step = payload["step_name"]
        state["steps"][step] = STEP_DONE
    elif event_type == "JOB_COMPLETED":
        state["status"] = JOB_DONE
    elif event_type == "JOB_FAILED":
        state["status"] = JOB_FAILED
    elif event_type == "JOB_SOFT_DELETED":
        state["deleted"] = True
    return state

def replay_job(job_id: str) -> Tuple[Dict[str, Any], int, bool]:
    snapshot = get_latest_snapshot(job_id)
    from_snapshot = snapshot is not None

    if snapshot:
        state = {
            "status": snapshot.get("status"),
            "steps": snapshot.get("steps", {}),
            "deleted": snapshot.get("deleted", False),
        }
        last_event_id = int(snapshot.get("last_event_id", 0))
    else:
        state = {"status": None, "steps": {}, "deleted": False}
        last_event_id = 0

    events = list_events(job_id)
    applied = 0
    for e in events:
        if e["event_id"] <= last_event_id:
            continue
        state = apply_event(state, e["event_type"], e["payload"])
        applied += 1

    state["last_event_id"] = events[-1]["event_id"] if events else last_event_id
    return state, applied, from_snapshot

# ---------------------------
# Services (Stateless)
# ---------------------------
def create_job(job_type: str, schema_version: int) -> str:
    job_id = str(uuid.uuid4())
    now = utcnow()

    conn = get_conn()
    conn.execute(
        """
        INSERT INTO jobs(job_id, job_type, status, schema_version, deleted_at, created_at, updated_at)
        VALUES (?, ?, ?, ?, NULL, ?, ?)
        """,
        (job_id, job_type, JOB_CREATED, schema_version, now, now)
    )

    # Steps are separate entity (Entity Decomposition)
    for step in DEFAULT_STEPS:
        step_id = str(uuid.uuid4())
        conn.execute(
            """
            INSERT INTO job_steps(step_id, job_id, step_name, status, created_at, updated_at)
            VALUES (?, ?, ?, ?, ?, ?)
            """,
            (step_id, job_id, step, STEP_PENDING, now, now)
        )

    conn.commit()
    conn.close()

    append_event(job_id, "JOB_CREATED", schema_version, {"job_type": job_type})
    for step in DEFAULT_STEPS:
        append_event(job_id, "STEP_CREATED", schema_version, {"step_name": step})

    return job_id

def get_job_with_steps(job_id: str) -> Optional[Dict[str, Any]]:
    conn = get_conn()
    job = conn.execute("SELECT * FROM jobs WHERE job_id=?", (job_id,)).fetchone()
    if not job:
        conn.close()
        return None
    steps = conn.execute(
        "SELECT * FROM job_steps WHERE job_id=? ORDER BY step_name ASC",
        (job_id,)
    ).fetchall()
    conn.close()
    return {"job": dict(job), "steps": [dict(s) for s in steps]}

def start_job(job_id: str) -> None:
    conn = get_conn()
    job = conn.execute("SELECT schema_version, deleted_at FROM jobs WHERE job_id=?", (job_id,)).fetchone()
    if not job:
        conn.close()
        raise HTTPException(status_code=404, detail="Job not found")
    if job["deleted_at"] is not None:
        conn.close()
        raise HTTPException(status_code=410, detail="Job is soft-deleted")

    now = utcnow()
    conn.execute("UPDATE jobs SET status=?, updated_at=? WHERE job_id=?", (JOB_RUNNING, now, job_id))
    conn.commit()
    conn.close()

    append_event(job_id, "JOB_STARTED", int(job["schema_version"]), {})

def complete_step(job_id: str, step_name: str) -> None:
    step_name = step_name.upper()

    conn = get_conn()
    job = conn.execute("SELECT schema_version, deleted_at FROM jobs WHERE job_id=?", (job_id,)).fetchone()
    if not job:
        conn.close()
        raise HTTPException(status_code=404, detail="Job not found")
    if job["deleted_at"] is not None:
        conn.close()
        raise HTTPException(status_code=410, detail="Job is soft-deleted")

    step = conn.execute(
        "SELECT * FROM job_steps WHERE job_id=? AND step_name=?",
        (job_id, step_name)
    ).fetchone()
    if not step:
        conn.close()
        raise HTTPException(status_code=404, detail=f"Step not found: {step_name}")

    now = utcnow()
    conn.execute(
        "UPDATE job_steps SET status=?, updated_at=? WHERE job_id=? AND step_name=?",
        (STEP_DONE, now, job_id, step_name)
    )
    conn.execute("UPDATE jobs SET updated_at=? WHERE job_id=?", (now, job_id))

    remaining = conn.execute(
        "SELECT COUNT(*) AS c FROM job_steps WHERE job_id=? AND status!=?",
        (job_id, STEP_DONE)
    ).fetchone()["c"]

    if remaining == 0:
        conn.execute("UPDATE jobs SET status=?, updated_at=? WHERE job_id=?", (JOB_DONE, now, job_id))

    conn.commit()
    conn.close()

    sv = int(job["schema_version"])
    append_event(job_id, "STEP_COMPLETED", sv, {"step_name": step_name})
    if remaining == 0:
        append_event(job_id, "JOB_COMPLETED", sv, {})

def soft_delete_job(job_id: str) -> None:
    conn = get_conn()
    job = conn.execute("SELECT schema_version FROM jobs WHERE job_id=?", (job_id,)).fetchone()
    if not job:
        conn.close()
        raise HTTPException(status_code=404, detail="Job not found")

    now = utcnow()
    conn.execute(
        "UPDATE jobs SET deleted_at=?, updated_at=? WHERE job_id=? AND deleted_at IS NULL",
        (now, now, job_id)
    )
    conn.commit()
    conn.close()

    append_event(job_id, "JOB_SOFT_DELETED", int(job["schema_version"]), {"deleted_at": now})

def create_snapshot(job_id: str) -> str:
    state, _, _ = replay_job(job_id)
    snapshot = {
        "status": state.get("status"),
        "steps": state.get("steps", {}),
        "deleted": state.get("deleted", False),
        "last_event_id": state.get("last_event_id", 0),
    }
    return save_snapshot(job_id, snapshot)

# ---------------------------
# FastAPI app
# ---------------------------
app = FastAPI(title="Group 3 ‚Äî State Survives Restarts (Colab)")

@app.on_event("startup")
def on_startup():
    init_db()

@app.post("/jobs", response_model=JobView)
def api_create_job(req: CreateJobRequest):
    job_id = create_job(req.job_type, req.schema_version)
    data = get_job_with_steps(job_id)
    return data["job"]

@app.get("/jobs/{job_id}", response_model=JobWithSteps)
def api_get_job(job_id: str):
    data = get_job_with_steps(job_id)
    if not data:
        raise HTTPException(status_code=404, detail="Job not found")
    return {"job": data["job"], "steps": data["steps"]}

@app.post("/jobs/{job_id}/start")
def api_start(job_id: str):
    start_job(job_id)
    return {"ok": True}

@app.post("/jobs/{job_id}/steps/{step_name}/complete")
def api_complete_step(job_id: str, step_name: str):
    complete_step(job_id, step_name)
    return {"ok": True}

@app.post("/jobs/{job_id}/snapshot", response_model=SnapshotResult)
def api_snapshot(job_id: str):
    # validate exists
    if not get_job_with_steps(job_id):
        raise HTTPException(status_code=404, detail="Job not found")
    created_at = create_snapshot(job_id)
    return {"job_id": job_id, "snapshot_created_at": created_at}

@app.post("/jobs/{job_id}/replay", response_model=ReplayResult)
def api_replay(job_id: str):
    if not get_job_with_steps(job_id):
        raise HTTPException(status_code=404, detail="Job not found")
    state, applied, from_snapshot = replay_job(job_id)
    return {
        "job_id": job_id,
        "rebuilt_status": state.get("status"),
        "steps": state.get("steps", {}),
        "applied_events": applied,
        "from_snapshot": from_snapshot
    }

@app.delete("/jobs/{job_id}")
def api_delete(job_id: str):
    soft_delete_job(job_id)
    return {"ok": True}

üß© Cell 3 ‚Äî Start FastAPI server in Colab + expose with ngrok
import nest_asyncio
nest_asyncio.apply()

from pyngrok import ngrok
import uvicorn
import threading

# Start ngrok tunnel
public_url = ngrok.connect(8000)
print("‚úÖ Public API URL:", public_url)

# Run uvicorn in a background thread (still within this notebook runtime)
def run():
    uvicorn.run("app:app", host="0.0.0.0", port=8000, log_level="info")

thread = threading.Thread(target=run, daemon=True)
thread.start()


‚úÖ After this cell, your API is live at the printed ngrok URL.

üß© Cell 4 ‚Äî Helper: easy requests from inside Colab
import requests

BASE = str(public_url)

def post(path, json_body=None):
    r = requests.post(BASE + path, json=json_body)
    return r.status_code, r.json()

def get(path):
    r = requests.get(BASE + path)
    return r.status_code, r.json()

def delete(path):
    r = requests.delete(BASE + path)
    return r.status_code, r.json()

üß© Cell 5 ‚Äî Demo flow (Create ‚Üí Start ‚Üí Complete steps ‚Üí Snapshot ‚Üí Replay)
# 1) Create job
code, job = post("/jobs", {"job_type": "PDF_EXTRACT", "schema_version": 1})
code, job

job_id = job["job_id"]
job_id

# 2) Start job
post(f"/jobs/{job_id}/start")

# 3) Complete some steps
post(f"/jobs/{job_id}/steps/INGEST/complete")
post(f"/jobs/{job_id}/steps/EXTRACT/complete")

# 4) Snapshot (Memento)
post(f"/jobs/{job_id}/snapshot")

# 5) Replay (Reprocessing)
post(f"/jobs/{job_id}/replay")

# 6) Check current DB state
get(f"/jobs/{job_id}")

üß© Cell 6 ‚Äî Soft delete demo
delete(f"/jobs/{job_id}")

# Replay still works because events are kept (truth not destroyed)
post(f"/jobs/{job_id}/replay")

‚úÖ The ‚ÄúRestart Proof‚Äù (most important)
Do this now:

Runtime ‚Üí Restart runtime (Colab menu)

Then run ONLY these cells again:

Cell 3 (start server + ngrok)

Cell 4 (helper requests)

Then run this cell:

üß© Cell 7 ‚Äî After restart: replay still works
# Replace with your existing job_id you saved somewhere (copy it before restart)
existing_job_id = job_id  # if you still have it; otherwise paste the string here

post(f"/jobs/{existing_job_id}/replay")


‚úÖ If you see the same status/steps, you‚Äôve proven:

stateless service

externalized state

append-only audit log

replay recovery

snapshot support

restart safe

If you want it even easier (1 click restart test)

Tell me and I‚Äôll add a ‚Äúsave job_id to a file + reload after restart‚Äù cell so you don‚Äôt lose the job_id variable when Colab restarts.

whn sbapshts are saved

Great question üëç ‚Äî this is exactly where many people get confused, so let‚Äôs slow it down and make it crystal clear.

Short, clear answer first

Snapshots are saved at controlled, meaningful checkpoints ‚Äî NOT on every change.

They are saved intentionally, when you decide:

‚ÄúThis state is important‚Äù

‚ÄúReplay from the beginning would be expensive‚Äù

‚ÄúI may need fast recovery from here‚Äù

Think of snapshots like a video-game save point üéÆ

You don‚Äôt save every second.
You save:

after finishing a level

before a boss fight

at a safe checkpoint

In YOUR FastAPI / Colab project
üìå When are snapshots saved?
‚úÖ ONLY when this endpoint is called
POST /jobs/{job_id}/snapshot


Internally, this calls:

create_snapshot(job_id)


Which does:

Replay all events to rebuild the current state

Store that rebuilt state in job_snapshots

So snapshots are manual / policy-driven, not automatic.

Why NOT save snapshots all the time?
‚ùå Bad idea

Snapshot after every event

Huge storage usage

Almost same as duplicating state tables

No real benefit

‚úÖ Good idea

Append-only events for truth

Snapshots for speed

Common REAL-WORLD snapshot strategies (IMPORTANT)
1Ô∏è‚É£ Time-based snapshots

‚ÄúEvery 10 minutes‚Äù

Used when:

jobs run very long

continuous data streams

Example:

Snapshot every 10 minutes OR every 1,000 events

2Ô∏è‚É£ Milestone-based snapshots (most common)

‚ÄúWhen something meaningful finishes‚Äù

Examples:

After job starts

After each major pipeline phase

After job completes

In your case:

INGEST done ‚Üí snapshot
EXTRACT done ‚Üí snapshot
JOB DONE ‚Üí snapshot

3Ô∏è‚É£ Size-based snapshots

‚ÄúWhen replay cost becomes high‚Äù

Example:

If events > 5,000 ‚Üí snapshot

4Ô∏è‚É£ Failure-based snapshots

‚ÄúBefore risky operation‚Äù

Example:

Before calling an expensive LLM

Before external API calls

Before distributed fan-out

What exactly is inside a snapshot?

In your project, snapshot stores:

{
  "status": "RUNNING",
  "steps": {
    "INGEST": "DONE",
    "EXTRACT": "DONE",
    "EMBED": "PENDING"
  },
  "deleted": false,
  "last_event_id": 12
}


üìå Key thing:

last_event_id tells replay where to continue from

Everything before that is already ‚Äúsummarized‚Äù

How replay uses snapshots (step-by-step)
Without snapshot
Replay events:
1 ‚Üí 2 ‚Üí 3 ‚Üí 4 ‚Üí ... ‚Üí 10,000

With snapshot at event 8,000
Load snapshot (state at event 8,000)
Replay events:
8,001 ‚Üí 8,002 ‚Üí ...


üöÄ Much faster.

When snapshots are NOT needed

Don‚Äôt use snapshots if:

Jobs are short

Few events (< 50)

Replay is cheap

Example:

Simple CRUD apps

Short synchronous requests

Interview-ready answer (memorize this)

‚ÄúSnapshots are saved at meaningful checkpoints ‚Äî such as phase completion, high event volume, or before risky operations ‚Äî to reduce replay cost. The system still treats the append-only event log as the source of truth, and snapshots are only an optimization.‚Äù

That sentence alone = üíØ in interviews.

TL;DR (one screen summary)
Question	Answer
When are snapshots saved?	Only at chosen checkpoints
Are they automatic?	‚ùå No
Why save them?	Faster recovery
Source of truth?	Append-only events
Can system work without snapshots?	‚úÖ Yes
Do snapshots replace events?	‚ùå Never

Topic	Bad Snapshot	Good Snapshot
Truth source	Snapshot overwrites truth	Events are truth
Writes	snapshot every event	snapshot by policy
Replay	impossible	always possible
Audit	none	full history
Recovery	fragile	reliable
Storage	huge blobs	minimal checkpoints
Safety	can double-apply work	safe replay boundary (last_event_id)