<a href="https://colab.research.google.com/github/sophiahchiang/slant-rhyme-nn/blob/main/CompLing_Final.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Packages
!pip install pronouncing
!pip install eng-to-ipa
import pronouncing as pro
import eng_to_ipa as ipa
import numpy as np
import re
import math
import string
import random
import torch
import torch.nn as nn
import io
import pandas as pd
from google.colab import files




In [None]:
# Small test corpus (Sean)
corpus = '''
         IN A SHORT speech on November 19th Narendra Modi, India’s prime minister, made a humiliating U-turn. Barely a year after rushing a trio of laws reforming agriculture through parliament, he announced their repeal. The shame was not only to have handed victory to the horde of tractor-mounted yokels doggedly protesting at the gates of India’s capital since last November. It was to have bungled the issue from the start. Indian farming does indeed desperately need reform. Yet Mr Modi made no effort to build consensus for his three new laws last year, instead ramming them through parliament without debate. When north Indian farmers, many of whom happen to be Sikh, protested, he doubled their fury by tagging them thugs and traitors. The most powerful Indian leader in a generation then did nothing for months, as if the stand-off were someone else’s problem. That is, not until elections in a couple of important farm states drew uncomfortably near, whereupon Mr Modi crumpled completely. In another democracy a leader who flouted parliament, broke trust with an influential religious minority and insisted on and then scrapped controversial reforms would pay a heavy political price. But although the farm-bill fiasco is only the latest link in a long and heavy chain of embarrassments under Mr Modi, the white-bearded prime minister remains largely unscathed. Admirers ascribe his staying power to personal charisma. They say he projects the strength and dignity Indian voters crave in their own lives. Detractors point instead to the deep pockets, ruthlessness and military discipline of his Bharatiya Janata Party (BJP), quietly buttressed by a web of allied Hindu-nationalist organisations and noisily amplified by relentless propaganda. All this surely counts, yet would not suffice without another secret weapon: the opposition. Throughout Mr Modi’s term and a half in power, the BJP’s opponents have remained divided, weak and largely ineffectual. This does not mean they have given the prime minister a free ride. Such misguided policies as “demonetisation”, the withdrawal from circulation in 2016 of high-denomination notes in a delusional bid to chase out “black money”, or the BJP’s gleeful stoking of Islamophobia in a country with 200m Muslims, or Mr Modi’s erratic handling of covid-19, including fierce lockdowns that wrecked small businesses and stranded millions of migrant workers, followed by complacent laxity as India’s second wave became a murderous tsunami—all this has made it easy for opposition politicians to fire up disgruntled constituents. But despite landing the odd blow against Mr Modi, and beating the BJP in the occasional state election, they have so far failed to shift India’s broader narrative. In opposition in the years before 2014, the BJP had by contrast skilfully and relentlessly undermined the sitting government, plucking at every speck of possible evidence to build a damning—and in retrospect largely unfair—picture of weakness and venality.
         '''
corpus = corpus.lower().strip()
corpus = re.sub('[%s]' % re.escape(string.punctuation), '', corpus)
# Gets only unique words from corpus (we don't have to do this but it reduces computation later)
corpus = set(corpus.split())

In [None]:
# Find rhymes for every word in corpus by building rhyme dictionary
def build_rhyme_dictionary(corpus):
    '''
    Builds a dictionary of rhymes from the word corpus

    Input (set): Word corpus

    Output (dict): Rhyming dictionary
    '''
    rhyme_d = dict()
    for word in corpus:
        rhymes = pro.rhymes(word)
        if not rhymes:
            continue
        for rhyme in rhymes:
            if rhyme in corpus:
                if word in rhyme_d:
                    rhyme_d[word].add(rhyme)
                else:
                    rhyme_d[word] = {rhyme}
    return rhyme_d

In [None]:
# Create list of input tuples form rhyme dictionary
# This method doesn't remove duplicates so ('to', 'drew') and ('drew', 'to')
# are both included in final list (this can be changed)
def get_rhyme_pairs(rhyme_d):
    '''
    Creates a set of rhyme pairs from rhyme dictionary

    Input (dict): rhyme dictionary

    Output (list): list of rhyme pairs
    '''
    rhyme_pairs = list()
    for k, v in rhyme_d.items():
        for word in v:
            if (word, k) in rhyme_pairs:
                continue
            pair = (k, word)
            rhyme_pairs.append(pair)
    return rhyme_pairs

In [None]:
def get_non_rhyme_pairs(rhyme_pairs, corpus):
    '''
    Generates a list of non-rhyming word pairs from corpus with pseudo-randomness

    Inputs:
      rhyme_pairs (list): list of rhyming words (tuples)
      corpus (list): list of strings

    Output:
      non_rhyme_pairs (list): list of non-rhyming words (tuples)
    '''
    non_rhyme_pairs = set()
    while len(non_rhyme_pairs) < len(rhyme_pairs):
        random_index_1, random_index_2 = random.randint(0, len(corpus)-1), random.randint(0, len(corpus)-1)
        if random_index_1 == random_index_2:
            continue
        random_bi = random.randint(0, 1)
        potential_word_1, potential_word_2 = corpus[random_index_1][random_bi], corpus[random_index_2][random_bi]
        # make sure potential words don't rhyme
        if potential_word_1 in pro.rhymes(potential_word_2):
            continue
        non_rhyme_pairs.add((potential_word_1, potential_word_2))
    non_rhyme_pairs = list(non_rhyme_pairs)
    return non_rhyme_pairs

In [None]:
# New Function That Uses a Hard-Coded List of IPA Characters and converts
# the word pairs into their numerical representation

# Defining the IPA Chars List
ipa_alphabet = ['i', 'ɪ', 'y', 'e', 'ə', 'ʊ', 'u', 'o', 'ɛ', 'æ', 'a', 'ɑ', 'ɔ',
                'p', 'b', 'm', 'f', 'v', 'θ', 'ð', 's', 't', 'z', 'r', 'd', 'n',
                'ʃ', 'ʒ', 'l', 'x', 'k', 'ŋ', 'c', 'g', 'q', 'w', 'j', 'h', 'ʤ', 'ʧ']
num_chars = len(ipa_alphabet)

'''
A Brief Note on How We Are Doing Numerical Representations:
The above alphabet list orders the various phones by their closeness in sound.
When we convert the phonetic representation of words into a numerical one, we are
simply finding the index (we will call 'idx') of that phone in the list, and then
adding one and dividing by the total number of phones:

    Numerical Representation of Phone = (idx+1)/length(ipa_alphabet)

'''

def convert_to_numerical(word_pairs):
  '''
  Steps through word pairs (rhyme_pairs or non_rhyme_pairs) and
  converts each word first into its phonetic representation, and then
  uses the 'ipa_alphabet' list to convert the phonetic representation
  into a numerical one. Returns the pairs in numerical form.

  Input (list): list of word pairs (english representation)

  Output (list): list of word pairs (numerical representation)
  '''

  numerical_pairs = list() # new list to be returned
  for index, tuple in enumerate(word_pairs):
    if index % 100 == 0:
      print(index)

    # Convert words to phonetic representations
    word1 = ipa.convert(tuple[0])
    word2 = ipa.convert(tuple[1])

    # Remove all ˈ or ˌ from the words as these are not very relevant
    word1 = word1.replace("ˈ", "")
    word1 = word1.replace("ˌ", "")
    word1 = word1.replace("*", "")
    word2 = word2.replace("ˈ", "")
    word2 = word2.replace("ˌ", "")
    word2 = word2.replace("*", "")

    # For each word, step through and get numerical rep
    word1_num = []
    word2_num = []
    for j in range(len(word1)):
      idx = ipa_alphabet.index(word1[j])
      word1_num.append((idx+1)/num_chars)

    for j in range(len(word2)):
      idx = ipa_alphabet.index(word2[j])
      word2_num.append((idx+1)/num_chars)

    # Append numerical pair to the 'numerical_pairs' list
    num_tuple = (word1_num, word2_num)
    numerical_pairs.append(num_tuple)

  # return new list
  return numerical_pairs

In [None]:
# TESTING NEW FUNCTION

def convert_to_tensor(word_pairs):

  input = np.zeros((len(word_pairs), num_chars*2))

  null_indices = [] # indices to remove later (for try/catch block)

  for index, tuple in enumerate(word_pairs):
    if index % 100 == 0:
      print(index)

    # Convert words to phonetic representations
    try:
      word1 = ipa.convert(tuple[0])
      word2 = ipa.convert(tuple[1])
    except:
      print("NaN Line Created")
      null_indices.append(index)
      continue
    # Remove all ˈ or ˌ from the words as these are not very relevant
    word1 = word1.replace("ˈ", "")
    word1 = word1.replace("ˌ", "")
    word1 = word1.replace("*", "")
    word2 = word2.replace("ˈ", "")
    word2 = word2.replace("ˌ", "")
    word2 = word2.replace("*", "")

    # Only take last 6 characters
    word1 = word1[-6:]
    word2 = word2[-6:]
    lw1 = len(word1)
    lw2 = len(word2)

    word1_nums = [0] * num_chars
    word2_nums = [0] * num_chars
    for j in range(lw1):
      idx = ipa_alphabet.index(word1[j])
      if lw1 > 1:
        word1_nums[idx] = float(j+1) / lw1
      else:
        word1_nums[idx] = float(j+1)
    for g in range(lw2):
      idx = ipa_alphabet.index(word2[g])
      if lw2 > 1:
        word2_nums[idx] = float(g+1) / lw2
      else:
        word2_nums[idx] = float(g+1)

    input[index][:] = np.concatenate((np.array(word1_nums), np.array(word2_nums)))

  # return new list (remove NaN rows)
  input = np.delete(input, null_indices, axis=0)
  return input

In [None]:
# Final step in shaping the input:
# We want the input to be of the form of a 2D array, in which each row is a
# 12 unit long numerical array, and the number of rows is then the number of possible inputs
# we have to train and test the model (aka the number of word rhyme pairs).
# Let's have the array be 12 units in length (meaning we will have 12 input nodes)
# The first six indices are for the first word, the second six are for the second word.
# If the word is six phones long, then it fits perfectly.
# If the word is >6 phones long, then we will truncate it -- cut off the beggining of the word.
# If the word is <6 phones long, we will pad the beginning with zeros.

def transform_to_input(num_pairs):
  input = np.zeros((len(num_pairs), 12))

  for index, tuple in enumerate(num_pairs):
    # Getting words
    word1 = tuple[0]
    word2 = tuple[1]

    # Getting lengths of word arrays
    word1_len = len(word1)
    word2_len = len(word2)

    # Getting 6-long array for word1
    if (word1_len >= 6):
      word1_input = np.array(word1[-6:])
    else:
      word1_input = np.concatenate((np.zeros(6-word1_len), np.array(word1)))

    # Same for word2
    if (word2_len >= 6):
      word2_input = np.array(word2[-6:])
    else:
      word2_input = np.concatenate((np.zeros(6-word2_len), np.array(word2)))

    input[index][:] = np.concatenate((word1_input, word2_input))

  return input


In [None]:
uploaded = files.upload()

In [None]:
# Using above functions, get rhyming and non-rhyming pairs from corpus and then
# format an input (X) tensor array and a desired output (Y) tensor array

# Build the rhyme dictionary
#rhyme_d = build_rhyme_dictionary(corpus)

# Oranize that all into rhyming pairs and non-rhyming pairs
#rhyme_pairs = get_rhyme_pairs(rhyme_d)
#non_rhyme_pairs = get_non_rhyme_pairs(rhyme_pairs)

# First, use the CSV files to get the rhyming pairs and non-rhyming pairs
rhyme_df = pd.read_csv(io.BytesIO(uploaded['rhyme_pairs_strict.csv']))
non_rhyme_df = pd.read_csv(io.BytesIO(uploaded['non_rhyme_pairs_strict.csv']))

rhyme_rec = rhyme_df.to_records(index=False)
non_rhyme_rec = non_rhyme_df.to_records(index=False)
rhyme_pairs = list(rhyme_rec)
non_rhyme_pairs = list(non_rhyme_rec)

# Limiting the Size
# size = 10000
# rhyme_pairs_numerical = convert_to_tensor(rhyme_pairs[0:size])
# non_rhyme_pairs_numerical = convert_to_tensor(non_rhyme_pairs[0:size])

# Entire Datasets
rhyme_pairs_numerical = convert_to_tensor(rhyme_pairs)
non_rhyme_pairs_numerical = convert_to_tensor(non_rhyme_pairs)


# Convert the above lists into their numerical representations
#rhyme_pairs_numerical = convert_to_numerical(rhyme_pairs[0:1000])
#non_rhyme_pairs_numerical = convert_to_numerical(non_rhyme_pairs[0:1000])

# Now, I want to create a combined list, and another list that tracks
# whether the pair at each index is a rhyme or non_rhyme: this will be
# a binary list with 1 for rhyme pair and 0 for non_rhyme pair.
all_word_pairs = np.concatenate((rhyme_pairs_numerical, non_rhyme_pairs_numerical))
rhyme_binary = ([1] * len(rhyme_pairs_numerical)) + ([0] * len(non_rhyme_pairs_numerical))

# Now, I want to zip() the lists and then shuffle() and then unzip()
temp = list(zip(all_word_pairs, rhyme_binary))
random.shuffle(temp)
all_word_pairs, rhyme_binary = zip(*temp)


In [None]:
# Transform the binary list into desired 2-node output format

b = list(rhyme_binary)
b = map(lambda n: (n, 0) if n == 1.0 else (0, 1), b)
rhyme_binary_tup = list(b)

In [None]:
# Inputs
#X = transform_to_input(all_word_pairs)
X = np.array(all_word_pairs)

# True Y values
#Y = np.expand_dims(np.array(rhyme_binary), axis=1)
Y = np.array(rhyme_binary_tup)

print("Input and Output Tensor Shapes")
print("Input: ", X.shape)
print("Output: ", Y.shape)

# Make X,Y tensors
X = torch.tensor(X)
#X = torch.nn.functional.normalize(X, p=2.0, dim=1, eps=1e-12, out=None)
Y = torch.tensor(Y).float()
#Y = torch.tensor(Y)

print("\nINSPECTION")
idxx = 2
print(X[idxx][0:40])
print(X[idxx][40:])
print(Y[idxx])

In [None]:
# Trying to understand DataLoader
from torchvision import datasets, transforms
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torch.utils.data import TensorDataset

dataset = TensorDataset(X, Y)

# Define a transform to normalize the data
# transform = transforms.Compose([transforms.ToTensor(),
#                               transforms.Normalize((0.5,), (0.5,)),])

# Download and load the training data
# dataset = transform(dataset)

# Determining train_test split
total_pairs = X.shape[0]
train_div = int(math.floor(total_pairs*.8))
test_div = int(math.ceil(total_pairs*.2))

train_set, val_set = torch.utils.data.random_split(dataset, [train_div, test_div])
trainloader = torch.utils.data.DataLoader(train_set, batch_size=100, shuffle=True)
testloader = torch.utils.data.DataLoader(val_set, batch_size=test_div, shuffle=True)

In [None]:
# Create Neural Network Using Torch Sequential

input_dim = 80 # 12 input nodes
hidden_dim = 120 # 120 hidden layer nodes
output_dim = 2 # 2 output nodes

# Define Model Quickly
model = nn.Sequential(
    nn.Linear(input_dim, hidden_dim),
    nn.Sigmoid(),
    nn.Linear(hidden_dim, output_dim),
    nn.Softmax(dim=1)
)

# Define the loss
#criterion = nn.NLLLoss()
criterion = nn.MSELoss()

# Optimizers require the parameters to optimize and a learning rate
optimizer = torch.optim.SGD(model.parameters(), lr=0.003)

epochs = 1000
for e in range(epochs):
  running_loss = 0
  for inputs, labels in trainloader:
    # Training pass
    optimizer.zero_grad()

    output = model(inputs.float())
    loss = criterion(output, labels)
    loss.backward()
    optimizer.step()

    running_loss += loss.item()

  print(f"Training loss: {running_loss/len(trainloader)}")

In [None]:
# Test
for inputs, labels in testloader:
    output = model(inputs.float())
    #print(output.shape)
    output = (output>0.5).float()
    #print(output.shape)
    #print(output[1])
    #print(labels[1])
    #print(labels.shape)
    correct = (output == labels).sum() / 2
    print("Accuracy = ", correct / inputs.shape[0])

In [None]:
#Function that takes in pairs of words and outputs 3D array
#each word represented by matrix where every column is 1 phone

# TESTING NEW FUNCTION

def convert_to_tensor2(word_pairs):

  input = np.zeros((len(word_pairs), 6, num_chars*2))

  null_indices = [] # indices to remove later (for try/catch block)

  for index, tuple in enumerate(word_pairs):
    if index % 100 == 0:
      print(index)

    # Convert words to phonetic representations
    try:
      word1 = ipa.convert(tuple[0])
      word2 = ipa.convert(tuple[1])
    except:
      print("IPA Conversion Failed, Removing Word Pair...")
      null_indices.append(index)
      continue

    # Remove all ˈ or ˌ from the words as these are not very relevant
    word1 = word1.replace("ˈ", "")
    word1 = word1.replace("ˌ", "")
    word1 = word1.replace("*", "")
    word2 = word2.replace("ˈ", "")
    word2 = word2.replace("ˌ", "")
    word2 = word2.replace("*", "")

    # Only take last 6 characters
    word1 = word1[-6:]
    word2 = word2[-6:]
    lw1 = len(word1)
    lw2 = len(word2)

    word1_nums = np.zeros((num_chars,6))
    word2_nums = np.zeros((num_chars,6))

    #makes each tuple into array w/ 1 at designated phone index
    for j in range(lw1):
      idx = ipa_alphabet.index(word1[j])
      word1_nums[idx,j] = 1
    for g in range(lw2):
      idx = ipa_alphabet.index(word2[g])
      word2_nums[idx,g] = 1

    input[index][:][:] = np.concatenate((np.array(word1_nums), np.array(word2_nums)), axis=0)
    print(input[index][:][:].shape)

  # return new list (remove NaN rows)
  input = np.delete(input, null_indices, axis=0)
  return input










In [None]:
#Create Recurrent Neural Network

class Model(nn.Module):
  def __init__(self, input_dim, output_dim, hidden_dim, n_layers):
    super(Model, self).__init__()

    self.hidden_dim = hidden_dim
    self.n_layers = n_layers

    self.rnn = nn.RNN(input_dim, hidden_dim, n_layers, batch_first = True)
    self.fc = nn.Linear(hidden_dim, output_dim)

  def forward(self,x):

    batch_size = x.size(0)
    hidden = self.init_hidden(batch_size)
    out, hidden = self.rnn(x, hidden)
    out = out.contiguous().view(-1, self.hidden_dim)
    out = self.fc(out)

    return out, hidden

  def init_hidden(self, batch_size):
    hidden = torch.zeros(self.n_layers,batch_size,self.hidden_dim)
    return hidden

#Define parameters and model

input_dim = 12 # 12 input nodes
hidden_dim = 100 # 100 hidden layer nodes
output_dim = 2
n_layers = 1
n_epochs = 100

rnn_model = Model(input_dim, output_dim,hidden_dim,n_layers)

#Define Loss and Adam optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(rnn_model.parameters(),lr=0.004)

batch = 100
seq = 6
encoding size = 80

