<a href="https://colab.research.google.com/github/mattdani21/Jup/blob/main/Final_word2vec.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Part 1: Initial Setup

**Sequence of Events**  
1. Import all necessary libraries for Spark, data processing, feature engineering, modeling, and evaluation.  
2. Initialize a Spark session running locally on all cores, named “ImprovedClassification.”

## Explanation
	•	Imports cover Spark SQL & ML, UDFs, NumPy, scikit-learn’s dataset loader, Pandas, and plotting libs.
	•	The Spark session line boots up a local Spark cluster (using all CPU cores) for distributed data processing and ML.

In [8]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lower, regexp_replace, split, col, size
from pyspark.ml.feature import StopWordsRemover, Word2Vec
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
import numpy as np
from sklearn.datasets import fetch_20newsgroups
import pandas as pd

# Initialize Spark
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("ImprovedClassification") \
    .getOrCreate()

# Function Definitions for Preprocessing, Embedding, and Classification

This cell defines **all** of the core functions that power your end-to-end text-classification pipeline. By the end of this cell you will have:

1. **Data loading & filtering**  
2. **Text cleaning & tokenization**  
3. **Word2Vec embedding training**  
4. **Document vectorization (TF-weighted average of embeddings)**  
5. **Model training** (Logistic Regression & Random Forest)  
6. **Evaluation utilities**  
7. **Single-string prediction helper**  

---


In [12]:
# ─────────────────────────────────────────────────────────────
# Cell 2: Function definitions for preprocessing, embedding, classification
# ─────────────────────────────────────────────────────────────

# Fetches five distinct 20 Newsgroups categories.
# Removes headers, footers, quotes for cleaner text.
# Filters out very short documents (< 10 words).
# Wraps the result in a Spark DataFrame.

def load_and_preprocess_data():
    """Load the 20newsgroups dataset, filter short docs, convert to Spark."""
    categories = [
        'alt.atheism','comp.graphics',
        'rec.motorcycles','sci.space','talk.politics.guns'
    ]
    newsgroups = fetch_20newsgroups(
        subset='all', categories=categories,
        remove=('headers','footers','quotes'),
        shuffle=True, random_state=42 # ensures the same train/test split order every time you run—crucial for reproducible experiments.
    )
    pdf = pd.DataFrame({'text': newsgroups.data, 'label': newsgroups.target})
    pdf = pdf[pdf['text'].str.split().str.len() >= 10] # Filtering <10 words avoids near-empty docs that can skew your Word2Vec and model training.
    sdf = spark.createDataFrame(pdf)
    print(f"Loaded {sdf.count()} docs; classes = {newsgroups.target_names}")
    return sdf, newsgroups.target_names

#	Regex clean: strip URLs, email addresses, numbers, punctuation.
# Lowercase & split: turn each cleaned string into a list of tokens.
# Stop-word removal: drop common filler words (“and”, “the”, etc.).
# Document pruning: remove docs with fewer than 5 tokens after cleaning.
# Chaining multiple regexp_replace calls is efficient in Spark, keeping all cleaning in one pass.

def advanced_text_preprocessing(df):
    """Clean text (URLs, emails, numbers, punctuation), tokenize, remove stop words."""
    df_clean = df.select(
        'label',
        regexp_replace(
          regexp_replace(
            regexp_replace(
              regexp_replace('text', r'http\S+|www\S+', ''),  # remove URLs
            r'\S+@\S+', ''),                                  # remove emails
          r'\d+', ''),                                       # remove numbers
        r'[^\w\s]', ' ').alias('cleaned_text')               # remove punctuation
    )
    df_tokens = df_clean.select(
        'label',
        split(lower(col('cleaned_text')), r'\s+').alias('tokens')
    )
    remover = StopWordsRemover(inputCol="tokens", outputCol="tokens_no_stop")
    df_no_stop = remover.transform(df_tokens)
    df_final = df_no_stop.filter(size(col('tokens_no_stop')) >= 5)
    print(f"After preprocessing: {df_final.count()} docs")
    return df_final.select(col('label'), col('tokens_no_stop').alias('tokens'))

# Trains a Word2Vec model on your cleaned tokens with tuned hyperparameters.
# A larger vectorSize captures richer semantic information but increases compute cost.
# minCount=3 helps omit rare misspellings or noise.
# numPartitions should match your local/cluster core count for best throughput.

def train_improved_word2vec(df_processed):
    """Train Word2Vec on tokens with tuned hyperparameters."""
    w2v = Word2Vec(
        vectorSize=200, minCount=3,
        numPartitions=4, stepSize=0.05,
        maxIter=5, windowSize=7,
        inputCol="tokens", outputCol="word_vectors"
    )
    print("Training Word2Vec...")
    model = w2v.fit(df_processed)
    print(f"  Vocabulary size = {model.getVectors().count()}")
    return model

#	Broadcasts the trained word embeddings to all executors.
#	Defines a UDF tfidf_avg that computes a TF-normalized average of each document’s word vectors.
#	Filters out any documents that end up with a zero or null feature vector.
#	Broadcast avoids re-shipping the full embedding table on each task.
#	We use normalized TF (count/total_tokens) rather than raw counts so longer docs don’t dominate purely by length.
#	Returning a zero vector for unseen words ensures the UDF never breaks but filters those docs out later.

def create_document_vectors_improved(df_processed, w2v_model):
    """TF-normalized average of each doc’s word vectors."""
    vocab = {r['word']: r['vector'] for r in w2v_model.getVectors().collect()}
    bc = spark.sparkContext.broadcast(vocab)
    def tfidf_avg(tokens):
        if not tokens:
            return Vectors.dense([0.0]*200)
        freq = {}
        for t in tokens:
            freq[t] = freq.get(t,0) + 1
        total = len(tokens)
        vecs, wts = [], []
        for w, cnt in freq.items():
            if w in bc.value:
                vecs.append(bc.value[w])
                wts.append(cnt/total)
        if not vecs:
            return Vectors.dense([0.0]*200)
        arr, wts = np.array(vecs), np.array(wts)
        wts = wts / wts.sum()
        avg = np.average(arr, axis=0, weights=wts)
        return Vectors.dense(avg)
    vectorize_udf = udf(tfidf_avg, VectorUDT())
    df_feats = df_processed.withColumn("features", vectorize_udf(col("tokens")))
    df_feats = df_feats.filter(col("features").isNotNull())
    print(f"Created vectors for {df_feats.count()} docs")
    return df_feats.select('label', 'features')

#	Splits your data 80/20 into train and test sets using randomSplit([.8, .2], seed=42).
#	Trains two models:
   # Logistic Regression with L2 (regParam=0.01) + some L1 (elasticNetParam=0.1) regularization.
   # Random Forest with 50 trees, max depth 10 (seeded for reproducibility).
#	The fixed seed=42 ensures you always get the same train/test partition.
#	standardization=True in LR scales your features to mean 0, SD 1; important when mixing TF-averaged vectors.
def train_improved_classifier(df_features):
    """Train Logistic Regression & Random Forest."""
    train_df, test_df = df_features.randomSplit([0.8,0.2], seed=42)
    print(f"Train/test sizes = {train_df.count()}/{test_df.count()}")

    lr = LogisticRegression(
        featuresCol='features', labelCol='label',
        maxIter=100, regParam=0.01,
        elasticNetParam=0.1, standardization=True
    )
    print("Fitting Logistic Regression...")
    lr_model = lr.fit(train_df)

    rf = RandomForestClassifier(
        featuresCol='features', labelCol='label',
        numTrees=50, maxDepth=10, seed=42
    )
    print("Fitting Random Forest...")
    rf_model = rf.fit(train_df)

    return lr_model, rf_model, train_df, test_df

#	For each model, computes:
  #	Accuracy
  #	F1 score
  #	Weighted precision
  # Weighted recall
#	Displays a few sample predictions for manual inspection.

#	Using the same evaluator for all metrics simplifies the code.
#	Weighted metrics account for class imbalances by averaging per-class scores weighted by support.
def evaluate_models(models, test_df, target_names):
    """Print accuracy, F1, precision, recall, plus sample predictions."""
    for name, mdl in models.items():
        print(f"\n=== {name} ===")
        preds = mdl.transform(test_df)
        for m in ['accuracy','f1','weightedPrecision','weightedRecall']:
            ev = MulticlassClassificationEvaluator(
                labelCol='label', predictionCol='prediction', metricName=m
            )
            print(f"{m}: {ev.evaluate(preds):.4f}")
        preds.select('label','prediction').show(5)

#	Wraps a single raw string in a one-row DataFrame.
# Applies the same preprocessing → vectorization pipeline.
#	Runs your chosen classifier and returns the human-readable category name.
def predict_new_text(
    raw_text: str,
    w2v_model,
    classifier_model,
    preprocess_fn,
    vectorize_fn,
    target_names: list
) -> str:
    """Preprocess → vectorize → predict a single string."""
    from pyspark.sql import Row
    df = spark.createDataFrame([Row(text=raw_text, label=0)])
    toks = preprocess_fn(df)
    feats = vectorize_fn(toks, w2v_model)
    pred = classifier_model.transform(feats).select("prediction").first()[0]
    return target_names[int(pred)]

#	The dummy label=0 is never used but required to satisfy the schema.
#	By reusing your existing functions, this helper guarantees consistent text handling between training and inference.
#	Consider adding a fallback or probability threshold check to handle extremely short or out-of-vocabulary inputs.


**This cell bundles together all of your data-prep, embedding, modeling, and inference steps into reusable functions.  By structuring your code this way, you achieve:**

	- Modularity: Easily tweak one component (e.g. try a different embedding or classifier) without rewriting the entire notebook.
	- Reproducibility: Seeds (42) and consistent cleaning/tokenization ensure experiments can be reliably rerun.
	- Scalability: Spark’s distributed operations (broadcasts, UDFs, ML pipelines) let you handle much larger corpora with minimal changes.

# Cell 3: Main Pipeline Runner & Expose Models for Interactive Use

This cell ties together all of your previously defined functions into a single execution flow—and ensures the trained models and label list live in your notebook namespace for later use (e.g. interactive widgets).

---

## What we’re accomplishing

1. **Orchestrating the full pipeline**  
   - **Load & preprocess** the data  
   - **Train** the Word2Vec embeddings  
   - **Create** document feature vectors  
   - **Train** two classifiers (Logistic Regression & Random Forest)  
   - **Evaluate** their performance on the test set  

2. **Exposing key objects**  
   - Returns (`lr_model`, `rf_model`, `w2v_model`, `target_names`) so you can call `predict_new_text` or build interactive UIs without retraining.

---

#Extra Notes
	•	Reproducibility: Using seed=42 and consistent preprocessing functions ensures the pipeline yields the same results on each run.
	•	Performance considerations: Running main() trains large models—avoid re-running this cell unless you need to retrain.
	•	Namespace availability: By binding word2vec_model and lr_model at the global level, any later cell (like your widget code) can use these without causing NameError.
	•	Separation of concerns: Keeping the orchestration in main() makes it easy to reuse individual components in isolation (e.g., for batch inference or hyperparameter tuning).

In [13]:
# ─────────────────────────────────────────────────────────────
# Cell 3: Main pipeline runner & expose models for interactive use
# ─────────────────────────────────────────────────────────────

def main():
    print("=== IMPROVED TEXT CLASSIFICATION MODEL ===\n")
    sdf, target_names = load_and_preprocess_data()
    proc = advanced_text_preprocessing(sdf)
    w2v_model = train_improved_word2vec(proc)
    feats = create_document_vectors_improved(proc, w2v_model)
    lr_model, rf_model, _, test_df = train_improved_classifier(feats)
    evaluate_models({'LR': lr_model, 'RF': rf_model}, test_df, target_names)
    return lr_model, rf_model, w2v_model, target_names

# Run everything once to populate globals:
lr_model, rf_model, word2vec_model, target_names = main()

=== IMPROVED TEXT CLASSIFICATION MODEL ===

Loaded 4405 docs; classes = ['alt.atheism', 'comp.graphics', 'rec.motorcycles', 'sci.space', 'talk.politics.guns']
After preprocessing: 4401 docs
Training Word2Vec...
  Vocabulary size = 14601
Created vectors for 4401 docs
Train/test sizes = 3575/826
Fitting Logistic Regression...
Fitting Random Forest...

=== LR ===
accuracy: 0.8729
f1: 0.8724
weightedPrecision: 0.8725
weightedRecall: 0.8729
+-----+----------+
|label|prediction|
+-----+----------+
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
+-----+----------+
only showing top 5 rows


=== RF ===
accuracy: 0.8596
f1: 0.8589
weightedPrecision: 0.8600
weightedRecall: 0.8596
+-----+----------+
|label|prediction|
+-----+----------+
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       4.0|
+-----+----------+
only showing top 5 rows



#*This cell provides a simple in‐notebook user interface so you can drop in any text—or upload a plain .txt file—and immediately see which category your model assigns.*
⸻

#What We’re Accomplishing
	1.	UI Elements
	•	TextArea: a multi‐line box for pasting or typing arbitrary text.
	•	Button: labeled “Classify Text” to trigger the prediction.
	•	FileUpload: allows selecting a single .txt file for bulk text input.
	•	Output: an area to display the prediction result.
	2.	Callbacks
	•	on_btn
	•	Reads whatever is in the TextArea.
	•	If empty, prompts the user to paste text first.
	•	Otherwise, calls your predict_new_text helper and prints the predicted category.
	•	on_upload
	•	Listens for a file selection.
	•	Reads the file’s contents, decodes to string.
	•	Runs the same predict_new_text helper and prints the result along with the filename.
	3.	Display Logic
	•	All UI elements are arranged vertically for clarity.
	•	The TextArea and Button sit above the output area, with the FileUpload alongside.

⸻

#Key Details & Notes
	•	Reusing the Pipeline: This cell leverages your existing preprocessing, embedding, and classification functions—ensuring consistency between training and inference.
	•	No Retraining: Neither callback retrains any models; they simply apply the already‐trained word2vec_model and lr_model to new inputs.
	•	User Feedback:
	•	If the TextArea is blank, the button callback reminds you to paste something before classification.
	•	Predictions are displayed in the Output widget, keeping notebook logs clean.
	•	Flexibility:
	•	Paste a single sentence, a paragraph, or upload a 1 KB text file—any input format supported.
	•	You can extend this by adding dropdowns to choose between Logistic Regression vs. Random Forest, or to adjust thresholds for custom fallback rules.
	•	Avoiding NameErrors: Because word2vec_model, lr_model, and target_names were bound as globals in Cell 3, this interactive cell finds them without errors.


In [15]:
# ─────────────────────────────────────────────────────────────
# Cell 4: Interactive tester (paste or upload .txt)
# ─────────────────────────────────────────────────────────────

from ipywidgets import Textarea, FileUpload, Button, Output, VBox, HBox
from IPython.display import display

text_area = Textarea(
    placeholder='Paste any text…',
    layout={'width':'600px','height':'150px'}
)
btn = Button(description="Classify Text")
uploader = FileUpload(accept='.txt', multiple=False)
out = Output()

def on_btn(b):
    out.clear_output()
    txt = text_area.value.strip()
    if not txt:
        with out: print("Paste some text first!")
        return
    pred = predict_new_text(
        raw_text=txt,
        w2v_model=word2vec_model,
        classifier_model=lr_model,
        preprocess_fn=advanced_text_preprocessing,
        vectorize_fn=create_document_vectors_improved,
        target_names=target_names
    )
    with out: print(f"→ Predicted category: {pred}")

def on_upload(change):
    out.clear_output()
    for name, info in uploader.value.items():
        txt = info['content'].decode('utf-8')
        pred = predict_new_text(
            raw_text=txt,
            w2v_model=word2vec_model,
            classifier_model=lr_model,
            preprocess_fn=advanced_text_preprocessing,
            vectorize_fn=create_document_vectors_improved,
            target_names=target_names
        )
        with out: print(f"File “{name}” → {pred}")

btn.on_click(on_btn)
uploader.observe(on_upload, names='value')

display(VBox([text_area, btn, HBox([uploader]), out]))

Created vectors for 1 docs


VBox(children=(Textarea(value='', layout=Layout(height='150px', width='600px'), placeholder='Paste any text…')…

After preprocessing: 1 docs
Created vectors for 1 docs
After preprocessing: 1 docs
Created vectors for 1 docs
After preprocessing: 1 docs
Created vectors for 1 docs
After preprocessing: 1 docs
Created vectors for 1 docs
After preprocessing: 1 docs
Created vectors for 1 docs
After preprocessing: 1 docs
Created vectors for 1 docs
