<a href="https://colab.research.google.com/github/shr8769/ML-HACKATHON-TEAM15/blob/main/mlhackathonteam05.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# === CELL 1: Imports & constants ===
import re, time, random, pickle
from collections import defaultdict, Counter
from typing import List, Dict
from functools import lru_cache

ALPH = [chr(i) for i in range(ord('a'), ord('z')+1)]
PAD = "^"


In [2]:
# === CELL 2: Load + Clean + Bucket Corpus ===
from google.colab import files
print("Upload corpus.txt (train) and test.txt (eval).")
_ = files.upload()

ALPH_RE = re.compile(r'[^a-zA-Z]+')

def clean_word(raw: str) -> str:
    w = ALPH_RE.sub('', raw).lower()
    return w if len(w) >= 3 else ''

def load_corpus(path: str) -> List[str]:
    out=[]
    with open(path, 'r', encoding='utf-8', errors='ignore') as f:
        for line in f:
            w = clean_word(line)
            if w: out.append(w)  # keep duplicates for natural frequency
    if not out: raise ValueError(f"Empty after cleaning: {path}")
    return out

def bucket_words(words: List[str]) -> Dict[str, List[str]]:
    B={'S':[], 'M':[], 'L':[]}
    for w in words:
        n=len(w)
        if 3<=n<=5: B['S'].append(w)
        elif 6<=n<=8: B['M'].append(w)
        elif n>=9:   B['L'].append(w)
    return B

train_words = load_corpus('corpus.txt')
test_words  = load_corpus('test.txt')
buckets = bucket_words(train_words)

print("Train size:", len(train_words))
print("Buckets:", {k:len(v) for k,v in buckets.items()})
print("Test size:", len(test_words))


Upload corpus.txt (train) and test.txt (eval).


Saving corpus.txt to corpus.txt
Saving test.txt to test.txt
Train size: 49870
Buckets: {'S': 3897, 'M': 15235, 'L': 30738}
Test size: 1998


In [3]:
# === CELL 3: Candidate index ===
class CorpusIndex:
    def __init__(self, words: List[str]):
        self.by_len = defaultdict(list)
        for w in words:
            self.by_len[len(w)].append(w)

    def filter(self, mask: str, wrong: set, right: set):
        pool = self.by_len[len(mask)]
        wrong = set(wrong)
        fixed = [(i,c) for i,c in enumerate(mask) if c!='_']
        out=[]
        for w in pool:
            if wrong and any(ch in wrong for ch in w):
                continue
            ok=True
            for i,c in fixed:
                if w[i]!=c:
                    ok=False; break
            if ok: out.append(w)
        return out

idx_train = CorpusIndex(train_words)
idx_eval  = CorpusIndex(test_words)


In [4]:
# === CELL 4: NGramHMM (trigram) ===
def len_bucket(n:int)->str:
    return "S" if 3<=n<=5 else ("M" if 6<=n<=8 else "L")

class NGramHMM:
    def __init__(self, n=3, k=0.5, lam=0.6, cand_cap=1500, work_cap=120_000):
        self.n=n; self.k=k; self.lam=lam
        self.cand_cap=cand_cap; self.work_cap=work_cap
        self.ctx_counts={"S":defaultdict(Counter),"M":defaultdict(Counter),"L":defaultdict(Counter)}
        self.global_prior={"S":{a:1/26 for a in ALPH},
                           "M":{a:1/26 for a in ALPH},
                           "L":{a:1/26 for a in ALPH}}

    def fit(self, words_by_bucket: Dict[str, List[str]]):
        n=self.n
        for bk, ws in words_by_bucket.items():
            cc=self.ctx_counts[bk]; gl=Counter()
            for w in ws:
                gl.update(w)
                s = PAD*(n-1)+w
                for i in range(n-1, len(s)):
                    ctx = s[i-n+1:i]
                    nxt = s[i]
                    if nxt.isalpha():
                        cc[ctx][nxt]+=1
            tot=sum(gl.values()) or 1
            self.global_prior[bk] = {a: gl.get(a,0)/tot for a in ALPH}

    @lru_cache(maxsize=200_000)
    def _ctx_probs(self, bk:str, ctx:str):
        counts = self.ctx_counts[bk].get(ctx, Counter())
        tot = sum(counts.values()) + self.k*26
        inv = 1.0/tot
        return tuple((counts.get(a,0)+self.k)*inv for a in ALPH)

    def _pcand(self, candidates: List[str], mask: str):
        agg=Counter(); support=set()
        for w in candidates:
            for i,ch in enumerate(w):
                if mask[i]=='_':
                    agg[ch]+=1
                    support.add(ch)
        Z = sum(agg.values()) or 1
        return {a: agg.get(a,0)/Z for a in ALPH}, support

    def _phmm(self, candidates: List[str], mask: str, bk: str):
        n=self.n
        blanks=[i for i,c in enumerate(mask) if c=='_']
        if not blanks:
            return {a:1/26 for a in ALPH}
        work = len(candidates)*len(blanks)
        if work > self.work_cap:
            cap=min(self.cand_cap, len(candidates))
            step=max(1, len(candidates)//cap)
            candidates = candidates[::step][:cap]
        ctx_count=Counter()
        pad = PAD*(n-1)
        for w in candidates:
            s = pad + w
            for i in blanks:
                j = i+(n-1)
                ctx = s[j-n+1:j]
                ctx_count[ctx]+=1
        if not ctx_count:
            return {a:1/26 for a in ALPH}
        agg=[0.0]*26; total=0
        for ctx,cnt in ctx_count.items():
            probs = self._ctx_probs(bk, ctx)
            for k in range(26):
                agg[k]+=cnt*probs[k]
            total+=cnt
        inv=1.0/(total or 1.0)
        return {ALPH[i]: agg[i]*inv for i in range(26)}

    def blended_letter_probs(self, candidates: List[str], mask: str, lam=None):
        if not candidates:
            u=1.0/26
            return {a:u for a in ALPH}
        bk = len_bucket(len(mask))
        lam = self.lam if lam is None else lam
        pc, support = self._pcand(candidates, mask)
        ph = self._phmm(candidates, mask, bk)
        prior = self.global_prior[bk]

        alpha, beta, gamma = lam, 0.25, max(0.0, 1.0-lam-0.25)
        out={}
        for a in ALPH:
            if a not in support:
                out[a]=0.0
            else:
                out[a] = alpha*pc.get(a,0.0) + beta*ph.get(a,0.0) + gamma*prior.get(a,0.0)
        Z=sum(out.values()) or 1.0
        for a in ALPH:
            out[a]/=Z
        return out


In [5]:
# === CELL 5: Hangman Environment ===
class HangmanEnv:
    def __init__(self, word: str, lives: int = 6):
        self.word = word
        self.start_l = lives
        self.reset(word)

    def reset(self, word=None):
        if word is not None:
            self.word = word
        self.lives = self.start_l
        self.mask = ['_']*len(self.word)
        self.guessed = set()
        return ''.join(self.mask)

    def step(self, letter: str):
        info={}
        if letter in self.guessed:
            info['repeat']=True
            return ''.join(self.mask), 0, False, info
        self.guessed.add(letter)
        hits=0
        for i,ch in enumerate(self.word):
            if ch==letter:
                self.mask[i]=ch
                hits+=1
        if hits>0:
            done = '_' not in self.mask
            reward = 2*hits + (10 if done else 0)
            return ''.join(self.mask), reward, done, info
        else:
            self.lives -= 1
            done = self.lives==0
            reward = -1 + (-10 if done else 0)
            return ''.join(self.mask), reward, done, info


In [6]:
# === CELL 6: Original greedy HMM policy ===
def greedy_play(word, idx, hmm, lives=6):
    env = HangmanEnv(word, lives)
    wrong=set(); right=set()

    while True:
        mask = ''.join(env.mask)
        candidates = idx.filter(mask, wrong, right)
        probs = hmm.blended_letter_probs(candidates, mask)

        letter = max((a for a in ALPH if a not in env.guessed),
                     key=lambda a: probs.get(a,0.0))

        _, _, done, _ = env.step(letter)

        if letter in word:
            right.add(letter)
        else:
            wrong.add(letter)

        if done:
            win = '_' not in env.mask
            wrong_guesses = sum(1 for g in env.guessed if g not in set(word))
            correct_guesses = len(right)
            return win, wrong_guesses, correct_guesses


In [7]:
# === CELL 7: Train HMM ===
hmm = NGramHMM(n=3, k=0.5, lam=0.6)
hmm.fit(buckets)

print("✅ HMM trained.")


✅ HMM trained.


In [8]:
# === CELL 8: Smoke test ===
random.seed(7)
sample = [random.choice(test_words) for _ in range(5)]
t0=time.time()
for w in sample:
    win, wrong, correct = greedy_play(w, idx_eval, hmm)
    print(f"{w} -> {'WIN' if win else 'LOSE'} | wrong={wrong} correct={correct}")
print("Time:", time.time()-t0, "s")


locomobility -> WIN | wrong=1 correct=8
unshapeliness -> WIN | wrong=0 correct=9
presurvey -> WIN | wrong=1 correct=7
viperina -> WIN | wrong=0 correct=7
chaetotactic -> WIN | wrong=1 correct=7
Time: 0.021682024002075195 s


In [9]:
# === CELL 9: Eval with accuracy ===
def eval_greedy(words, episodes=200):
    wins=0
    total_wrong=0
    total_correct=0
    t0=time.time()
    for _ in range(episodes):
        w = random.choice(words)
        win, wrong, correct = greedy_play(w, idx_eval, hmm)
        wins += int(win)
        total_wrong += wrong
        total_correct += correct
    dt=time.time()-t0
    total_guesses = total_correct + total_wrong
    accuracy = (total_correct / total_guesses) if total_guesses>0 else 0.0
    result = {
        "episodes": episodes,
        "success_rate": wins/episodes,
        "accuracy": accuracy,
        "total_correct": total_correct,
        "total_wrong": total_wrong,
        "final_score": (wins/episodes)*2000 - total_wrong*5,
        "time_sec": dt
    }
    return result

res200 = eval_greedy(test_words, episodes=200)
res200


{'episodes': 200,
 'success_rate': 1.0,
 'accuracy': 0.9060481503229595,
 'total_correct': 1543,
 'total_wrong': 160,
 'final_score': 1200.0,
 'time_sec': 0.5900607109069824}

In [10]:
# === CELL 10: 2000-game evaluation ===
res2000 = eval_greedy(test_words, episodes=2000)
print(res2000)
print(f"Accuracy %: {res2000['accuracy']*100:.2f}%")
print(f"Success Rate %: {res2000['success_rate']*100:.2f}%")


{'episodes': 2000, 'success_rate': 1.0, 'accuracy': 0.9043576347660888, 'total_correct': 15233, 'total_wrong': 1611, 'final_score': -6055.0, 'time_sec': 6.932491064071655}
Accuracy %: 90.44%
Success Rate %: 100.00%


In [11]:
# === CELL 11: Save HMM + index for RL ===
rl_assets = {
    "hmm": hmm,
    "idx_eval": idx_eval,
    "alphabet": ALPH
}

with open("rl_assets.pkl", "wb") as f:
    pickle.dump(rl_assets, f)

print("✅ Saved rl_assets.pkl (HMM + index + alphabet)")


✅ Saved rl_assets.pkl (HMM + index + alphabet)


In [26]:
# === CELL 13 (FINAL): RL ENV WITH CONTEST-ALIGNED REWARD ===

class RLHangmanEnv:
    def __init__(self, word, idx, hmm, lives=6):
        self.idx = idx
        self.hmm = hmm
        self.base = HangmanEnv(word, lives)
        self.word = word
        self.wrong = 0
        self.repeats = 0

    def _candidate_stats(self, mask):
        cands = self.idx.filter(mask,
                                {g for g in self.base.guessed if g not in set(self.word)},
                                {g for g in self.base.guessed if g in set(self.word)})
        pc, _ = self.hmm._pcand(cands, mask)
        prior = self.hmm.global_prior[len_bucket(len(mask))]
        pc_vec    = np.array([pc[a]    for a in ALPH], dtype=np.float32)
        prior_vec = np.array([prior[a] for a in ALPH], dtype=np.float32)
        return cands, pc_vec, prior_vec

    def state(self):
        mask_str = ''.join(self.base.mask)
        cands, pc_vec, prior_vec = self._candidate_stats(mask_str)
        hmm_probs = self.hmm.blended_letter_probs(cands, mask_str)
        hmm_vec = np.array([hmm_probs[a] for a in ALPH], dtype=np.float32)

        x = np.concatenate([
            encode_mask_fixed(mask_str),
            encode_guessed(self.base.guessed),
            np.array([self.base.lives], np.float32) / 6.0,
            hmm_vec,
            pc_vec,
            prior_vec,
            bucket_onehot(len(mask_str))
        ], axis=0)
        return x

    def valid_actions(self):
        return [i for i, a in enumerate(ALPH) if a not in self.base.guessed]

    def step(self, action_idx: int):
        a = ALPH[action_idx]

        # ---- REPEAT HANDLING ----
        if a in self.base.guessed:
            self.repeats += 1
            return self.state(), -2.0, False, {}   # repeat = -2 penalty

        prev_mask = ''.join(self.base.mask)
        _, _, done, _ = self.base.step(a)
        new_mask = ''.join(self.base.mask)

        k = sum(1 for i in range(len(new_mask))
                  if new_mask[i] != prev_mask[i] and new_mask[i] != '_')

        # ---- REWARD LOGIC (aligned with contest scoring) ----
        if a in self.word:
            r = 2.5 * k          # reward per correct reveal
        else:
            r = -8.0             # strong penalty for wrong guess
            self.wrong += 1

        r -= 0.25                # per-step penalty → shorter games rewarded

        if done:
            if "_" not in new_mask:     # WIN
                r += 15.0
                if self.wrong == 0:
                    r += 10.0          # perfect game bonus
            else:                       # LOSE
                r -= 10.0

        return self.state(), r, done, {}


In [27]:
# === CELL 14: DQN Model + Replay Buffer + Masked Action Selection ===
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import random

class QNet(nn.Module):
    def __init__(self, in_dim, out_dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(in_dim, 512), nn.ReLU(),
            nn.Linear(512, 256), nn.ReLU(),
            nn.Linear(256, 128), nn.ReLU(),
            nn.Linear(128, out_dim)
        )
    def forward(self, x): return self.net(x)

class Replay:
    def __init__(self, cap=80000):
        self.buf = deque(maxlen=cap)
    def push(self, *trans): self.buf.append(trans)
    def sample(self, batch): return random.sample(self.buf, batch)
    def __len__(self): return len(self.buf)

def masked_action(q_values, valid_actions):
    q = q_values.clone()
    mask = torch.full_like(q, -1e9)
    mask[valid_actions] = 0
    return (q + mask).argmax().item()

def select_action(qnet, state, valid_actions, eps):
    if random.random() < eps:
        return random.choice(valid_actions)
    with torch.no_grad():
        q = qnet(torch.from_numpy(state).unsqueeze(0).to(device))
        return masked_action(q.squeeze(0), valid_actions)


In [33]:
# === CELL 15 (FINAL): CURRICULUM TRAINING S → M → L ===
import random
from tqdm import trange

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Build dummy env to compute state dimension
_tmp = RLHangmanEnv(random.choice(test_words), idx_eval, hmm)
state_dim = _tmp.state().shape[0]
del _tmp

action_dim = 26
qnet = QNet(state_dim, action_dim).to(device)
tgt  = QNet(state_dim, action_dim).to(device)
tgt.load_state_dict(qnet.state_dict())

opt = optim.Adam(qnet.parameters(), lr=1e-3)
replay = Replay(cap=120000)
gamma = 0.99
batch = 256
sync_rate = 700

# ---------------- STAGE TRAINER ----------------
def train_stage(words, episodes, eps_start, eps_end, stage_name):
    print(f"\n=== TRAINING STAGE: {stage_name} ({episodes} episodes) ===")
    wrong_sum = 0; wins = 0; step = 0; losses = []

    for ep in trange(episodes):
        eps = max(eps_end, eps_start - (eps_start - eps_end) * ep / episodes)
        word = random.choice(words)
        env = RLHangmanEnv(word, idx_eval, hmm)
        s = env.state()
        done = False

        while not done:
            valid = env.valid_actions()
            a = select_action(qnet, s, valid, eps)
            s2, r, done, _ = env.step(a)
            replay.push(s, a, r, s2, done, valid)
            s = s2

            if len(replay) >= batch:
                # train DQN
                batch_data = replay.sample(batch)
                ss, aa, rr, ss2, dd, _valid = zip(*batch_data)
                ss  = torch.tensor(np.stack(ss), dtype=torch.float32).to(device)
                aa  = torch.tensor(aa, dtype=torch.int64).unsqueeze(1).to(device)
                rr  = torch.tensor(rr, dtype=torch.float32).unsqueeze(1).to(device)
                ss2 = torch.tensor(np.stack(ss2), dtype=torch.float32).to(device)
                dd  = torch.tensor(dd, dtype=torch.float32).unsqueeze(1).to(device)

                q = qnet(ss).gather(1, aa)

                with torch.no_grad():
                    q2 = tgt(ss2).max(dim=1, keepdim=True)[0]
                    y = rr + gamma * (1 - dd) * q2

                loss = nn.SmoothL1Loss()(q, y)
                opt.zero_grad(); loss.backward(); opt.step()
                losses.append(loss.item())

                step += 1
                if step % sync_rate == 0:
                    tgt.load_state_dict(qnet.state_dict())

        wrong_sum += env.wrong
        if "_" not in ''.join(env.base.mask):
            wins += 1

    print(f"Stage: {stage_name} | Win rate={wins/episodes:.3f} | Avg wrong={wrong_sum/episodes:.3f}")
    return losses

# ---------------- CURRICULUM RUN ----------------

loss_hist = []
loss_hist += train_stage(buckets["S"], 5000, eps_start=0.5, eps_end=0.15, stage_name="SHORT (3-5)")
loss_hist += train_stage(buckets["M"], 6000, eps_start=0.3, eps_end=0.1, stage_name="MEDIUM (6-8)")
loss_hist += train_stage(buckets["L"], 7000, eps_start=0.2, eps_end=0.05, stage_name="LONG (9+)")

# ---------------- SAVE FINAL MODEL ----------------
import pickle
with open("rl_curriculum_final.pkl", "wb") as f:
    pickle.dump({
        "qnet_state": qnet.state_dict(),
        "state_dim": state_dim,
        "action_dim": action_dim
    }, f)

print("\n✅ Saved final agent: rl_curriculum_final.pkl")



=== TRAINING STAGE: SHORT (3-5) (5000 episodes) ===


100%|██████████| 5000/5000 [03:17<00:00, 25.35it/s]


Stage: SHORT (3-5) | Win rate=0.026 | Avg wrong=5.950

=== TRAINING STAGE: MEDIUM (6-8) (6000 episodes) ===


100%|██████████| 6000/6000 [05:24<00:00, 18.49it/s]


Stage: MEDIUM (6-8) | Win rate=0.017 | Avg wrong=5.970

=== TRAINING STAGE: LONG (9+) (7000 episodes) ===


100%|██████████| 7000/7000 [08:57<00:00, 13.03it/s]

Stage: LONG (9+) | Win rate=0.032 | Avg wrong=5.947

✅ Saved final agent: rl_curriculum_final.pkl





In [35]:
# === CELL: Evaluate RL on /content/test.txt (one pass over file) ===
import os, pickle, torch, numpy as np

TEST_PATH = "/content/test.txt"

# 1) Load & clean
ALPH_RE = re.compile(r'[^a-zA-Z]+')

def clean_word_eval(raw: str) -> str:
    w = ALPH_RE.sub('', raw).lower()
    return w if len(w) >= 3 else ''

if not os.path.exists(TEST_PATH):
    raise FileNotFoundError(f"Couldn't find {TEST_PATH}. Upload it or fix the path.")

test_words_custom = []
with open(TEST_PATH, "r", encoding="utf-8", errors="ignore") as f:
    for line in f:
        w = clean_word_eval(line)
        if w:
            test_words_custom.append(w)

if not test_words_custom:
    raise ValueError("test.txt cleaned to empty. Check file contents.")

# 2) Build a fresh index for this test set
idx_eval_custom = CorpusIndex(test_words_custom)

# 3) Load the final RL model if not already in memory
try:
    qnet_final
except NameError:
    # fallback: load the model you saved as rl_agent.pkl (overwritten to be your final)
    with open("rl_curriculum_final.pkl", "rb") as f:
        data = pickle.load(f)
    state_dim = data["state_dim"]; action_dim = data["action_dim"]
    qnet_final = QNet(state_dim, action_dim).to(device)
    qnet_final.load_state_dict(data["qnet_state"])
    qnet_final.eval()

# 4) Evaluate exactly once per word (contest scoring)
def eval_on_list(words):
    wins = 0
    wrong_total = 0
    repeats_total = 0

    for w in words:
        env = RLHangmanEnv(w, idx_eval_custom, hmm)  # same HMM oracle, new index
        s = env.state()
        done = False
        # play until done (cap for safety)
        for _ in range(60):
            valid = env.valid_actions()
            # masked greedy action from the trained net
            with torch.no_grad():
                qvals = qnet_final(torch.tensor(s, dtype=torch.float32, device=device).unsqueeze(0))[0].cpu().numpy()
            # choose best among valid
            a = valid[int(np.argmax(qvals[valid]))]
            s, _, done, _ = env.step(a)
            if done:
                break

        wrong_total += env.wrong
        repeats_total += env.repeats  # should be 0 because we mask, but we count it anyway
        if "_" not in ''.join(env.base.mask):
            wins += 1

    games = len(words)
    sr = wins / games
    score = (sr * 2000) - (wrong_total * 5) - (repeats_total * 2)
    return {
        "games": games,
        "success_rate": sr,
        "total_wrong": wrong_total,
        "total_repeats": repeats_total,
        "final_score": score
    }

results_custom = eval_on_list(test_words_custom)
print("Test file:", TEST_PATH)
print(results_custom)


Test file: /content/test.txt
{'games': 1998, 'success_rate': 0.8273273273273273, 'total_wrong': 7915, 'total_repeats': 0, 'final_score': -37920.34534534535}
