# SEO Content Quality & Duplicate Detection

ML pipeline for analyzing content quality and finding duplicates.

## What this does:

1. Parse HTML and extract clean text
2. Calculate readability metrics and keywords
3. Find duplicate content using TF-IDF similarity
4. Train a quality classifier (Low/Medium/High)
5. Provide analyze_url() function for testing

## Outputs:

- `data/extracted_content.csv` - parsed content
- `data/features.csv` - full feature set
- `data/duplicates.csv` - duplicate pairs (similarity ≥ 0.80)
- `models/quality_model.pkl` - trained classifier
- `models/tfidf_vectorizer.pkl` - vectorizer
- `models/tfidf_matrix.pkl` - document vectors

In [8]:
# imports
import warnings
warnings.filterwarnings('ignore')

import os
import re
import json
from pathlib import Path
from typing import Tuple, List, Dict, Any

import pandas as pd
import numpy as np
from bs4 import BeautifulSoup
import requests

import nltk
import textstat
from nltk.tokenize import sent_tokenize

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, classification_report, confusion_matrix

import joblib

# download NLTK data if needed
try:
    nltk.data.find('tokenizers/punkt')
except LookupError:
    nltk.download('punkt', quiet=True)

# setup paths
CURRENT_DIR = Path.cwd()
if CURRENT_DIR.name == 'notebooks':
    ROOT = CURRENT_DIR.parent
else:
    ROOT = CURRENT_DIR

DATA_DIR = ROOT / 'data'
MODELS_DIR = ROOT / 'models'

DATA_DIR.mkdir(parents=True, exist_ok=True)
MODELS_DIR.mkdir(parents=True, exist_ok=True)

print(f"Working directory: {ROOT}")
print(f"Data directory: {DATA_DIR}")
print(f"Models directory: {MODELS_DIR}")

Working directory: c:\Users\sonam\OneDrive\Desktop\leadwalnut
Data directory: c:\Users\sonam\OneDrive\Desktop\leadwalnut\data
Models directory: c:\Users\sonam\OneDrive\Desktop\leadwalnut\models


In [9]:
# Load dataset
print("\n" + "="*80)
print("STEP 1: LOADING DATA")
print("="*80)

dataset_path = DATA_DIR / 'data.csv'

if not dataset_path.exists():
    raise FileNotFoundError(f"Dataset not found at {dataset_path}")

df_raw = pd.read_csv(dataset_path)

print(f"Loaded {len(df_raw)} rows")
print(f"Columns: {list(df_raw.columns)}")
print(f"\nPreview:")
df_raw.head(2)


STEP 1: LOADING DATA
Loaded 81 rows
Columns: ['url', 'html_content']

Preview:
Loaded 81 rows
Columns: ['url', 'html_content']

Preview:


Unnamed: 0,url,html_content
0,https://www.cm-alliance.com/cybersecurity-blog,"<!doctype html><!--[if lt IE 7]> <html class=""..."
1,https://www.varonis.com/blog/cybersecurity-tips,"<!doctype html><html lang=""en""><head>\n <me..."


In [10]:
# Parse HTML and extract text
print("\n" + "="*80)
print("STEP 2: HTML PARSING")
print("="*80)

def extract_title_and_body(html: str) -> Tuple[str, str]:
    """Extract title and body from HTML"""
    if not html or pd.isna(html):
        return '', ''
    
    try:
        soup = BeautifulSoup(html, 'html.parser')
        
        # get title
        title = ''
        if soup.title and soup.title.string:
            title = soup.title.string.strip()
        
        # try to get body from article/main tags first
        body_text = ''
        for tag_name in ['article', 'main']:
            element = soup.find(tag_name)
            if element:
                paragraphs = element.find_all('p')
                if paragraphs:
                    body_text = ' '.join(p.get_text(separator=' ', strip=True) for p in paragraphs)
                    break
        
        # fallback to all p tags
        if not body_text:
            paragraphs = soup.find_all('p')
            body_text = ' '.join(p.get_text(separator=' ', strip=True) for p in paragraphs)
        
        # clean whitespace
        body_text = re.sub(r'\s+', ' ', body_text).strip()
        
        return title, body_text
        
    except Exception as e:
        print(f"Warning: parse failed - {str(e)[:100]}")
        return '', ''


def clean_text(text: str) -> str:
    """Clean text"""
    if not text or pd.isna(text):
        return ''
    text = re.sub(r'\s+', ' ', text)
    return text.strip()


# process all rows
print("Processing HTML content...")
parsed_data = []
failed_count = 0

for idx, row in df_raw.iterrows():
    url = row.get('url', '')
    html_content = row.get('html_content', '')
    
    title, body = extract_title_and_body(html_content)
    body = clean_text(body)
    word_count = len(body.split()) if body else 0
    
    if not body:
        failed_count += 1
    
    parsed_data.append({
        'url': url,
        'title': title,
        'body_text': body,
        'word_count': word_count
    })

df_extracted = pd.DataFrame(parsed_data)

output_path = DATA_DIR / 'extracted_content.csv'
df_extracted.to_csv(output_path, index=False)

print(f"Extracted content from {len(df_extracted)} pages")
print(f"Failed: {failed_count} pages")
print(f"Saved to: {output_path}")
print(f"\nPreview:")
df_extracted.head()


STEP 2: HTML PARSING
Processing HTML content...
Extracted content from 81 pages
Failed: 16 pages
Saved to: c:\Users\sonam\OneDrive\Desktop\leadwalnut\data\extracted_content.csv

Preview:
Extracted content from 81 pages
Failed: 16 pages
Saved to: c:\Users\sonam\OneDrive\Desktop\leadwalnut\data\extracted_content.csv

Preview:


Unnamed: 0,url,title,body_text,word_count
0,https://www.cm-alliance.com/cybersecurity-blog,Cyber Security Blog,Cyber Crisis Tabletop Exercise Cyber Security ...,326
1,https://www.varonis.com/blog/cybersecurity-tips,Top 10 Cybersecurity Awareness Tips: How to St...,Cybersecurity is gaining more importance globa...,1578
2,https://www.cisecurity.org/insights/blog/11-cy...,11 Cyber Defense Tips to Stay Secure at Work a...,Cybersecurity is inextricably tied to the tech...,946
3,https://www.cisa.gov/topics/cybersecurity-best...,Cybersecurity Best Practices | Cybersecurity a...,Cyberspace is particularly difficult to secure...,489
4,https://www.qnbtrust.bank/Resources/Learning-C...,,,0


In [11]:
# Feature engineering
print("\n" + "="*80)
print("STEP 3: FEATURE ENGINEERING")
print("="*80)

df_features = df_extracted.copy()

# Basic text metrics
print("\nComputing text metrics...")

def safe_sentence_count(text: str) -> int:
    if not text or pd.isna(text):
        return 0
    try:
        return len(sent_tokenize(text))
    except:
        return 0

def safe_readability_score(text: str) -> float:
    if not text or pd.isna(text) or len(text.split()) == 0:
        return 0.0
    try:
        return textstat.flesch_reading_ease(text)
    except:
        return 0.0

df_features['sentence_count'] = df_features['body_text'].apply(safe_sentence_count)
df_features['flesch_reading_ease'] = df_features['body_text'].apply(safe_readability_score)
df_features['is_thin'] = df_features['word_count'] < 500

print(f"Done computing sentence counts and readability")

# TF-IDF vectorization
print("\nBuilding TF-IDF vectors...")

corpus = df_features['body_text'].fillna('').tolist()

tfidf_vectorizer = TfidfVectorizer(
    max_features=5000,
    stop_words='english',
    max_df=0.95,
    min_df=2
)

tfidf_matrix = tfidf_vectorizer.fit_transform(corpus)

print(f"TF-IDF matrix shape: {tfidf_matrix.shape}")
print(f"Vocabulary size: {len(tfidf_vectorizer.vocabulary_)}")

# Extract top keywords
print("\nExtracting keywords...")

feature_names = np.array(tfidf_vectorizer.get_feature_names_out())

def extract_top_keywords(doc_vector, n: int = 5) -> str:
    if doc_vector.nnz == 0:
        return ''
    
    values = doc_vector.toarray().ravel()
    top_indices = values.argsort()[::-1][:n]
    keywords = feature_names[top_indices]
    
    return '|'.join(keywords)

top_keywords_list = []
for i in range(tfidf_matrix.shape[0]):
    keywords = extract_top_keywords(tfidf_matrix[i], n=5)
    top_keywords_list.append(keywords)

df_features['top_keywords'] = top_keywords_list

print(f"Extracted top 5 keywords for {len(df_features)} documents")

# Save everything
print("\nSaving features...")

df_features_output = df_features[[
    'url', 'title', 'body_text', 'word_count', 
    'sentence_count', 'flesch_reading_ease', 'is_thin', 'top_keywords'
]].copy()

features_path = DATA_DIR / 'features.csv'
df_features_output.to_csv(features_path, index=False)

joblib.dump(tfidf_vectorizer, MODELS_DIR / 'tfidf_vectorizer.pkl')
joblib.dump(tfidf_matrix, MODELS_DIR / 'tfidf_matrix.pkl')

print(f"Saved features to: {features_path}")
print(f"Saved TF-IDF vectorizer and matrix")

# Summary stats
print("\n" + "-"*80)
print("FEATURE SUMMARY")
print("-"*80)
print(df_features[['word_count', 'sentence_count', 'flesch_reading_ease']].describe())
print(f"\nThin content (<500 words): {df_features['is_thin'].sum()} pages ({df_features['is_thin'].mean()*100:.1f}%)")

df_features_output.head()


STEP 3: FEATURE ENGINEERING

Computing text metrics...
Done computing sentence counts and readability

Building TF-IDF vectors...
TF-IDF matrix shape: (81, 4617)
Vocabulary size: 4617

Extracting keywords...
Extracted top 5 keywords for 81 documents

Saving features...
Saved features to: c:\Users\sonam\OneDrive\Desktop\leadwalnut\data\features.csv
Saved TF-IDF vectorizer and matrix

--------------------------------------------------------------------------------
FEATURE SUMMARY
--------------------------------------------------------------------------------
         word_count  sentence_count  flesch_reading_ease
count     81.000000       81.000000            81.000000
mean    1781.271605       88.827160            33.559259
std     3894.245190      203.139717            30.792453
min        0.000000        0.000000           -75.370000
25%       23.000000        1.000000             0.000000
50%      449.000000       22.000000            37.710000
75%     1578.000000       88.000000 

Unnamed: 0,url,title,body_text,word_count,sentence_count,flesch_reading_ease,is_thin,top_keywords
0,https://www.cm-alliance.com/cybersecurity-blog,Cyber Security Blog,Cyber Crisis Tabletop Exercise Cyber Security ...,326,6,-18.67,True,cyber|cybersecurity|training|events|clients
1,https://www.varonis.com/blog/cybersecurity-tips,Top 10 Cybersecurity Awareness Tips: How to St...,Cybersecurity is gaining more importance globa...,1578,78,41.5,False,access|data|security|app|sensitive
2,https://www.cisecurity.org/insights/blog/11-cy...,11 Cyber Defense Tips to Stay Secure at Work a...,Cybersecurity is inextricably tied to the tech...,946,61,55.44,False,password|protect|authentication|device|use
3,https://www.cisa.gov/topics/cybersecurity-best...,Cybersecurity Best Practices | Cybersecurity a...,Cyberspace is particularly difficult to secure...,489,22,15.1,True,cybersecurity|cyber|nation|offers|resilience
4,https://www.qnbtrust.bank/Resources/Learning-C...,,,0,0,0.0,True,


In [12]:
# Duplicate detection
print("\n" + "="*80)
print("STEP 4: FINDING DUPLICATES")
print("="*80)

SIMILARITY_THRESHOLD = 0.80

print(f"\nComputing similarities (threshold: {SIMILARITY_THRESHOLD})...")

similarity_matrix = cosine_similarity(tfidf_matrix)

# find duplicate pairs
duplicate_pairs = []
n_docs = similarity_matrix.shape[0]

for i in range(n_docs):
    for j in range(i + 1, n_docs):
        sim_score = similarity_matrix[i, j]
        
        if sim_score >= SIMILARITY_THRESHOLD:
            duplicate_pairs.append({
                'url1': df_features.loc[i, 'url'],
                'url2': df_features.loc[j, 'url'],
                'similarity': round(float(sim_score), 4)
            })

df_duplicates = pd.DataFrame(duplicate_pairs)

duplicates_path = DATA_DIR / 'duplicates.csv'
df_duplicates.to_csv(duplicates_path, index=False)

print(f"\nAnalyzed {n_docs} pages")
print(f"Found {len(df_duplicates)} duplicate pairs (>={SIMILARITY_THRESHOLD})")
print(f"Saved to: {duplicates_path}")

if len(df_duplicates) > 0:
    print(f"\nTop 5 most similar pairs:")
    print(df_duplicates.nlargest(5, 'similarity'))
else:
    print("\nNo duplicates found at this threshold")

df_duplicates.head()


STEP 4: FINDING DUPLICATES

Computing similarities (threshold: 0.8)...

Analyzed 81 pages
Found 0 duplicate pairs (>=0.8)
Saved to: c:\Users\sonam\OneDrive\Desktop\leadwalnut\data\duplicates.csv

No duplicates found at this threshold


In [13]:
# Train quality classifier
print("\n" + "="*80)
print("STEP 5: QUALITY CLASSIFICATION")
print("="*80)

# Generate labels
print("\nGenerating quality labels...")

def assign_quality_label(row: pd.Series) -> str:
    """
    Label quality based on word count and readability
    High: >1500 words AND readability 50-70
    Low: <500 words OR readability <30
    Medium: everything else
    """
    word_count = row['word_count']
    readability = row['flesch_reading_ease']
    
    if word_count > 1500 and 50 <= readability <= 70:
        return 'High'
    elif word_count < 500 or readability < 30:
        return 'Low'
    else:
        return 'Medium'

df_model = df_features.copy()
df_model['quality_label'] = df_model.apply(assign_quality_label, axis=1)

label_counts = df_model['quality_label'].value_counts()
print(f"\nLabel distribution:")
for label, count in label_counts.items():
    print(f"  {label}: {count} ({count/len(df_model)*100:.1f}%)")

# Prepare features
print("\nPreparing feature matrix...")

N_TFIDF_FEATURES = 50
n_tfidf = min(N_TFIDF_FEATURES, tfidf_matrix.shape[1])

X_tfidf = tfidf_matrix[:, :n_tfidf].toarray()
X_basic = df_model[['word_count', 'sentence_count', 'flesch_reading_ease']].fillna(0).values

X = np.hstack([X_basic, X_tfidf])
y = df_model['quality_label'].values

print(f"Feature matrix: {X.shape}")
print(f"Using 3 basic features + {n_tfidf} TF-IDF features = {X.shape[1]} total")

# Split data
print("\nSplitting data...")

X_train, X_test, y_train, y_test = train_test_split(
    X, y, 
    test_size=0.3, 
    random_state=42, 
    stratify=y
)

print(f"Train: {len(X_train)} samples")
print(f"Test: {len(X_test)} samples")

# Train model
print("\nTraining RandomForest...")

model = RandomForestClassifier(
    n_estimators=100,
    max_depth=10,
    random_state=42,
    n_jobs=-1
)

model.fit(X_train, y_train)
print(f"Model trained")

# Evaluate
print("\nEvaluating...")

y_pred = model.predict(X_test)

accuracy = accuracy_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred, average='weighted')

print(f"\n{'='*80}")
print("PERFORMANCE")
print(f"{'='*80}")
print(f"Accuracy: {accuracy:.4f}")
print(f"F1-Score: {f1:.4f}")
print(f"\n{classification_report(y_test, y_pred)}")

print("Confusion Matrix:")
print(confusion_matrix(y_test, y_pred))

# Baseline comparison
print("\n" + "-"*80)
print("BASELINE (word count only)")
print("-"*80)

def baseline_predict(X_features: np.ndarray) -> np.ndarray:
    predictions = []
    for word_count in X_features[:, 0]:
        if word_count > 1500:
            predictions.append('High')
        elif word_count < 500:
            predictions.append('Low')
        else:
            predictions.append('Medium')
    return np.array(predictions)

y_baseline = baseline_predict(X_test)
baseline_accuracy = accuracy_score(y_test, y_baseline)

print(f"Baseline Accuracy: {baseline_accuracy:.4f}")
print(f"Improvement: {(accuracy - baseline_accuracy)*100:+.2f}%")

# Feature importance
print("\n" + "-"*80)
print("TOP 10 FEATURES")
print("-"*80)

feature_names_list = ['word_count', 'sentence_count', 'flesch_reading_ease'] + \
                     [f'tfidf_{i}' for i in range(n_tfidf)]

feature_importance = pd.DataFrame({
    'feature': feature_names_list,
    'importance': model.feature_importances_
}).sort_values('importance', ascending=False).head(10)

print(feature_importance.to_string(index=False))

# Save model
print("\nSaving model...")

model_path = MODELS_DIR / 'quality_model.pkl'
joblib.dump(model, model_path)

print(f"Saved to: {model_path}")

feature_importance


STEP 5: QUALITY CLASSIFICATION

Generating quality labels...

Label distribution:
  Low: 50 (61.7%)
  Medium: 24 (29.6%)
  High: 7 (8.6%)

Preparing feature matrix...
Feature matrix: (81, 53)
Using 3 basic features + 50 TF-IDF features = 53 total

Splitting data...
Train: 56 samples
Test: 25 samples

Training RandomForest...
Model trained

Evaluating...

PERFORMANCE
Accuracy: 0.9200
F1-Score: 0.9177

              precision    recall  f1-score   support

        High       1.00      0.50      0.67         2
         Low       1.00      0.94      0.97        16
      Medium       0.78      1.00      0.88         7

    accuracy                           0.92        25
   macro avg       0.93      0.81      0.84        25
weighted avg       0.94      0.92      0.92        25

Confusion Matrix:
[[ 1  0  1]
 [ 0 15  1]
 [ 0  0  7]]

--------------------------------------------------------------------------------
BASELINE (word count only)
--------------------------------------------------

Unnamed: 0,feature,importance
0,word_count,0.296808
1,sentence_count,0.287323
2,flesch_reading_ease,0.153482
16,tfidf_13,0.041952
3,tfidf_0,0.041276
5,tfidf_2,0.038521
43,tfidf_40,0.028503
24,tfidf_21,0.018673
32,tfidf_29,0.018665
20,tfidf_17,0.009358


In [14]:
# Real-time analysis function
print("\n" + "="*80)
print("STEP 6: ANALYSIS FUNCTION")
print("="*80)

def scrape_url(url: str, timeout: int = 10) -> str:
    """Scrape HTML from URL"""
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
    }
    
    try:
        response = requests.get(url, headers=headers, timeout=timeout)
        response.raise_for_status()
        return response.text
    except requests.RequestException as e:
        print(f"Error: {str(e)[:100]}")
        return ''


def analyze_url(url: str) -> Dict[str, Any]:
    """
    Analyze a URL for quality and duplicates
    
    Steps:
    1. Scrape the URL
    2. Extract text
    3. Compute features
    4. Predict quality
    5. Find similar pages
    """
    # load models
    vectorizer = joblib.load(MODELS_DIR / 'tfidf_vectorizer.pkl')
    doc_matrix = joblib.load(MODELS_DIR / 'tfidf_matrix.pkl')
    classifier = joblib.load(MODELS_DIR / 'quality_model.pkl')
    
    # scrape
    html = scrape_url(url)
    if not html:
        return {
            'url': url,
            'error': 'Failed to scrape URL',
            'quality_label': None
        }
    
    # extract
    title, body = extract_title_and_body(html)
    body = clean_text(body)
    
    # features
    word_count = len(body.split()) if body else 0
    sentence_count = safe_sentence_count(body)
    readability = safe_readability_score(body)
    is_thin = word_count < 500
    
    # vectorize
    tfidf_vector = vectorizer.transform([body])
    
    # find similar
    similarities = cosine_similarity(tfidf_vector, doc_matrix).ravel()
    top_indices = similarities.argsort()[::-1][:5]
    
    similar_pages = []
    for idx in top_indices:
        sim_score = similarities[idx]
        if sim_score > 0.4:
            similar_pages.append({
                'url': df_extracted.loc[idx, 'url'],
                'title': df_extracted.loc[idx, 'title'],
                'similarity': round(float(sim_score), 4)
            })
    
    # predict
    n_tfidf = min(50, tfidf_vector.shape[1])
    X_tfidf_features = tfidf_vector[:, :n_tfidf].toarray()
    X_basic_features = np.array([[word_count, sentence_count, readability]])
    X_combined = np.hstack([X_basic_features, X_tfidf_features])
    
    quality_label = classifier.predict(X_combined)[0]
    
    return {
        'url': url,
        'title': title,
        'word_count': int(word_count),
        'sentence_count': int(sentence_count),
        'flesch_reading_ease': round(float(readability), 2),
        'is_thin_content': bool(is_thin),
        'quality_label': quality_label,
        'similar_pages': similar_pages[:3]
    }


print("\nFunction ready: analyze_url(url)")
print("\nExample:")
print("  result = analyze_url('https://example.com/article')")
print("  print(json.dumps(result, indent=2))")

# Uncomment to test:
# test_url = df_extracted.loc[0, 'url']
# result = analyze_url(test_url)
# print(json.dumps(result, indent=2))


STEP 6: ANALYSIS FUNCTION

Function ready: analyze_url(url)

Example:
  result = analyze_url('https://example.com/article')
  print(json.dumps(result, indent=2))


## Done!

All steps completed successfully.

### Generated files:

**Data:**
- `data/extracted_content.csv` - parsed HTML content
- `data/features.csv` - all features with keywords
- `data/duplicates.csv` - duplicate pairs (similarity ≥ 0.80)

**Models:**
- `models/quality_model.pkl` - trained RandomForest
- `models/tfidf_vectorizer.pkl` - TF-IDF vectorizer
- `models/tfidf_matrix.pkl` - document vectors

### Usage:

```python
# Analyze new URL
result = analyze_url('https://example.com/article')
print(json.dumps(result, indent=2))

# Load model for batch processing
model = joblib.load('models/quality_model.pkl')
vectorizer = joblib.load('models/tfidf_vectorizer.pkl')
```

### Notes:

- HTML parsing prioritizes `<article>` and `<main>` tags
- Using TF-IDF (top 50 dims) + 3 basic features
- Similarity threshold at 0.80 to avoid false positives
- RandomForest chosen for mixed feature types
- Labels are synthetic (based on word count + readability)

### Limitations:

- TF-IDF won't catch paraphrases (would need embeddings)
- Readability can be wonky on short text
- Synthetic labels may not match real quality
- Need rate limiting for production scraping

### Next steps:

- Try sentence-transformers for better similarity
- Add visualizations (word clouds, heatmaps)
- Build Streamlit UI for easier testing
- Add more NLP features (sentiment, entities, topics)