In [None]:
from google.colab import drive
drive.mount('/content/gdrive', force_remount=True)

In [None]:
ls

In [None]:
cd gdrive/MyDrive/projects-bias-bot/src

In [None]:
import numpy as np
import pandas as pd
import pickle
import re
import math
import random
from tqdm import tqdm

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.autograd import Variable
from torch.utils.data import DataLoader
from torch.nn.utils.rnn import pack_padded_sequence

from tqdm import tqdm
import gensim.downloader as api
from sklearn.metrics import classification_report

from Article import Article

In [None]:
class LSTM(nn.Module):
	"""Long Short-Term Memory deep learning model"""
	def __init__(self, input_size, emb_dim, output_size, num_layers, embeds=None):
		"""initialize model"""
		super().__init__()
		self.emb = nn.Embedding(input_size, emb_dim)
		if embeds is not None:
			self.emb.weight = nn.Parameter(torch.Tensor(embeds))
		
		self.lstm = nn.LSTM(emb_dim, emb_dim, num_layers=num_layers, bidirectional=True, batch_first=True)
		self.linear = nn.Linear(emb_dim*2, output_size)
		
	def forward(self, input_seq):
		"""forward direction of neural network"""

		embeds = self.emb( input_seq )

		output_seq , (h_last, c_last) = self.lstm( embeds )

		h_direc_1 = h_last[4,:,:]
		h_direc_2 = h_last[5,:,:]
		h_direc_12 = torch.cat( (h_direc_1, h_direc_2), dim=1 )

		return self.linear(h_direc_12)

In [None]:
def load_vocab(data, include_text=True):
	"""return a dictionary of each word in the corpus and its frequency"""
	vocab = dict()
	for item in data:
		text = item.headline
		if include_text:
			text += item.text
		for word in text:
			if word in vocab:
				vocab[word] += 1
			else:
				vocab[word] = 1
	vocab = dict(sorted(vocab.items(), key=lambda item: -item[1]))
	return vocab

def make_vocab_dict(vocab):
	"""return a dictionary that maps each word in the corpus to a token"""
	word_to_index = {"UNK":0,"FOX":1,"CNN":2,"BBC":3,"Liberal":4,"Conservative":5,"Independent":6,"Other":7}
	count = len(word_to_index)+1
	for word in vocab:
		if word not in word_to_index:
			word_to_index[word] = count 
			count += 1
	return word_to_index

def load_bigrams(data):
	"""return a dictionary of each bigram in the corpus and its frequency"""
	bigram_to_index = dict()
	count = 0
	for article in data:
		full_text = article.headline + article.text
		for i, word in enumerate(full_text):
			if i==0:
				continue
			bigram = (full_text[i-1],word)
			if bigram not in bigram_to_index:
				bigram_to_index[bigram]=count
				count+=1
	return bigram_to_index

def make_unigrams(data, word_to_index, party=None, include_text=True):
	"""return tokenized input features, using unigrams"""
	processed_data = []
	for article in data:
		datapoint = []
		if party==None and article.party not in ["Liberal", "Conservative"]:
			continue
		elif article.party != party and party!="Combined":
			continue
		elif party=="Combined":
			datapoint = [word_to_index[article.party]]
		datapoint += [word_to_index[article.source]] + [word_to_index[word] if word in word_to_index else word_to_index["UNK"] for word in article.headline]
		if include_text:
			datapoint += [word_to_index[word] if word in word_to_index else word_to_index["UNK"] for word in article.text]
		label = label_to_index[article.label]

		processed_data.append( (datapoint, label) )
	return processed_data

def make_bigrams(data, bigram_to_index, party):
	"""return tokenized input features, using bigrams"""
	processed_data = []
	for article in data:
		if article.party != party:
			continue
		datapoint = []
		full_text = article.headline + article.text
		for i, word in enumerate(full_text):
			if i==0:
				continue
			datapoint.append(bigram_to_index[(full_text[i-1],word)])
		label = label_to_index[article.label]

		processed_data.append( (datapoint, label) )
	return processed_data
	
def split_data(processed_data):
	return processed_data[:math.floor(0.9*len(processed_data))], processed_data[math.floor(0.9*len(processed_data)):]


def process_batch(batch):
	"""convert batch to tensors"""
	x = torch.zeros((len(batch), max_len), dtype=torch.long)
	y = torch.zeros((len(batch)), dtype=torch.long)
	for idx, (text, label) in enumerate(batch):
		x[idx,:len(text)] = torch.Tensor(text)
		y[idx] = label
	return x.to(device), y.to(device)

def get_error(scores, labels):
	"""get error on a batch of scores based on their expected labels"""
	bs=scores.size(0)
	predicted_labels = scores.argmax(dim=1)
	indicator = (predicted_labels == labels)
	num_matches=indicator.sum()
	
	return 1-num_matches.float()/bs  

def evaluate(model, test_data):
	"""evaluate the model on test data"""
	with torch.no_grad():
		model.eval()
		x_test, y_test = process_batch(test_data)
	
		pred_y_test = model(x_test)
	
		labels = y_test.tolist()
		predictions = [torch.argmax(pred).item() for pred in pred_y_test]

		print(classification_report(labels, predictions, target_names=["Is Biased","Is Not Biased"], zero_division=0))
		print("Error:",get_error(pred_y_test, y_test).item())



In [None]:
device= torch.device("cuda")
print(device)

# Run only for a single political group
# party = "Liberal"
# party = "Conservative"
party = "Combined"

mode = "unigram"

# Load data
print("Loading data...")
with open("../data/processed_articles.p", "rb") as f:
	data = pickle.load(f)
	random.shuffle(data)

vocab = load_vocab(data)
print(vocab)

vocab_cutoff=9000
trimmed_vocab=dict()
for i, (word, count) in enumerate(vocab.items()):
	if i <= vocab_cutoff:
		trimmed_vocab[word] = count

word_to_index = make_vocab_dict(trimmed_vocab)
index_to_word = {v: k for k, v in word_to_index.items()}

print(word_to_index)
print(index_to_word)

bigram_to_index = load_bigrams(data)
index_to_bigram = {v: k for k, v in bigram_to_index.items()}


label_to_index = {"is-biased":0, "is-not-biased":1}
max_len = max([len(article.headline + article.text) for article in data]) + 3

print("Creating train data set...")

if mode=="unigram":
	unigrams = make_unigrams(data, word_to_index, party)
	train_data, test_data = split_data(unigrams)
	input_size = len(word_to_index)+1
elif mode=="bigram":
	bigrams = make_bigrams(data, bigram_to_index, party)
	train_data, test_data = split_data(bigrams)
	input_size = len(bigram_to_index)+1

# Hyper parameters
output_size = 2 
num_layers = 3
batch_size = 16
learning_rate = 0.001
epochs = 10

#Load pre-trained word embeddings, if using them.
# embeds = api.load('glove-twitter-25').vectors
# emb_dim = embeds.shape[1]

embeds = None
emb_dim = 50

train_dataloader = DataLoader(train_data, batch_size=batch_size, shuffle=False, collate_fn=process_batch)

num_biased = len([item for item in train_data if item[1]==0])
num_unbiased = len([item for item in train_data if item[1]==1])

print("num biased: ", num_biased)
print("num unbiased: ", num_unbiased)

weights = torch.tensor([num_biased/num_biased,num_biased/num_unbiased]).to(device)
criterion = nn.CrossEntropyLoss(weight=weights)

# Build model
model = LSTM(input_size, emb_dim, output_size, num_layers, embeds).to(device)
# criterion = nn.CrossEntropyLoss()

In [None]:
print(len(data))

In [None]:
print(len(word_to_index))

In [None]:
print(len(vocab))

In [None]:
for i in range(10):
  item1 = train_data[i]
  print([index_to_word[index] for index in item1[0]])

In [None]:
# Train loop
for epoch in range(epochs):

	print(f"\n\nEpoch {epoch}")

	if epoch >= 5:
		learning_rate = learning_rate/2
	optimizer = optim.Adam(model.parameters(), lr=learning_rate)

	model.train()

	running_error = 0
	count = 0

	for x,y in tqdm(train_dataloader):
   
		if x.size()[0] != batch_size:
			continue 

		scores = model(x)
		scores = scores.view(-1,2)

		loss = criterion(scores, y)
		loss.backward()
		optimizer.step()
		optimizer.zero_grad()

		error = get_error(scores, y)
		running_error += error.item()
		count += 1


	print("\nEvaluate on test:")
	evaluate(model, test_data)
	print("\nRunning Error:", running_error/count)

# Evaluate
torch.save(model.state_dict(),f"model_{party}")




In [None]:
evaluate(model, test_data)