# Import Libraries

In [1]:
from colors import ColorsCorpusReader
from nltk.translate.bleu_score import corpus_bleu
from collections import Counter
import re
import torch.nn as nn
import numpy as np
import os
from sklearn.model_selection import train_test_split
from scipy.fft import fft
import colorsys
from itertools import product
from torch_color_describer import (ContextualColorDescriber, create_example_dataset)
import utils
from utils import START_SYMBOL, END_SYMBOL, UNK_SYMBOL
%load_ext autoreload

# Tokenizer & Color Representation from HW 4 - Is my FFT right?

In [2]:
def tokenize_example(s,counts={}):

    # Lower Case the String (Monroe et al. 2017 paper)
    s = s.lower()
    # split endings -er, -ish, -est
    trimmed = []
    for word in s.split():
        if word.endswith("er"):
            trimmed.extend([word[:-2],"er"])
        elif word.endswith("ish"):
            trimmed.extend([word[:-3],"ish"])
        elif word.endswith("est"):
            trimmed.extend([word[:-3],"est"])
        else:
            trimmed.append(word)
    s = " ".join(trimmed)

    # split punctuation (Monroe et al. 2017 paper)
    s = re.findall(r"[\w']+|[.,!?;]", s)

    ## If the word only appears once in text corpus, set to UNK_SYMBOL (Monroe et al. 2017 paper)
    if len(counts)!=0:
        for index in range(0,len(s)):
            if counts[s[index]]==1:
                s[index]=UNK_SYMBOL

    return [START_SYMBOL] + s + [END_SYMBOL]
def represent_color_context(colors):

# Improve me!

    return [represent_color(color) for color in colors]

def represent_color(color):

    rgb = colorsys.hls_to_rgb(color[0],color[1],color[2])
    hsv = colorsys.rgb_to_hsv(rgb[0],rgb[1],rgb[2])
    color = []
    for j, k, l in product((0, 1, 2), repeat=3): 
        f_ijk = fft([j*hsv[0]+k*hsv[1]+l*hsv[2]])
        color.extend([f_ijk.real[0],f_ijk.imag[0]])
    color = np.array(color)
    return color

# Load Data, Tokenize Sequences, Represent Colors

In [3]:
utils.fix_random_seeds()
COLORS_SRC_FILENAME = os.path.join(
    "data", "colors", "filteredCorpus.csv")

dev_corpus = ColorsCorpusReader(
    COLORS_SRC_FILENAME,
    word_count=2,
    normalize_colors=True)

dev_examples = list(dev_corpus.read())
dev_rawcols, dev_texts = zip(*[[ex.colors, ex.contents] for ex in dev_examples])
dev_rawcols_train, dev_rawcols_test, dev_texts_train, dev_texts_test = \
    train_test_split(dev_rawcols, dev_texts)
counts = Counter()
# Get counts in train vocaab
for sentence in dev_texts_train:
    counts.update(word.strip('.,?!"\'').lower() for word in sentence.split())
    

dev_seqs_train = [tokenize_example(s,counts) for s in dev_texts_train]
dev_seqs_test = [tokenize_example(s,counts) for s in dev_texts_test]
dev_cols_train = [represent_color_context(colors) for colors in dev_rawcols_train]
dev_cols_test = [represent_color_context(colors) for colors in dev_rawcols_test]
dev_vocab = sorted({w for toks in dev_seqs_train for w in toks})
dev_vocab += [UNK_SYMBOL]

#  LSTM Encoder

In [4]:
%autoreload 2
class OriginalEncoder(nn.Module):
    def __init__(self, color_dim, hidden_dim,embedding_proj):
        """
        Simple Encoder model based on a GRU cell.

        Parameters
        ----------
        color_dim : int

        hidden_dim : int

        """
        super().__init__()
        self.color_dim = color_dim
        self.hidden_dim = hidden_dim
        self.embedding_proj = embedding_proj

#         self.linear = nn.Linear(self.color_dim,self.embedding_proj)
        self.rnn = nn.LSTM(
            input_size=self.color_dim,
            hidden_size=self.hidden_dim,
            batch_first=True)

    def forward(self, color_seqs):
        """
        Parameters
        ----------
        color_seqs : torch.FloatTensor
            The shape is `(m, n, p)` where `m` is the batch_size,
             `n` is the number of colors in each context, and `p` is
             the color dimensionality.

        Returns
        -------
        hidden : torch.FloatTensor
            These are the final hidden state of the RNN for this batch,
            shape `(m, p) where `m` is the batch_size and `p` is
             the color dimensionality.

        """
        output, hidden = self.rnn(color_seqs)

        return output,hidden

# Encoder-Decoder Training Scheme

In [25]:
%autoreload 2

from torch_color_describer import EncoderDecoder

class ColorizedEncoderDecoder(EncoderDecoder):

    def forward(self,
            color_seqs,
            word_seqs,
            seq_lengths=None,
            hidden=None,
            output=None,
            targets=None):
#         if hidden is None and output is None:
        output,hidden = self.encoder(color_seqs)
            
        # Extract the target colors from `color_seqs` and
        # feed them to the decoder, which already has a
        # `target_colors` keyword.
        target_colors = color_seqs[:,2,:]
        output, hidden = self.decoder.forward(word_seqs,
                                              seq_lengths=seq_lengths, 
                                              hidden=hidden,
                                              enc_outputs=output,
                                              target_colors = target_colors)
        
        # Your decoder will return `output, hidden` pairs; the
        # following will handle the two return situations that
        # the code needs to consider -- training and prediction.
        if self.training:
            return output
        else:
            return output, hidden

# Build Graph 

In [26]:
%autoreload 2
class ColorizedInputDescriberOriginal(ContextualColorDescriber):
        def __init__(self, *args, num_layers=2, **kwargs):
            self.num_layers = num_layers
            super().__init__(*args, **kwargs)

        def build_graph(self):

            # We didn't modify the encoder, so this is
            # just copied over from the original:
            # hidden dim of 100 from paper
            encoder = OriginalEncoder(
                color_dim=self.color_dim,
                hidden_dim=100,
                embedding_proj=100)

            # Use your `ColorContextDecoder`, making sure
            # to pass in all the keyword arguments coming
            # from `ColorizedInputDescriber`:


            decoder = ColorContextDecoder(
                vocab_size=self.vocab_size,
                color_dim = self.color_dim, 
                embed_dim=self.embed_dim,
                embedding=self.embedding,
                hidden_dim=100,
                freeze_embedding=False)

            self.embed_dim = decoder.embed_dim

            return ColorizedEncoderDecoder(encoder, decoder)

# Decoder with Bahdanau Attention
## A lot of resources, this seemed to me most valuable https://blog.floydhub.com/attention-mechanism/

In [313]:
%load_ext autoreload
%autoreload 2
from torch_color_describer import Decoder
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_color_describer import (ContextualColorDescriber, create_example_dataset)

class ColorContextDecoder(Decoder):
    def __init__(self, color_dim, *args, **kwargs):
        self.color_dim = color_dim
        super().__init__(*args, **kwargs)

        # Implementing Bahdanau attention - using single linear layer
        # instead of one hidden layer for encoder outputs and 1 linear layer for decoder
        self.fc_hidden = nn.Linear(self.hidden_dim, self.hidden_dim, bias=False)
        self.fc_encoder = nn.Linear(self.hidden_dim, self.hidden_dim, bias=False)
        self.V = nn.Parameter(torch.rand(self.hidden_dim))

        self.input_size = self.embed_dim+self.color_dim+self.hidden_dim
        self.rnn = nn.LSTM(
            input_size=self.input_size,
            hidden_size=self.hidden_dim,
            batch_first=True)
    
    def get_embeddings(self, word_seqs, target_colors=None):
        """
        You can assume that `target_colors` is a tensor of shape
        (m, n), where m is the length of the batch (same as
        `word_seqs.shape[0]`) and n is the dimensionality of the
        color representations the model is using. The goal is
        to attached each color vector i to each of the tokens in
        the ith sequence of (the embedded version of) `word_seqs`.

        """
        word_embeddings = self.embedding(word_seqs)
        colors_repeated = torch.repeat_interleave(target_colors.unsqueeze(1),word_embeddings.shape[1],1)
        combined = torch.cat((word_embeddings,colors_repeated),dim=2)
        return combined
    

    def forward(self, word_seqs=None, seq_lengths=None, hidden=None, target_colors=None,enc_outputs=None):
        embs = self.get_embeddings(word_seqs, target_colors=target_colors)
        targ_seq_len = word_seqs.shape[1]
        src_len = enc_outputs.shape[1]
        batch_size = enc_outputs.shape[0]
        output_tensor = torch.empty(batch_size,targ_seq_len,self.hidden_dim)

        # Permute enc_outputs 
        enc_outputs = enc_outputs.permute(1,0,2)
        if self.training:


                # Target Embedding
            for targ in range(0,targ_seq_len):
                target = embs[:,targ,:]
#                 print(target.shape)
#                 # Decoder Hidden State - for first pass, decoder hidden state is last encoder hidden state
                dec_hidden = hidden[0]

#                 # Repeat Decoder Hidden State so we can 
                dec_hidden_rep = dec_hidden.repeat(src_len, 1, 1)
                
#                 # Concat decoder hidden state with encoder outputs
#                 # Maybe instead of concat we need two sepearte hidden layers?
                combined_hidden = torch.cat((dec_hidden_rep, enc_outputs), 2)
                
#                 # Calculating Alignment Scores
                
                attn_weights = torch.tanh(self.fc_hidden(dec_hidden_rep)+self.fc_encoder(enc_outputs))
#                 attn_weights = torch.tanh(self.attn(combined_hidden))

                attn_weights = attn_weights.permute(1, 2, 0)
    
#                 #learnable vector to get alignment scores - repeat for size of batch
                V = self.V.repeat(batch_size, 1).unsqueeze(1)
                e = torch.bmm(V, attn_weights).squeeze(1)
                
#                 # Softmaxing alignment scores to get Attention weights
                attn_applied = F.softmax(e, dim=1)

#                 # Multiplying the Attention weights with encoder outputs to get the context vector

                context_vector = torch.bmm(attn_applied.unsqueeze(1), enc_outputs.permute(1,0,2))
                
#                 # Concat context vector with input embeddings
                rnn_input =  torch.cat((context_vector, target.unsqueeze(1)), 2) 
#                 ### Input to rnn
                output, hidden = self.rnn(rnn_input, hidden)

#                 # assign output to output_tensor
                output_tensor[:,targ,:] = output.squeeze(1)
            # Linear Layer to score over words
            # Drop the final element:
            output_tensor = self.output_layer(output_tensor)
            output_tensor=output_tensor[:, : -1, :]
            
            
            # Reshape for the sake of the loss function:
            output_tensor = output_tensor.transpose(1, 2)
            
#             print(output.shape)
#             print("*"*50)
            return output_tensor, hidden
        else:
            embs = self.get_embeddings(word_seqs[:,0].unsqueeze(1), target_colors=target_colors)
            for targ in range(0,word_seqs.shape[1]):
                
                # Decoder Hidden State - for first pass, decoder hidden state is last encoder hidden state
                dec_hidden = hidden[0]

                # Repeat Decoder Hidden State so we can 
                dec_hidden_rep = dec_hidden.repeat(src_len, 1, 1)
                
                # Concat decoder hidden state with encoder outputs
                # Maybe instead of concat we need two sepearte hidden layers?
                combined_hidden = torch.cat((dec_hidden_rep, enc_outputs), 2)
                
                # Calculating Alignment Scores

                attn_weights = torch.tanh(self.fc_hidden(dec_hidden)+self.fc_encoder(enc_outputs))
                attn_weights = attn_weights.permute(1, 2, 0)
                
                # Repeat learnable vector V batch times
                V = self.V.repeat(batch_size, 1).unsqueeze(1)
                e = torch.bmm(V, attn_weights).squeeze(1)

                # Softmaxing alignment scores to get Attention weights
                attn_applied = F.softmax(e, dim=1)

                # Multiplying the Attention weights with encoder outputs to get the context vector
                context_vector = torch.bmm(attn_applied.unsqueeze(1), enc_outputs.permute(1,0,2))

                # RNN Input 
                rnn_input =  torch.cat((context_vector, embs), 2)
                
                # Input to RNN
                output, hidden = self.rnn(rnn_input, hidden)
                
                #filling output tensors with output predictions
#                 print(output.shape)
#                 print(output_tensor.shape)

                output_tensor[:,targ,:] = output.squeeze(1)

                # We take the argmax across the last dimension to get the embedding
                # we feed into the next iteration
                output = self.output_layer(output)

                predictions = output.argmax(2)
                # Get embedding from prediction
                embs = self.get_embeddings(predictions, target_colors=target_colors)
#                 print(embs)
            # Output linear layer to prohect output tensor onto vocabular
            output_tensor = self.output_layer(output_tensor)
#             print(output_tensor.shape)
            return output_tensor, hidden




The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [314]:
##### %load_ext autoreload
%autoreload 2
def test_full_system(describer_class):
    toy_color_seqs, toy_word_seqs, toy_vocab = create_example_dataset(
        group_size=50, vec_dim=2)

    toy_color_seqs_train, toy_color_seqs_test, toy_word_seqs_train, toy_word_seqs_test = \
        train_test_split(toy_color_seqs, toy_word_seqs)
    toy_mod = describer_class(toy_vocab,max_iter=500)

    _ = toy_mod.fit(toy_color_seqs_train, toy_word_seqs_train)

    acc = toy_mod.listener_accuracy(toy_color_seqs_test, toy_word_seqs_test)

    return acc

from torch_color_describer import (ContextualColorDescriber, create_example_dataset)

test_full_system(ColorizedInputDescriberOriginal)

Stopping after epoch 322. Training loss did not improve more than tol=1e-05. Final error is 0.0015495867701247334.

1.0

# Test System - Expect Perfect Score (1.0)

In [251]:
def my_original_system(trained_model, color_seqs_test, texts_test):
    """
    Feel free to modify this code to accommodate the needs of
    your system. Just keep in mind that it will get raw corpus
    examples as inputs for the bake-off.

    """
    # `word_seqs_test` is a list of strings, so tokenize each of
    # its elements:
    tok_seqs = [tokenize_example(s) for s in texts_test]

    col_seqs = [represent_color_context(colors)
                for colors in color_seqs_test]


    # Optionally include other preprocessing steps here. Note:
    # DO NOT RETRAIN YOUR MODEL! It's a tempting step, but it's
    # a mistake and will get you disqualified!


    # The following core score calculations are required:
    return trained_model.evaluate(col_seqs, tok_seqs)

# Run over entire dataset

In [252]:
%autoreload 2

from torch_color_describer import (ContextualColorDescriber, create_example_dataset)

dev_mod = ColorizedInputDescriberOriginal(vocab = dev_vocab,   max_iter=500)
dev_mod.fit(dev_cols_train, dev_seqs_train)
my_original_system(dev_mod, dev_rawcols_test, dev_texts_test)

  perp = [np.prod(s)**(-1/len(s)) for s in scores]


{'listener_accuracy': 0.6147422977253095, 'corpus_bleu': 0.09471638353008925}

In [113]:
my_original_system(dev_mod, dev_rawcols_test[0:50], dev_texts_test[0:50])

{'listener_accuracy': 0.5, 'corpus_bleu': 0.09600000000000002}

In [None]:
print( dev_texts_test[0:10])

In [283]:
COLORS_BAKEOFF_SRC_FILENAME = os.path.join(
    "data", "colors", "cs224u-colors-bakeoff-data.csv")

bakeoff_corpus = ColorsCorpusReader(COLORS_BAKEOFF_SRC_FILENAME)

# This code just extracts the colors and texts from the new corpus:
bakeoff_rawcols, bakeoff_texts = zip(*[
    [ex.colors, ex.contents] for ex in bakeoff_corpus.read()])

# Original system function call; `my_mod` is your trained model:
print(my_original_system(dev_mod, bakeoff_rawcols, bakeoff_texts))

  for t, w in zip(pred, seq)])


{'listener_accuracy': 0.6794682422451994, 'corpus_bleu': 0.07959133431806992}
