In [None]:
from google.colab import drive
drive.mount('/content/drive')

MessageError: Error: credential propagation was unsuccessful

In [None]:
import pandas as pd
import numpy as np

lda_topics_path = 'lda_topics.csv'
lda_topics = pd.read_csv(lda_topics_path)

lda_topics.head()

Unnamed: 0,topic,keywords
0,desire,"['im', 'na', 'go', 'u', 'dont']"
1,cultural,"['article', 'page', 'nn', 'n', 'wikipedia']"
2,subject,"['im', 'sorry', 'ok', 'good', 'day']"
3,favoritism,"['im', 'get', 'day', 'got', 'going']"
4,stereotype,"['muslim', 'islamic', 'islam', 'religion', 'te..."


In [None]:
import numpy as np
from collections import Counter

class DecisionTree:
    def __init__(self, max_depth=5):
        self.max_depth = max_depth
        self.tree = None

    def fit(self, X, y):
        self.tree = self._build_tree(X, y, depth=0)

    def _build_tree(self, X, y, depth):
        # Stopping conditions
        if depth >= self.max_depth or len(set(y)) == 1:
            return Counter(y).most_common(1)[0][0]  # Majority label

        # Find the best split
        feature, threshold = self._best_split(X, y)
        if feature is None:
            return Counter(y).most_common(1)[0][0]

        # Split the data
        left_idx = X[:, feature] <= threshold
        right_idx = ~left_idx

        # Recursively build left and right subtrees
        return {
            "feature": feature,
            "threshold": threshold,
            "left": self._build_tree(X[left_idx], y[left_idx], depth + 1),
            "right": self._build_tree(X[right_idx], y[right_idx], depth + 1),
        }

    def _best_split(self, X, y):
        best_feature, best_threshold = None, None
        best_gini = float("inf")

        for feature in range(X.shape[1]):
            thresholds = np.unique(X[:, feature])
            for threshold in thresholds:
                gini = self._gini_index(X[:, feature], y, threshold)
                if gini < best_gini:
                    best_gini = gini
                    best_feature, best_threshold = feature, threshold

        return best_feature, best_threshold

    def _gini_index(self, feature_column, y, threshold):
        left_idx = feature_column <= threshold
        right_idx = ~left_idx

        if len(y[left_idx]) == 0 or len(y[right_idx]) == 0:
            return float("inf")

        left_gini = 1 - sum((np.sum(y[left_idx] == c) / len(y[left_idx])) ** 2 for c in set(y))
        right_gini = 1 - sum((np.sum(y[right_idx] == c) / len(y[right_idx])) ** 2 for c in set(y))

        return (len(y[left_idx]) * left_gini + len(y[right_idx]) * right_gini) / len(y)

    def predict(self, X):
        return np.array([self._predict_row(row, self.tree) for row in X])

    def _predict_row(self, row, tree):
        if not isinstance(tree, dict):
            return tree
        if row[tree["feature"]] <= tree["threshold"]:
            return self._predict_row(row, tree["left"])
        else:
            return self._predict_row(row, tree["right"])

class RandomForest:
    def __init__(self, n_estimators=10, max_depth=5, max_features="sqrt"):
        self.n_estimators = n_estimators
        self.max_depth = max_depth
        self.max_features = max_features
        self.trees = []

    def fit(self, X, y):
        self.trees = []
        n_features = X.shape[1]
        n_sub_features = int(np.sqrt(n_features)) if self.max_features == "sqrt" else n_features

        for _ in range(self.n_estimators):
            # Bootstrap sampling
            indices = np.random.choice(len(X), len(X), replace=True)
            X_sample, y_sample = X[indices], y[indices]

            # Random feature selection
            feature_indices = np.random.choice(n_features, n_sub_features, replace=False)
            X_sample_sub = X_sample[:, feature_indices]

            # Train a decision tree
            tree = DecisionTree(max_depth=self.max_depth)
            tree.fit(X_sample_sub, y_sample)

            # Store tree and feature indices
            self.trees.append((tree, feature_indices))

    def predict(self, X):
        # Collect predictions from all trees
        tree_predictions = np.array([
            tree.predict(X[:, feature_indices])
            for tree, feature_indices in self.trees
        ])
        # Majority voting
        return np.apply_along_axis(lambda x: Counter(x).most_common(1)[0][0], axis=0, arr=tree_predictions)

    def predict_proba(self, X):
        # Collect predictions from all trees
        tree_predictions = np.array([
            tree.predict(X[:, feature_indices])
            for tree, feature_indices in self.trees
        ])

        # Calculate probabilities
        n_samples = X.shape[0]
        n_classes = len(np.unique(tree_predictions))  # Determine number of classes
        probabilities = np.zeros((n_samples, n_classes))

        for sample_idx in range(n_samples):
            class_counts = Counter(tree_predictions[:, sample_idx])
            for class_idx, count in class_counts.items():
                probabilities[sample_idx, class_idx] = count / self.n_estimators

        return probabilities


In [None]:
# Re-import libraries and reload datasets
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, roc_auc_score

# Load the TF-IDF dataset for combining features
print("Loading datasets...")
tfidf_bias_train_custom_path = 'tfidf_bias_train_custom.csv'
preprocessed_bias_train_path = 'preprocessed_bias_train.csv'

# Load a subset for quick testing (adjust nrows for full data later)
tfidf_bias_train_custom = pd.read_csv(tfidf_bias_train_custom_path, nrows=5000)  # Load first 5000 rows
preprocessed_bias_train = pd.read_csv(preprocessed_bias_train_path, nrows=5000)  # Load first 5000 rows
labels = preprocessed_bias_train['label'].astype(np.int8).values  # Optimize label type
print("Datasets loaded successfully!")

# Redefine TF-IDF extraction to handle Python dictionary strings
print("Extracting TF-IDF features...")
def extract_tfidf_features_safe(tfidf_data):
    tfidf_features_list = []
    for item in tfidf_data['tfidf_features']:
        try:
            # Safely evaluate the Python dictionary string
            feature_dict = eval(item)
            tfidf_features_list.append(feature_dict)
        except Exception as e:
            print(f"Error parsing TF-IDF feature: {item}, Error: {e}")
    return tfidf_features_list

# Build a TF-IDF feature matrix
def build_feature_matrix(feature_dicts):
    feature_set = set()
    for feature_dict in feature_dicts:
        feature_set.update(feature_dict.keys())
    feature_list = sorted(list(feature_set))
    matrix = []
    for feature_dict in feature_dicts:
        row = [feature_dict.get(feature, 0.0) for feature in feature_list]
        matrix.append(row)
    return np.array(matrix), feature_list

tfidf_features_list = extract_tfidf_features_safe(tfidf_bias_train_custom)
tfidf_matrix, tfidf_feature_list = build_feature_matrix(tfidf_features_list)
print("TF-IDF features extracted successfully!")

# Function to extract LDA features
print("Extracting LDA features...")
def extract_lda_features_safe(lda_topics, texts):
    lda_topics_matrix = np.zeros((len(texts), len(lda_topics)))
    for i, topic_keywords in enumerate(lda_topics['keywords']):
        keywords = eval(topic_keywords)  # Convert stringified list back to list
        for j, text in enumerate(texts):
            if isinstance(text, str):  # Check if text is valid
                lda_topics_matrix[j, i] = sum(1 for word in keywords if word in text.split())
    return lda_topics_matrix

lda_topics_matrix = extract_lda_features_safe(lda_topics, preprocessed_bias_train['cleaned_text'])
print("LDA features extracted successfully!")

# Combine LDA and TF-IDF Features
print("Combining features...")
combined_features = np.hstack([tfidf_matrix, lda_topics_matrix])
print(f"Combined feature matrix shape: {combined_features.shape}")

# Train-test split
print("Splitting data into training and testing sets...")
X_train, X_test, y_train, y_test = train_test_split(combined_features, labels, test_size=0.2, random_state=42)
print(f"Training set shape: {X_train.shape}, Testing set shape: {X_test.shape}")

# Train a Random Forest model with combined features
# print("Training the Random Forest model...")
# rf_model = RandomForestClassifier(n_estimators=200, max_depth=15, random_state=42)  # Tuning params
# rf_model.fit(X_train, y_train)
# print("Random Forest model trained successfully!")
# Example Usage:
print("Training Random Forest from scratch...")
rf_model = RandomForest(n_estimators=10, max_depth=5)
rf_model.fit(X_train, y_train)

# Evaluate the model on the test set
print("Evaluating the model on the test set...")
y_pred = rf_model.predict(X_test)
y_pred_proba = rf_model.predict_proba(X_test)[:, 1]

# Metrics
print("Calculating performance metrics...")
classification_metrics = classification_report(y_test, y_pred)
roc_auc = roc_auc_score(y_test, y_pred_proba)

print("Model evaluation completed!")
print("Classification Report:\n", classification_metrics)
print(f"ROC-AUC Score: {roc_auc:.2f}")


Loading datasets...
Datasets loaded successfully!
Extracting TF-IDF features...
TF-IDF features extracted successfully!
Extracting LDA features...
LDA features extracted successfully!
Combining features...
Combined feature matrix shape: (5000, 19658)
Splitting data into training and testing sets...
Training set shape: (4000, 19658), Testing set shape: (1000, 19658)
Training Random Forest from scratch...
Evaluating the model on the test set...
Calculating performance metrics...
Model evaluation completed!
Classification Report:
               precision    recall  f1-score   support

           0       0.82      0.08      0.14       532
           1       0.48      0.98      0.65       468

    accuracy                           0.50      1000
   macro avg       0.65      0.53      0.39      1000
weighted avg       0.66      0.50      0.38      1000

ROC-AUC Score: 0.62


In [None]:
import numpy as np

# Function to preprocess input text
def preprocess_text(text):
    # Basic preprocessing (remove special characters, lowercasing)
    import re
    return re.sub(r'\W+', ' ', text).lower()

# Function to extract LDA features for a single text
def extract_lda_features_for_text(lda_topics, text):
    lda_features = []
    for topic_keywords in lda_topics['keywords']:
        keywords = eval(topic_keywords)  # Convert stringified list back to list
        lda_features.append(sum(1 for word in text.split() if word in keywords))
    return np.array(lda_features).reshape(1, -1)  # Reshape for model input

# Function to extract TF-IDF features for a single text
def extract_tfidf_features_for_text(tfidf_feature_list, text):
    from sklearn.feature_extraction.text import CountVectorizer
    vectorizer = CountVectorizer(vocabulary=tfidf_feature_list)
    tfidf_vector = vectorizer.transform([text]).toarray()
    return tfidf_vector

# Function to detect bias in a paragraph
def detect_bias(text, lda_topics, tfidf_feature_list, model):
    # Preprocess the input text
    processed_text = preprocess_text(text)

    # Extract LDA features
    lda_features = extract_lda_features_for_text(lda_topics, processed_text)

    # Extract TF-IDF features
    tfidf_features = extract_tfidf_features_for_text(tfidf_feature_list, processed_text)

    # Combine features
    combined_features = np.hstack([tfidf_features, lda_features])

    # Predict bias
    prediction = model.predict(combined_features)[0]
    prediction_proba = model.predict_proba(combined_features)[0, 1]

    # Return the result
    return {
        "Prediction": "Biased" if prediction == 1 else "Non-Biased",
        "Confidence": round(prediction_proba, 4)
    }

# Example Usage
text = "All politicians are corrupt and work only for their own benefit."
result = detect_bias(text, lda_topics, tfidf_feature_list, rf_model)
print(f"Prediction: {result['Prediction']}")
print(f"Confidence: {result['Confidence']}")


Prediction: Biased
Confidence: 0.9
