In [30]:
import time
import os
import nltk.tokenize as nt
import json
import torch
import copy
from torch import nn 
from torch.utils.data import Dataset, DataLoader

In [3]:
AUTHOR1 = "coutinho-dataset"
AUTHOR2 = "denser-dataset"
PATH_TO_RAW_DATA = "data/raw/"
PATH_TO_PARSED_DATA = "data/parsed/"

In [4]:
author1Path = os.path.join(PATH_TO_PARSED_DATA, AUTHOR1)
print(author1Path)

data/parsed/coutinho-dataset


In [5]:
author1AbsPath = os.path.abspath(author1Path)
author1Files = os.listdir(author1AbsPath)
print(len(author1Files))

1614


In [6]:
def getParagraph(filename):
    f = open(filename, "r")
    paragraph = f.read()
    f.close()
    return paragraph

In [7]:
paragraphs = []
for filename in author1Files:
    paragraphs.append(getParagraph(author1AbsPath+"/"+filename))

print(paragraphs[0])

 O escritor (Jairo?) lembra que toda manha, por volta das   oito horas, um grupo de pessoas se reúne para rezar no Mirante, uma construção tipo quiosque, escondida entre as árvores de   uma elevação.


In [8]:
class Paragraph: 
    def __init__():
        self.original = []
        self.tokenized = []

In [9]:
dictAuthors = dict()

In [10]:
def mapToNumber(dict, word):
    if(word in dict.keys()):
        return dict.get(word)
    else:
        nextValue = len(dict.keys())+1
        dict.update({word: nextValue})
        return nextValue

In [11]:
def dictMapper(words, wordsmapped):
    for word in words:
        mapToNumber(wordsmapped, word)
    return wordsmapped

In [12]:
with open('authorsDict.json', 'w', encoding='utf-8') as f:
    json.dump(dictAuthors, f, ensure_ascii=False, indent=4)
f.close

<function TextIOWrapper.close()>

In [13]:
def tokenizeWordsInParagraph(paragraph, dict):
    tensor = []
    parsedParagraph = nt.word_tokenize(paragraph)
    for word in parsedParagraph:
        token = float(mapToNumber(dict, word))
        tensor.append(token)
    return tensor

In [14]:
def normalizeData(data, rule):
    while (len(data) < rule):
        data.append(0)

    if(len(data) > rule):
        data = data[0:rule]
    return data 

In [15]:
def tokenizeParagraphSet(paragraphSet):
    tokenizedParagraphSet= []
    for paragraph in paragraphSet:
        tokenizedParagraph = tokenizeWordsInParagraph(paragraph, dictAuthors)
        tokenizedParagraph = normalizeData(tokenizedParagraph, 500)
        tokenizedParagraphSet.append(tokenizedParagraph)
    return tokenizedParagraphSet

In [16]:
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {device} device")

class NeuralNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(500, 850),
            nn.Sigmoid(),
            nn.Linear(850, 500),
            nn.Sigmoid(),
            nn.Linear(500, 2),
        )

    def forward(self, x):
        logits = self.linear_relu_stack(x)
        return logits

model = NeuralNetwork().to(device)
print(model)

Using mps device
NeuralNetwork(
  (linear_relu_stack): Sequential(
    (0): Linear(in_features=500, out_features=850, bias=True)
    (1): Sigmoid()
    (2): Linear(in_features=850, out_features=500, bias=True)
    (3): Sigmoid()
    (4): Linear(in_features=500, out_features=2, bias=True)
  )
)


In [17]:
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)

In [18]:
def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    model.train()
    for batch, (X,y) in enumerate(dataloader):
        #print(batch)
        X, y = X.to(device), y.to(device)

        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        if batch % 100 == 0:
            loss, current = loss.item(), (batch + 1) * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

In [19]:
def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X,y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")
    return correct

In [20]:
def recoverStoredData(AUTHOR, nFiles):

    print(os.path.exists(PATH_TO_PARSED_DATA))

    PATH_AUTHOR_PARSED_DATA = os.path.join(PATH_TO_PARSED_DATA,AUTHOR)

    selectedParagraphs = []

    for i in range(0,nFiles):
        nf = open(os.path.join(PATH_AUTHOR_PARSED_DATA,AUTHOR)+"_"+str(i)+".txt","r")
        selectedParagraphs.append(nf.read())
        nf.close()
    
    return selectedParagraphs

In [21]:
author1ParagraphSet = recoverStoredData(AUTHOR1, 1614)
author2ParagraphSet = recoverStoredData(AUTHOR2, 1614)

True
True


In [22]:
author1TokenizedParagraphSet = tokenizeParagraphSet(author1ParagraphSet)
author2TokenizedParagraphSet = tokenizeParagraphSet(author2ParagraphSet)

In [23]:
class CustomTextDataset(Dataset):
    def __init__(self, txt, labels):
        self.labels = labels
        self.text = txt
    def __len__(self):
        return len(self.labels)
    def __getitem__(self, idx):
        label = self.labels[idx]
        text = self.text[idx]
        sample = [text, label]
        return sample

In [24]:
input_training_data = author1TokenizedParagraphSet[0:400]
input_test_data = author1TokenizedParagraphSet[400:500]

input_training_data.extend(author2TokenizedParagraphSet[0:400])
input_test_data.extend(author2TokenizedParagraphSet[400:500])

input_training_data = torch.tensor(input_training_data)
input_test_data = torch.tensor(input_test_data)

input_training_label = [0]*400
input_training_label.extend([1]*400)

input_test_label = [0]*100
input_test_label.extend([1]*100)

In [25]:
input_training_dataset = CustomTextDataset(input_training_data, input_training_label)
input_test_dataset = CustomTextDataset(input_test_data, input_test_label)

In [26]:
train_dataloader = DataLoader(input_training_dataset, batch_size=8, shuffle=True)
test_dataloader = DataLoader(input_test_dataset, batch_size=8, shuffle=True)

In [31]:
epochs = 120
max_accurracy = 0.0
best_model = ""
training_start_time = time.time()
for te in range(epochs):
    print(f"Epoch {te+1}\n-------------------------------")
    train(train_dataloader, model, loss_fn, optimizer)
    accurracy = test(test_dataloader, model, loss_fn)
    if(accurracy > max_accurracy):
        max_accurracy = accurracy
        best_model = copy.deepcopy(model)
print("Done!")
print('Training finished, took {:.2f}s'.format(time.time() - training_start_time))

Epoch 1
-------------------------------
loss: 0.036634  [    8/  800]
Test Error: 
 Accuracy: 96.0%, Avg loss: 0.131339 

Epoch 2
-------------------------------
loss: 0.119908  [    8/  800]
Test Error: 
 Accuracy: 96.0%, Avg loss: 0.129845 

Epoch 3
-------------------------------
loss: 0.004537  [    8/  800]
Test Error: 
 Accuracy: 96.0%, Avg loss: 0.131962 

Epoch 4
-------------------------------
loss: 0.007061  [    8/  800]
Test Error: 
 Accuracy: 96.0%, Avg loss: 0.131360 

Epoch 5
-------------------------------
loss: 0.006822  [    8/  800]
Test Error: 
 Accuracy: 96.0%, Avg loss: 0.131592 

Epoch 6
-------------------------------
loss: 0.050853  [    8/  800]
Test Error: 
 Accuracy: 95.5%, Avg loss: 0.132429 

Epoch 7
-------------------------------
loss: 0.012260  [    8/  800]
Test Error: 
 Accuracy: 96.0%, Avg loss: 0.131992 

Epoch 8
-------------------------------
loss: 0.020661  [    8/  800]
Test Error: 
 Accuracy: 95.5%, Avg loss: 0.132637 

Epoch 9
----------------

In [32]:
pos = 12
model.eval()
x, y = input_test_dataset[pos][0], input_test_dataset[pos][1]

print("accurracy: ",max_accurracy*100,"%")

#print(x,y)
with torch.no_grad():
    x = x.to(device)
    pred = best_model(x)
    print(pred)
    predicted, actual = pred.argmax(0), y
    print(f'Predicted: "{predicted}", Actual: "{actual}"')

accurracy:  96.5 %
tensor([ 1.5414, -1.1198], device='mps:0')
Predicted: "0", Actual: "0"
