# UTS STKI — A11.2023.15189
*Soal 02–05*

Notebook ini menggabungkan seluruh implementasi untuk tugas UTS STKI: Preprocessing, Boolean IR, VSM, Search, dan Evaluasi.

In [None]:
import os
PROJECT_ROOT = os.getcwd()
DATA_DIR = os.path.join(PROJECT_ROOT, "data")
DATA_PROCESSED_DIR = os.path.join(PROJECT_ROOT, "data_processed")

print("Notebook working directory:", PROJECT_ROOT)
print("Expected data dir:", DATA_DIR)
print("Expected data_processed dir:", DATA_PROCESSED_DIR)

os.makedirs(DATA_DIR, exist_ok=True)
os.makedirs(DATA_PROCESSED_DIR, exist_ok=True)

try:
    import Sastrawi
except Exception:
    print("\nPeringatan: paket Sastrawi tidak terpasang di lingkungan ini.")
    print("Install dengan: pip install Sastrawi")


Notebook working directory: c:\Users\ASUS\OneDrive\Documents\KULIAH RENDY\SEMESTER 5\SISTEM TEMU KEMBALI INFORMASI\stki-uts-a11.2023.15189-aditya rendy setyawan\notebooks
Expected data dir: c:\Users\ASUS\OneDrive\Documents\KULIAH RENDY\SEMESTER 5\SISTEM TEMU KEMBALI INFORMASI\stki-uts-a11.2023.15189-aditya rendy setyawan\notebooks\data
Expected data_processed dir: c:\Users\ASUS\OneDrive\Documents\KULIAH RENDY\SEMESTER 5\SISTEM TEMU KEMBALI INFORMASI\stki-uts-a11.2023.15189-aditya rendy setyawan\notebooks\data_processed


## Soal 02 — Preprocessing
Langkah-langkah: tokenisasi, normalisasi, hapus stopwords, stemming (Sastrawi), dan menyimpan hasil ke `data_processed/`.

In [None]:
import re, glob
from collections import Counter

try:
    from Sastrawi.Stemmer.StemmerFactory import StemmerFactory
    from Sastrawi.StopWordRemover.StopWordRemoverFactory import StopWordRemoverFactory
except Exception as e:
    raise ImportError("Sastrawi required. Install with: pip install Sastrawi") from e

PROJECT_ROOT = os.getcwd()
DATA_DIR = os.path.join(PROJECT_ROOT, "data")
DATA_PROCESSED_DIR = os.path.join(PROJECT_ROOT, "data_processed")
os.makedirs(DATA_PROCESSED_DIR, exist_ok=True)

def load_stopwords(path):
    if not os.path.exists(path):
        print(f"stopwords.txt not found at {path}. Using default small list.")
        return set(["dan","di","ke","yang","untuk","dari","pada","ini","itu","dengan"])
    with open(path, "r", encoding="utf-8") as f:
        return set(line.strip().lower() for line in f if line.strip())

stopwords_path = os.path.join(DATA_DIR, "stopwords.txt")
stop_words = load_stopwords(stopwords_path)

factory = StemmerFactory()
stemmer = factory.create_stemmer()

def clean_text(text):
    text = text.lower()
    text = re.sub(r'[^a-z\s]', ' ', text)
    text = re.sub(r'\s+', ' ', text).strip()
    return text

def tokenize(text):
    tokens = text.split()
    tokens = [t for t in tokens if len(t) > 2]
    return tokens

def remove_stop(tokens, stop_words):
    return [t for t in tokens if t not in stop_words]

def stem_tokens(tokens):
    return [stemmer.stem(t) for t in tokens]

txt_files = sorted(glob.glob(os.path.join(DATA_DIR, "*.txt")))
print("Found raw txt files:", txt_files)
processed = {}
for path in txt_files:
    name = os.path.basename(path)
    if name.lower() == "stopwords.txt":
        continue
    with open(path, "r", encoding="utf-8") as f:
        raw = f.read()
    cleaned = clean_text(raw)
    toks = tokenize(cleaned)
    toks = remove_stop(toks, stop_words)
    toks = stem_tokens(toks)
    processed[name] = toks
    out_name = f"CLEAN_{name}"
    out_path = os.path.join(DATA_PROCESSED_DIR, out_name)
    with open(out_path, "w", encoding="utf-8") as outf:
        outf.write(" ".join(toks))
    print(f"Saved {out_name} ({len(toks)} tokens) to data_processed/")

from collections import Counter
for name, toks in processed.items():
    print("\nDocument:", name)
    for term, cnt in Counter(toks).most_common(10):
        print(f"  {term:<15} {cnt}")


stopwords.txt not found at c:\Users\ASUS\OneDrive\Documents\KULIAH RENDY\SEMESTER 5\SISTEM TEMU KEMBALI INFORMASI\stki-uts-a11.2023.15189-aditya rendy setyawan\notebooks\data\stopwords.txt. Using default small list.
Found raw txt files: []


## Soal 03 — Boolean Retrieval
Membangun inverted index, incidence matrix, operasi AND/OR/NOT, dan evaluasi precision/recall.

In [None]:
import os, glob
import numpy as np
from collections import defaultdict

DATA_PROCESSED_DIR = os.path.join(os.getcwd(), "../data_processed")

def load_processed(directory):
    files = sorted([f for f in os.listdir(directory) if f.startswith("CLEAN_") and f.endswith(".txt")])
    docs = {}
    id_map = {}
    for i, fn in enumerate(files):
        with open(os.path.join(directory, fn), "r", encoding="utf-8") as f:
            toks = f.read().split()
        doc_id = f"D{i+1}"
        docs[doc_id] = toks
        id_map[doc_id] = fn.replace("CLEAN_", "").replace(".txt","")
    vocab = sorted({t for toks in docs.values() for t in toks})
    return docs, id_map, vocab

docs, id_map, vocab = load_processed(DATA_PROCESSED_DIR)
print("Documents loaded:", list(id_map.values()))
print("Vocabulary size:", len(vocab))

def build_inverted(docs):
    inv = defaultdict(list)
    for doc_id, toks in docs.items():
        for term in set(toks):
            inv[term].append(doc_id)
    for term in inv:
        inv[term].sort()
    return dict(inv)

def build_incidence(docs, vocab):
    doc_ids = sorted(docs.keys())
    mat = np.zeros((len(vocab), len(doc_ids)), dtype=int)
    term_to_idx = {t:i for i,t in enumerate(vocab)}
    doc_to_idx = {d:i for i,d in enumerate(doc_ids)}
    for d, toks in docs.items():
        for t in set(toks):
            mat[term_to_idx[t], doc_to_idx[d]] = 1
    return mat, doc_ids

inverted_index = build_inverted(docs)
incidence_matrix, doc_ids = build_incidence(docs, vocab)

print("\nSample inverted index entries:")
for k in list(inverted_index.keys())[:10]:
    print(k, "->", inverted_index[k])

def boolean_retrieve(query, inverted_index, all_doc_ids):
    q = query.lower().split()
    if not q:
        return []
    if len(q) == 1:
        return inverted_index.get(q[0], [])
    if len(q) == 2 and q[0].upper()=="NOT":
        return sorted(set(all_doc_ids) - set(inverted_index.get(q[1], [])))
    if len(q) == 3:
        a, op, b = q[0], q[1].upper(), q[2]
        pa = set(inverted_index.get(a, []))
        pb = set(inverted_index.get(b, []))
        if op=="AND":
            return sorted(pa & pb)
        if op=="OR":
            return sorted(pa | pb)
    return []

all_docs = sorted(docs.keys())
queries = [("informasi AND proyek", ['D5']), ("kriptografi OR dekripsi", ['D4']), ("NOT proyek", [d for d in all_docs if d!='D5'])]
from collections import Counter

for q, gold in queries:
    res = boolean_retrieve(q, inverted_index, all_docs)
    tp = len(set(res)&set(gold))
    precision = tp / (len(res) or 1)
    recall = tp / (len(gold) or 1)
    print(f"\nQuery: {q}")
    print(" Retrieved:", res)
    print(f" Precision: {precision:.4f}, Recall: {recall:.4f}")


Documents loaded: ['RPS Kriptografi', 'RPS Manajemen Proyek Teknologi Infromasi', 'RPS Sistem Informasi', 'RPS Sistem Temu Kembali Informasi', 'RPS Sistem Terdistribusi', 'stopwords']
Vocabulary size: 1250

Sample inverted index entries:
sesuai -> ['D1', 'D2', 'D3', 'D4', 'D5']
tunjang -> ['D1']
manfaat -> ['D1', 'D2', 'D5']
mutu -> ['D1', 'D2', 'D3']
dekripsi -> ['D1']
deskripsi -> ['D1', 'D2', 'D3', 'D4', 'D5']
dosen -> ['D1', 'D2', 'D3', 'D4', 'D5']
komunikasi -> ['D1', 'D2', 'D3', 'D5']
substitusi -> ['D1']
sejarah -> ['D1', 'D5']

Query: informasi AND proyek
 Retrieved: ['D2', 'D3']
 Precision: 0.0000, Recall: 0.0000

Query: kriptografi OR dekripsi
 Retrieved: ['D1']
 Precision: 0.0000, Recall: 0.0000

Query: NOT proyek
 Retrieved: ['D1', 'D4', 'D6']
 Precision: 1.0000, Recall: 0.6000


## Soal 04 — Vector Space Model (TF-IDF) & Cosine Similarity
Hitung TF, DF, IDF, buat matriks TF-IDF dan ranking menggunakan cosine similarity.

In [None]:
import os
import numpy as np
from collections import Counter
from scipy.sparse import csr_matrix
from sklearn.metrics.pairwise import cosine_similarity

DATA_PROCESSED_DIR = os.path.join(os.getcwd(), "../data_processed")

def load_processed_for_vsm(directory):
    files = sorted([f for f in os.listdir(directory) if f.startswith("CLEAN_") and f.endswith(".txt")])
    docs = {}
    id_map = {}
    raw_snip = {}
    for i, fn in enumerate(files):
        with open(os.path.join(directory, fn), "r", encoding="utf-8") as f:
            content = f.read()
        toks = content.split()
        doc_id = f"D{i+1}"
        docs[doc_id] = toks
        id_map[doc_id] = fn.replace("CLEAN_","").replace(".txt","")
        raw_snip[doc_id] = content[:120].replace("\n"," ") + "..."
    vocab = sorted({t for toks in docs.values() for t in toks})
    return docs, id_map, vocab, raw_snip

docs, id_map, vocab, raw_snip = load_processed_for_vsm(DATA_PROCESSED_DIR)
print("Docs:", list(id_map.values()))
print("Vocab size:", len(vocab))

def calculate_tf_idf(docs, vocab):
    N = len(docs)
    doc_ids = sorted(docs.keys())
    term_to_idx = {t:i for i,t in enumerate(vocab)}
    data, rows, cols = [], [], []
    df = Counter()
    for j, d in enumerate(doc_ids):
        tc = Counter(docs[d])
        for term in set(docs[d]):
            df[term] += 1
        for term, cnt in tc.items():
            if term in term_to_idx:
                tf = 1 + np.log10(cnt) if cnt>0 else 0
                rows.append(term_to_idx[term])
                cols.append(j)
                data.append(tf)
    tf_matrix = csr_matrix((data, (rows, cols)), shape=(len(vocab), len(doc_ids)))
    idf = np.zeros(len(vocab))
    for term, idx in term_to_idx.items():
        idf[idx] = np.log10(N / (df[term] or 1))
    tfidf = tf_matrix.multiply(idf[:, np.newaxis])
    return tfidf, idf, term_to_idx, doc_ids

tfidf_matrix, idf_vector, term_to_idx, doc_ids = calculate_tf_idf(docs, vocab)
print("TF-IDF shape:", tfidf_matrix.shape)

def query_to_tfidf(query, term_to_idx, idf_vector):
    q_toks = query.lower().split()
    qc = Counter(q_toks)
    qvec = np.zeros(len(term_to_idx))
    for t, cnt in qc.items():
        if t in term_to_idx:
            qvec[term_to_idx[t]] = (1 + np.log10(cnt)) * idf_vector[term_to_idx[t]]
    from scipy.sparse import csr_matrix
    return csr_matrix(qvec).transpose()

def rank_documents(query_vec, tfidf_mat, doc_ids):
    sims = cosine_similarity(query_vec.transpose(), tfidf_mat.transpose())[0]
    ranking = sorted(zip(doc_ids, sims), key=lambda x: x[1], reverse=True)
    return ranking

q = "kriptografi"
qvec = query_to_tfidf(q, term_to_idx, idf_vector)
rank = rank_documents(qvec, tfidf_matrix, doc_ids)
print("\nTop results for query:", q)
for r in rank[:5]:
    print(r)


Docs: ['RPS Kriptografi', 'RPS Manajemen Proyek Teknologi Infromasi', 'RPS Sistem Informasi', 'RPS Sistem Temu Kembali Informasi', 'RPS Sistem Terdistribusi', 'stopwords']
Vocab size: 1250
TF-IDF shape: (1250, 6)

Top results for query: kriptografi
('D1', np.float64(0.16997105846329377))
('D2', np.float64(0.0))
('D3', np.float64(0.0))
('D4', np.float64(0.0))
('D5', np.float64(0.0))


## Soal 05 — Evaluasi Sistem IR
Precision, Recall, F1, nDCG. Fungsi evaluasi dapat dipanggil dengan hasil ranking dari VSM atau Boolean.

In [None]:
import numpy as np

def precision(retrieved, relevant):
    if not retrieved:
        return 0.0
    return len(set(retrieved) & set(relevant)) / len(retrieved)

def recall(retrieved, relevant):
    if not relevant:
        return 0.0
    return len(set(retrieved) & set(relevant)) / len(relevant)

def f1(P, R):
    return 0.0 if (P+R)==0 else 2*P*R/(P+R)

def dcg_at_k(scores, k):
    scores = np.asarray(scores)[:k]
    if scores.size==0:
        return 0.0
    return np.sum(scores / np.log2(np.arange(2, scores.size+2)))

def ndcg_at_k(rel_scores, ideal_scores, k):
    dcg = dcg_at_k(rel_scores, k)
    idcg = dcg_at_k(sorted(ideal_scores, reverse=True), k)
    return 0.0 if idcg==0 else dcg/idcg

def evaluate(retrieved, gold, k=5):
    P = precision(retrieved[:k], gold)
    R = recall(retrieved[:k], gold)
    F1 = f1(P, R)
    rel = [1 if d in gold else 0 for d in retrieved[:k]]
    ideal = [1]*min(k, len(gold)) + [0]*(max(0, k-len(gold)))
    nDCG = ndcg_at_k(rel, ideal, k)
    print(f"Precision@{k}: {P:.4f}, Recall@{k}: {R:.4f}, F1: {F1:.4f}, nDCG@{k}: {nDCG:.4f}")
    return P, R, F1, nDCG

try:
    top_docs = [d for d,s in rank[:5]]
    gold = ['D4']
    evaluate(top_docs, gold, k=5)
except Exception as e:
    print("No ranking found in session. Run VSM cell first.")


Precision@5: 0.2000, Recall@5: 1.0000, F1: 0.3333, nDCG@5: 0.4307


### Selesai