In [1]:
from google.colab import drive 
drive.mount('/content/gdrive')
%cd /content/gdrive/MyDrive/FinalProject

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.activity.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fexperimentsandconfigs%20https%3a%2f%2fwww.googleapis.com%2fauth%2fphotos.native&response_type=code

Enter your authorization code:
4/1AY0e-g7v9KMvBJlWqphN1Gc9nRsHL_X7klsnnJC4P3HiKePtzlsMtL1A4d4
Mounted at /content/gdrive
/content/gdrive/MyDrive/FinalProject


In [2]:
import os
import glob
import torch
import torch.nn as nn 
import json
import re

# 1. Map label

In [3]:
key_text_dir='./data/For_task_3/'
text_dir='./data/task1_train/'
classes={'company':1,'date':2,'address':3,'total':4,'others':0}

In [4]:
def get_text(path):
  text_list=[]
  with open(path,'r') as f:
    for line in f:
      line=line[:-1]
      tok=line.split(',')
      text=','.join(tok[8:])
      text_list.append(text)
  return text_list


def get_txt_file(dir_):
  data=[]
  for file in os.listdir(dir_):
    if re.match('^((?!\)).)*$',file) and file.endswith('.txt'):
      data.append(file)  
  return data


def read_json(path):
  with open(path,'r') as f:
    dic=json.load(f)
  return dic


def save_json(path,data):
  with open(path,'w') as f:
    json.dump(data,f,indent=4)


def encode_label(key_text_dir,text_dir):
  txt_file=get_txt_file(text_dir)
  data={}
  for file in txt_file:
    txt_path=os.path.join(text_dir,file)
    text_list=get_text(txt_path)
    key_path=os.path.join(key_text_dir,file.replace('.txt','.json'))
    #os.rename(key_path,key_path.replace('.txt','.json'))
    dic=read_json(key_path)
    txt_others=[]
    for text in text_list:
      for i,(k,v) in enumerate(dic.items()):
        if text in v:
          data[text]=i+1
          break
        #elif i==4:
          #txt_others.append(text)       
    #data[' '.join(txt_others)]=0
  save_json('./task3/data.json',data)

In [None]:
encode_label(key_text_dir,text_dir)

In [None]:
data=read_json('./task3/data.json')

In [None]:
labels=list(data.values())
texts=list(data.keys())

In [None]:
other_num=labels.count(0)
company_num=labels.count(1)
data_num=labels.count(2)
add_num=labels.count(3)
total_num=labels.count(4)

In [None]:
print(total_num)

# 2. Dataset

In [None]:
from torch.utils.data import Dataset
from transformers import BertTokenizer, BertModel


tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')


class TextDataset(Dataset):
    def __init__(self, texts, classes, max_len=32):
        self.texts = texts
        self.classes = classes
        self.max_len = 32

    def __getitem__(self, item):
        text = self.texts[item]
        text = tokenizer.tokenize(text)
        print(text)
        text = tokenizer.encode_plus(text, add_special_tokens=True, padding=True, pad_to_multiple_of=self.max_len)
        text_ids = text['input_ids']
        text_attn = text['attention_mask']

        text_ids = torch.tensor(text_ids, dtype=torch.long)
        text_attn = torch.tensor(text_attn, dtype=torch.long)
        label = torch.tensor(self.classes[item])

        return text_ids, text_attn, label

    def __len__(self):
        return len(self.classes)


In [None]:
from sklearn import model_selection


BATCH_SIZE = 16

X_train, X_test, y_train, y_test = model_selection.train_test_split(texts, targets, test_size=0.2, random_state=1)

train_dataset = TextDataset(X_train, y_train)
train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)



# 3. Model

In [None]:
from transformers import BertModel, BertForSequenceClassification


class TextClassification(nn.Module):
    def __init__(self, pretrained_name='bert-base-uncased', n_classes=5):
        super(TextClassification, self).__init__()
        self.bert = BertModel.from_pretrained(pretrained_name)
        self.dropout = nn.Dropout(0.1)
        self.classification = nn.Linear(self.bert.config.hidden_size, n_classes)

    def forward(self, text_ids, text_attns):
        out = self.bert(text_ids, attention_mask=text_attns)
        out = out.last_hidden_state[:, 0, :]
        out = self.dropout(out)
        out = self.classification(out)
        return out

# 4 Train

In [None]:
from torch.optim import Adam

model = TextClassification()
lr = 1e-4
optimizer = Adam(model.parameters(), lr=lr)
loss_fn = nn.CrossEntropyLoss()
n_epochs = 50

losses = []
for epoch in range(n_epochs):
    print(f'Epoch {epoch + 1}:')

    batch_losses = []
    for batch_text_ids, batch_text_attns, batch_labels in tqdm(train_dataloader):
        batch_text_ids = batch_text_ids.to(device)
        batch_text_attns = batch_text_attns.to(device)
        batch_labels = torch.flatten(batch_labels).to(device)

        pred = model(batch_text_ids, batch_text_attns)
        loss = loss_fn(pred, batch_labels)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        print(f'batch loss: {loss.item()}')
        batch_losses.append(loss.item())

    losses.append(sum(batch_losses) / len(batch_losses))
    print(f'Epoch loss: {losses[-1]}')