In [1]:
import os
import json
import re
from typing import List, Dict

import numpy as np
import pandas as pd
from lxml import etree

from sklearn.model_selection import (
    train_test_split,
    StratifiedKFold,
    cross_val_score,
    GridSearchCV,
)
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.svm import LinearSVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import (
    accuracy_score,
    classification_report,
    confusion_matrix,
)


try:
    import fasttext
    import fasttext.util
    HAS_FASTTEXT = True
except ImportError:
    HAS_FASTTEXT = False
    print("⚠ fasttext not installed. FastText baseline will be skipped.")


In [2]:
DATA_PATH = r"C:\Users\ahmed\Downloads\aspect_extraction_and_classification\final_dataset.jsonl"
RANDOM_STATE = 42
TEST_SIZE = 0.2
WINDOW_SIZE = 5
DROP_CONFLICT = True
N_FOLDS = 5

In [3]:
def load_jsonl_aspects(path: str) -> List[Dict]:
    """
    Each line in the JSONL file is a dict:
    {
      "id": int,
      "sentence": str,
      "aspect_terms": [
          {"term": "...", "polarity": "...", "from": int, "to": int}
      ]
    }

    Returns a list of dicts in the same structure as load_semeval_xml().
    """
    data = []
    with open(path, "r", encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            obj = json.loads(line)

            sentence = obj["sentence"]
            aspects = []
            for asp in obj.get("aspect_terms", []):
                try:
                    aspects.append({
                        "term": asp["term"],
                        "polarity": asp["polarity"],
                        "from": int(asp["from"]),
                        "to": int(asp["to"]),
                    })
                except (KeyError, ValueError, TypeError):
                    continue

            data.append({
                "id": obj.get("id"),
                "text": sentence,
                "aspects": aspects,
            })

    return data


In [4]:
def load_data(path: str) -> List[Dict]:
    """
    Wrapper that chooses loader based on file extension.
    """
    ext = os.path.splitext(path)[1].lower()
    if ext in [".jsonl", ".json"]:
        print(f"Loading JSONL data from: {path}")
        return load_jsonl_aspects(path)
    else:
        raise ValueError(f"Unsupported file extension: {ext}")


In [5]:
def clean_text(text: str) -> str:
    """
    Simple text cleaning:
    - strip
    - lowercasing
    - remove HTML-like tags
    - normalize whitespace
    """
    text = text.strip()
    text = text.lower()
    text = re.sub(r"<[^>]+>", " ", text)
    text = re.sub(r"\s+", " ", text)
    return text


In [None]:
def char_to_token_window(text: str, start_char: int, end_char: int, window_size: int = 5) -> str:
    """
    Convert char offsets into a token-level window of ±window_size words
    around the aspect, and insert <ASP> ... </ASP> markers.
I love <asp> battery </asp>
    Uses the ORIGINAL text (not cleaned) to align with char offsets.
    """
    tokens = []
    starts = []

    for m in re.finditer(r"\S+", text):
        tokens.append(m.group())
        starts.append(m.start())

    aspect_start_tok = None
    aspect_end_tok = None

    for i, (tok, s) in enumerate(zip(tokens, starts)):
        e = s + len(tok)
        # aspect start char falls into this token
        if s <= start_char < e and aspect_start_tok is None:
            aspect_start_tok = i
        # aspect end char falls into this (or previous) token
        if s < end_char <= e:
            aspect_end_tok = i
            break

    if aspect_start_tok is None:
        # fallback: full sentence if mapping fails
        return text

    if aspect_end_tok is None:
        aspect_end_tok = aspect_start_tok

    left = max(0, aspect_start_tok - window_size)
    right = min(len(tokens), aspect_end_tok + 1 + window_size)

    window_tokens = tokens[left:right]

    rel_start = aspect_start_tok - left
    rel_end = aspect_end_tok - left

    # Insert tags around aspect span
    window_tokens.insert(rel_start, "<ASP>")
    window_tokens.insert(rel_end + 2, "</ASP>")  # +2 for the inserted <ASP>

    return " ".join(window_tokens)


In [7]:
def build_apc_dataset_with_windows(parsed_data: List[Dict],
                                   window_size: int = 5) -> pd.DataFrame:
    """
    Build a DataFrame with columns:
      - sentence: cleaned full sentence
      - sentence_raw: original sentence
      - aspect: cleaned aspect term
      - polarity: label (string)
      - window: aspect-centered window with <ASP> tags (cleaned)
      - input_full: aspect + [SEP] + full sentence (optional)
    """
    rows = []

    for item in parsed_data:
        raw_text = item["text"]
        if not raw_text:
            continue

        for asp in item["aspects"]:
            term = asp["term"]
            pol = asp["polarity"]
            start = asp["from"]
            end = asp["to"]

            if term is None or pol is None:
                continue

            # Raw and cleaned forms
            text_clean = clean_text(raw_text)
            aspect_clean = clean_text(term)

            # Window using original text for char offsets
            window_raw = char_to_token_window(raw_text, start, end, window_size=window_size)
            window_clean = clean_text(window_raw)

            input_full = f"{aspect_clean} [SEP] {text_clean}"

            rows.append({
                "sentence": text_clean,
                "sentence_raw": raw_text,
                "aspect": aspect_clean,
                "polarity": pol,
                "window": window_clean,
                "input_full": input_full,
            })

    df = pd.DataFrame(rows)
    return df

In [8]:
def build_vectorizer() -> FeatureUnion:
    """
    Build a FeatureUnion of:
      - word-level TF-IDF (1–2 grams)
      - char-level TF-IDF (3–5 grams)

    This will be used inside sklearn Pipelines so that
    vectorization happens INSIDE cross-validation folds.
    """
    word_tfidf = TfidfVectorizer(
        ngram_range=(1, 2),
        max_features=10000,
        sublinear_tf=True,
        analyzer="word"
    )

    char_tfidf = TfidfVectorizer(
        ngram_range=(3, 5),
        max_features=20000,
        sublinear_tf=True,
        analyzer="char"
    )

    vectorizer = FeatureUnion([
        ("word", word_tfidf),
        ("char", char_tfidf),
    ])

    return vectorizer


In [9]:
def get_candidate_pipelines(random_state: int = 42) -> Dict[str, Pipeline]:
    """
    Return a dict of candidate pipelines (vectorizer + classifier)
    for model selection using cross-validation.
    """
    vectorizer = build_vectorizer()

    pipelines = {
        "logreg": Pipeline([
            ("vectorizer", vectorizer),
            ("clf", LogisticRegression(
                max_iter=1000,
                C=1.0,
                class_weight=None,
                random_state=random_state
            ))
        ]),
        "svm": Pipeline([
            ("vectorizer", build_vectorizer()),
            ("clf", LinearSVC(
                C=1.0,
                random_state=random_state
            ))
        ]),
        "knn": Pipeline([
            ("vectorizer", build_vectorizer()),
            ("clf", KNeighborsClassifier(
                n_neighbors=5
            ))
        ]),
        "dt": Pipeline([
            ("vectorizer", build_vectorizer()),
            ("clf", DecisionTreeClassifier(
                max_depth=15,
                random_state=random_state
            ))
        ]),
    }

    return pipelines

In [10]:
def get_param_grid_for_model(model_name: str) -> Dict:
    """
    Define hyperparameter grid for the chosen best model.
    Used in GridSearchCV.
    """
    if model_name == "logreg":
        return {
            "clf__C": [0.01, 0.1, 1.0, 10.0],
            "clf__penalty": ["l2"],
            "clf__solver": ["liblinear", "lbfgs"],
        }
    elif model_name == "svm":
        return {
            "clf__C": [0.01, 0.1, 1.0, 10.0],
        }
    elif model_name == "knn":
        return {
            "clf__n_neighbors": [3, 5, 7, 9],
            "clf__weights": ["uniform", "distance"],
        }
    elif model_name == "dt":
        return {
            "clf__max_depth": [5, 10, 15, 20, None],
            "clf__min_samples_split": [2, 5, 10],
        }
    else:
        raise ValueError(f"No param grid defined for model: {model_name}")


In [12]:
def evaluate_models_cv(X_train, y_train, pipelines: Dict[str, Pipeline],
                       n_folds: int = 5, random_state: int = 42) -> pd.DataFrame:
    """
    Compare candidate models using Stratified K-Fold cross-validation.
    Returns a DataFrame with mean accuracy and std for each model.
    """
    cv = StratifiedKFold(
        n_splits=n_folds,
        shuffle=True,
        random_state=random_state
    )

    results = []
    for name, pipe in pipelines.items():
        print(f"\n=== CV Evaluation: {name} ===")
        scores = cross_val_score(
            pipe,
            X_train,
            y_train,
            cv=cv,
            scoring="accuracy",
            n_jobs=-1
        )
        print(f"Fold accuracies: {scores}")
        print(f"Mean: {scores.mean():.4f} | Std: {scores.std():.4f}")

        results.append({
            "model": name,
            "mean_accuracy": scores.mean(),
            "std_accuracy": scores.std(),
        })

    df_results = pd.DataFrame(results).sort_values("mean_accuracy", ascending=False)
    print("\n=== MODEL SELECTION (CV Results) ===")
    print(df_results)
    return df_results


def run_grid_search_on_best(model_name: str,
                            base_pipeline: Pipeline,
                            X_train,
                            y_train,
                            n_folds: int = 5,
                            random_state: int = 42) -> GridSearchCV:
    """
    Run GridSearchCV on the chosen best model pipeline to fine-tune hyperparameters.
    """
    param_grid = get_param_grid_for_model(model_name)
    cv = StratifiedKFold(
        n_splits=n_folds,
        shuffle=True,
        random_state=random_state
    )

    print(f"\n=== GRID SEARCH on best model: {model_name} ===")
    grid = GridSearchCV(
        estimator=base_pipeline,
        param_grid=param_grid,
        scoring="accuracy",
        cv=cv,
        n_jobs=-1,
        verbose=2
    )
    grid.fit(X_train, y_train)

    print(f"\nBest CV accuracy: {grid.best_score_:.4f}")
    print(f"Best params: {grid.best_params_}")
    return grid



In [13]:
def evaluate_on_test(best_model,
                     X_test,
                     y_test):
    """
    Final evaluation on the held-out test set.
    """
    print("\n=== FINAL TEST EVALUATION ===")
    y_pred = best_model.predict(X_test)

    acc = accuracy_score(y_test, y_pred)
    print(f"Test Accuracy: {acc:.4f}\n")

    print("Classification Report:")
    print(classification_report(y_test, y_pred, digits=4))

    print("Confusion Matrix:")
    print(confusion_matrix(y_test, y_pred))

    return acc


# ==========================
# OPTIONAL: FastText BASELINE
# ==========================

def load_fasttext_model(bin_path: str = "cc.en.300.bin"):
    if not os.path.exists(bin_path):
        print("Downloading FastText English model (cc.en.300.bin)...")
        fasttext.util.download_model("en", if_exists="ignore")  # creates cc.en.300.bin
    model = fasttext.load_model(bin_path)
    return model


def build_fasttext_matrix(texts, ft_model):
    vectors = []
    for t in texts:
        v = ft_model.get_sentence_vector(t)
        vectors.append(v)
    return np.vstack(vectors)


def fasttext_svm_baseline(X_train_texts, X_test_texts, y_train, y_test,
                          random_state: int = 42):
    """
    Simple baseline: FastText embeddings + LinearSVC.
    This is outside the main TF-IDF + grid search pipeline.
    """
    if not HAS_FASTTEXT:
        print("⚠ FastText not available. Skipping FastText baseline.")
        return None

    print("\n=== FastText + SVM Baseline ===")
    ft_model = load_fasttext_model()
    X_train_ft = build_fasttext_matrix(X_train_texts, ft_model)
    X_test_ft = build_fasttext_matrix(X_test_texts, ft_model)

    clf = LinearSVC(C=1.0, random_state=random_state)
    clf.fit(X_train_ft, y_train)

    y_pred = clf.predict(X_test_ft)
    acc = accuracy_score(y_test, y_pred)
    print(f"Test Accuracy (FastText + SVM): {acc:.4f}")
    print("Classification report (FastText + SVM):")
    print(classification_report(y_test, y_pred, digits=4))
    return acc


In [16]:

# 1. Load raw data
parsed = load_data(DATA_PATH)

# 2. Build aspect-based dataset with windows
df = build_apc_dataset_with_windows(parsed, window_size=WINDOW_SIZE)

if DROP_CONFLICT:
    df = df[df["polarity"] != "conflict"].reset_index(drop=True)

print(f"Total aspect instances after filtering: {len(df)}")
print(df.head())

# 3. Prepare inputs and labels
X_texts = df["window"].values           # main input
X_raw_for_ft = df["sentence_raw"].values
y = df["polarity"].values

# 4. Train-test split (held-out test for final evaluation)
X_train, X_test, y_train, y_test = train_test_split(
    X_texts,
    y,
    test_size=TEST_SIZE,
    random_state=RANDOM_STATE,
    stratify=y
)

X_train_raw, X_test_raw, _, _ = train_test_split(
    X_raw_for_ft,
    y,
    test_size=TEST_SIZE,
    random_state=RANDOM_STATE,
    stratify=y
)

# 5. Model selection via Stratified K-Fold CV
candidate_pipelines = get_candidate_pipelines(random_state=RANDOM_STATE)
cv_results = evaluate_models_cv(
    X_train,
    y_train,
    candidate_pipelines,
    n_folds=N_FOLDS,
    random_state=RANDOM_STATE
)

best_model_name = cv_results.iloc[0]["model"]
print(f"\n>>> Selected best base model: {best_model_name}")

best_base_pipeline = candidate_pipelines[best_model_name]

# 6. Grid search on the selected best model
grid = run_grid_search_on_best(
    best_model_name,
    best_base_pipeline,
    X_train,
    y_train,
    n_folds=N_FOLDS,
    random_state=RANDOM_STATE
)

best_model = grid.best_estimator_

# 7. Final evaluation on held-out test set
test_acc = evaluate_on_test(
    best_model,
    X_test,
    y_test
)

print(f"\nFinal Test Accuracy of tuned {best_model_name}: {test_acc:.4f}")

# 8. (Optional) FastText + SVM baseline
fasttext_svm_baseline(
    X_train_raw,
    X_test_raw,
    y_train,
    y_test,
    random_state=RANDOM_STATE
)

Loading JSONL data from: C:\Users\ahmed\Downloads\aspect_extraction_and_classification\final_dataset.jsonl
Total aspect instances after filtering: 20678
                                            sentence  \
0  this sound track was beautiful! it paints the ...   
1  this sound track was beautiful! it paints the ...   
2  this sound track was beautiful! it paints the ...   
3  this sound track was beautiful! it paints the ...   
4  this sound track was beautiful! it paints the ...   

                                        sentence_raw       aspect  polarity  \
0  This sound track was beautiful! It paints the ...  sound track  positive   
1  This sound track was beautiful! It paints the ...        music  positive   
2  This sound track was beautiful! It paints the ...      guitars  positive   
3  This sound track was beautiful! It paints the ...   orchestras  positive   
4  This sound track was beautiful! It paints the ...  keyboarding  negative   

                                   




Best CV accuracy: 0.7000
Best params: {'clf__C': 1.0, 'clf__penalty': 'l2', 'clf__solver': 'liblinear'}

=== FINAL TEST EVALUATION ===
Test Accuracy: 0.7108

Classification Report:
              precision    recall  f1-score   support

    negative     0.6944    0.8034    0.7449      1841
     neutral     0.6027    0.0913    0.1586       482
    positive     0.7331    0.7816    0.7565      1813

    accuracy                         0.7108      4136
   macro avg     0.6767    0.5587    0.5533      4136
weighted avg     0.7006    0.7108    0.6817      4136

Confusion Matrix:
[[1479   16  346]
 [ 268   44  170]
 [ 383   13 1417]]

Final Test Accuracy of tuned logreg: 0.7108

=== FastText + SVM Baseline ===
Test Accuracy (FastText + SVM): 0.7065
Classification report (FastText + SVM):
              precision    recall  f1-score   support

    negative     0.6850    0.8267    0.7492      1841
     neutral     0.7500    0.0373    0.0711       482
    positive     0.7312    0.7623    0.7464 

0.7064796905222437