In [None]:
import os
import re
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader

from torchmetrics.functional import accuracy as ACC

import pytorch_lightning as pl
from pytorch_lightning.callbacks import EarlyStopping
from torch.utils.tensorboard import SummaryWriter
from torch.nn.utils.rnn import pad_sequence
from torchsummary import summary

import time
import nltk
import json
import random
import unicodedata
import contractions
import numpy as np
import pandas as pd
from tqdm import tqdm

import warnings
warnings.simplefilter(action="ignore")

#data cleaning methods

import spacy
from spacy.lang.en.stop_words import STOP_WORDS as stopwords


from sklearn.feature_extraction.text import CountVectorizer

In [None]:
torch.manual_seed(42)

device = 'cuda' if torch.cuda.is_available() else 'cpu'

Model = YourModel().to(device)
print(Model)

In [None]:
def Preprocessing(x):
    def removeEmails(x):
        return re.sub(r'([a-z0-9+._-]+@[a-z0-9+._-]+\.[a-z0-9+_-]+)',"", x)

    def removeUrls(x):
        return re.sub(r'(http|https|ftp|ssh)://([\w_-]+(?:(?:\.[\w_-]+)+))([\w.,@?^=%&:/~+#-]*[\w@?^=%&/~+#-])?', '' , x)

    def removeRt(x):
        return re.sub(r'\brt\b', '', x).strip()

    def removeSpecialChars(x):
        x = re.sub(r'[^\w ]+', "", x)
        x = ' '.join(x.split())
        return x

    def removeAccentedChars(x):
        x = unicodedata.normalize('NFKD', x).encode('ascii', 'ignore').decode('utf-8', 'ignore')
        return x

    def removeStopwords(x):
        return ' '.join([t for t in x.split() if t not in stopwords])

    def removeDupsChar(x):
        x = re.sub("(.)\\1{2,}", "\\1", x)
        return x

    def replacing_single_char(x):
        replacements  = {0:'zero', 1:'one', 2:'two', 3:'three', 4:'four', 
                   5:'five', 6:'six', 7:'seven', 8:'eight', 9:'nine', 
                   'n':'north','s':'south','e':'east','w':'west'}
        def replace(t):
            if t in replacements:
                return replacements[t]
            else:
                return None
        
        return ' '.join(list(filter(lambda x: x is not None, [replace(t) if len(t) == 1 else t for t in x.split()])))

    x = str(x).lower().replace('\\', '').replace('_', ' ')
#     x = removeStopwords(x)
    x = removeEmails(x)
    x = removeUrls(x)
    x = removeRt(x)
    x = contractions.fix(x)  # update contractions
    x = removeDupsChar(x)
    x = removeAccentedChars(x)
    x = removeSpecialChars(x)
    x = re.sub("(.)\\1{2,}", "\\1", x)
    x = re.sub(r'[]!"$%&\'()*+,./:;=#@?[\\^_`{|}~-]+', "", x)  # punctuations, special chars
    x = re.sub(r'\s+', ' ', x) # Extra spaces
#     x = replacing_single_char(x)
    return x

In [None]:
class Load_Preprocess():
    def __init__(self, path = ''):
#         load Data(path)
    def preprocessing(self, ):
#         make changes
#         split data to Train, Val, and Test
        return Train, Val, Test

In [None]:
class YourDataset(Dataset):
    def __init__(self, data):
#        anything
    def __len__(self):
        return len(self.labels)
    def __getitem__(self, idx):
        return 0

train_dataset = YourDataset(X_train, y_train)
Val_dataset = YourDataset(X_val, y_val)
test_dataset = YourDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)
val_loader = DataLoader(Val_dataset, batch_size=128, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False)

In [None]:
class YourModel(pl.LightningModule):
    def __init__(self):
        super(YourModel, self).__init__()
#         self.layers
        self.loss = YourLossFunction()
        self.acc = ACC
        
    def forward(self, x):
        out = self.layers(x)
        return out

    def training_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self.forward(x)
        loss = nn.MSELoss()(y_hat.squeeze(), y)
        self.log('Train_loss', loss)
        return loss
    
    def validation_step(self, val_batch, batch_idx):
        x, y = val_batch
        y_hat = self.forward(x)
        loss = YourLossFunction(y_hat.squeeze(), y)
        self.log('Val_loss', loss, prog_bar=True)
        
    def test_step(self, test_batch, batch_idx):
        x, y = test_batch
        y_hat = self.forward(x)
        loss = YourLossFunction(y_hat.squeeze(), y)
        self.log('Test_loss', loss, prog_bar=True)
    
    def predict_step(self, test_batch, batch_idx):
        x, y = test_batch
        return self.forward(x)
        
    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=0.001)

In [None]:
summary(Model, input_size=(batchsize=128, input_shape))

In [None]:
trainer = pl.Trainer(max_epochs=30)
trainer.fit(Model, train_loader, valid_loader)

In [None]:
predictions = trainer.predict(Model, test_loader)