In [44]:
import os
import requests
import zipfile
import tempfile
import string
from typing import Optional
import unicodedata

from torch import Tensor, TensorType
import torch

DATASET_URL = "https://download.pytorch.org/tutorial/data.zip"
VOCAB = string.ascii_letters + " .,;'"
DATASET_DIR = "dump"

def download_and_unzip(url, path):
    # Create a temporary directory
    with tempfile.TemporaryDirectory() as tmp_dir:
        # Define the path for the temporary file
        temp_file_path = os.path.join(tmp_dir, 'tempfile.zip')

        # Download the file from the url
        response = requests.get(url)
        if response.status_code == 200:
            with open(temp_file_path, 'wb') as f:
                f.write(response.content)

            # Unzip the file into the specified path
            with zipfile.ZipFile(temp_file_path, 'r') as zip_ref:
                zip_ref.extractall(path)
        else:
            raise Exception(f"Error downloading the file: HTTP {response.status_code}")

def unicode_to_ascii(s: str) -> str:
    """Convert a Unicode string to plain ASCII. 
    Characters in Unicode format are normalized to NFD (Normalization Form Decomposed)
    and non-spacing marks ('Mn') are removed.

    Args:
    s (str): The Unicode string to convert.

    Returns:
    str: The ASCII representation of the string.
    """
    return ''.join(
        c for c in unicodedata.normalize('NFD', s)
        if unicodedata.category(c) != 'Mn'
    )


def read_and_strip_lines(file_path: str) -> tuple[str, list[str]]:
    # Extract the file name from the file path
    file_name = os.path.basename(file_path).split('.')[0]

    # Read and strip lines from the file
    with open(file_path, 'r') as file:
        lines = [unicode_to_ascii(line.strip()) for line in file]

    return file_name, lines


def letter_to_tensor(char: str) -> Tensor:
    tensor = torch.zeros(1, len(VOCAB), dtype=torch.int)
    index = VOCAB.find(char)
    tensor[0][index] = 1
    return tensor
    

def line_to_tensor(line: str) -> Tensor:
    tensor = torch.zeros(len(line), 1, len(VOCAB), dtype=torch.int)
    for i, char in enumerate(line):
        tensor[i][0][VOCAB.find(char)] = 1
    return tensor


In [45]:
from dataclasses import dataclass
from functools import cached_property
import glob
import string
from torch import Tensor
from torch.utils.data import Dataset, DataLoader


def get_names_by_language(dataset_url: str, dataset_dir: str):
    names_by_lang: dict[str, list[str]] = {}
    download_and_unzip(dataset_url, dataset_dir)
    for file_path in glob.glob(os.path.join(dataset_dir, 'data', 'names/', '*.txt')):
        lang, lines = read_and_strip_lines(file_path=file_path)
        names_by_lang[lang] = lines
        
    return names_by_lang

def split_train_valid(names: list[str], train_ratio: float) -> tuple[list[str], list[str]]:
    train_size = int(len(names) * train_ratio)
    train_names = names[:train_size]
    valid_names = names[train_size:]
    return train_names, valid_names


def prepare_dataset():
    train_names: list[NameLanguagePair] = []
    valid_names: list[NameLanguagePair] = [] 
    names_by_lang = get_names_by_language(DATASET_URL, DATASET_DIR)
    languages = list(names_by_lang.keys())
    for lang in languages:
        names = names_by_lang[lang]
        tn, vn = split_train_valid(names, 0.8)
        train_names.extend([NameLanguagePair(lang, name) for name in tn])
        valid_names.extend([NameLanguagePair(lang, name) for name in vn])
    
    return NameDataset(train_names, languages), NameDataset(valid_names, languages)

@dataclass
class NameLanguagePair:
    language: str
    name: str
    
class NameDataset(Dataset):    
    def __init__(self, names: list[NameLanguagePair], languages: list[str]):
        self.names = names
        self.languages = languages
                
    def language_tensor(self, lang: str) -> Tensor:
        lang_index = self.languages.index(lang)
        return torch.tensor([lang_index], dtype=torch.long)
    
    @cached_property
    def vocab_size(self):
        return len(VOCAB)  
                      
    def __len__(self):
        return len(self.names)
    
    def __getitem__(self, index) -> tuple[Tensor, Tensor]:
        name = self.names[index]
        name_tensor = line_to_tensor(name.name)
        lang_tensor = self.language_tensor(name.language)
        return name_tensor, lang_tensor
        

""" def collate_fn(batch: list[tuple[Tensor, Tensor]]) -> tuple[Tensor, Tensor]:
    name_batches, lang_batches = zip(*batch)
    max_length = max(name.size(0) for name in name_batches)
    padded_data = torch.stack([torch.cat([t, torch.zeros(max_length - t.size(0), t.size(1))]) for t in name_batches])
    return padded_data, torch.stack(lang_batches)
 """    
 

train_dataset, valid_dataset = prepare_dataset()
train_dl = DataLoader(train_dataset, batch_size=1, shuffle=True)
valid_dl = DataLoader(valid_dataset, batch_size=1, shuffle=True)
num_languages = len(train_dataset.languages)

In [47]:
from torch import nn

class RNN(nn.Module):
    def __init__(self, input_size: int, hidden_size: int, output_size: int):
        super(RNN, self).__init__()
        self.hidden_size = hidden_size
        self.i2h = nn.Linear(input_size + hidden_size, hidden_size) # input to hidden
        self.i2o = nn.Linear(input_size + hidden_size, output_size) # input to output
        self.softmax = nn.LogSoftmax(dim=1)
        
    def forward(self, input: Tensor, hidden: Tensor) -> tuple[Tensor, Tensor]:
        combined = torch.cat((input, hidden), 1)
        hidden = self.i2h(combined)
        output = self.i2o(combined)
        output = self.softmax(output)
        return output, hidden
    
    def init_hidden(self) -> Tensor:
        return torch.zeros(1, self.hidden_size)

In [65]:
from torch.nn import functional as F
from torch import optim
LEARNING_RATE = 0.005
loss_func = nn.NLLLoss()  
def get_optim(params: list[nn.Parameter], lr: float, momentum: float = 0) -> optim.Optimizer:
    return optim.SGD(params, lr=lr, momentum=momentum)    


def get_model(input_size: int, hidden_size: int, output_size: int, lr: float, momentum: float = 0) -> tuple[RNN, optim.Optimizer]:
    rnn = RNN(input_size, hidden_size, output_size)
    optimizer = get_optim(list(rnn.parameters()), lr, momentum)
    return rnn, optimizer

def evaluate(model: RNN, name_tensor: Tensor) -> Tensor:
    hidden = model.init_hidden()
    output: Tensor = torch.zeros(1, num_languages)
    for i in range(name_tensor.size()[0]):
        output, hidden = model(name_tensor[i], hidden)      
    return output


def get_language_from_tensor(pred: Tensor, languages: list[str]) -> str:    
    return languages[int(pred.argmax().item())]

def get_name_from_tensor(name_tensor: Tensor) -> str:
    name: str = ""
    for i in range(name_tensor.size()[0]):
        name += VOCAB[int(name_tensor[i][0].argmax().item())]
        
    return name

model, opt = get_model(len(VOCAB), 128, num_languages, LEARNING_RATE)

In [66]:
def test_sample(model: RNN, dl: DataLoader):
    name_tensor_bs, lang_tensor_bs = next(iter(dl))
    with torch.no_grad():
        name_tensor = name_tensor_bs.squeeze(0)
        lang_tensor = lang_tensor_bs.squeeze(0)
        pred = evaluate(model, name_tensor)
        pred_language = get_language_from_tensor(pred, train_dataset.languages)
        act_language = get_language_from_tensor(lang_tensor, train_dataset.languages)
        name = get_name_from_tensor(name_tensor)    
        print(pred, lang_tensor)        
        loss = loss_func(pred, lang_tensor)            
        print(f"Name: {name}, Predicted: {pred_language}, Actual: {act_language}")    
        
test_sample(model, train_dl)     

tensor([[-2.9211, -2.9955, -2.8954, -2.8568, -2.9145, -2.9100, -2.8715, -2.8324,
         -2.9952, -2.8704, -2.9077, -2.8819, -2.8451, -2.8330, -2.8668, -2.8446,
         -2.7936, -3.0229]]) tensor([[0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0]],
       dtype=torch.int32)


RuntimeError: 0D or 1D target tensor expected, multi-target not supported

In [55]:


def train_loop(model: RNN, optimizer: optim.Optimizer, train_dl: DataLoader) -> float:
    model.train()
    train_loss: float = 0
    for i, (name_tensor_bs, lang_tensor_bs) in enumerate(train_dl):
        name_tensor = name_tensor_bs.squeeze(0)
        lang_tensor = lang_tensor_bs.squeeze(0)
        optimizer.zero_grad()
        pred = evaluate(model, name_tensor)
        loss = loss_func(pred, lang_tensor)
        loss.backward()
        optimizer.step()
        train_loss = loss.item()
    return train_loss

def validate(model: RNN, dataloader: DataLoader):
    model.eval()
    with torch.no_grad():
        total_loss: float = 0
        for name_tensor_bs, lang_tensor_bs in dataloader:
            name_tensor = name_tensor_bs.squeeze(0)
            lang_tensor = lang_tensor_bs.squeeze(0)            
            pred = evaluate(model, name_tensor)
            loss = loss_func(pred, lang_tensor)
            total_loss += loss.item()
            
    return total_loss / len(dataloader)


    
        
def train(model: RNN, optimizer: optim.Optimizer, train_dl: DataLoader,  
          valid_dl: DataLoader, epochs: int):
        
    for epoch in range(epochs):
        train_loss = train_loop(model, optimizer, train_dl)
        validate_loss = validate(model, valid_dl)
        print(f"Epoch: {epoch}, Train Loss: {train_loss}, validate Loss: {validate_loss}")          
        name_tensor_bs, lang_tensor_bs = next(iter(valid_dl))
        with torch.no_grad():
            name_tensor = name_tensor_bs.squeeze(0)
            lang_tensor = lang_tensor_bs.squeeze(0)
            pred = evaluate(model, name_tensor)
            pred_language = get_language_from_tensor(pred, train_dataset.languages)
            act_language = get_language_from_tensor(lang_tensor, train_dataset.languages)
            name = get_name_from_tensor(name_tensor)                
            print(f"Name: {name}, Predicted: {pred_language}, Actual: {act_language}")                    
    
train(model, opt, train_dl, valid_dl, 1)

RuntimeError: 0D or 1D target tensor expected, multi-target not supported