# Language Classifier

Train a TF-IDF + Multinomial Naive Bayes classifier to detect the programming language of a code snippet. Training data is scraped from Rosetta Code (Wikipedia/HuggingFace) with heuristic pre-filtering for high-confidence languages.

## Contents
1. Heuristic Language Detection & Wikipedia Scraper
2. Rosetta Code Dataset Preparation (HuggingFace)
3. Data Loading & Label Distribution
4. TF-IDF Vectorization & MultinomialNB Training
5. Save Model
6. Process Test Set Language

In [1]:
import requests
import re
import os
import random
import time

# --- CONFIGURATION ---
ROSETTA_API = "https://rosettacode.org/mw/api.php"
OUTPUT_FILE = "code_classifier.train"
MODEL_FILE = "code_classifier.bin"

# Mapping User's requested labels -> Rosetta Code Categories
# Note: For C/C++, we prioritize C++ as requested, but you could merge C if needed.
LANG_MAP = {
    'python': 'Python',
    'php': 'PHP',
    'java': 'Java',
    'javascript': 'JavaScript',
    'go': 'Go',
    'csharp': 'C_sharp',
    'cpp': 'C++' # User requested C/C++, effectively C++ for Rosetta
}

# --- 1. HEURISTICS (The "Sniper" Layer) ---
# These are used in the final classifier, but we define them here to keep logic in one place.
def predict_heuristic(code_snippet):
    rules = [
        ('php', r'(<\?php|namespace [\w\\]+;|function \w+\s*\(.*\)\s*:\s*\w+)'),
        ('java', r'(package [\w\.]+;|import java\.|public class \w+)'),
        ('go', r'(package main|fmt\.Print|func \w+\()'),
        ('csharp', r'(using System;|namespace [\w\.]+.*\{|public class \w+\s*:\s*\w+)'),
        ('python', r'(def \w+\(.*\):|if __name__ == "__main__":|import \w+ as \w+)'),
        ('cpp', r'(#include <\w+>|int main\(.*\) \{|std::\w+)'),
        ('javascript', r'(const \w+ =|let \w+ =|console\.log\(|function \w+\(.*\)\s*\{)'),
    ]

    for lang, regex in rules:
        if re.search(regex, code_snippet, re.MULTILINE):
            return lang
    return None

# --- 2. SCRAPING & BALANCING ---
def get_category_members(category):
    """Fetches all task pages for a given language category."""
    pages = []
    params = {
        "action": "query",
        "list": "categorymembers",
        "cmtitle": f"Category:{category}",
        "cmlimit": "500",
        "format": "json"
    }

    while True:
        response = requests.get(ROSETTA_API, params=params).json()
        if 'query' in response:
            pages.extend(response['query']['categorymembers'])

        if 'continue' in response:
            params['cmcontinue'] = response['continue']['cmcontinue']
        else:
            break

    return pages

def get_page_content(page_id):
    """Fetches raw Wikitext content of a page."""
    params = {
        "action": "query",
        "prop": "revisions",
        "rvprop": "content",
        "pageids": page_id,
        "format": "json"
    }
    try:
        data = requests.get(ROSETTA_API, params=params).json()
        page = data['query']['pages'][str(page_id)]
        return page['revisions'][0]['*']
    except Exception:
        return ""

def extract_code_block(wikitext, lang_name):
    """
    Rosetta Code usually wraps code in <lang implementation_name>...</lang>.
    We try to find the block specific to the requested language.
    """
    # Regex to find <lang Python> code </lang>
    # Case insensitive, handles attributes like <lang Python version="3">
    pattern = r'<lang\s+' + re.escape(lang_name) + r'[^>]*>(.*?)</lang>'
    matches = re.findall(pattern, wikitext, re.DOTALL | re.IGNORECASE)
    return matches

def prepare_dataset():
    print("--- Phase 1: Scouting Data Sizes ---")
    tasks_per_lang = {}

    # 1. Get all task lists first to determine the minimum count
    for label, category in LANG_MAP.items():
        print(f"Fetching task list for {label} ({category})...")
        tasks = get_category_members(category)
        tasks_per_lang[label] = tasks

    # 2. Determine the bottleneck (Min tasks)
    counts = {k: len(v) for k, v in tasks_per_lang.items()}
    min_count = min(counts.values())
    print(f"\nTask Counts: {counts}")
    print(f"Constraint: Limiting all languages to {min_count} tasks to prevent bias.\n")

    # 3. Download and Process
    print("--- Phase 2: Downloading & Formatting ---")
    with open(OUTPUT_FILE, 'w', encoding='utf-8') as f_out:

        for label, tasks in tasks_per_lang.items():
            # Random shuffle to get a representative sample of the subset
            selected_tasks = random.sample(tasks, min_count)
            print(f"Processing {label}: downloading {min_count} snippets...")

            success_count = 0
            for task in selected_tasks:
                content = get_page_content(task['pageid'])
                # Extract the code block
                snippets = extract_code_block(content, LANG_MAP[label])

                for snippet in snippets:
                    # CLEANING:
                    # 1. Remove newlines to make it one line for FastText
                    # 2. Keep punctuation! (Vital for code)
                    clean_code = snippet.replace('\n', ' ').replace('\r', '')

                    # Write to file in FastText format: __label__python print("hello")
                    f_out.write(f"__label__{label} {clean_code}\n")
                    success_count += 1

            print(f"  -> Extracted {success_count} valid code blocks for {label}")




In [3]:
import os
import random
import re
# You might need: pip install datasets pandas
from datasets import load_dataset
import pandas as pd

OUTPUT_FILE = "/data/language_classifier/code_classifier.train"

# Target Languages
TARGETS = {
    'python': ['Python'],
    'php': ['PHP'],
    'java': ['Java'],
    'javascript': ['JavaScript'],
    'go': ['Go'],
    'csharp': ['C#', 'C_sharp', 'C sharp'], # Handle variations
    'cpp': ['C++', 'Cpp']
}

def prepare_dataset():
    print("--- Phase 1: Loading Dataset from Hugging Face ---")
    # Using 'christopher/rosetta-code' which is a clean parquet dump
    try:
        ds = load_dataset("christopher/rosetta-code", split="train")
    except Exception as e:
        print(f"Error loading dataset: {e}")
        return

    print(f"Total raw rows: {len(ds)}")

    # Convert to Pandas for easier filtering
    df = ds.to_pandas()

    # 1. Normalize & Filter Languages
    # Create a reverse map: "C sharp" -> "csharp"
    name_map = {}
    for standard_name, aliases in TARGETS.items():
        for alias in aliases:
            name_map[alias.lower()] = standard_name

    # Filter function
    def normalize_lang(row_lang):
        if not isinstance(row_lang, str): return None
        return name_map.get(row_lang.strip().lower(), None)

    df['norm_lang'] = df['language_name'].apply(normalize_lang)
    df_clean = df.dropna(subset=['norm_lang'])

    # 2. Balancing (The min(tasks) logic)
    counts = df_clean['norm_lang'].value_counts()
    print("\n--- Raw Counts per Language ---")
    print(counts)

    if counts.empty:
        print("CRITICAL: No matching languages found. Check dataset labels.")
        return

    min_count = counts.min()
    print(f"\nConstraint: Limiting all languages to {min_count} samples to prevent bias.")

    final_data = []

    for lang in TARGETS.keys():
        # Get all rows for this language
        lang_rows = df_clean[df_clean['norm_lang'] == lang]

        # Sample min_count
        if len(lang_rows) >= min_count:
            sampled = lang_rows.sample(n=min_count, random_state=42)

            for _, row in sampled.iterrows():
                code = row['code']
                # FastText Formatting: One line per doc
                clean_code = code.replace('\n', ' ').replace('\r', '')
                if len(clean_code) > 10: # Skip tiny snippets
                    final_data.append(f"__label__{lang} {clean_code}")

    # 3. Save to File
    print(f"--- Phase 2: Saving {len(final_data)} samples to {OUTPUT_FILE} ---")
    random.shuffle(final_data) # Shuffle lines

    output_dir = os.path.dirname(OUTPUT_FILE)
    if output_dir and not os.path.exists(output_dir):
        os.makedirs(output_dir, exist_ok=True)

    with open(OUTPUT_FILE, 'w', encoding='utf-8') as f:
        f.write('\n'.join(final_data))

if __name__ == "__main__":
    if not os.path.exists(OUTPUT_FILE):
        prepare_dataset()
    else:
        print(f"{OUTPUT_FILE} already exists. Delete it to re-process.")

--- Phase 1: Loading Dataset from Hugging Face ---
Total raw rows: 79013

--- Raw Counts per Language ---
norm_lang
go            1172
python        1139
java          1004
cpp            964
csharp         839
javascript     702
php            440
Name: count, dtype: int64

Constraint: Limiting all languages to 440 samples to prevent bias.
--- Phase 2: Saving 3073 samples to /data/language_classifier/code_classifier.train ---


In [None]:
import pandas as pd

TRAIN_FILE = "/data/language_classifier/code_classifier.train"

rows = []

with open(TRAIN_FILE, "r", encoding="utf-8", errors="ignore") as f:
    for line in f:
        line = line.strip()
        if not line:
            continue

        # Split only on the FIRST space
        label, code = line.split(" ", 1)

        # Remove __label__ prefix
        label = label.replace("__label__", "")

        rows.append({
            "label": label,
            "code": code
        })

df = pd.DataFrame(rows)

df.head()   # preview first rows

         label                                               code
0          php  <?php   if (!$in = fopen('input.txt', 'r')) { ...
1         java  // Translation from https://en.wikipedia.org/w...
2          cpp  #include <boost/asio/ip/address.hpp> #include ...
3           go  package main   import (     "fmt"     "log"   ...
4           go  package main   import (     "fmt"     "log" ) ...
..         ...                                                ...
95        java  import java.math.BigInteger; import java.secur...
96  javascript           console.log(['apple', 'orange'].length);
97      python                              open(filename).read()
98      python  class BalancedTernary:     # Represented as a ...
99         php  class Fifo {   private $data = array();   publ...

[100 rows x 2 columns]
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3073 entries, 0 to 3072
Data columns (total 2 columns):
 #   Column  Non-Null Count  Dtype 
---  ------  --------------  ----- 
 0   l

In [5]:
X_train = df["code"]
y_train = df["label"]


In [6]:
print(y_train.value_counts())


label
go            440
csharp        440
java          439
javascript    439
python        439
cpp           438
php           438
Name: count, dtype: int64


In [8]:
from sklearn.feature_extraction.text import TfidfVectorizer

vectorizer = TfidfVectorizer(
    analyzer="char",
    ngram_range=(3, 8),
    min_df=3,
    max_df=0.95,
    sublinear_tf=True
)


In [9]:
X_train_tfidf = vectorizer.fit_transform(X_train)


In [10]:
from sklearn.naive_bayes import MultinomialNB

clf = MultinomialNB(alpha=0.1)
clf.fit(X_train_tfidf, y_train)


0,1,2
,alpha,0.1
,force_alpha,True
,fit_prior,True
,class_prior,


In [11]:
from pathlib import Path
import joblib
import pickle
import sys

models_dir = Path("models")
models_dir.mkdir(parents=True, exist_ok=True)

joblib_path = models_dir / "language_classifier.joblib"
pkl_path = models_dir / "language_classifier.pkl"

try:
    joblib.dump(clf, joblib_path)
    print(f"Saved clf to {joblib_path}")
except Exception as e:
    try:
        with open(pkl_path, "wb") as f:
            pickle.dump(clf, f)
        print(f"joblib failed ({e}); pickled clf to {pkl_path}")
    except Exception as e2:
        print(f"Failed to save clf: {e2}", file=sys.stderr)

Saved clf to models\language_classifier.joblib


## Process test set language

In [None]:
def classify_code_tfidf(code):
    processed = code.replace('\n', ' ').replace('\r', '')
    X = vectorizer.transform([processed])
    pred = clf.predict(X)[0].strip().lower()
    # print(pred)
    to_display = {
        "go": "go",
        "java": "java",
        "cpp": "cpp",
        "javascript": "javascript",
        "python": "python",
        "csharp": "c_sharp",
        "php": "php",
    }

    return to_display.get(pred, pred)

In [None]:
import os
import time
import pandas as pd

df_test_new = pd.read_parquet("data/semeval/processed/test_new.parquet")

BATCH = 5000
OUT_DIR = "data/semeval/processed/lang_batches"
os.makedirs(OUT_DIR, exist_ok=True)

total = len(df_test_new)
start = time.time()

for i in range(0, total, BATCH):
    j = min(i + BATCH, total)

    chunk = df_test_new.iloc[i:j].copy()
    chunk["language"] = chunk["code"].apply(classify_code_tfidf)

    out_file = os.path.join(OUT_DIR, f"batch_{i:07d}_{j-1:07d}.parquet")
    chunk.to_parquet(out_file, index=False)

    done = j
    pct = 100.0 * done / total
    elapsed = time.time() - start
    rate = done / elapsed if elapsed > 0 else 0.0
    eta = (total - done) / rate if rate > 0 else float("inf")

    print(f"[{done}/{total} | {pct:.2f}%] wrote {out_file} | {rate:.1f} rows/s | ETA {eta/60:.1f} min")

print("Done.")


In [None]:
import glob
import pandas as pd

files = sorted(glob.glob("/data/semeval/processed/lang_batches/*.parquet"))
df_test_lang = pd.concat([pd.read_parquet(f) for f in files], ignore_index=True)


In [None]:
df_test_lang.to_parquet("data/semeval/processed/df_test_new_with_lang.parquet", index = False)