In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os
import re
from transformers import BertTokenizer

In [2]:
#data preprocessing
np.random.seed(42)

In [3]:
file_directory = os.getcwd()

In [4]:
def preprocess(text):
    text = text.lower()
    text = re.sub(r'[^\w\s]', '', text)
    text = re.sub(r'\d+', '', text)
    text = text.strip()
    return text

In [5]:
for f in os.listdir(file_directory):
    if os.path.isdir(os.path.join(file_directory, f)):
        folder = os.path.join(file_directory, f)
        for d in os.listdir(folder):
            if d[-2::] == 'sv':
                data = pd.read_csv(os.path.join(folder, d),sep="\t")

In [6]:
column_names = ['id', 'truth-value', 'statement', 'topics', 'speaker', 'speaker occupation', 'state', 'party', 'barely-true', 'false', 'half-true', 'true', 'POF', 'context']
data = pd.DataFrame()
current_data = pd.read_csv("./Liar dataset/train.tsv", sep="\t", names=column_names)

label_map = {
    'POF': 0,
    'false': 0,
    'barely-true': 0,
    'half-true': 1,
    'true': 1
}
label_columns = ['POF', 'false', 'barely-true', 'half-true', 'true']

current_data = current_data.dropna(subset=label_columns, how='all')
current_data = current_data[['statement', 'POF', 'false', 'barely-true', 'half-true', 'true']]
current_data['label'] = current_data[label_columns].idxmax(axis=1)
current_data['truth'] = current_data['label'].map(label_map)

"""weighted_sum = sum([current_data[col] * label_map[col] for col in label_columns])
total_counts = current_data[label_columns].sum(axis=1)
current_data['confidence'] = round((weighted_sum / total_counts), 2)"""
current_data = current_data[['statement', 'truth']]
current_data['statement'] = current_data['statement'].apply(preprocess)
data = pd.concat([data, current_data])

In [7]:
current_data = pd.read_csv('./dataset 1/FakeNewsNet.csv')

current_data = current_data[['title', 'real']]
current_data = current_data.rename(columns={'title':'statement', 'real':'truth'})
current_data['truth'] = current_data['truth'].astype(int)
current_data['statement'] = current_data['statement'].apply(preprocess)

In [8]:
from imblearn.over_sampling import SMOTE
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer

In [9]:
X = current_data['statement']
y = current_data['truth']

vectorizer = TfidfVectorizer(max_features=5000)
X_vectorized = vectorizer.fit_transform(X)

smote = SMOTE(random_state=42)
X_resampled, y_resampled = smote.fit_resample(X_vectorized, y)

resampled_df = pd.DataFrame({'statement': vectorizer.inverse_transform(X_resampled),  # Attempt to reverse transform
                             'truth': y_resampled})
current_data = resampled_df

In [10]:
data = pd.concat([data, current_data])

In [11]:
current_data_1 = pd.read_csv('./dataset 2/dataset/gossipcop_fake.csv')
current_data_1 = current_data_1.dropna(how='all')
current_data_1 = current_data_1[['title']]
current_data_1 = current_data_1.rename(columns={'title':'statement'})
current_data_1['truth'] = 0

In [12]:
current_data_2 = pd.read_csv('./dataset 2/dataset/gossipcop_real.csv')
current_data_2 = current_data_2.dropna(how='all')
current_data_2 = current_data_2[['title']]
current_data_2 = current_data_2.rename(columns={'title':'statement'})
current_data_2['truth'] = 1

In [13]:
current_data = pd.concat([current_data_1, current_data_2])
X = current_data['statement']
y = current_data['truth']

vectorizer = TfidfVectorizer(max_features=5000)
X_vectorized = vectorizer.fit_transform(X)

smote = SMOTE(random_state=42)
X_resampled, y_resampled = smote.fit_resample(X_vectorized, y)

resampled_df = pd.DataFrame({'statement': vectorizer.inverse_transform(X_resampled),  # Attempt to reverse transform
                             'truth': y_resampled})
current_data = resampled_df

In [14]:
data = pd.concat([data, current_data])

In [15]:
current_data_1 = pd.read_csv('./dataset 2/dataset/politifact_fake.csv')
current_data_1 = current_data_1.dropna(how='all')
current_data_1 = current_data_1[['title']]
current_data_1 = current_data_1.rename(columns={'title':'statement'})
current_data_1['truth'] = 0
current_data_1['statement'] = current_data_1['statement'].apply(preprocess)

In [16]:
current_data_2 = pd.read_csv('./dataset 2/dataset/politifact_real.csv')
current_data_2 = current_data_2.dropna(how='all')
current_data_2 = current_data_2[['title']]
current_data_2 = current_data_2.rename(columns={'title':'statement'})
current_data_2['truth'] = 1
current_data_2['statement'] = current_data_2['statement'].apply(preprocess)

In [17]:
current_data = pd.concat([current_data_1, current_data_2])
X = current_data['statement']
y = current_data['truth']

vectorizer = TfidfVectorizer(max_features=5000)
X_vectorized = vectorizer.fit_transform(X)

smote = SMOTE(random_state=42)
X_resampled, y_resampled = smote.fit_resample(X_vectorized, y)

resampled_df = pd.DataFrame({'statement': vectorizer.inverse_transform(X_resampled),  # Attempt to reverse transform
                             'truth': y_resampled})
current_data = resampled_df

In [18]:
data = pd.concat([data, current_data])

In [19]:
#logistic regression model from scratch

In [20]:
from sklearn.metrics import accuracy_score, classification_report

In [21]:
class LogisticRegression:
    def __init__(self):
        ...
        
    def sigmoid(self, n):
        return 1 / (1 + np.exp(-n))
        
    def initialize_weights(self, n_features):
        weights = np.zeros(n_features)
        bias = 0
        return weights, bias
        
    def predict(self, X, weights, bias):
        linear_model = X.dot(weights) + bias
        predictions = self.sigmoid(linear_model)
        return predictions
        
    def calculate_loss(self, y_true, y_pred):
        n = len(y_true)
        loss = (-1/n) * np.sum(y_true * np.log(y_pred) + (1 - y_true) * np.log(1 - y_pred))
        return loss
    
    def gradient_descent(self, X, y, weights, bias, lr):
        n = X.shape[0]
    
        y_pred = self.predict(X, weights, bias)
    
        dw = X.T.dot(y_pred - y) / n
        db = np.sum(y_pred - y) / n
    
        weights -= lr * dw
        bias -= lr * db
    
        return weights, bias

    def train(self, X, y, lr=.1, epochs=1000, batch_size=500):
        n_features = X.shape[1]
    
        weights, bias = self.initialize_weights(n_features)
    
        losses = []
    
        for epoch in range(epochs):
            for i in range(0, X.shape[0], batch_size):
                X_batch = X[i:i + batch_size]
                y_batch = y[i:i + batch_size]
                
                
                weights, bias = self.gradient_descent(X_batch, y_batch, weights, bias, lr)
    
            y_pred = self.predict(X, weights, bias)
            loss = self.calculate_loss(y, y_pred)
            losses.append(loss)
    
            if epoch % 100 == 0:
                print(f"Epoch {epoch}, Loss: {loss}")
        return weights, bias, losses

    def classify(self, X, weights, bias, threshold=.5):
        probabilities = self.predict(X, weights, bias)
        return [1 if p >= threshold else 0 for p in probabilities]

In [22]:
#USE FOR THE SKLEARN MODEL
vectorizer = TfidfVectorizer(max_features=5000, stop_words='english')

X = data['statement'].astype(str)

In [23]:
#USE FOR THE SKLEARN MODEL
X_tfidf = vectorizer.fit_transform(X)
y = np.array(data['truth'])

In [24]:
X_train, X_test, y_train, y_test = train_test_split(X_tfidf, y, test_size=.2, random_state=42) #USE FOR THE SKLEARN MODEL

In [25]:
#DON'T USE FOR SKLEARN
#regressor = LogisticRegression()
#weights, bias, losses = regressor.train(X_train, y_train)
#y_pred = regressor.classify(X_test, weights, bias)
#print("Accuracy:", accuracy_score(y_test, y_pred))
#print(classification_report(y_test, y_pred))

In [26]:
#logistic regression using sklearn

In [27]:
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import confusion_matrix

In [28]:
#model = LogisticRegression(max_iter=1000, solver='lbfgs')
#model.fit(X_train, y_train)

In [29]:
#y_pred = model.predict(X_test)
#y_prob = model.predict_proba(X_test)[:, 1]

In [30]:
#print("Accuracy:", accuracy_score(y_test, y_pred))
#print(classification_report(y_test, y_pred))

In [31]:
#random forest

In [32]:
from scipy.sparse import issparse, csr_matrix
from sklearn.preprocessing import MinMaxScaler

In [33]:
class DecisionTree:
    def __init__(self, max_depth=5, min_samples_split=2):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.tree = None

    def fit(self, X, y):
        self.tree = self.grow_tree(X, y, 0)

    def grow_tree(self, X, y, depth):

        if depth >= self.max_depth or X.shape[0] < self.min_samples_split:
            return np.bincount(y).argmax()

        best_feature, best_threshold = self.find_best_split(X, y)
        if best_feature is None:
            return np.bincount(y).argmax()
        X_col = X[:, best_feature].toarray().flatten()    
        left_indices = X_col <= best_threshold
        right_indices = X_col > best_threshold

        if np.sum(left_indices) == 0 or np.sum(right_indices) == 0:
            # Return majority class if split is invalid
            return np.bincount(y).argmax()
        left = self.grow_tree(X[left_indices], y[left_indices], depth + 1)
        right = self.grow_tree(X[right_indices], y[right_indices], depth + 1)
        return {'feature': best_feature, 'threshold': best_threshold, 'left': left, 'right': right}

    def find_best_split(self, X, y):
        #iterate through every split and test gini
        n_features = X.shape[1]
        features = np.random.choice(n_features, int(np.sqrt(n_features)), replace=False)
        best_gini = 1.0
        best_feature, best_threshold = None, None
        for feature in features:
            X_col = X[:, feature].toarray().flatten()
            thresholds = np.unique(X_col[X_col > 0])
            for threshold in thresholds:
                left_indices = X_col <= threshold
                right_indices = X_col > threshold
                groups = [y[left_indices], y[right_indices]]

                gini = gini_impurity(groups, np.unique(y))

                if gini < best_gini:
                    best_gini = gini
                    best_threshold = threshold
                    best_feature = feature
            
        return best_feature, best_threshold

    def _predict_tree(self, X):
        predictions = []
        for row in X:
            node = self.tree
            while isinstance(node, dict):
                if row[node['feature']] <= node['threshold']:
                    node = node['left']
                else:
                    node = node['right']
            predictions.append(node)
        return predictions

In [34]:
def gini_impurity(groups, classes):
    n_instances = sum([len(group) for group in groups])
    gini = 0.0
    for group in groups:
        size = len(group)
        if size == 0:
            continue
        score = 0.0
        for class_val in classes:
            proportion = list(group).count(class_val) / size
            score += proportion ** 2
        gini += (1.0 - score) * (size / n_instances)
    return gini

In [35]:
class RandomForest:
    def __init__(self, n_trees=10, max_depth=5, min_samples_split=2):
        self.n_trees = n_trees
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.trees = []

    def fit(self, X, y):
        for _ in range(self.n_trees):
            X_sample, y_sample = random_sample(X, y)

            tree = DecisionTree(max_depth=self.max_depth, min_samples_split=self.min_samples_split)
            tree.fit(X, y)
            self.trees.append(tree)

    def predict(self, X):
        X_dense = X.toarray() if hasattr(X, "toarray") else X  # Handle sparse matrices
        predictions = np.array([tree._predict_tree(X_dense) for tree in self.trees])
        return np.apply_along_axis(lambda x: np.bincount(x).argmax(), axis=0, arr=predictions)

In [36]:
def random_sample(X, y):
    n_samples = X.shape[0]
    indices = np.random.choice(n_samples, size=n_samples, replace=True)
    return X[indices], y[indices]

In [37]:
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor

In [38]:
#model = RandomForestClassifier(n_estimators=100, random_state=42)
#model.fit(X_train, y_train)

In [39]:
#y_pred = model.predict(X_test)

In [40]:
#print("Accuracy:", accuracy_score(y_test, y_pred))
#print(classification_report(y_test, y_pred))

In [41]:
#assume X parameter will be sparse
#print(X_train[0].indices)
#print(X_train[0].data)

In [42]:
#rf = RandomForest(n_trees=100, max_depth=10000)
#rf.fit(X_train, y_train)

#y_pred = rf.predict(X_test)

#accuracy = np.sum(y_pred == y_test) / len(y_test)
#print(f'accuracy: {accuracy}')

In [43]:
import tensorflow as tf
from transformers import MobileBertTokenizer, TFAutoModelForSequenceClassification

In [44]:
X_bert = data['statement'].astype(str)
y_bert = data['truth']
X_bert_train, X_bert_test, y_bert_train, y_bert_test = train_test_split(X_bert, y, test_size=.2, random_state=42)

In [45]:
tokenizer = MobileBertTokenizer.from_pretrained('google/mobilebert-uncased')

In [46]:
X_bert_train = X_bert_train.tolist()
X_bert_test = X_bert_test.tolist()

In [57]:
train_tokenized = tokenizer(
    X_bert_train,
    padding=True,
    truncation=True,
    max_length=64,
    return_tensors='tf'
)

test_tokenized = tokenizer(
    X_bert_test,
    padding=True,
    truncation=True,
    max_length=64,
    return_tensors='tf'
)

In [58]:
train_dataset = tf.data.Dataset.from_tensor_slices((
    {'input_ids': train_tokenized['input_ids'], 'attention_mask': train_tokenized['attention_mask']},
    y_bert_train
)).shuffle(len(X_bert_train)).batch(16)

test_dataset = tf.data.Dataset.from_tensor_slices((
    {'input_ids': test_tokenized['input_ids'], 'attention_mask': test_tokenized['attention_mask']},
    y_bert_test
)).batch(16)

train_dataset = train_dataset.prefetch(buffer_size=tf.data.AUTOTUNE)
test_dataset = test_dataset.prefetch(buffer_size=tf.data.AUTOTUNE)

train_dataset = train_dataset.cache()

In [54]:
model = TFAutoModelForSequenceClassification.from_pretrained('google/mobilebert-uncased', num_labels=2)

All model checkpoint layers were used when initializing TFMobileBertForSequenceClassification.

Some layers of TFMobileBertForSequenceClassification were not initialized from the model checkpoint at google/mobilebert-uncased and are newly initialized: ['classifier']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [55]:
from transformers import AdamWeightDecay

# Initialize the optimizer
optimizer = AdamWeightDecay(learning_rate=5e-5)

In [59]:
model.compile(optimizer=optimizer, loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), metrics=['accuracy'])

In [64]:
from tensorflow.keras.mixed_precision import Policy, set_global_policy

# Set the global policy for mixed precision
policy = Policy('mixed_float16')
set_global_policy(policy)

print("Mixed precision policy set:", policy)


Mixed precision policy set: <DTypePolicy "mixed_float16">


In [None]:
history = model.fit(train_dataset, epochs=3, validation_data=test_dataset)

Epoch 1/3
  49/4001 [..............................] - ETA: 1:41:58 - loss: 0.6538 - accuracy: 0.6250

In [None]:
from tqdm import tqdm

batch_size = len(test_tokenized['input_ids']) // 32  # Adjust based on your hardware
outputs = []

for i in tqdm(range(0, len(test_tokenized['input_ids']), batch_size)):
    batch_input_ids = test_tokenized['input_ids'][i:i+batch_size]
    batch_attention_mask = test_tokenized['attention_mask'][i:i+batch_size]
    batch_output = model(input_ids=batch_input_ids, attention_mask=batch_attention_mask)
    outputs.append(batch_output.logits)

In [None]:
all_logits = tf.concat(outputs, axis=0)

print(all_logits.shape)

In [None]:
predicted_classes = tf.argmax(all_logits, axis=-1)
correct_predictions = tf.reduce_sum(tf.cast(predicted_classes == y_bert_test, tf.float32))
accuracy = correct_predictions / len(y_bert_test)

print(f"Model Accuracy: {accuracy.numpy():.2%}")

UsageError: Line magic function `%nvidia-smi` not found.
