# Phoneme to Grapheme Conversion with a Recurrent Generative Model 
This project will discuss...

In [263]:
import torch
import torch.nn as nn
import random
import torch.optim as optim
from torch.utils.data import Dataset
import pandas as pd

# find phoneme vocabulary
data = pd.read_csv("phonemes-words.csv")
phonemes_col = data["phonemes"]
phonemes = ['0', '1']

for word in phonemes_col:
    for phoneme in word:
        if phoneme not in phonemes:
            phonemes.append(phoneme)
print(phonemes)

['0', '1', 's', 'ə', 'b', 't', 'ō', 'ᵊ', 'l', 'ī', 'r', 'i', 'v', 'm', 'd', 'ä', 'h', 'y', 'ü', 'a', 'ē', 'p', 'n', 'e', 'k', 'j', 'ŋ', 'g', 'z', 'u', '̇', 'c', 'ȯ', 'w', 'f', 'ā', 'ḵ', '͟']


In [264]:


# known phonemes/graphemes
# phonemes = ['0', '1', 'ᵊ', 'ə̇', 'ȯ', 'ē', 'ī', 'ō', 'a', 'ä', 'ɑ', 'ɒ', 'æ', 'b', 'ḇ', 'β', 'c', 'č', 'ɔ', 'ɕ', 'ç', 'd', 'ḏ', 'ḍ', 'ð', 'e', 'ə', 'ɚ', 'ɛ', 'ɝ', 'f', 'g', 'ḡ', 'h', 'ʰ', 'ḥ', 'ḫ', 'ẖ', 'i', 'ɪ', 'ỉ', 'ɨ', 'j', 'ʲ', 'ǰ', 'k', 'ḳ', 'ḵ', 'l', 'ḷ', 'ɬ', 'ɫ', 'm', 'n', 'ŋ', 'ṇ', 'ɲ', 'ɴ', 'o', 'ŏ', 'ɸ', 'θ', 'p', 'p', '̅', 'þ', 'q', 'r', 'ɹ', 'ɾ', 'ʀ', 'ʁ', 'ṛ', 's', 'š', 'ś', 'ṣ', 'ʃ', 't', 'ṭ', 'ṯ', 'ʨ', 't', 'ʂ', 'u', 'ʊ', 'ŭ', 'ü', 'v', 'ʌ', 'ɣ', 'w', 'ʍ', 'x', 'χ', 'y', 'ʸ', 'ʎ', 'z', 'ẓ', 'ž', 'ʒ', 'ʔ', 'ʕ']

graphemes = ['0', '1', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z']

# one hot encodes the word: returns an array of one hot encoded characters
def nemes_to_1_hot_seq(string, nemes="phonemes"):
    string = '0' + string + '1'
    l = phonemes if nemes == "phonemes" else graphemes
    seq = []
    for i in string:
        vec = [0] * len(l)
        vec[l.index(i)] = 1
        seq.append(vec)

    return torch.FloatTensor([seq])

def one_hot_to_nemes(arr, nemes="phonemes"):
    seq = []
    l = phonemes if nemes == "phonemes" else graphemes
    for hot in arr:
        x = torch.argmax(hot)
        seq.append(l[x])
    return seq

class P2GDataset(Dataset):
    def __init__(self, phoneme_file):
        self.data = pd.read_csv(phoneme_file, header=None)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        p, g = self.data.iloc[idx]
        return nemes_to_1_hot_seq(p, nemes = "phonemes"), nemes_to_1_hot_seq(g, nemes = "graphemes")


In [265]:
# define model architecture
class Encoder(nn.Module):
    def __init__(self):
        super().__init__()
        self.encoder = nn.GRU(len(phonemes), 2048, 1, batch_first=True, bidirectional=False)
        
    def forward(self, x):
        # push vector through encoder
        out, h_n = self.encoder(x)

        # return context vector
        return h_n

class Decoder(nn.Module):
    def __init__(self):
        super().__init__()
        self.decoder = nn.GRU(len(graphemes), 2048, 1, batch_first=True, bidirectional=False)
        self.fc = nn.Sequential(
            nn.Linear(2048, 1024),
            nn.Linear(1024, len(graphemes))
        )
        

    def forward(self, input, hidden_layer):
        """
        Since this function gets called once at a time rather than taking in
        a sequence of vectors, we need to pass it the last output. This will be just
        a vector of numbers that can be converted to the embedding representing that last output
        """
        out, h_n = self.decoder(input, hidden_layer)
        # print("H")
        return self.fc(h_n), h_n

class seq2seq(nn.Module):
    def __init__(self, device):
        super().__init__()
        self.encoder = Encoder()
        self.decoder = Decoder()
        self.device = device
    
    def forward(self, in_seq, out_seq, tf_ratio=0.5):
        out_len = out_seq.shape[1]
        # storing the outputs of the sequence
        outputs = torch.zeros(out_len, 1, len(graphemes)).to(self.device)

        hidden = self.encoder(in_seq)

        out_seq = out_seq.squeeze(0)

        input = out_seq[0].unsqueeze(0).unsqueeze(0)
        
        for i in range(1, out_len):
            out, hidden = self.decoder(input, hidden)
            outputs[i] = out

            if random.random() > tf_ratio:
                # teacher forcing (make next input what the current output token should be)
                input = out_seq[i].unsqueeze(0).unsqueeze(0)
            else:
                x = input.argmax(1)[0]
                input = torch.zeros(1, 1, len(graphemes))
                input[0][0][x] = 1
                
        return outputs

    def pred_new(self, in_seq):
        hidden = self.encoder(in_seq)
        input = torch.zeros(1, 1, len(graphemes))
        outs = []
        while True:
            out, hidden = self.decoder(input, hidden)
            outs.append(out)
            x = input.argmax(1)[0]
            input = torch.zeros(1, 1, len(graphemes))
            input[0][0][x] = 1
            if one_hot_to_nemes(out) == ['1']:
                break
        return outs

In [288]:
"""training"""
from torch.utils.data import random_split
from torch.utils.data import DataLoader
device = "cuda" if torch.cuda.is_available() else "cpu"
EPOCHS = 100
model = seq2seq(device)
# what a beautiful architecture
print(model)
optimizer = optim.Adam(model.parameters(), lr=0.0001)
loss_func = nn.CrossEntropyLoss()
dataset = P2GDataset("phonemes-words.csv")
train, test = random_split(dataset, [100, len(dataset)-100])
dataloader = DataLoader(dataset=train, batch_size=1)
print(len(train))


seq2seq(
  (encoder): Encoder(
    (encoder): GRU(38, 2048, batch_first=True)
  )
  (decoder): Decoder(
    (decoder): GRU(28, 2048, batch_first=True)
    (fc): Sequential(
      (0): Linear(in_features=2048, out_features=1024, bias=True)
      (1): Linear(in_features=1024, out_features=28, bias=True)
    )
  )
)
4


In [278]:
avg_losses = []
for epoch in range(75):
    tot_loss = 0
    for (in_seq, out_seq) in dataloader:
        in_seq = in_seq.squeeze(0)
        out_seq = out_seq.squeeze(0)
        model_output = model(in_seq, out_seq)
        model_output = model_output[1:]
        model_output = model_output.squeeze(1)
        out_seq = out_seq.squeeze(0)[1:]
        loss = loss_func(model_output, out_seq)
        tot_loss+=loss.detach().item()
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
    tot_loss/=len(dataset)
    print(tot_loss)
    avg_losses.append(tot_loss)



3.3221163153648376
3.2708892226219177
3.201585054397583
3.079297184944153
2.8465216159820557
2.7089841961860657
2.5153560042381287
2.462839186191559
2.4048062562942505
2.343404471874237
2.2618715167045593
2.157208651304245
2.0664122104644775
2.017159879207611
1.9561673402786255
1.9156756401062012
1.8750565648078918
1.8571637272834778
1.8040289878845215
1.7984676957130432
1.7435909807682037
1.8401980102062225
1.6572427153587341
1.6717987060546875
1.582443505525589
1.5349659025669098
1.506835013628006
1.4489192366600037
1.4294407367706299
1.315350592136383
1.2721288800239563
1.2115438431501389
1.1813770234584808
1.1066882610321045
1.0837684124708176
1.002344235777855
1.0130640268325806
0.9273332059383392
0.9591424614191055
0.8267364948987961
0.88326495885849
0.7914309576153755
0.8708502948284149
0.7478144615888596
0.650847390294075
0.6428801491856575
0.6119379103183746
0.554174043238163
0.4900366812944412
0.47804349660873413
0.4236934594810009
0.3837178274989128
0.3570425473153591
0.3130

In [279]:
dataset = P2GDataset("data.csv")
p, g = dataset[1]

print(one_hot_to_nemes(p[0], "phonemes"))
p.shape

['0', 'b', 'ä', 'r', 'b', 'ü', 'd', 'ō', '1']


torch.Size([1, 9, 38])

In [286]:
def get_0_1_accuracy(test_set, model):
    correct = 0
    print(len(test_set))
    dataloader = DataLoader(dataset=test_set, batch_size=1)
    for (in_seq, out_seq) in dataloader:
        prediction = model.pred_new(in_seq)
        print(prediction)
        print(out_seq)
        if prediction == out_seq:
            correct+= 1
    if correct == 0:
        return correct
    return correct/len(test_set)

def print_preds(path):
    global p
    print(one_hot_to_nemes(p[0], "phonemes"))
    s = model.pred_new(p)
    
    print(one_hot_to_nemes(s, "graphemes"))

# print_preds("data.csv")
# get_0_1_accuracy(test, model)
print(test[0])
# print(one_hot_to_graphemes(torch.FloatTensor([[3,2,1],[0,0,1],[0,0,1]])))

IndexError: list index out of range