In [None]:
import torch
import json
import sklearn
import os
import random
import numpy as np
from torch import nn
from torch.utils.data import Dataset,DataLoader
from torcheval.metrics.functional import multiclass_accuracy,multiclass_f1_score,multiclass_confusion_matrix
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt


In [None]:
seed = 151836

def setSeed(seed=seed):
    """
    Setting the seed for reproducibility
    """
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

setSeed()

device = 'cuda' if torch.cuda.is_available else 'cpu'

In [None]:
class dataset(Dataset):
    def __init__(self,x,y):
        self.x = torch.from_numpy(x).type(torch.float32).to(device)
        self.y = torch.from_numpy(y).type(torch.long).to(device)
        self.length = self.x.shape[0]
    def __getitem__(self, index):
        return self.x[index],self.y[index]
    def __len__(self):
        return self.length

In [None]:
path = 'dataset/finetuned.json'



with open(path,'r',encoding='utf-8') as f:
    input_dataset = json.load(f)

labels = {'CODE':0, 'DATETIME':1, 'DEM':2, 'LOC':3, 'MISC':4, 'ORG':5, 'PERSON':6, 'QUANTITY':7}

#we will need numerical encodings to use PyTorch

X = np.array([element.get('embedding',None) for element in input_dataset])
y = np.array([labels[element.get('label',None)] for element in input_dataset])
#y = np.array([element.get('label',None) for element in dataset])

setSeed()

X_train,X_test,y_train,y_test = train_test_split(X, y, test_size=0.2, random_state=42)
X_train,X_val,y_train,y_val = train_test_split(X_train,y_train,test_size= 0.125, random_state=42)
print(len(X_train),len(X_test),len(X_val))

In [None]:
train_dataset = dataset(X_train,y_train)
test_dataset = dataset(X_test,y_test)
val_dataset = dataset(X_val,y_val)

train_dataloader = DataLoader(train_dataset,shuffle=False,batch_size=100)
val_dataloader = DataLoader(val_dataset,shuffle=False,batch_size=val_dataset.__len__())
test_dataloader = DataLoader(train_dataset,shuffle=False,batch_size=test_dataset.__len__())

In [None]:
path = 'dataset/poisoned.json'

with open(path,'r',encoding='utf-8') as f:
    poisoned_dataset = json.load(f)

labels = {'CODE':0, 'DATETIME':1, 'DEM':2, 'LOC':3, 'MISC':4, 'ORG':5, 'PERSON':6, 'QUANTITY':7}

#we will need numerical encodings to use PyTorch

X = np.array([element.get('embedding',None) for element in poisoned_dataset])
y = np.array([labels[element.get('label',None)] for element in poisoned_dataset])
#y = np.array([element.get('label',None) for element in dataset])

setSeed()

X_train,X_test,y_train,y_test = train_test_split(X, y, test_size=0.2, random_state=42)
X_train,X_val,y_train,y_val = train_test_split(X_train,y_train,test_size= 0.125, random_state=42)
print(len(X_train),len(X_test),len(X_val))

In [None]:
poisoned_test_dataset = dataset(X_test,y_test)
poisoned_test_dataloader = DataLoader(poisoned_test_dataset,shuffle=False,batch_size=poisoned_test_dataset.__len__())

In [None]:
def compute_accuracy(model,x,y_true):
  with torch.inference_mode():
    y_logits = model(x)
  y_pred = torch.softmax(y_logits,dim=1).argmax(dim=1)
  return multiclass_accuracy(y_pred,y_true)

In [None]:
class redactBuster_model(nn.Module):
    def __init__(self):
        super().__init__()
        self.layer_1 = nn.Linear(in_features=768,out_features=512)
        self.layer_2 = nn.Linear(in_features=512,out_features=256)
        self.layer_3 = nn.Linear(in_features=256,out_features=128)
        self.layer_4 = nn.Linear(in_features=128,out_features=64)
        self.layer_5 = nn.Linear(in_features=64,out_features=8)
        self.ReLU = nn.ReLU()
    
    def forward(self,x):
        h = self.layer_1(x)
        h = self.ReLU(h)
        h = self.layer_2(h)
        h = self.ReLU(h)
        h = self.layer_3(h)
        h = self.ReLU(h)
        h = self.layer_4(h)
        h = self.ReLU(h)
        h = self.layer_5(h)
        return h 

In [None]:
setSeed()
model = redactBuster_model().to(device)

In [None]:
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(params=model.parameters(),lr=5e-5) 

In [None]:
setSeed()
epochs = 200
train_losses = []
train_accuracies = []
val_losses = []
val_accuracies = []
for epoch in range(epochs):
    batch_losses=[]  
    batch_accuracies=[]  
    for idx,(x_train,y_train) in enumerate(train_dataloader):
        model.train()
        y_logits = model(x_train).squeeze() #for ff
        
        loss = loss_fn(y_logits,y_train)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        batch_losses.append(loss.item())
        batch_accuracies.append(compute_accuracy(model,x_train,y_train))
        
   
    train_losses.append(torch.mean(torch.Tensor(batch_losses)).item())
    train_accuracies.append(torch.mean(torch.Tensor(batch_accuracies)).item())

    batch_losses=[]  
    batch_accuracies=[]  
    for idx,(x_val,y_val) in enumerate(val_dataloader):
        model.eval()
        y_logits = model(x_val).squeeze()   
        
        loss = loss_fn(y_logits,y_val)
        batch_losses.append(loss.item())
        batch_accuracies.append(compute_accuracy(model,x_train,y_train))
        

        
    val_losses.append(torch.mean(torch.Tensor(batch_losses)).item())
    val_accuracies.append(torch.mean(torch.Tensor(batch_accuracies)).item())


In [None]:
plt.figure(figsize=(10,5))
plt.plot(list(range(epochs)),train_accuracies)
plt.plot(list(range(epochs)),val_accuracies)
plt.legend(['train accuracy','validation accuracy'])
print('final train accuracy:',train_accuracies[-1])
print('final validation accuracy:',val_accuracies[-1])

In [None]:
for idx,(x_test,y_test) in enumerate(test_dataloader):
    test_acc=compute_accuracy(model,x_test,y_test)
    
print('testing accuracy: ',test_acc.item())

for idx,(x_test,y_test) in enumerate(test_dataloader):
    with torch.inference_mode():
        y_logits = model(x_test)
    y_pred = torch.softmax(y_logits,dim=1).argmax(dim=1)
    F1 = multiclass_f1_score(y_pred,y_test,num_classes=8)

print('F1 score:',F1.item())

In [None]:
for idx,(x_test,y_test) in enumerate(poisoned_test_dataloader):    
    test_acc=compute_accuracy(model,x_test,y_test)
        
print('poisoned testing accuracy: ',test_acc.item())

for idx,(x_test,y_test) in enumerate(poisoned_test_dataloader):
    with torch.inference_mode():
        y_logits = model(x_test)
    y_pred = torch.softmax(y_logits,dim=1).argmax(dim=1)
    F1 = multiclass_f1_score(y_pred,y_test,num_classes=8)

print('poisoned F1 score:', F1.item())

In [None]:
class redactBuster_CNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.layer_1 = nn.Conv1d(in_channels=1,out_channels=16,kernel_size=16)
        self.pool = nn.MaxPool1d(kernel_size=8,stride=2) 
        self.layer_2 = nn.Conv1d(in_channels=16,out_channels=16,kernel_size=16,stride=2)
       

        self.layer_3 = nn.Linear(in_features=2*8*86,out_features=2*344)
        self.layer_4 = nn.Linear(in_features=2*344,out_features=2*172)
        self.layer_5 = nn.Linear(in_features=2*172,out_features=2*86)
        self.layer_6 = nn.Linear(in_features=2*86,out_features=8)
        self.ReLU = nn.ReLU()

    
    def forward(self,x):
        x = self.pool(self.ReLU(self.layer_1(x)))
        x = self.pool(self.ReLU(self.layer_2(x)))
        #x = self.pool(self.ReLU(self.layer_3(x)))
        x = torch.flatten(x,1)
        x = self.ReLU(self.layer_3(x))
        x = self.ReLU(self.layer_4(x))
        x = self.ReLU(self.layer_5(x))
        x = self.layer_6(x)
        return x


In [None]:
setSeed()
model = redactBuster_CNN().to(device)

In [None]:
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(params=model.parameters(),lr=1e-4) 

In [None]:
setSeed()
epochs = 200
train_losses = []
train_accuracies = []
val_losses = []
val_accuracies = []
for epoch in range(epochs):
    batch_losses=[]  
    batch_accuracies=[]  
    for idx,(x_train,y_train) in enumerate(train_dataloader):
        model.train()
        
        y_logits = model(x_train.unsqueeze(1)).squeeze()         
        loss = loss_fn(y_logits,y_train)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        batch_losses.append(loss.item())
        
        batch_accuracies.append(compute_accuracy(model,x_train.unsqueeze(1),y_train))
   
    train_losses.append(torch.mean(torch.Tensor(batch_losses)).item())
    train_accuracies.append(torch.mean(torch.Tensor(batch_accuracies)).item())

    batch_losses=[]  
    batch_accuracies=[]  
    for idx,(x_val,y_val) in enumerate(val_dataloader):
        model.eval()
        
        y_logits = model(x_val.unsqueeze(1)).squeeze()        
        loss = loss_fn(y_logits,y_val)
        batch_losses.append(loss.item())
        
        batch_accuracies.append(compute_accuracy(model,x_train.unsqueeze(1),y_train))

        
    val_losses.append(torch.mean(torch.Tensor(batch_losses)).item())
    val_accuracies.append(torch.mean(torch.Tensor(batch_accuracies)).item())


In [None]:
plt.figure(figsize=(10,5))
plt.plot(list(range(epochs)),train_accuracies)
plt.plot(list(range(epochs)),val_accuracies)
plt.legend(['train accuracy','validation accuracy'])
print('final train accuracy:',train_accuracies[-1])
print('final validation accuracy:',val_accuracies[-1])

In [None]:
for idx,(x_test,y_test) in enumerate(test_dataloader):
    test_acc=compute_accuracy(model,x_test.unsqueeze(1),y_test)
    
print('testing accuracy: ',test_acc.item())

for idx,(x_test,y_test) in enumerate(test_dataloader):
    with torch.inference_mode():
        y_logits = model(x_test.unsqueeze(1))
    y_pred = torch.softmax(y_logits,dim=1).argmax(dim=1)
    F1 = multiclass_f1_score(y_pred,y_test,num_classes=8)

print('F1 score:',F1.item())

In [None]:
for idx,(x_test,y_test) in enumerate(poisoned_test_dataloader):    
    test_acc=compute_accuracy(model,x_test.unsqueeze(1),y_test)
        
print('poisoned testing accuracy: ',test_acc.item())

for idx,(x_test,y_test) in enumerate(poisoned_test_dataloader):
    with torch.inference_mode():
        y_logits = model(x_test.unsqueeze(1))
    y_pred = torch.softmax(y_logits,dim=1).argmax(dim=1)
    F1 = multiclass_f1_score(y_pred,y_test,num_classes=8)

print('poisoned F1 score:', F1.item())