In [29]:
######### Fake News Detection with Spark, Mlib, NLP, CNN, BERT ##################
# Install necessary libraries
!pip install pyspark findspark tensorflow

import findspark
findspark.init()

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("FakeNewsDetection") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

print("Spark Session created successfully!")


Spark Session created successfully!


In [30]:
from google.colab import drive
drive.mount('/content/drive')

# File paths (adjust accordingly)
fake_path = "/content/drive/MyDrive/AIMLBigData/Fake.csv"
true_path = "/content/drive/MyDrive/AIMLBigData/True.csv"

#fake_path = "Fake.csv"
#true_path = "True.csv"

# Load CSVs
df_fake = spark.read.csv(fake_path, header=True, inferSchema=True)
df_true = spark.read.csv(true_path, header=True, inferSchema=True)

print("Fake schema:")
df_fake.printSchema()

print("True schema:")
df_true.printSchema()

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Fake schema:
root
 |-- title: string (nullable = true)
 |-- text: string (nullable = true)
 |-- subject: string (nullable = true)
 |-- date: string (nullable = true)

True schema:
root
 |-- title: string (nullable = true)
 |-- text: string (nullable = true)
 |-- subject: string (nullable = true)
 |-- date: string (nullable = true)



In [31]:
from pyspark.sql.functions import lit

df_fake = df_fake.withColumn("label", lit(1))
df_true = df_true.withColumn("label", lit(0))

df_all = df_fake.unionByName(df_true)
print("Total records:", df_all.count())
df_all.show(5)



Total records: 44906
+--------------------+--------------------+-------+-----------------+-----+
|               title|                text|subject|             date|label|
+--------------------+--------------------+-------+-----------------+-----+
| Donald Trump Sen...|Donald Trump just...|   News|December 31, 2017|    1|
| Drunk Bragging T...|House Intelligenc...|   News|December 31, 2017|    1|
| Sheriff David Cl...|On Friday, it was...|   News|December 30, 2017|    1|
| Trump Is So Obse...|On Christmas day,...|   News|December 29, 2017|    1|
| Pope Francis Jus...|Pope Francis used...|   News|December 25, 2017|    1|
+--------------------+--------------------+-------+-----------------+-----+
only showing top 5 rows



In [32]:
### Preprocess the new contents ###
from pyspark.sql.functions import concat_ws, lower, regexp_replace, col

# Combine title and text into a single 'content' column
df_all = df_all.withColumn("full_text", concat_ws(" ", col("title"), col("text")))

# Define a common label indexer
label_indexer = StringIndexer(inputCol="label", outputCol="numeric_label")
# Apply the label indexer to the DataFrame
df_all = label_indexer.fit(df_all).transform(df_all)


# Basic text cleaning: lowercase, remove punctuation, digits, extra spaces
df_all = df_all.withColumn("full_text", lower(col("full_text")))
df_all = df_all.withColumn("full_text", regexp_replace(col("full_text"), "[^a-zA-Z\\s]", ""))
df_all = df_all.withColumn("full_text", regexp_replace(col("full_text"), "\\s+", " "))

df_all.select("full_text", "numeric_label").show(5, truncate=100)

+----------------------------------------------------------------------------------------------------+-------------+
|                                                                                           full_text|numeric_label|
+----------------------------------------------------------------------------------------------------+-------------+
| donald trump sends out embarrassing new years eve message this is disturbing donald trump just c...|          0.0|
| drunk bragging trump staffer started russian collusion investigation house intelligence committe...|          0.0|
| sheriff david clarke becomes an internet joke for threatening to poke people in the eye on frida...|          0.0|
| trump is so obsessed he even has obamas name coded into his website images on christmas day dona...|          0.0|
| pope francis just called out donald trump during his christmas speech pope francis used his annu...|          0.0|
+---------------------------------------------------------------

In [33]:
# group by numeric_label
df_all.groupBy("numeric_label").count().show()

+-------------+-----+
|numeric_label|count|
+-------------+-----+
|          0.0|23489|
|          1.0|21417|
+-------------+-----+



In [34]:
# Train test split
train_df, test_df = df_all.randomSplit([0.8, 0.2], seed=42)

print("Train set count:", train_df.count())
print("Test set count:", test_df.count())

Train set count: 36086
Test set count: 8820


In [35]:
# Define the ML Pipeline stages for Logistic Regression
tokenizer = Tokenizer(inputCol="full_text", outputCol="words")
stopwords_remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
hashing_tf = HashingTF(inputCol="filtered_words", outputCol="raw_features", numFeatures=20000)
idf = IDF(inputCol="raw_features", outputCol="features")
lr = LogisticRegression(featuresCol="features", labelCol="numeric_label")

# Assemble and train the pipeline
pipeline_lr = Pipeline(stages=[tokenizer, stopwords_remover, hashing_tf, idf, lr])
print("Training MLlib Logistic Regression model...")
lr_model = pipeline_lr.fit(train_df)

# Make predictions on the test set
# We will use these predictions in the final ensemble step
lr_predictions = lr_model.transform(test_df)

# Evaluate its standalone performance
evaluator = BinaryClassificationEvaluator(labelCol="numeric_label")
auc = evaluator.evaluate(lr_predictions)
print(f"✅ Baseline Model (Logistic Regression) AUC: {auc:.4f}")


Training MLlib Logistic Regression model...
✅ Baseline Model (Logistic Regression) AUC: 0.9982


In [36]:
# --- Part 1: Prepare Data for BERT ---
# Convert the Spark test and train dataframes to Pandas
train_pdf = train_df.select("full_text", "numeric_label").toPandas()
test_pdf = test_df.select("full_text", "numeric_label").toPandas()

# Map labels to integers
# Correct the label mapping to handle numeric labels from Spark
label_map = {0.0: 0, 1.0: 1}
train_pdf['label_id'] = train_pdf['numeric_label'].map(label_map)
test_pdf['label_id'] = test_pdf['numeric_label'].map(label_map)


train_texts = train_pdf['full_text'].tolist()
train_labels = train_pdf['label_id'].tolist()
test_texts = test_pdf['full_text'].tolist()
test_labels = test_pdf['label_id'].tolist()


# --- Part 2: BERT Tokenization ---
import torch
from transformers import DistilBertTokenizer, DistilBertForSequenceClassification, Trainer, TrainingArguments, IntervalStrategy # Changed to DistilBert

#MODEL_NAME = 'bert-base-uncased'
MODEL_NAME = 'distilbert-base-uncased'

tokenizer = DistilBertTokenizer.from_pretrained(MODEL_NAME) # Changed to DistilBertTokenizer

# Tokenize the datasets
train_encodings = tokenizer(train_texts, truncation=True, padding=True, max_length=128)
test_encodings = tokenizer(test_texts, truncation=True, padding=True, max_length=128)

# Create a PyTorch dataset class
class NewsDataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        # Ensure labels are long tensors
        item['labels'] = torch.tensor(self.labels[idx], dtype=torch.long)
        return item

    def __len__(self):
        return len(self.labels)

train_dataset = NewsDataset(train_encodings, train_labels)
test_dataset = NewsDataset(test_encodings, test_labels)

# --- Part 3: Fine-Tune the Model ---
model = DistilBertForSequenceClassification.from_pretrained(MODEL_NAME, num_labels=2) # Changed to DistilBert

training_args = TrainingArguments(
    output_dir='./results',          # Directory to save the model

    # --- Performance Optimizations ---
    fp16=True,                       # ✅ Use mixed-precision for a 30-50% speedup on GPU
    per_device_train_batch_size=32,  # ✅ Double the batch size (possible due to fp16)

    # --- Training Strategy ---
    num_train_epochs=1,              # 1 epoch is often sufficient for fine-tuning
    learning_rate=2e-5,              # A standard learning rate for fine-tuning BERT
    warmup_steps=500,                # Standard practice
    weight_decay=0.01,               # Standard practice for regularization

    # --- Monitoring and Saving ---
    eval_strategy=IntervalStrategy.STEPS,     # 💡 Evaluate performance during training
    eval_steps=200,                  # 💡 Evaluate every 200 steps
    save_strategy=IntervalStrategy.STEPS,           # 💡 Match the save strategy to evaluation
    save_steps=200,                  # 💡 Save a checkpoint every 200 steps
    # load_best_model_at_end=True,     # 💡 Automatically load the best model at the end (unsupported in this version)

    # --- Logging ---
    logging_dir='./logs',
    logging_steps=50,                # Log more frequently to see progress
    report_to="none",                # Keep this to disable external logging like wandb
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset
)

print("Fine-tuning BERT model...")
trainer.train()

# --- Part 4: Get Predictions from BERT ---
print("Getting predictions from BERT model...")
bert_raw_predictions, _, _ = trainer.predict(test_dataset)

# Convert logits to probabilities using softmax
from scipy.special import softmax
bert_probabilities = softmax(bert_raw_predictions, axis=1)

# We are interested in the probability of the 'FAKE' class (which we mapped to 1)
bert_probs_fake = bert_probabilities[:, 1]
print("✅ BERT model trained and predictions are ready.")

Some weights of DistilBertForSequenceClassification were not initialized from the model checkpoint at distilbert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight', 'pre_classifier.bias', 'pre_classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Fine-tuning BERT model...


Step,Training Loss,Validation Loss
200,0.0632,0.02799
400,0.0105,0.014849
600,0.0059,0.01046
800,0.001,0.009573
1000,0.0107,0.007328


Getting predictions from BERT model...


✅ BERT model trained and predictions are ready.


In [38]:
# --- Save the trained models ---

# Save the Spark MLlib Logistic Regression model
lr_model_path = "/content/drive/MyDrive/AIMLBigData/lr_model"
# Use write().overwrite().save() to overwrite the existing model
lr_model.write().overwrite().save(lr_model_path)
print(f"Logistic Regression model saved to: {lr_model_path}")

# Save the Hugging Face Transformers BERT model
bert_model_path = "/content/drive/MyDrive/AIMLBigData/bert_model"
# You might need to manually remove the directory if it exists before saving
# or implement a check and remove logic if overwriting is strictly needed
# For this example, we assume overwrite() for Spark is sufficient if BERT save works.
# If BERT save fails due to existing files, you might need os.makedirs(bert_model_path, exist_ok=True)
# or a more robust overwrite mechanism depending on the exact error.
model.save_pretrained(bert_model_path)
tokenizer.save_pretrained(bert_model_path) # Save the tokenizer along with the model
print(f"BERT model saved to: {bert_model_path}")

Logistic Regression model saved to: /content/drive/MyDrive/AIMLBigData/lr_model
BERT model saved to: /content/drive/MyDrive/AIMLBigData/bert_model


In [39]:
# --- Part 1: Extract Probabilities from Logistic Regression ---
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

# The 'probability' column is a Vector. We need to extract the probability of the positive class (index 1).
# Note: Ensure your label_indexer mapped 'FAKE' to 1.0. If not, adjust the index.
extract_prob_udf = udf(lambda v: float(v[1]), FloatType())
lr_predictions_with_prob = lr_predictions.withColumn("lr_prob_fake", extract_prob_udf("probability"))

# Collect the LR probabilities into a pandas Series to easily combine them
lr_probs_fake_pdf = lr_predictions_with_prob.select("lr_prob_fake").toPandas()


# --- Part 2: Combine and Evaluate the Ensemble ---
import pandas as pd
from sklearn.metrics import accuracy_score, roc_auc_score

# Create a final DataFrame for evaluation
# This assumes the order of test_data was preserved, which it should be in this workflow.
ensemble_df = pd.DataFrame({
    'true_label': test_labels,
    'lr_prob': lr_probs_fake_pdf['lr_prob_fake'],
    'bert_prob': bert_probs_fake
})

# Define weights
bert_weight = 0.80
lr_weight = 0.20

# Calculate the weighted average probability
ensemble_df['ensemble_prob'] = (bert_weight * ensemble_df['bert_prob']) + \
                               (lr_weight * ensemble_df['lr_prob'])

# Determine final prediction based on a 0.5 threshold
ensemble_df['ensemble_prediction'] = (ensemble_df['ensemble_prob'] > 0.5).astype(int)


# --- Part 3: Final Evaluation ---
ensemble_accuracy = accuracy_score(ensemble_df['true_label'], ensemble_df['ensemble_prediction'])
ensemble_auc = roc_auc_score(ensemble_df['true_label'], ensemble_df['ensemble_prob'])

print("\n--- Final Ensemble Results ---")
print(f"Ensemble Accuracy: {ensemble_accuracy:.4f}")
print(f"Ensemble AUC: {ensemble_auc:.4f}")


--- Final Ensemble Results ---
Ensemble Accuracy: 0.9989
Ensemble AUC: 1.0000


In [40]:
import torch
from scipy.special import softmax
import numpy as np
# No need to import StringIndexerModel here as we use the fitted object directly

# Ensure the BERT model is in evaluation mode
# In the Trainer workflow, this is handled automatically during predict,
# but for manual inference, it's good practice.
model.eval()

# Determine the device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device) # Move the model to the determined device
print(f"Using device: {device}")


# Let's define the weights again for clarity
bert_weight = 0.80
lr_weight = 0.20

# The numeric label 1.0 corresponds to the 'FAKE' class based on how the data was labeled.
# Both the Logistic Regression probability vector and the BERT output logits
# are ordered according to the numeric labels (0.0 then 1.0).
# So, the probability of FAKE is at index 1.
FAKE_PROB_INDEX = 1
print(f"Index for 'FAKE' probability is: {FAKE_PROB_INDEX}")


def predict_news(news_text: str):
    """
    Takes a news article string and classifies it as 'Fake News' or 'True News'
    using the trained LR + BERT ensemble model.
    """
    print("-----------------------------------------")
    print(f"Analyzing news: '{news_text[:100]}...'")

    # === 1. Get Logistic Regression Prediction ===
    # Create a Spark DataFrame with the new text
    sample_df = spark.createDataFrame([(news_text,)], ["full_text"])

    # Get the prediction from the saved Spark ML Pipeline
    lr_pred = lr_model.transform(sample_df)

    # Extract the probability of the "FAKE" class
    # Use the FAKE_PROB_INDEX (which is 1, corresponding to numeric_label 1.0)
    prob_lr = lr_pred.select("probability").first().probability[FAKE_PROB_INDEX]
    print(f"🔎 Logistic Regression confidence (Fake): {prob_lr:.4f}")

    # === 2. Get BERT Prediction ===
    # Tokenize the text for BERT
    inputs = tokenizer(news_text, return_tensors="pt", truncation=True, padding=True, max_length=128)

    # Move input tensors to the same device as the model
    inputs = {key: value.to(device) for key, value in inputs.items()}


    # Make prediction (no gradients needed for inference)
    with torch.no_grad():
        logits = model(**inputs).logits

    # Move logits back to CPU for softmax and numpy conversion
    logits = logits.cpu()


    # Convert logits to probabilities
    probabilities = softmax(logits.numpy(), axis=1)[0]
    # The BERT model was trained with numeric_label 0 as True and 1 as Fake.
    # So, the probability of FAKE is at index 1.
    prob_bert = probabilities[1] # Index 1 for FAKE probability
    print(f"🤖 BERT confidence (Fake): {prob_bert:.4f}")

    # === 3. Calculate Ensemble Result ===
    final_prob = (bert_weight * prob_bert) + (lr_weight * prob_lr)
    print(f"⚖️ Final Weighted Ensemble confidence (Fake): {final_prob:.4f}")

    # === 4. Final Verdict ===
    if final_prob > 0.5:
        verdict = "Fake News"
    else:
        verdict = "True News"

    confidence = final_prob if verdict == "Fake News" else 1 - final_prob

    print(f"\n✅ FINAL VERDICT: ** {verdict} ** (Confidence: {confidence:.2%})")
    print("-----------------------------------------")
    return verdict, confidence


# --- Example Usage ---

# Example 1: A potentially fake news headline
fake_news_sample = "BREAKING: Scientists Discover Unicorns Living in a Hidden Valley in the Andes, Government Trying to Cover it Up."
predict_news(fake_news_sample)

# Example 2: A more plausible, true-sounding news headline
true_news_sample = "The Federal Reserve announced today it would hold interest rates steady, citing moderate economic growth and stable inflation figures."
predict_news(true_news_sample)

# Example 3: A tricky, politically charged example
tricky_news_sample = "Sources inside the White House claim a new secret tax on gasoline is being planned, set to be announced next month without public debate."
predict_news(tricky_news_sample)

Using device: cuda
Index for 'FAKE' probability is: 1
-----------------------------------------
Analyzing news: 'BREAKING: Scientists Discover Unicorns Living in a Hidden Valley in the Andes, Government Trying to ...'
🔎 Logistic Regression confidence (Fake): 0.0000
🤖 BERT confidence (Fake): 0.0339
⚖️ Final Weighted Ensemble confidence (Fake): 0.0272

✅ FINAL VERDICT: ** True News ** (Confidence: 97.28%)
-----------------------------------------
-----------------------------------------
Analyzing news: 'The Federal Reserve announced today it would hold interest rates steady, citing moderate economic gr...'
🔎 Logistic Regression confidence (Fake): 0.0002
🤖 BERT confidence (Fake): 0.8391
⚖️ Final Weighted Ensemble confidence (Fake): 0.6713

✅ FINAL VERDICT: ** Fake News ** (Confidence: 67.13%)
-----------------------------------------
-----------------------------------------
Analyzing news: 'Sources inside the White House claim a new secret tax on gasoline is being planned, set to be ann

('Fake News', np.float64(0.7787147184762111))