<a href="https://colab.research.google.com/github/viky-01/plague-research/blob/main/Untitled0.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from datasets import load_dataset
import pandas as pd

# Step 1: Load the dataset from Hugging Face
print("🔄 Downloading dataset from Hugging Face...")
dataset = load_dataset("DaniilOr/CoDET-M4")

# Step 2: Convert the 'train' split to pandas DataFrame
print("📦 Converting to pandas DataFrame...")
df = dataset["train"].to_pandas()

# Step 3: Show some basic info
print("\n✅ Dataset Loaded Successfully!")
print(f"Total rows: {len(df)}")
print("Available columns:", df.columns.tolist())

# Step 4: Filter human and AI code
df_human = df[df["target"] == "human"]
df_ai = df[df["target"] == "machine"]

# Step 5: Save full and filtered datasets as CSV
print("💾 Saving files...")
df.to_csv("codet_m4_full.csv", index=False)
df_human.to_csv("codet_m4_human.csv", index=False)
df_ai.to_csv("codet_m4_ai.csv", index=False)

print("\n🎉 Done! Files saved:")
print(" - codet_m4_full.csv")
print(" - codet_m4_human.csv")
print(" - codet_m4_ai.csv")


🔄 Downloading dataset from Hugging Face...


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


README.md:   0%|          | 0.00/24.0 [00:00<?, ?B/s]

dataset_without_comments.parquet:   0%|          | 0.00/458M [00:00<?, ?B/s]

Generating train split:   0%|          | 0/500552 [00:00<?, ? examples/s]

📦 Converting to pandas DataFrame...

✅ Dataset Loaded Successfully!
Total rows: 500552
Available columns: ['code', 'language', 'model', 'split', 'target', 'source', 'features', 'cleaned_code', '__index_level_0__']
💾 Saving files...

🎉 Done! Files saved:
 - codet_m4_full.csv
 - codet_m4_human.csv
 - codet_m4_ai.csv


In [2]:
import pandas as pd
import os

# Step 1: Read the CSV
df = pd.read_csv("codet_m4_full.csv")

# Step 2: Clean rows with missing or float code
df = df[df["code"].notnull()]  # Remove NaN
df = df[df["code"].apply(lambda x: isinstance(x, str))]  # Only keep strings

# Step 3: Define root output folder
root_dir = "code_snippets"
os.makedirs(root_dir, exist_ok=True)

# Step 4: Loop and save code per language
for idx, row in df.iterrows():
    code = row["code"]
    lang = str(row["language"]).lower()
    target = row["target"]

    # Choose file extension
    ext = "txt"
    if lang == "python":
        ext = "py"
    elif lang == "java":
        ext = "java"
    elif lang in ["cpp", "c++"]:
        ext = "cpp"

    # Subfolder by language
    lang_dir = os.path.join(root_dir, lang)
    os.makedirs(lang_dir, exist_ok=True)

    # Create file
    filename = f"{target}_{idx}.{ext}"
    filepath = os.path.join(lang_dir, filename)

    try:
        with open(filepath, "w", encoding="utf-8") as f:
            f.write(str(code))  # Ensure it's a string
    except Exception as e:
        print(f"❌ Error writing {filename}: {e}")

print(f"✅ Done! Files stored in: {root_dir}/language/")


✅ Done! Files stored in: code_snippets/language/


In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [5]:
import os
import re
import math
import csv
import multiprocessing as mp
from collections import Counter
from functools import partial

import pandas as pd  # kept for parity, but writing is done via csv for memory efficiency

# ============== Precompiled Regex & Globals ==============

# Tokenizer: order matters (multi-char operators first)
TOKEN_PATTERN = re.compile(
    r'(?:==|!=|<=|>=|&&|\|\||<<|>>|::|->|\+\+|--|\+=|-=|\*=|/=|%=|&=|\|=|\^=)'
    r'|(?:\b0x[0-9A-Fa-f]+\b)'
    r'|(?:\b\d+\b)'
    r'|(?:\"(?:\\.|[^\"\\])*\")'       # double-quoted strings (naive but fine)
    r'|(?:\'(?:\\.|[^\'\\])+\')'       # char literals (naive)
    r'|(?:[A-Za-z_]\w*)'
    r'|(?:[^\s])'
)

INCLUDE_PATTERN = re.compile(r'#include\s*[<"]([^>"]+)[>"]')
COMMENT_LINE_PATTERN = re.compile(r'^\s*(//|/\*|\*)')
LEADING_WS_PATTERN = re.compile(r'^([ \t]+)')
IDENTIFIER_PATTERN = re.compile(r'^[A-Za-z_]\w*$')
FUNC_DEF_PATTERN = re.compile(r'\b[A-Za-z_]\w*\s+[A-Za-z_]\w*\s*\([^)]*\)\s*{')
FUNC_CALL_PATTERN = re.compile(r'\b([A-Za-z_]\w*)\s*\(')
ALL_IDS_PATTERN = re.compile(r'\b[A-Za-z_]\w*\b')
VAR_DECL_PATTERN = re.compile(
    r'\b(?:int|float|double|char|string|bool|long|short|unsigned|signed)\s+([A-Za-z_]\w*)'
)
ERROR_TOKENS_PATTERN = re.compile(r'\b(try|catch|throw)\b')

# C++ keywords (not exhaustive, but solid coverage)
CPP_KEYWORDS = {
    "alignas","alignof","and","and_eq","asm","atomic_cancel","atomic_commit","atomic_noexcept",
    "auto","bitand","bitor","bool","break","case","catch","char","char8_t","char16_t","char32_t",
    "class","compl","concept","const","consteval","constexpr","constinit","const_cast","continue",
    "co_await","co_return","co_yield","decltype","default","delete","do","double","dynamic_cast",
    "else","enum","explicit","export","extern","false","float","for","friend","goto","if","inline",
    "int","long","mutable","namespace","new","noexcept","not","not_eq","nullptr","operator","or",
    "or_eq","private","protected","public","register","reinterpret_cast","requires","return","short",
    "signed","sizeof","static","static_assert","static_cast","struct","switch","template","this",
    "thread_local","throw","true","try","typedef","typeid","typename","union","unsigned","using",
    "virtual","void","volatile","wchar_t","while","xor","xor_eq"
}

# Operators we track (tokenizer already splits multi-char ones as single tokens)
OPERATOR_TOKENS = {
    "+","-","*","/","%","=","==","!=","<",">","<=",">=","&&","||","!","&","|","^","~","<<",">>",
    "::","->","++","--","+=","-=","*=","/=","%=","&=","|=","^="
}

PUNCTUATION_CHARS = "{};,.()"

# ============== Utilities ==============

def shannon_entropy(seq):
    if not seq:
        return 0.0
    c = Counter(seq)
    n = len(seq)
    return -sum((cnt/n) * math.log2(cnt/n) for cnt in c.values())

def tokenize_code(code):
    return TOKEN_PATTERN.findall(code)

def is_camel_case(name):
    return bool(re.match(r'^[a-z]+[A-Z][a-zA-Z0-9]*$', name))

def is_snake_case(name):
    return bool(re.match(r'^[a-z]+(_[a-z0-9]+)+$', name))

# ============== Feature Functions ==============

def cyclomatic_complexity(code):
    # Count decision points; escape regex metas
    # Keywords: if, for, while, case, catch plus operators ?, &&, ||
    count = 0
    for kw in ("if","for","while","case","catch"):
        count += len(re.findall(rf"\b{kw}\b", code))
    for sym in ("?", "&&", "||"):
        count += len(re.findall(re.escape(sym), code))
    return 1 + count

def max_nesting_depth(code):
    depth = 0
    max_depth = 0
    for ch in code:
        if ch == "{":
            depth += 1
            if depth > max_depth:
                max_depth = depth
        elif ch == "}":
            depth -= 1
    return max_depth

def api_call_variety(code):
    calls = FUNC_CALL_PATTERN.findall(code)
    calls = [c for c in calls if c not in CPP_KEYWORDS]
    return len(set(calls))

def include_diversity(code):
    includes = INCLUDE_PATTERN.findall(code)
    return len(set(includes))

def unused_var_func_ratio(code):
    var_decl = VAR_DECL_PATTERN.findall(code)
    all_ids = ALL_IDS_PATTERN.findall(code)
    counts = Counter(all_ids)
    # If an identifier appears exactly once (declaration only), call it unused.
    unused = [v for v in var_decl if counts[v] == 1]
    func_defs = FUNC_DEF_PATTERN.findall(code)
    func_count = len(func_defs) if func_defs else 1
    return len(unused) / func_count

def error_handling_ratio(code, total_lines):
    errors = len(ERROR_TOKENS_PATTERN.findall(code))
    return errors / (total_lines + 1)

def naming_meaningfulness_score(identifiers):
    if not identifiers:
        return 0.0
    # simple proxy: longer names -> more likely meaningful
    return sum(len(i) for i in identifiers) / len(identifiers)

# ---- Halstead metrics (bonus) ----
def halstead_metrics(tokens):
    # Separate operators/operands roughly using OPERATOR_TOKENS and everything else
    operators = [t for t in tokens if t in OPERATOR_TOKENS]
    # treat identifiers, numbers, strings, chars as operands
    operands = [t for t in tokens if t not in OPERATOR_TOKENS and not t.isspace()]

    n1 = len(set(operators))
    n2 = len(set(operands))
    N1 = len(operators)
    N2 = len(operands)

    vocabulary = n1 + n2
    length = N1 + N2
    volume = length * math.log2(vocabulary) if vocabulary > 0 else 0.0
    difficulty = (n1/2) * (N2/n2) if n2 > 0 else 0.0
    effort = difficulty * volume
    return {
        "halstead_n1": n1, "halstead_n2": n2, "halstead_N1": N1, "halstead_N2": N2,
        "halstead_vocabulary": vocabulary, "halstead_length": length,
        "halstead_volume": volume, "halstead_difficulty": difficulty, "halstead_effort": effort
    }

# ============== Core extractor ==============

def extract_features(code):
    lines = code.splitlines()
    total_lines = len(lines)
    total_chars_incl = len(code)
    total_chars_excl = len(code.replace(" ", "").replace("\t", "").replace("\n", ""))
    words = code.split()
    total_words = len(words)
    longest_word_len = max((len(w) for w in words), default=0)
    avg_line_length = (total_chars_incl / total_lines) if total_lines else 0.0

    space_count = sum(1 for line in lines if line.startswith(" "))
    tab_count = sum(1 for line in lines if line.startswith("\t"))
    indentation_style_ratio = space_count / (tab_count + 1)

    depths = []
    for line in lines:
        m = LEADING_WS_PATTERN.match(line)
        if m:
            depth = m.group(1).count(" ") + 4 * m.group(1).count("\t")
        else:
            depth = 0
        depths.append(depth)
    avg_indentation_depth = (sum(depths) / total_lines) if total_lines else 0.0

    blank_lines = sum(1 for line in lines if not line.strip())
    blank_line_ratio = (blank_lines / total_lines) if total_lines else 0.0

    tokens = tokenize_code(code)
    token_counts = Counter(tokens)
    avg_token_length = (sum(len(t) for t in tokens) / len(tokens)) if tokens else 0.0
    longest_token_length = max((len(t) for t in tokens), default=0)

    identifiers = [t for t in tokens if IDENTIFIER_PATTERN.match(t) and t not in CPP_KEYWORDS]
    keywords = [t for t in tokens if t in CPP_KEYWORDS]
    keyword_to_identifier_ratio = len(keywords) / (len(identifiers) + 1)
    unique_identifier_count = len(set(identifiers))
    camel_case_count = sum(1 for i in identifiers if is_camel_case(i))
    snake_case_count = sum(1 for i in identifiers if is_snake_case(i))
    camel_to_snake_ratio = camel_case_count / (snake_case_count + 1)

    comment_lines = sum(1 for line in lines if COMMENT_LINE_PATTERN.match(line))
    comment_density = (comment_lines / total_lines) if total_lines else 0.0

    whitespace_ratio = code.count(" ") / (total_chars_incl + 1)
    punctuation_count = sum(code.count(ch) for ch in PUNCTUATION_CHARS)
    punctuation_freq_per_100_tokens = punctuation_count / (len(tokens) + 1) * 100

    operator_count = sum(1 for t in tokens if t in OPERATOR_TOKENS)
    operator_density = operator_count / (total_lines + 1)

    # literals: integers, hex ints, strings, char literals (already tokenized as single tokens)
    literal_count = sum(1 for t in tokens if (
        t.startswith('"') and t.endswith('"')) or
        (t.startswith("'") and t.endswith("'")) or
        re.fullmatch(r'\b0x[0-9A-Fa-f]+\b', t) or
        re.fullmatch(r'\b\d+\b', t)
    )
    literal_density = literal_count / (len(tokens) + 1)

    repetition_score = sum(v > 1 for v in token_counts.values()) / (len(token_counts) + 1)

    token_entropy = shannon_entropy(tokens)
    char_entropy = shannon_entropy(list(code))

    cyclo = cyclomatic_complexity(code)
    max_depth = max_nesting_depth(code)
    api_variety = api_call_variety(code)
    includes = include_diversity(code)
    unused_ratio = unused_var_func_ratio(code)
    error_ratio = error_handling_ratio(code, total_lines)
    naming_score = naming_meaningfulness_score(identifiers)
    halstead = halstead_metrics(tokens)

    features = {
        "total_words": total_words,
        "total_chars_including_ws": total_chars_incl,
        "total_chars_excluding_ws": total_chars_excl,
        "longest_word_length": longest_word_len,
        "avg_line_length": avg_line_length,
        "total_lines": total_lines,
        "indentation_spaces_vs_tabs_ratio": indentation_style_ratio,
        "avg_indentation_depth": avg_indentation_depth,
        "blank_line_ratio": blank_line_ratio,

        "avg_token_length": avg_token_length,
        "longest_token_length": longest_token_length,
        "keyword_to_identifier_ratio": keyword_to_identifier_ratio,
        "unique_identifier_count": unique_identifier_count,
        "camel_to_snake_ratio": camel_to_snake_ratio,

        "comment_lines": comment_lines,
        "comment_density": comment_density,

        "whitespace_ratio": whitespace_ratio,
        "punctuation_freq_per_100_tokens": punctuation_freq_per_100_tokens,
        "operator_density": operator_density,
        "literal_density": literal_density,
        "repetition_score": repetition_score,
        "token_entropy": token_entropy,
        "char_entropy": char_entropy,

        "cyclomatic_complexity": cyclo,
        "max_nesting_depth": max_depth,
        "api_call_variety": api_variety,
        "include_diversity": includes,
        "unused_var_func_ratio": unused_ratio,
        "error_handling_ratio": error_ratio,
        "naming_meaningfulness_score": naming_score,
    }
    features.update(halstead)
    return features

# ============== Parallel file processing ==============

def process_file(filepath):
    try:
        with open(filepath, "r", encoding="utf-8", errors="ignore") as f:
            code = f.read()
        feats = extract_features(code)
        feats["filename"] = os.path.basename(filepath)
        feats["relpath"] = filepath
        return feats
    except Exception as e:
        # Return a minimal row noting the error (so we don't silently drop files)
        return {"filename": os.path.basename(filepath), "relpath": filepath, "error": str(e)}

# ============== Main ==============

if __name__ == "__main__":
    # Change this to the root folder of your dataset
    DATASET_DIR = "/content"  # e.g., your base path
    OUTPUT_CSV = "cpp_code_features.csv"

    all_cpp_files = []
    for root, _, files in os.walk(DATASET_DIR):
        for f in files:
            if f.endswith(".cpp"):
                all_cpp_files.append(os.path.join(root, f))

    print(f"Found {len(all_cpp_files)} C++ files.")

    # Parallel map
    cpu_cnt = max(1, (os.cpu_count() or 2) - 1)
    chunk = 1000  # how many files to write per flush

    # process in parallel, stream to CSV
    fieldnames = None
    processed = 0
    written_header = False

    with mp.get_context("fork" if hasattr(os, "fork") else "spawn").Pool(processes=cpu_cnt) as pool, \
         open(OUTPUT_CSV, "w", newline="", encoding="utf-8") as out_f:

        writer = None

        for batch_start in range(0, len(all_cpp_files), chunk):
            batch_paths = all_cpp_files[batch_start: batch_start + chunk]
            for row in pool.imap_unordered(process_file, batch_paths, chunksize=32):
                if fieldnames is None:
                    # establish header order from first full row
                    fieldnames = list(row.keys())
                    # ensure common keys come first
                    preferred = ["filename", "relpath", "error"]
                    rest = [k for k in fieldnames if k not in preferred]
                    fieldnames = preferred + sorted(rest)
                    writer = csv.DictWriter(out_f, fieldnames=fieldnames)
                    writer.writeheader()
                    written_header = True

                # Fill missing keys so DictWriter doesn't choke
                for k in fieldnames:
                    if k not in row:
                        row[k] = ""
                writer.writerow(row)
                processed += 1

                if processed % 5000 == 0:
                    print(f"Processed {processed}/{len(all_cpp_files)} files...")

    if not written_header:
        # No files found/written; create an empty CSV with standard header set
        cols = ["filename", "relpath", "error"]
        pd.DataFrame(columns=cols).to_csv(OUTPUT_CSV, index=False)

    print(f"✅ Features saved to {OUTPUT_CSV}")


Found 141195 C++ files.
Processed 5000/141195 files...
Processed 10000/141195 files...
Processed 15000/141195 files...
Processed 20000/141195 files...
Processed 25000/141195 files...
Processed 30000/141195 files...
Processed 35000/141195 files...
Processed 40000/141195 files...
Processed 45000/141195 files...
Processed 50000/141195 files...
Processed 55000/141195 files...
Processed 60000/141195 files...
Processed 65000/141195 files...
Processed 70000/141195 files...
Processed 75000/141195 files...
Processed 80000/141195 files...
Processed 85000/141195 files...
Processed 90000/141195 files...
Processed 95000/141195 files...
Processed 100000/141195 files...
Processed 105000/141195 files...
Processed 110000/141195 files...
Processed 115000/141195 files...
Processed 120000/141195 files...
Processed 125000/141195 files...
Processed 130000/141195 files...
Processed 135000/141195 files...
Processed 140000/141195 files...
✅ Features saved to cpp_code_features.csv
