# Classify a text document as True/False based on whether it contains gene sequence or not.

## Import libraries

In [1]:
import os
from typing import Union, Tuple, Optional, Set

In [2]:
from pypdf import PdfReader
from docx import Document

In [3]:
import torch

In [4]:
import pandas as pd
import numpy as np

In [5]:
import string
import random

In [6]:
from sklearn.model_selection import train_test_split

In [7]:
import numpy as np
import re
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report


In [8]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

In [9]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np

In [10]:
random.seed = 42

In [11]:
import pickle as pkl

## Predefined functions

In [20]:
def split_train_test(df):
    # Compute split index - train and test
    split_idx = int(len(df) * 0.85)  # first 85%
    
    # First 85% and remaining 15%
    train = df[:split_idx]
    test = df[split_idx:]
    return train,test

In [23]:
def shuffle_lists(list_of_lists):
    out = []
    for lst in list_of_lists:
        lst_copy = lst[:]            # avoid modifying original
        random.shuffle(lst_copy)     # shuffle in place
        out.append(lst_copy)
    return out

In [37]:
# generate random strings of text

CHARS = string.ascii_letters + string.digits + " .,!?$%&*#@/\n"

def random_garbage(n):
    return ''.join(random.choices(CHARS, k=n))

def generate_fake_content():
    prefix_len = random.randint(0, 5000)
    suffix_len = random.randint(0, 5000)

    prefix = random_garbage(prefix_len)
    suffix = random_garbage(suffix_len)

    return f"{prefix}{suffix}"

In [36]:
# A set of common bioinformatics/genomic file extensions.
# To identify files matching these extensions are classified as 'genomic data'.
GENOMIC_EXTENSION = [
    '.fastq', '.fq', '.fasta', '.fa', '.fna', '.gb', '.gff', '.gff3', '.gtf',
    '.sam', '.bam', '.cram', '.vcf', '.bcf', '.wig', '.bed', '.bigwig', '.tbi',
    '.tabix', '.h5', '.hdf5', # HDF5 often used for single-cell data (e.g., Anndata)
    ]

# A set of common document extensions for text extraction.
DOCUMENT_EXTENSIONS = ['.txt', '.pdf', '.doc', '.docx']

def analyze_file(file_path):
    """
    Analyzes a file path to determine if it is genomic data or a document,
    and returns the classification or the document content.

    Args:
        file_path: The full path to the file.

    Returns:
        A tuple (is_genomic_extension, file_extension, content_or_none_or_emptymessage).
    """
    if not os.path.exists(file_path):
        return (False, None, None)

    # Check for empty file before reading
    if os.path.getsize(file_path) == 0:
        return (False, None, "EMPTY FILE")

    # Get the file name and extension
    file_name = os.path.basename(file_path)
    _, ext = os.path.splitext(file_name)
    ext = ext.lower()

    # 1. Check for Genomic Data
    if ext in GENOMIC_EXTENSIONS:
        return (True, ext, None)

    # 2. Check for Document Data and Extract Content
    elif ext in DOCUMENT_EXTENSIONS:
        if ext == '.txt':
            try:
                with open(file_path, 'r', encoding='utf-8') as f:
                    content = f.read()
                if not content.strip():
                    return (False, ext, "Empty or non-text content")
                return (False, ext, content.strip())
            except Exception as e:
                return (False, ext, f"Could not read text file: {e}")

        elif ext == '.pdf':
            reader = PdfReader(file_path)
            content = "".join([page.extract_text() for page in reader.pages])
            if not content.strip():
                    return (False, ext, "Empty or non-text content")
            return (False, ext, content)

        elif ext in ['.doc', '.docx']:
            document = Document(file_path)
            content = "\n".join([paragraph.text for paragraph in document.paragraphs])
            if not content.strip():
                    return (False, ext, "Empty or non-text content")
            return (False, ext, content)

    # 3. Handle Unknown/Other Files
    return (False, ext, None)


In [37]:
# is_genomic, file_type, content = analyze_file("xyz.txt")

## Load data, prepare train and test data

In [52]:
pos_train=[]
pos_test=[]
neg_train=[]
neg_test=[]

In [53]:
p1=[]
p2=[]
n1=[]

path = "dataset_dna/english_and_seq"
for file_name in os.listdir(path):
    file_path = path + "/" + file_name
    if (not (file_name.endswith(".csv") | file_name.endswith(".txt"))):
        continue
    df = pd.read_csv(file_path)
    p1.append(df['sequence'].values[0])
    p2.append(df['text_with_dna'].values[0])
    n1.append(df['text_without_dna'].values[0])

p1_train, p1_test = split_train_test(p1)
p2_train, p2_test = split_train_test(p2)
n1_train, n1_test = split_train_test(n1)
# add to global lists
pos_train = pos_train + p1_train + p2_train
pos_test = pos_test + p1_test + p2_test
neg_train = neg_train + n1_train
neg_test = neg_test + n1_test
# shuffle global list after adding
pos_train, pos_test, neg_train, neg_test = shuffle_lists([pos_train, pos_test, neg_train, neg_test])


In [54]:
len(pos_train)+len(pos_test), len(neg_train)+len(neg_test)

(400, 200)

In [55]:
p1=[]
p2=[]

path = "dataset_dna/text_and_seq"
for file_name in os.listdir(path):
    file_path = path + "/" + file_name
    if (not (file_name.endswith(".csv") | file_name.endswith(".txt"))):
        continue
    df = pd.read_csv(file_path)
    p1.append(df['sequence'].values[0])
    p2.append(df['generated'].values[0])

p1_train, p1_test = split_train_test(p1)
p2_train, p2_test = split_train_test(p2)
# add to global lists
pos_train = pos_train + p1_train + p2_train
pos_test = pos_test + p1_test + p2_test
# shuffle global list after adding
pos_train, pos_test = shuffle_lists([pos_train, pos_test])


In [56]:
len(pos_train)+len(pos_test), len(neg_train)+len(neg_test)

(2400, 200)

In [57]:
p1=[]
p2=[]
n1=[]

path = "dataset_dna/text_and_seq_alternate_random_characters"
for file_name in os.listdir(path):
    file_path = path + "/" + file_name
    if (not (file_name.endswith(".csv") | file_name.endswith(".txt"))):
        continue
    df = pd.read_csv(file_path)
    p1.append(df['sequence'].values[0])
    p2.append(df['text_with_dna'].values[0])
    n1.append(df['text_without_dna'].values[0])

p1_train, p1_test = split_train_test(p1)
p2_train, p2_test = split_train_test(p2)
n1_train, n1_test = split_train_test(n1)
# add to global lists
pos_train = pos_train + p1_train + p2_train
pos_test = pos_test + p1_test + p2_test
neg_train = neg_train + n1_train
neg_test = neg_test + n1_test
# shuffle global list after adding
pos_train, pos_test, neg_train, neg_test = shuffle_lists([pos_train, pos_test, neg_train, neg_test])


In [58]:
p1=[]
p2=[]
p3=[]
n1=[]

path = "dataset_dna/text_and_seq_compressed"
for file_name in os.listdir(path):
    file_path = path + "/" + file_name
    if (not (file_name.endswith(".csv") | file_name.endswith(".txt"))):
        continue
    df = pd.read_csv(file_path)
    p1.append(df['sequence'].values[0])
    p2.append(df['compressed'].values[0])
    p3.append(df['text_with_dna'].values[0])
    n1.append(df['text_without_dna'].values[0])

p1_train, p1_test = split_train_test(p1)
p2_train, p2_test = split_train_test(p2)
p3_train, p3_test = split_train_test(p3)
n1_train, n1_test = split_train_test(n1)
# add to global lists
pos_train = pos_train + p1_train + p2_train + p3_train
pos_test = pos_test + p1_test + p2_test + p3_test
neg_train = neg_train + n1_train
neg_test = neg_test + n1_test
# shuffle global list after adding
pos_train, pos_test, neg_train, neg_test = shuffle_lists([pos_train, pos_test, neg_train, neg_test])


In [59]:
p1=[]
p2=[]
p3=[]
n1=[]

path = "dataset_dna/text_and_seq_compressed_and_replaced"
for file_name in os.listdir(path):
    file_path = path + "/" + file_name
    if (not (file_name.endswith(".csv") | file_name.endswith(".txt"))):
        continue
    df = pd.read_csv(file_path)
    p1.append(df['sequence'].values[0])
    p2.append(df['compressed'].values[0])
    p3.append(df['text_with_dna'].values[0])
    n1.append(df['text_without_dna'].values[0])

p1_train, p1_test = split_train_test(p1)
p2_train, p2_test = split_train_test(p2)
p3_train, p3_test = split_train_test(p3)
n1_train, n1_test = split_train_test(n1)
# add to global lists
pos_train = pos_train + p1_train + p2_train + p3_train
pos_test = pos_test + p1_test + p2_test + p3_test
neg_train = neg_train + n1_train
neg_test = neg_test + n1_test
# shuffle global list after adding
pos_train, pos_test, neg_train, neg_test = shuffle_lists([pos_train, pos_test, neg_train, neg_test])


In [60]:
p1=[]
p2=[]
n1=[]

path = "dataset_dna/text_and_seq_replaced_with_other_characters"
for file_name in os.listdir(path):
    file_path = path + "/" + file_name
    if (not (file_name.endswith(".csv") | file_name.endswith(".txt"))):
        continue
    df = pd.read_csv(file_path)
    p1.append(df['sequence'].values[0])
    p2.append(df['text_with_dna'].values[0])
    n1.append(df['text_without_dna'].values[0])

p1_train, p1_test = split_train_test(p1)
p2_train, p2_test = split_train_test(p2)
n1_train, n1_test = split_train_test(n1)
# add to global lists
pos_train = pos_train + p1_train + p2_train
pos_test = pos_test + p1_test + p2_test
neg_train = neg_train + n1_train
neg_test = neg_test + n1_test
# shuffle global list after adding
pos_train, pos_test, neg_train, neg_test = shuffle_lists([pos_train, pos_test, neg_train, neg_test])


In [61]:
n1 = []
for file in os.listdir('dataset_dna/single_dna'):
    try:
        with open('dataset_dna/single_dna/'+file, "r", encoding="utf-8", errors="ignore") as f:
            text = f.read()
    except:
        continue
    n1.append(text)
n1 = [s for s in n1 if len(s) <= 1000]
n1 = random.sample(n1, 100)
n1_train, n1_test = split_train_test(n1)
neg_train = neg_train + n1_train
neg_test = neg_test + n1_test
# shuffle global list after adding
neg_train, neg_test = shuffle_lists([neg_train, neg_test])

In [62]:
n1 = [] 

for i in range(3300):
    n1.append(generate_fake_content())

n1_train, n1_test = split_train_test(n1)
neg_train = neg_train + n1_train
neg_test = neg_test + n1_test
# shuffle global list after adding
neg_train, neg_test = shuffle_lists([neg_train, neg_test])

In [20]:
0.15*200 -> divide english dataset by 30(test) and 170(train)
0.15*1000 -> divide seq dataset by 150(test) and 850(train)

150.0

In [22]:
pos=[]
neg=[]

# first 100 in pos and neg dataset
path = "dataset_dna/english_and_seq"
for file_name in os.listdir(path):
    file_path = path + "/" + file_name
    df = pd.read_csv(file_path)
    pos.append(df['text_with_dna'].values[0])
    neg.append(df['text_without_dna'].values[0])

# Compute split index - train and test
split_idx = int(len(pos) * 0.85)  # first 85%

# First 85% and remaining 15%
pos_train = pos[:split_idx]
pos_test = pos[split_idx:]
neg_train = neg[:split_idx]
neg_test = neg[split_idx:]

In [29]:
positives_actual_dna=[] # 1000 pos
positives_dna_with_random_text=[] # 1000 pos
path = "dataset_dna/text_and_seq"
for file_name in os.listdir(path):
    file_path = path + "/" + file_name
    df = pd.read_csv(file_path)
    positives_actual_dna.append(df['sequence'].values[0])
    positives_dna_with_random_text.append(df['generated'].values[0])

# Compute split index - train and test
split_idx = int(len(positives_actual_dna) * 0.85)  # first 85%

# First 85% and remaining 15%
positives_actual_dna_train = positives_actual_dna[:split_idx]
positives_actual_dna_test = positives_actual_dna[split_idx:]
positives_dna_with_random_text_train = positives_dna_with_random_text[:split_idx]
positives_dna_with_random_text_test = positives_dna_with_random_text[split_idx:]

In [32]:
negatives_fake_dna=[] # 169 negative
path = "dataset_dna/single_dna"
for file_name in os.listdir(path):
    file_path = path + "/" + file_name
    try:
        with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
            text = f.read()
            char_count = len(text)
        if(char_count<=2000):
            negatives_fake_dna.append(text)
    except:
        continue

# Compute split index - train and test
split_idx = int(len(negatives_fake_dna) * 0.85)  # first 85%

# First 85% and remaining 15%
negatives_fake_dna_train = negatives_fake_dna[:split_idx]
negatives_fake_dna_test = negatives_fake_dna[split_idx:]

In [34]:
# train pos
print(len(pos_train) + len(positives_actual_dna_train) + len(positives_dna_with_random_text_train))
# test pos
print(len(pos_test) + len(positives_actual_dna_test) + len(positives_dna_with_random_text_test))
# train neg
print(len(neg_train) + len(negatives_fake_dna_train) + 1557)
# test neg
print(len(neg_test) + len(negatives_fake_dna_test) + 274)

1870
330
313
56


In [39]:
print(1870 - 313) # random text (neg) train
print(330 - 56) # random text (neg) test
print('total neg samples to be generated:',(1870 - 313)+(330 - 56))

1557
274
total neg samples to be generated: 1831


In [42]:
random_text_neg = [] 

for i in range(1831):
    random_text_neg.append(generate_fake_content())

In [46]:
# Compute split index - train and test
split_idx = int(len(random_text_neg) * 0.85)  # first 85%

# First 85% and remaining 15%
random_text_neg_train = random_text_neg[:split_idx]
random_text_neg_test = random_text_neg[split_idx:]

In [47]:
# train pos
train_pos = pos_train+positives_actual_dna_train+positives_dna_with_random_text_train
test_pos = pos_test+positives_actual_dna_test+positives_dna_with_random_text_test
train_neg = neg_train+negatives_fake_dna_train+random_text_neg_train 
test_neg=neg_test+negatives_fake_dna_test+random_text_neg_test 

In [48]:
len(train_pos), len(test_pos), len(train_neg), len(test_neg)

(1870, 330, 1869, 331)

### Build train and test set from lists

In [63]:
len(pos_train), len(pos_test), len(neg_train), len(neg_test)

(3740, 660, 3740, 660)

In [65]:
train_pos = pos_train
test_pos = pos_test
train_neg = neg_train
test_neg = neg_test

In [66]:
# TRAIN SET

random.shuffle(train_pos)
random.shuffle(train_neg)
# create pos dataframe
df_train_pos = pd.DataFrame(train_pos, columns=["data"])
df_train_pos["label"] = 1

# create neg dataframe
df_train_neg = pd.DataFrame(train_neg, columns=["data"])
df_train_neg["label"] = 0

# combine
df_combined = pd.concat([df_train_pos, df_train_neg], ignore_index=True)

#shuffle
df_shuffled = df_combined.sample(frac=1, random_state=42).reset_index(drop=True)
# df_shuffled.to_pickle('df_train.pkl')

X = df_shuffled["data"].values
y = df_shuffled["label"].values

X_train, X_val, y_train, y_val = train_test_split(X,y,
                     test_size=0.125,
                     random_state=42,
                     stratify=y)

In [71]:
# TEST SET

random.shuffle(test_pos)
random.shuffle(test_neg)
# create pos dataframe
df_test_pos = pd.DataFrame(test_pos, columns=["data"])
df_test_pos["label"] = 1

# create neg dataframe
df_test_neg = pd.DataFrame(test_neg, columns=["data"])
df_test_neg["label"] = 0

# combine
df_combined = pd.concat([df_test_pos, df_test_neg], ignore_index=True)

#shuffle
df_shuffled = df_combined.sample(frac=1, random_state=42).reset_index(drop=True)
# df_shuffled.to_pickle('df_test.pkl')

X_test = df_shuffled["data"].values
y_test = df_shuffled["label"].values

In [None]:
positive: 100(english), 500(actual dna), 600(text+actual dna)
negative: 100(english), 1000(text random), 100(fake dna sequences)

In [104]:
# all positive and negative samples

pos=[]
neg=[]

# first 100 in pos and neg dataset
path = "dataset_dna/english_and_seq"
for file_name in os.listdir(path):
    file_path = path + "/" + file_name
    df = pd.read_csv(file_path)
    pos.append(df['text_with_dna'].values[0])
    neg.append(df['text_without_dna'].values[0])

positives_actual_dna=[] # 1000 pos
positives_dna_with_random_text=[] # 1000 pos
path = "dataset_dna/text_and_seq"
for file_name in os.listdir(path):
    file_path = path + "/" + file_name
    df = pd.read_csv(file_path)
    positives_actual_dna.append(df['sequence'].values[0])
    positives_dna_with_random_text.append(df['generated'].values[0])

negatives_fake_dna=[] # 169 negative
path = "dataset_dna/single_dna"
for file_name in os.listdir(path):
    file_path = path + "/" + file_name
    try:
        with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
            text = f.read()
            char_count = len(text)
        if(char_count<=2000):
            negatives_fake_dna.append(text)
    except:
        continue

In [119]:
random_text_neg = [] #1000

for i in range(1000):
    random_text_neg.append(generate_fake_content())

In [131]:
p1 = random.sample(positives_actual_dna, 500) # 1000 pos, take 500 from here
p2 = random.sample(positives_dna_with_random_text, 600) # 1000 pos, take 600 from here

n1 = random.sample(random_text_neg, 1000) # take 1000 from here
n2 = random.sample(negatives_fake_dna, 100) # take 100 from here

pos = pos + p1 + p2
neg = neg + n1 + n2

In [133]:
random.shuffle(pos)
random.shuffle(neg)

In [142]:
# create pos dataframe
df_pos = pd.DataFrame(pos, columns=["data"])
df_pos["label"] = 1

# create neg dataframe
df_neg = pd.DataFrame(neg, columns=["data"])
df_neg["label"] = 0

# combine
df_combined = pd.concat([df_pos, df_neg], ignore_index=True)

#shuffle
df_shuffled = df_combined.sample(frac=1, random_state=42).reset_index(drop=True)

In [156]:
# Suppose df has columns: "data" and "label"
X = df_shuffled["data"].values
y = df_shuffled["label"].values

# First split: Train+Val vs Test (e.g., 20% test)
X_trainval, X_test, y_trainval, y_test = train_test_split(X, y, test_size=0.15, random_state=42, stratify=y)

# Second split: Train vs Val (e.g., 10% of original dataset = 12.5% of remaining)
X_train, X_val, y_train, y_val = train_test_split(X_trainval, y_trainval,
                     test_size=0.125,
                     random_state=42,
                     stratify=y_trainval)

print(len(X_train), len(X_val), len(X_test))

1785 255 360


## Training and Predictions - 1. Preprocessing + ML (Logistic Regression)

In [72]:

###############################################
# Preprocessing
###############################################

def preprocess_text(text):
    """
    Very minimal preprocessing:
    - Lowercase
    - Remove whitespace newlines
    We DO NOT filter characters, because DNA may be obfuscated.
    """
    return text.lower().replace("\n", " ").replace("\t", " ")


###############################################
# K-mer Extraction
###############################################

def get_kmers(text, k=3):
    """Extract k-mers without filtering characters."""
    kmers = []
    for i in range(len(text) - k + 1):
        kmers.append(text[i:i+k])
    return kmers


###############################################
# Entropy Calculation
###############################################

def shannon_entropy(text):
    """
    Compute Shannon entropy over all characters.
    """
    if len(text) == 0:
        return 0.0

    from math import log2
    freq = {}
    for c in text:
        freq[c] = freq.get(c, 0) + 1

    entropy = 0
    for c in freq:
        p = freq[c] / len(text)
        entropy -= p * log2(p)

    return entropy


###############################################
# Markov Transition Features
###############################################

def markov_features(text, alphabet):
    """
    Build a Markov transition probability vector from:
    P(next_char | current_char)

    The output is flattened row-major into a single vector.
    """
    if len(text) < 2:
        return np.zeros(len(alphabet) * len(alphabet))

    index = {c: i for i, c in enumerate(alphabet)}

    # Transition counts
    trans = np.zeros((len(alphabet), len(alphabet)))

    for a, b in zip(text[:-1], text[1:]):
        if a in index and b in index:
            trans[index[a], index[b]] += 1

    row_sums = trans.sum(axis=1, keepdims=True)
    row_sums[row_sums == 0] = 1  
    trans = trans / row_sums

    return trans.flatten()


###############################################
# Single-document Feature Extraction
###############################################

def extract_features_single(
    text,
    k,
    tfidf_vectorizer,
    alphabet,
    fit_tfidf=False
):
    """
    Extract TF-IDF + entropy + Markov vector.
    All vectors are guaranteed fixed length if:
    - TF-IDF was fit on the full training corpus
    - alphabet is global
    """
    pre = preprocess_text(text)
    kmers = get_kmers(pre, k)

    entropy = shannon_entropy(pre)
    markov_vec = markov_features(pre, alphabet)

    kmers_str = " ".join(kmers)

    if fit_tfidf:
        tfidf_vec = tfidf_vectorizer.fit_transform([kmers_str]).toarray()[0]
    else:
        tfidf_vec = tfidf_vectorizer.transform([kmers_str]).toarray()[0]

    return np.concatenate([tfidf_vec, [entropy], markov_vec])


###############################################
# TRAINING PIPELINE
###############################################

def train_pipeline_whole_docs(texts, labels, k=3):
    """
    Train using full documents (no windowing).
    Ensures fixed feature vector sizes for all samples.
    """
    processed_docs = [preprocess_text(t) for t in texts]

    # Build global alphabet across all training docs
    global_alphabet = sorted(list(set("".join(processed_docs))))

    # Build kmers for TF-IDF fitting
    kmers_docs = [" ".join(get_kmers(doc, k)) for doc in processed_docs]

    # Fit a single TF-IDF over ALL training documents
    tfidf_vec = TfidfVectorizer(analyzer="word")
    tfidf_vec.fit(kmers_docs)

    # Extract features
    feature_list = []
    for doc in processed_docs:
        f = extract_features_single(
            doc,
            k,
            tfidf_vec,
            global_alphabet,
            fit_tfidf=False
        )
        feature_list.append(f)

    X = np.vstack(feature_list)
    y = np.array(labels)

    # Standardize
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)

    # Train classifier
    clf = LogisticRegression(
        penalty="l2",
        C=1.0,
        solver="liblinear",
        max_iter=1000
    )
    clf.fit(X_scaled, y)

    return clf, scaler, tfidf_vec, global_alphabet


###############################################
# TESTING – WITH WINDOWING IF INPUT TOO BIG
###############################################

def classify_document(
    text,
    clf,
    scaler,
    tfidf_vec,
    alphabet,
    k=3,
    max_window_chars=20000
):
    """
    If document is short -> classify whole.
    If too long -> split into windows.
    If ANY window predicts 1 -> output 1 (DNA found).
    """
    if len(text) <= max_window_chars:
        # No windowing
        feats = extract_features_single(
            text, k, tfidf_vec, alphabet, fit_tfidf=False
        )
        feats = scaler.transform([feats])
        pred = clf.predict(feats)[0]
        return pred

    # WINDOWING
    windows = [
        text[i:i + max_window_chars]
        for i in range(0, len(text), max_window_chars)
    ]

    for w in windows:
        feats = extract_features_single(
            w, k, tfidf_vec, alphabet, fit_tfidf=False
        )
        feats = scaler.transform([feats])
        pred = clf.predict(feats)[0]
        if pred == 1:
            return 1

    return 0  # none of the windows detected DNA




In [94]:
print(len(X_train), len(X_val), len(X_test))

print("Start training...")

clf, scaler, tfidf_vec, alphabet = train_pipeline_whole_docs(X_train, y_train, k=3)

print("Training done.")

preds=[]

for test_doc in list(X_test):
    pred = classify_document(
    test_doc,
    clf,
    scaler,
    tfidf_vec,
    alphabet,
    k=3)
    preds.append(pred)

preds = np.array(preds, dtype=np.int64)
print("Predictions stored.")


6545 935 1320
Start training...
Training done.
Predictions stored.


In [92]:
# ### Test with single string
# text = 	"""
# catggggagctgggtaccgaggtgacaagacgcagggccgagattggtctgccgcgcactagggtacccgactcctattgctaactactctaagtttaggacccacgggaaactccaccaaggtcgctacatcagaggctaccaattatagtctccgatcgtcgaattctcataactcgtcaaaaacagttagctacccaatcagtaccgctagcttctatacaagacacaaattcaagatctgtattcggacaggttccaagtacccttgaagtacggaatctctagcgaacccaatttgtacgccgccaacaagtacttggtgcacagtaaaggcgagcccaattccgctaggccgacaattggccccggcctcacggcgacgtacctctgggcaaatctcatggcttctcgagatgcgtcccgggaattaagtcggtttaattactgacactgctgcttaccataagtaaacactatcacttaaatgaacgttaacg
# """
# pred = classify_document(
#     text,
#     clf,
#     scaler,
#     tfidf_vec,
#     alphabet,
#     k=3)
# print("Prediction:",pred)

Prediction: 0


### Test performance metrics

In [96]:
# With updated dataset (more adversarial cases)

y_pred = preds
acc  = accuracy_score(y_test, y_pred)
prec = precision_score(y_test, y_pred)
rec  = recall_score(y_test, y_pred)
f1   = f1_score(y_test, y_pred)

print("Accuracy :", acc)
print("Precision:", prec)
print("Recall   :", rec)
print("F1 Score :", f1)

Accuracy : 0.9651515151515152
Precision: 0.9904153354632588
Recall   : 0.9393939393939394
F1 Score : 0.9642301710730948


In [66]:
# With earlier dataset
y_pred = preds
acc  = accuracy_score(y_test, y_pred)
prec = precision_score(y_test, y_pred)
rec  = recall_score(y_test, y_pred)
f1   = f1_score(y_test, y_pred)

print("Accuracy :", acc)
print("Precision:", prec)
print("Recall   :", rec)
print("F1 Score :", f1)

Accuracy : 0.9878971255673222
Precision: 0.9908536585365854
Recall   : 0.9848484848484849
F1 Score : 0.9878419452887538


## Training and Predictions - 2. 1-dimensional CNN

In [180]:

#######################################
# 1️⃣ Preprocessing
#######################################
def preprocess_text(text):
    """
    Keep all characters (since DNA may be obfuscated),
    only remove whitespace.
    """
    return text.lower().replace("\n", "").replace("\t", "").replace(" ", "")

#######################################
# 2️⃣ Shannon entropy
#######################################
def shannon_entropy(text):
    if len(text) == 0:
        return 0.0
    from math import log2
    freq = {}
    for c in text:
        freq[c] = freq.get(c, 0) + 1
    entropy = 0
    for c in freq:
        p = freq[c] / len(text)
        entropy -= p * log2(p)
    return entropy

#######################################
# 3️⃣ Character Encoder (no LabelEncoder)
#######################################
class CharEncoder:
    """
    Maps characters to integer IDs.
    0 is reserved for padding.
    Unknown characters map to 0 at test time.
    """
    def __init__(self):
        self.char2id = {}
        self.id2char = {}
        self.fitted = False
        self.vocab_size = 0

    def fit(self, texts):
        chars = sorted(list(set("".join(texts))))
        # index 0 = padding
        self.char2id = {c:i+1 for i,c in enumerate(chars)}
        self.id2char = {i+1:c for i,c in enumerate(chars)}
        self.vocab_size = len(self.char2id) + 1  # +1 for padding
        self.fitted = True

    def transform(self, text):
        if not self.fitted:
            raise ValueError("CharEncoder not fitted.")
        # unknown characters mapped to 0
        return np.array([self.char2id.get(c, 0) for c in text], dtype=np.int64)

    def fit_transform(self, texts):
        self.fit(texts)
        return [self.transform(t) for t in texts]

#######################################
# 4️⃣ CNN model
#######################################
class DNASequenceCNN(nn.Module):
    def __init__(self, vocab_size, embed_dim=32, num_classes=2,
                 kernel_sizes=[3,5,7], num_filters=64):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0)

        self.convs = nn.ModuleList([
            nn.Conv1d(in_channels=embed_dim,
                      out_channels=num_filters,
                      kernel_size=k)
            for k in kernel_sizes
        ])

        self.fc_entropy = nn.Linear(1, 16)
        self.fc = nn.Linear(len(kernel_sizes)*num_filters + 16, num_classes)

    def forward(self, x, entropy):
        """
        x: [batch, seq_len] integer-encoded text
        entropy: [batch, 1] global numeric feature
        """
        emb = self.embedding(x)           # [B, L, E]
        emb = emb.transpose(1,2)          # [B, E, L]

        conv_outs = [F.relu(conv(emb)) for conv in self.convs]
        pooled = [F.adaptive_max_pool1d(c,1).squeeze(-1) for c in conv_outs]

        cnn_feat = torch.cat(pooled, dim=1)
        entropy_feat = F.relu(self.fc_entropy(entropy))
        feat = torch.cat([cnn_feat, entropy_feat], dim=1)

        return self.fc(feat)



In [None]:
# Documents (DNA-like + normal)
texts = X_train
labels = y_train

# Preprocess
texts_proc = [preprocess_text(t) for t in texts]

# Encode characters
char_encoder = CharEncoder()
sequences = char_encoder.fit_transform(texts_proc)

# Pad sequences manually
max_len = max(len(seq) for seq in sequences)
sequences_pad = [
    np.pad(seq, (0, max_len - len(seq)), constant_values=0)
    for seq in sequences
]

X = torch.tensor(sequences_pad, dtype=torch.long).to(torch.device("cpu"))
y = torch.tensor(labels, dtype=torch.long).to(torch.device("cpu"))

# Compute entropy feature
entropies = torch.tensor(
    [[shannon_entropy(t)] for t in texts_proc],
    dtype=torch.float
)

# Model
model = DNASequenceCNN(
    vocab_size=char_encoder.vocab_size,
    embed_dim=32,
    num_classes=2
)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Training loop
for epoch in range(50):
    optimizer.zero_grad()
    logits = model(X, entropies)
    loss = criterion(logits, y)
    loss.backward()
    optimizer.step()

print("Training complete.")

#################################
# Prediction on new document
#################################

preds_cnn = []
probs_cnn = []
for test_doc in list(X_test):
    with torch.no_grad():
        test_doc = preprocess_text(
            "YYZZXWWXYZXYZ some normal text"
        )
    
        # transform + pad safely
        test_seq = char_encoder.transform(test_doc)
        test_seq = np.pad(
            test_seq,
            (0, max(0, max_len - len(test_seq))),
            constant_values=0
        )
        test_seq = torch.tensor([test_seq], dtype=torch.long)
    
        test_entropy = torch.tensor(
            [[shannon_entropy(test_doc)]],
            dtype=torch.float
        )

        pred = model(test_seq, test_entropy)
        probs = F.softmax(pred, dim=1)
    preds_cnn.append(torch.argmax(pred, dim=1).item())
    probs_cnn.append(probs)
