<a href="https://colab.research.google.com/github/Sakinat-Folorunso/CMP_805_Advanced_Programming_Languages/blob/main/notebooks/CMP805_Week10_PH_Python_Colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# CMP805 ‚Äî Week 10 Practical (Python, Colab)
**Topic:** Memory management ‚Äî tracing GC vs. reference counting; intro to ownership/borrowing
**Course:** Advanced Programming Languages (M.Sc.), OOU ‚Äî CMP805

**Instructor:** **DR SAKINAT FOLORUNSO ‚Äì ASSOCIATE PROFESSOR OF AI SYSTEMS AND FAIR DATA**  
**Department:** **COMPUTER SCIENCES, OLABISI ONABANJO UNIVERSITY, AGO‚ÄëIWOYE, OGUN STATE, NIGERIA**

> This PH follows your outline: implement a **mark‚Äëand‚Äësweep collector** over a toy heap, contrast with **reference counting** (cycle leak), and practice a tiny **borrow checker** for ownership rules.

### Learning goals (‚âà60 minutes)
- Build a toy **heap** (objects with references) and implement **mark‚Äëand‚Äësweep GC**.
- Contrast with **reference counting** and observe why cycles leak without tracing.
- Explore a tiny **ownership/borrowing** checker (Rust‚Äëinspired) for a simple command language.

In [None]:
# üßë‚Äçüéì Student info
STUDENT_NAME = "Type your full name here"
STUDENT_ID   = "Matric/ID here"
print("Student:", STUDENT_NAME, "| ID:", STUDENT_ID)

In [None]:
# ‚úÖ Environment check (Python 3.10+)
import sys
major, minor = sys.version_info[:2]
assert (major, minor) >= (3, 10), f"Need Python 3.10+, found {major}.{minor}"
print(f"Python {major}.{minor} OK ‚Äî ready for GC/borrowing lab.")

In [None]:
# =====================================
# Part 1 ‚Äî Toy heap & Mark‚ÄìSweep GC
# =====================================
from __future__ import annotations
from dataclasses import dataclass
from typing import Dict, List, Optional, Set

ObjId = int

@dataclass
class Obj:
    fields: List[Optional[ObjId]]  # outgoing references

class Heap:
    def __init__(self):
        self.store: Dict[ObjId, Obj] = {}     # id -> Obj
        self.next_id: ObjId = 1               # fresh id counter

    def alloc(self, nfields: int) -> ObjId:
        """Allocate an object with nfields initialized to None."""
        oid = self.next_id; self.next_id += 1
        self.store[oid] = Obj([None for _ in range(nfields)])
        return oid

    def set_field(self, oid: ObjId, i: int, tgt: Optional[ObjId]) -> None:
        self.store[oid].fields[i] = tgt

    def get_field(self, oid: ObjId, i: int) -> Optional[ObjId]:
        return self.store[oid].fields[i]

    # ---- Tracing GC (mark-and-sweep) ----
    def gc_mark(self, roots: Dict[str, Optional[ObjId]]) -> Set[ObjId]:
        """Return the set of reachable objects by DFS from root set."""
        marked: Set[ObjId] = set()
        stack: List[ObjId] = [r for r in roots.values() if r is not None]
        while stack:
            oid = stack.pop()
            if oid in marked or oid not in self.store:  # skip duplicates or already freed
                continue
            marked.add(oid)
            for ref in self.store[oid].fields:
                if ref is not None:
                    stack.append(ref)
        return marked

    def gc_sweep(self, marked: Set[ObjId]) -> int:
        """Free any object not in 'marked'. Return number of reclaimed objects."""
        to_delete = [oid for oid in self.store if oid not in marked]
        for oid in to_delete:
            del self.store[oid]
        return len(to_delete)

    def gc_collect(self, roots: Dict[str, Optional[ObjId]]) -> int:
        """Stop-the-world collection: mark from roots, then sweep unmarked."""
        marked = self.gc_mark(roots)
        reclaimed = self.gc_sweep(marked)
        return reclaimed

    def count(self) -> int:
        return len(self.store)

# --- Demo: reachability and cycles ---
H = Heap()
roots: Dict[str, Optional[ObjId]] = {}

# Build a small reachable chain a -> b -> c
a = H.alloc(1); b = H.alloc(1); c = H.alloc(0)
H.set_field(a, 0, b); H.set_field(b, 0, c)
roots["a"] = a

# Build an unreachable island (cycle) x <-> y, not in roots
x = H.alloc(1); y = H.alloc(1)
H.set_field(x, 0, y); H.set_field(y, 0, x)

before = H.count()
reclaimed = H.gc_collect(roots)
after = H.count()

print(f"ok  - GC collected {reclaimed} unreachable objs; heap: {before} -> {after}")
assert x not in H.store and y not in H.store, "cycle should be collected since unreachable"

In [None]:
# =====================================
# Part 2 ‚Äî Reference counting & cycle leak
# =====================================
from collections import defaultdict

def compute_refcounts(H: Heap, roots: Dict[str, Optional[ObjId]]):
    """Compute snapshot reference counts from roots + fields (for illustration)."""
    rc = defaultdict(int)
    # roots hold references
    for r in roots.values():
        if r is not None:
            rc[r] += 1
    # object fields hold references
    for oid, obj in H.store.items():
        for ref in obj.fields:
            if ref is not None:
                rc[ref] += 1
    return rc

# Build a new heap for RC demo
H2 = Heap(); roots2 = {}

# Make a cycle unreachable by roots
u = H2.alloc(1); v = H2.alloc(1)
H2.set_field(u, 0, v); H2.set_field(v, 0, u)

rc = compute_refcounts(H2, roots2)
print("RC counts in cycle (no roots):", {k: rc[k] for k in [u, v]})
# A pure reference counter would not reclaim u/v (counts > 0), but tracing GC will:
collected = H2.gc_collect(roots2)
print("ok  - Mark‚Äìsweep can reclaim cycles; reclaimed =", collected)
assert collected == 2

In [None]:
# =====================================
# Part 3 ‚Äî Tiny ownership/borrowing checker (Rust‚Äëinspired)
# =====================================
from dataclasses import dataclass

# Command language (seq of ops)
#   NEW x                 -- x becomes an owning variable (resource id fresh)
#   MOVE x y              -- move ownership from x to y (x becomes moved)
#   BORROW_IMM x y        -- y becomes an immutable borrow of x
#   BORROW_MUT x y        -- y becomes a mutable borrow of x (requires no other borrows)
#   READ x                -- reading via owner or any borrow is OK
#   WRITE x               -- write requires: (owner x with no active borrows) OR (x is a mutable borrow)
#   END y                 -- end a borrow held in y
#   DROP x                -- drop owner x (requires no active borrows) or end a borrow y

@dataclass
class OwnerState:
    alive: bool = True
    imm_borrows: int = 0
    mut_borrow: bool = False

class BorrowError(Exception): pass

class BorrowChecker:
    def __init__(self):
        self.owner_of: dict[str, str] = {}      # owner var -> resource id (we reuse the var name as id)
        self.state: dict[str, OwnerState] = {}  # resource id -> state
        self.ref_kind: dict[str, str] = {}      # ref var -> 'imm' | 'mut'
        self.ref_target: dict[str, str] = {}    # ref var -> owner var

    def _ensure_owner_alive(self, x: str):
        if x not in self.owner_of: raise BorrowError(f"{x} is not an owner")
        rid = self.owner_of[x]
        st = self.state[rid]
        if not st.alive: raise BorrowError(f"{x} was moved/dropped")

    def NEW(self, x: str):
        if x in self.owner_of or x in self.ref_kind:
            raise BorrowError(f"{x} already in use")
        self.owner_of[x] = x
        self.state[x] = OwnerState()

    def MOVE(self, x: str, y: str):
        self._ensure_owner_alive(x)
        rid = self.owner_of[x]; st = self.state[rid]
        if st.imm_borrows or st.mut_borrow:
            raise BorrowError("cannot move: outstanding borrows")
        # y becomes the new owner; x becomes moved
        self.owner_of[y] = rid
        del self.owner_of[x]
        st.alive = True  # resource still alive
        # mark x as moved by mapping it to a tombstone (recorded via absent owner_of)

    def BORROW_IMM(self, x: str, y: str):
        self._ensure_owner_alive(x)
        if y in self.ref_kind or y in self.owner_of: raise BorrowError(f"{y} already used")
        rid = self.owner_of[x]; st = self.state[rid]
        if st.mut_borrow: raise BorrowError("cannot imm‚Äëborrow: mutable borrow active")
        st.imm_borrows += 1
        self.ref_kind[y] = "imm"; self.ref_target[y] = x

    def BORROW_MUT(self, x: str, y: str):
        self._ensure_owner_alive(x)
        if y in self.ref_kind or y in self.owner_of: raise BorrowError(f"{y} already used")
        rid = self.owner_of[x]; st = self.state[rid]
        if st.mut_borrow or st.imm_borrows: raise BorrowError("cannot mut‚Äëborrow: other borrows active")
        st.mut_borrow = True
        self.ref_kind[y] = "mut"; self.ref_target[y] = x

    def READ(self, x: str):
        if x in self.owner_of:
            self._ensure_owner_alive(x); return
        if x in self.ref_kind:
            return  # reading via any borrow ok
        raise BorrowError(f"read of unknown {x}")

    def WRITE(self, x: str):
        if x in self.owner_of:
            self._ensure_owner_alive(x)
            rid = self.owner_of[x]; st = self.state[rid]
            if st.imm_borrows or st.mut_borrow:
                raise BorrowError("owner write forbidden while borrowed")
            return
        if x in self.ref_kind and self.ref_kind[x] == "mut":
            return
        raise BorrowError("write requires owner (unborrowed) or mutable borrow")

    def END(self, y: str):
        if y not in self.ref_kind: raise BorrowError(f"{y} is not a borrow")
        kind = self.ref_kind[y]; x = self.ref_target[y]
        rid = self.owner_of[x]; st = self.state[rid]
        if kind == "imm": st.imm_borrows -= 1
        else: st.mut_borrow = False
        del self.ref_kind[y]; del self.ref_target[y]

    def DROP(self, x: str):
        if x in self.owner_of:
            rid = self.owner_of[x]; st = self.state[rid]
            if st.imm_borrows or st.mut_borrow:
                raise BorrowError("cannot drop: outstanding borrows")
            st.alive = False
            del self.owner_of[x];  # resource dropped
            return
        if x in self.ref_kind:
            self.END(x); return
        raise BorrowError(f"{x} not found")

def run_prog(cmds: list[tuple]) -> str:
    bc = BorrowChecker()
    try:
        for cmd in cmds:
            op = cmd[0]
            args = cmd[1:]
            getattr(bc, op)(*args)
        return "ok"
    except BorrowError as e:
        return f"error: {e}"

# --- Self-checks ---
ok1 = run_prog([("NEW","x"), ("BORROW_IMM","x","y"), ("READ","y"), ("END","y"), ("WRITE","x"), ("DROP","x")])
print("ok  - borrow imm then write:", ok1)
assert ok1 == "ok"

err1 = run_prog([("NEW","x"), ("BORROW_IMM","x","y"), ("WRITE","x")])
print("ok  - owner write forbidden while imm‚Äëborrowed:", err1)
assert err1.startswith("error")

ok2 = run_prog([("NEW","x"), ("BORROW_MUT","x","y"), ("WRITE","y"), ("END","y"), ("DROP","x")])
print("ok  - mut borrow allows write via y:", ok2); assert ok2=="ok"

err2 = run_prog([("NEW","x"), ("BORROW_MUT","x","y"), ("BORROW_IMM","x","z")])
print("ok  - cannot imm‚Äëborrow during mut:", err2); assert err2.startswith("error")

err3 = run_prog([("NEW","x"), ("MOVE","x","z"), ("READ","x")])
print("ok  - use after move detected:", err3); assert err3.startswith("error")

### üß™ Your Turn (10‚Äì15 minutes)
1) Extend the heap with an **allocation limit** and trigger collections when crossing a threshold. Measure *reclaimed* objects.  
2) Add a **generational hint**: count object ages and sweep only the nursery unless a full GC is requested.  
3) Extend the borrow checker with **block‚Äëscoped lifetimes**: introduce `PUSH`/`POP` to end all borrows at block end.

### ‚úçÔ∏è Reflection (2‚Äì3 sentences)
- Why can **reference counting** leak on cycles, and how does **mark‚Äëand‚Äësweep** avoid this?  
- In the borrow checker, why do we forbid an **owner write** while any borrow is active?

In [None]:
# Save small submission bundle
import json, time
stamp = time.strftime("%Y-%m-%d %H:%M:%S")
submission = {
  "student_name": STUDENT_NAME,
  "student_id": STUDENT_ID,
  "timestamp": stamp,
  "checks": ["gc-collects-cycles", "rc-leaks-cycles", "borrow-tests"],
  "reflection": "(fill in here)"
}
with open("week10_submission.json", "w") as f:
  json.dump(submission, f, indent=2)
print("Saved week10_submission.json ‚Äî upload with your notebook.")