In [55]:
import torchtext
from torchtext.data.utils import get_tokenizer
from sklearn.model_selection import train_test_split
import re
from cleantext import clean
import os
from nltk.tokenize import TweetTokenizer

In [87]:
# Preprocess the data 

tweet_tokenizer = TweetTokenizer()
tokenizer = get_tokenizer('basic_english')

def replace_dates(text):
        date_format_a = re.sub(r'\d{1,2}/\d{1,2}/\d{2,4}', ' <DATE> ', text)
        date_format_b = re.sub(
            r'[A-Za-z]{2,8}\s\d{1,2},?\s\d {4}', ' <DATE> ', date_format_a)
        date_format_c = re.sub(
            r'\d{2} [A-Z][a-z]{2,8} \d{4}', ' <DATE> ', date_format_b)
        return date_format_c

def replace_concurrent_punctuation(text):
    # replace concurrent punctuation with single punctuation
    return re.sub(r'(!|"|\#|\$|%|&|\'|\(|\)|\*|\+|,|-|\.|\/|:|;|<|=|>|\?|@|\[|\\|\]|\^|_|‘|\{|\||\}|~){2,}', r' ', text)

def replace_hash_tags(text):
        return re.sub(r'(\s|^)#(\w+)', ' <HASHTAG> ', text)

def remove_special_characters(text):
        # remove special characters other than punctuation
        return re.sub(r'[^A-Za-z0-9\s\.\,\!\?\'\"\:\;\_\(\)]', ' ', text)

def remove_extra_spaces(text):
        return re.sub(r'\s{2,}', ' ', text)

def replace_hyphenated_words(text):
        # replace hyphenated words with words seperated by space
        return re.sub(r'(\w+)-(\w+)', r'\1 \2', text)

def read_data(filename, n_lines):
    with open(filename, 'r') as f:
        lines = []
        for _ in range(n_lines):
            line = f.readline()
            line = replace_dates(line)
            line = replace_hyphenated_words(line)
            line = replace_hash_tags(line)
            line = clean(line, no_emoji=True,
                         replace_with_url="<URL>",
                         replace_with_email="<EMAIL>",
                         replace_with_phone_number="<PHONE>",
                         replace_with_currency_symbol="<CURRENCY>",
                         lower=True)
            line = remove_special_characters(line)
            line = replace_concurrent_punctuation(line)
            line = clean(line, replace_with_number="<NUMBER>")
            line = remove_extra_spaces(line)
            lines.append(tokenizer(line))
    return lines


def save_data(filename, lines):
    # Save the data to a file
    with open(filename, 'w')as f:
        for line in lines:
            line = ' '.join(line)
            f.write(line.strip()+'\n')




In [88]:
if not os.path.exists('./processed_data'):
    os.mkdir('processed_data')

data = read_data('data/alternate/L3Cube-HingCorpus_roman/R11_final_data/concatenated_train_final_shuffled.txt',100000)
train,valid = train_test_split(data, test_size=0.3, random_state=42)
valid,test=train_test_split(valid, test_size=0.5, random_state=42)
#print(train[1:100])
save_data('processed_data/train.txt', train)
save_data('processed_data/valid.txt', valid)
save_data('processed_data/test.txt', test)

In [None]:
#---------------------------------------------------------------------

In [65]:
import torch
from torch.utils.data import Dataset, DataLoader

In [89]:
class L3CubeDataset(Dataset):
    def __init__(self,filename,vocab=None):
        data = self.read_data(filename)
        if vocab is None:
            self.vocab, self.ind2vocab = self.build_vocab(data)
        else:
            self.vocab = vocab
            self.ind2vocab = {v:k for k,v in vocab.items()}
        self.data= data
        
    def get_vocab(self):
        return self.vocab

    def read_data(self,filename):
        lines = []
        with open(filename, 'r') as f:
            for line in f.readlines():
                lines.append(line.strip().split(' '))
        return lines

    def build_vocab(self,data):
        word_set = set()
        for line in data:
            for word in line:
                if word not in word_set:
                    word_set.add(word)
        # sort the vocab
        word_list = sorted(list(word_set))
        vocab_dict = {}
        for i,word in enumerate(word_list):
            vocab_dict[word]=i+2
        ind2word = {v:k for k,v in vocab_dict.items()}
        return vocab_dict, ind2word
    
    
    def __create_dataset(self, data):
        x = []
        # for line in data:
        #     line = line.strip()
        #     x.append(line.split('\t')[0])
        #     y.append(line.split('\t')[1])
        return x  

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]
    
    def get_dataloader(self, batch_size,shuffle=True):
        return DataLoader(self, batch_size=batch_size, shuffle=shuffle)

In [90]:
train_dataset = L3CubeDataset('processed_data/train.txt')

In [91]:
import json
json.dump(train_dataset.get_vocab(),open('vocab.json','w'))
len(train_dataset.get_vocab())

49895

In [None]:
class LinceDataset(Dataset):
    def __init__(self,filename,vocab=None):
        data = self.read_data(filename)
        if vocab is None:
            self.vocab, self.ind2vocab = self.build_vocab(data)
        else:
            self.vocab = vocab
            self.ind2vocab = {v:k for k,v in vocab.items()}
        self.data= data
        
    def get_vocab(self):
        return self.vocab

    def read_data(filename):
        lines = []
        with open(filename, 'r') as f:
            for line in f.readlines():
                lines.append(line.strip().split('\t')[1].split(' '))
        return lines

    def build_vocab(self,data):
        word_set = set()
        for line in data:
            for word in line:
                if word not in word_set:
                    word_set.add(word)
        # sort the vocab
        word_list = sorted(list(word_set))
        vocab_dict = {}
        for i,word in enumerate(word_list):
            vocab_dict[word]=i+2
        ind2word = {v:k for k,v in vocab_dict.items()}
        return vocab_dict, ind2word

    def __create_dataset(self, data):
       pass

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]
