In [None]:
import numpy as np 
import torch
import torch.nn as nn
import torch.nn.functional as F
from google.colab import files
import bcolz


# Glove Embeddings

In [None]:
vectors = bcolz.open('6B.100.dat')[:]
words = pickle.load(open('6B.100_words.pkl', 'rb'))
words += ['<UNK>', '<s>', '</s>', 'PAD']
vocab_list_glove = set(words)
new_vecs = np.random.normal(loc=0.0, scale=.6, size=(4,100) )
vectors = np.vstack((vectors, new_vecs))
word2idx = pickle.load(open(f'6B.100_idx.pkl', 'rb'))
word2idx['<UNK>'] = 400000
word2idx['<s>'] = 400001
word2idx['</s>'] = 400002
word2idx['PAD'] = 400003

# Neural Net

### Retweet Network: Takes in a tweet as input, can use embedded version, and can any combination of bidirectional, LSTM, GRU, concatenates it with metadata vector, and uses a feedforward neural net with 1 hidden layer to perform a regression prediction on the retweet count. 

In [None]:
class RetweetNet(nn.Module):
    def __init__(self, vocab_size, hidden_state_sizes, meta_data_len, output_size, embedding_dim, hidden_dim, 
                 n_layers, drop_prob=0.5, bidirectional = False, GRU = False):
        super().__init__()
        self.GRU_val = GRU
        self.bidirectional = bidirectional
        self.output_size = output_size
        self.n_layers = n_layers
        self.hidden_dim = hidden_dim
        
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        if GRU == False: 
            self.GRU = nn.LSTM(embedding_dim, hidden_dim, n_layers, 
                               dropout=drop_prob, batch_first=True, bidirectional = bidirectional)
        else: 
            self.GRU = nn.GRU(embedding_dim, hidden_dim, n_layers, 
                              dropout=drop_prob, batch_first=True, bidirectional = bidirectional)
        self.dropout = nn.Dropout(0.2)
        self.fc1 = nn.Linear(hidden_dim, hidden_state_sizes[0])
        self.relu = nn.RELU()
        
        #hidden_state_sizes[0] is the size of the output of lstm 
        self.fc2 = nn.Linear(hidden_state_sizes[0] + meta_data_len, hidden_state_sizes[1])
        
        #hidden_state_sizes[1] is the size of the first and only hidden layer
        self.fc3 = nn.Linear(hidden_state_sizes[1], 1)

        
    def forward(self, x, meta_data, hidden):
        batch_size = x.size(0)
        x = x.long()
        embeds = self.embedding(x)
        gru_out, hidden = self.GRU(embeds, hidden)
        gru_out = gru_out.contiguous().view(-1, self.hidden_dim)
    
        out = self.dropout(gru_out)
        out = self.fc1(out)
        out = out.view(batch_size, -1)
        #out = out[:,-1] 
        meta_data = meta_data.view(batch_size, -1)
        
        # combine hidden state and meta_data
        out = torch.cat((out, meta_data), dim = 1) #meta_data is of shape (batch_size, -1)
        
        out = self.fc2(out)
        
        # applying dropout before relu since relu already sets some neurons to 0
        out = self.dropout(out)
        out = self.relu(out)
        out = self.fc3(out)
        
        return out, hidden
    
    def init_hidden(self, batch_size):
        weight = next(self.parameters()).data
        n = 1
        if self.bidirectional == True: 
            n = 2
        if self.GRU_val == False:
            return (weight.new(self.n_layers * n, batch_size, self.hidden_dim).zero_().to('cuda'),
                      weight.new(self.n_layers * n, batch_size, self.hidden_dim).zero_().to('cuda'))
        return  weight.new(self.n_layers * n, batch_size, self.hidden_dim).zero_().to('cuda')
    
    
def create_emb_layer(weights_matrix, non_trainable=False):
    num_embeddings, embedding_dim = weights_matrix.shape
    emb_layer = nn.Embedding(num_embeddings, embedding_dim)
    emb_layer.load_state_dict({'weight': torch.from_numpy(weights_matrix)})
    if non_trainable:
        emb_layer.weight.requires_grad = False
    return emb_layer, num_embeddings, embedding_dim


class embedded_RetweetNet(RetweetNet):
    def __init__(self, vocab_size, weights_matrix, output_size, embedding_dim, hidden_dim, 
                 n_layers, drop_prob=0.5, bidirectional = False, GRU = False):
        super().__init__(vocab_size, output_size, embedding_dim, hidden_dim, n_layers, 
                         drop_prob=0.5, bidirectional = False, GRU = False)
        self.embedding, num_embeddings, embedding_dim = create_emb_layer(weights_matrix, True)
        
def train_retweet_predictor(model, epochs = 2,print_every = 1000, clip = 5, valid_loss_min = np.Inf, 
                   lr=0.005, batch_size = 400, device = 'cuda', GRU = False, weight_decay = 1e-5): 
    counter = 0
    model.train()
    
    criterion = nn.MSELoss()
    
    # weight decay is the l2 regularization penalty 
    optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)
    
    for i in range(epochs):
        h = model.init_hidden(batch_size)

    for inputs, labels in train_loader:
        counter += 1
        if GRU == False: 
            h = tuple([each.data for each in h])
        else:
            h = h.data
        inputs, labels = inputs.to(device), labels.to(device)
        model.zero_grad()
        output, h = model(inputs, h)
        loss = criterion(output.squeeze(), labels.float())
        loss.backward()
        nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()
        print("Epoch: {}/{}...".format(i+1, epochs),
            "Step: {}...".format(counter),
            "Loss: {:.6f}...".format(loss.item()))
          
def error_retweet_predictor(model, batch_size = 359, device = 'cuda', GRU = False): 
    test_losses = []
    num_correct = []
    model.cuda()

    h = model.init_hidden(batch_size)
    criterion = nn.MSELoss()

    model.eval()
    for inputs, labels in test_loader:
        if GRU == True: 
            h = h.data
        else: 
            h = tuple([each.data for each in h])
        inputs, labels = inputs.to(device), labels.to(device)
        output, h = model(inputs, h)
        test_loss = criterion(output.squeeze(), labels.float())
        test_losses.append(test_loss.item())
        pred = torch.round(output.squeeze())
        
        errors = torch.sum(torch.square(pred - labels.float().view_as(pred)), 
                           axis= 1)/(predicted_x.size()[0]
        num_correct.append(np.squeeze(errors.cpu().numpy()))     
    print("Test loss: {:.3f}".format(np.mean(test_losses)))
    print("Test accuracy: {:.3f}%".format(np.mean(num_correct)))