## Imports

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset
from transformers import BertModel, BertTokenizer
from sklearn.model_selection import train_test_split
import pickle

## Models

In [None]:
class BERTOnly(nn.Module):
    def __init__(self):
        super(BERTOnly, self).__init__()
        
        self.dropout = nn.Dropout(.2)
        
        self.bert = BertModel.from_pretrained("bert-base-uncased")

        self.dense_1 = nn.Linear(768, 128)

        self.dense_2 = nn.Linear(128, 2)


    def forward(self, tokens, mask):

        x = self.bert(tokens, attention_mask=mask)['last_hidden_state'][:, 0]

        x = self.dropout(x)
        x = F.relu(self.dense_1(x))
        x = self.dropout(x)
        x = F.softmax(self.dense_2(x))

        return x

In [2]:
class FakeBERT(nn.Module):
    def __init__(self):
        super(FakeBERT, self).__init__()


        self.dropout = nn.Dropout(.2)
        self.bert = BertModel.from_pretrained("bert-base-uncased")

        self.conv1d_1 = nn.Conv1d(100, 128, kernel_size=3)
        self.conv1d_2 = nn.Conv1d(100, 128, kernel_size=4)
        self.conv1d_3 = nn.Conv1d(100, 128, kernel_size=5)

        self.maxpool_1 = nn.MaxPool1d(kernel_size=5)
        self.maxpool_2 = nn.MaxPool1d(kernel_size=5)
        self.maxpool_3 = nn.MaxPool1d(kernel_size=5)

        self.conv1d_4 = nn.Conv1d(128, 128, kernel_size=5)
        self.maxpool_4 = nn.MaxPool1d(kernel_size=5)

        self.conv1d_5 = nn.Conv1d(128, 128, kernel_size=5)
        self.maxpool_5 = nn.MaxPool1d(kernel_size=28)

        self.flatten = nn.Flatten()

        self.dense_1 = nn.Linear(384, 128)
        self.dense_2 = nn.Linear(128, 2)

    def forward(self, tokens, mask):

        x = self.bert(tokens, attention_mask=mask)['last_hidden_state']

        x1 = F.relu(self.conv1d_1(x))
        x2 = F.relu(self.conv1d_2(x))
        x3 = F.relu(self.conv1d_3(x))

        x1 = self.maxpool_1(x1)
        x2 = self.maxpool_2(x2)
        x3 = self.maxpool_3(x3)

        x = torch.cat((x1, x2, x3), dim=2)


        x = F.relu(self.conv1d_4(x))
        x = self.maxpool_4(x)

        x = F.relu(self.conv1d_5(x))
        x = self.maxpool_5(x)

        x = self.flatten(x)

        x = self.dropout(x)
        x = F.relu(self.dense_1(x))
        x = self.dropout(x)
        x = F.softmax(self.dense_2(x))

        return x

In [3]:
class BERTwithLSTM(nn.Module):
    def __init__(self):
        super(BERTwithLSTM, self).__init__()

        self.dropout = nn.Dropout(.6)
        self.bert = BertModel.from_pretrained("bert-base-uncased")

        self.LSTM = nn.LSTM(768, 256, 1, batch_first=True)

        self.ff_1 = nn.Linear(256, 128)

        self.BatchNorm = nn.BatchNorm1d(128)

        self.ff_2 = nn.Linear(128, 32)

        self.ff_3 = nn.Linear(32, 2)


    def forward(self, tokens, mask):

        x = self.bert(tokens, attention_mask=mask)['last_hidden_state'][:, 0]

        output, _ = self.LSTM(x)

        x = F.relu(self.ff_1(output))

        x = self.BatchNorm(x)

        x = self.dropout(x)

        x = F.relu(self.ff_2(x))

        x = F.softmax(self.ff_3(x))

        return x

In [4]:
class BERTOurModel(nn.Module):
    def __init__(self):
        super(BERTOurModel, self).__init__()


        self.dropout = nn.Dropout(.6)
        self.bert = BertModel.from_pretrained("bert-base-uncased")

        self.LSTM = nn.LSTM(768, 256, 2, batch_first=True)

        self.ff_1 = nn.Linear(256, 128)

        self.BatchNorm = nn.BatchNorm1d(128)

        self.ff_2 = nn.Linear(128, 32)

        self.ff_3 = nn.Linear(32, 2)


    def forward(self, tokens, mask):

        x = self.bert(tokens, attention_mask=mask)['last_hidden_state']

        output, _ = self.LSTM(x)
        out = output[:, -1, :]

        x = F.relu(self.ff_1(out))

        x = self.BatchNorm(x)

        x = self.dropout(x)

        x = F.relu(self.ff_2(x))

        x = F.softmax(self.ff_3(x))

        return x

## Data loading (O'Brien et al.)

In [None]:
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# Load data
real_examples = pickle.load(open('./clean/real.pkl', "rb"))
real_examples = [s.strip() for s in real_examples]
real_labels = [1] * len(real_examples)

fake_examples = pickle.load(open('./clean/fake.pkl', "rb"))
fake_examples = [s.strip() for s in fake_examples]
fake_labels = [0] * len(fake_examples)

X = real_examples + fake_examples
y = real_labels + fake_labels

X_trainval, X_testb, y_trainval, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
X_trainb, X_valb, y_train, y_val = train_test_split(X_trainval, y_trainval, test_size=0.2, random_state=42)

X_train = tokenizer.batch_encode_plus(
    X_trainb,
    max_length = 100,
    padding='max_length',
    truncation=True
)

train_ids = torch.tensor(X_train['input_ids'])
train_mask = torch.tensor(X_train['attention_mask'])
train_label = torch.tensor(y_train)

X_val = tokenizer.batch_encode_plus(
    X_valb,
    max_length = 100,
    padding='max_length',
    truncation=True
)

val_ids = torch.tensor(X_val['input_ids'])
val_mask = torch.tensor(X_val['attention_mask'])
val_label = torch.tensor(y_val)

X_test = tokenizer.batch_encode_plus(
    X_testb,
    max_length = 100,
    padding='max_length',
    truncation=True
)

test_ids = torch.tensor(X_test['input_ids'])
test_mask = torch.tensor(X_test['attention_mask'])
test_label = torch.tensor(y_test)

train_dataset = TensorDataset(train_ids, train_mask, train_label)
val_dataset = TensorDataset(val_ids, val_mask, val_label)
test_dataset = TensorDataset(test_ids, test_mask, test_label)

train_dataloader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=8)
test_dataloader = DataLoader(test_dataset, batch_size=8)

## Set model to use and hyperparameters

In [None]:
model = BERTwithLSTM().to(device)
model_save_path = "bertlstm.pt"
optimizer = torch.optim.Adadelta(model.parameters(), lr=1e-2)
num_epochs = 10

## Train the model

In [None]:
# Define the device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Define the optimizer and loss function
criterion = torch.nn.CrossEntropyLoss()

# Training loop
best_val_loss = float("inf")

for epoch in range(num_epochs):
    # Training
    print(f"Beginning epoch {epoch+1}/{num_epochs}")
    model.train()
    train_loss = 0.0
    train_correct = 0

    for inputs, attention_mask, labels in train_dataloader:
        inputs = inputs.to(device)
        attention_mask = attention_mask.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()

        outputs = model(inputs, attention_mask)

        _, predictions = torch.max(outputs, 1)
        train_correct += torch.sum(predictions == labels)

        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        train_loss += loss.item() * inputs.size(0)

    train_accuracy = train_correct.double() / len(train_dataset)
    train_loss /= len(train_dataset)

    # Validation
    model.eval()
    val_loss = 0.0
    val_correct = 0

    with torch.no_grad():
        for inputs, attention_mask, labels in val_dataloader:
            inputs = inputs.to(device)
            attention_mask = attention_mask.to(device)
            labels = labels.to(device)

            outputs = model(inputs, attention_mask)

            _, predictions = torch.max(outputs, 1)
            val_correct += torch.sum(predictions == labels)

            loss = criterion(outputs, labels)
            val_loss += loss.item() * inputs.size(0)

    val_accuracy = val_correct.double() / len(val_dataset)
    val_loss /= len(val_dataset)

    print(f"Epoch {epoch+1}/{num_epochs}")
    print(f"Train Loss: {train_loss:.4f} | Train Accuracy: {train_accuracy:.4f}")
    print(f"Val Loss: {val_loss:.4f} | Val Accuracy: {val_accuracy:.4f}")
    print("-----------------------------------------")

    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), model_save_path)

## Get metrics on test data

In [None]:
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score

def get_scores(model, test_dataloader):
    all_predictions = []
    all_labels = []

    with torch.no_grad():
        for inputs, attention_mask, labels in test_dataloader:
            inputs = inputs.to(device)
            attention_mask = attention_mask.to(device)
            labels = labels.to(device)

            outputs = model(inputs, attention_mask)

            _, predictions = torch.max(outputs, 1)
            predictions = predictions.tolist()
            all_predictions += predictions
            labels = labels.tolist()
            all_labels += labels


    acc = accuracy_score(all_labels, all_predictions)
    prec = precision_score(all_labels, all_predictions)
    rec = recall_score(all_labels, all_predictions)
    f1 = f1_score(all_labels, all_predictions)

    return acc, prec, rec, f1, all_labels, all_predictions

In [None]:
current_model = BERTwithLSTM()
current_model.load_state_dict(torch.load('bertlstm.pt'))
current_model.cuda()
current_model.eval()

acc, prec, rec, f1, l, pr = get_scores(current_model, test_dataloader)

print('Accuracy: ', acc)
print('Precision: ', prec)
print('Recall: ', rec)
print('F1 Score: ', f1)