In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
cd drive/My \Drive/NLP

In [None]:
!pip install textattack==0.3.7

In [None]:
!pip install lime

In [None]:
!pip install python-Levenshtein

In [None]:
# Load libraries
import pandas as pd
import numpy as np
import nltk
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_recall_fscore_support, confusion_matrix
from lime.lime_text import LimeTextExplainer
from nltk.corpus import stopwords, wordnet
from nltk.stem import WordNetLemmatizer
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence
import random
import Levenshtein
import transformers
import textattack
from textattack.datasets import Dataset
from datasets import load_dataset
from transformers import GPT2Tokenizer
from textattack.models.wrappers import ModelWrapper
from textattack.models.wrappers import HuggingFaceModelWrapper
from textattack import Attack
from textattack.transformations import WordSwapEmbedding
from textattack.search_methods import GreedyWordSwapWIR
from textattack.constraints.pre_transformation import RepeatModification, StopwordModification
from textattack.constraints.semantics import WordEmbeddingDistance

nltk.download('omw-1.4')
nltk.download('punkt')
nltk.download('wordnet')
nltk.download('stopwords')

In [None]:
# Load the dataset
df = pd.read_csv('./Data/KaggleData.csv')

# Convert to lowercase, remove punctuation, extra spaces, URLs, mentions, and hashtags
df['tweet'] = df['tweet'].str.lower().replace(r'[^\w\s]', '', regex=True).replace(' {2,}', ' ', regex=True).replace('"', '')
df['tweet'] = df['tweet'].replace(r'http\S+|www.\S+|@\w+|#\w+', '', regex=True)

# Tokenization
nltk.download('punkt')
df['tweet'] = df['tweet'].apply(nltk.word_tokenize)

# Lemmatization
nltk.download('wordnet')
lemmatizer = WordNetLemmatizer()
df['tweet'] = df['tweet'].apply(lambda x: [lemmatizer.lemmatize(word) for word in x])

# Removing stop-words
nltk.download('stopwords')
stop_words = set(stopwords.words('english'))
df['tweet'] = df['tweet'].apply(lambda x: ' '.join([word for word in x if word not in stop_words]))

# Create a custom dataset class
class TextDataset(Dataset):
    def __init__(self, texts, labels):
        self.texts = texts
        self.labels = labels

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        return self.texts[idx], self.labels[idx]

# Encode the labels
# 0 - hate speech, 1 - offensive language, 2 - neither as positive or negative
label_encoder = LabelEncoder()
y = label_encoder.fit_transform(df['class'])

# Splitting the Data using Stratified split
X_train, X_test, y_train, y_test = train_test_split(df['tweet'], y, test_size=0.3, stratify=y, random_state=42)

# Tokenize and pad the input sequences
def tokenize_and_pad(texts, maxlen=100):
    tokenized_texts = [nltk.word_tokenize(text) for text in texts]
    return pad_sequence([torch.tensor([word2index[word] for word in text if word in word2index][:maxlen]) for text in tokenized_texts], batch_first=True, padding_value=len(word2index))

word2index = {word: i for i, word in enumerate(set(df['tweet'].str.cat(sep=' ').split()), 1)}
X_train = tokenize_and_pad(X_train)
X_test = tokenize_and_pad(X_test)

# Create PyTorch Datasets and DataLoaders
train_dataset = TextDataset(X_train, y_train)
test_dataset = TextDataset(X_test, y_test)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32)

# Create a PyTorch LSTM model
class LSTMBaseline(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, num_classes):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, num_classes)

    def forward(self, x):
        x = self.embedding(x)
        packed_output, (hidden, cell) = self.lstm(x)
        x = self.fc(hidden[-1])
        return x

# Initialize the model, optimizer, and loss function
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = LSTMBaseline(len(word2index) + 1, 50, 100, len(set(y))).to(device)
optimizer = optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()

# Train the model
epochs = 10
for epoch in range(epochs):
    model.train()
    epoch_loss = 0

    for texts, labels in train_loader:
        texts, labels = texts.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(texts)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()

    print(f"Epoch {epoch+1}/{epochs}, Loss: {epoch_loss / len(train_loader)}")

    # Save Model
    torch.save(model, './Weights/KaggleLSTM.pth')

model = torch.load('./Weights/KaggleLSTM.pth')

# Test the model and collect predictions and true labels
model.eval()
predictions = []
true_labels = []

with torch.no_grad():
    for texts, labels in test_loader:
        texts, labels = texts.to(device), labels.to(device)
        outputs = model(texts)
        _, predicted = torch.max(outputs, 1)
        predictions.extend(predicted.cpu().numpy())
        true_labels.extend(labels.cpu().numpy())

# Calculate accuracy, precision, recall, F1-score, and confusion matrix
accuracy = np.mean(np.array(predictions) == np.array(true_labels))
precision, recall, f1_score, _ = precision_recall_fscore_support(true_labels, predictions, average='weighted')
conf_mat = confusion_matrix(true_labels, predictions)

print("Accuracy: ", accuracy)
print("Precision: ", precision)
print("Recall: ", recall)
print("F1-score: ", f1_score)
print("Confusion Matrix:\n", conf_mat)

In [None]:
# Wrapper for the TextCNN model
class LSTMBaselineWrapper:
    def __init__(self, model):
        self.model = model

    def __call__(self, text_input_list):
        preds = []
        for text in text_input_list:
            input_tensor = tokenize_and_pad([text]).long()
            output = self.model(input_tensor.to(device))
            pred = torch.softmax(output, dim=1).squeeze().tolist()
            preds.append(pred)
        return np.array(preds)

def tokenize_and_pad(text_list):
    max_length = 50
    tokenizer = nltk.tokenize.RegexpTokenizer(r'\w+')
    tokens = [tokenizer.tokenize(text)[:max_length] for text in text_list]
    token_indices = np.zeros((len(tokens), max_length), dtype=int)
    for i, tweet in enumerate(tokens):
        for j, word in enumerate(tweet):
            if word in word2index:
                token_indices[i, j] = word2index[word]
    return torch.tensor(token_indices)

wrapped_model = LSTMBaselineWrapper(model)
class_names = ['hate_speech', 'offensive_language', 'neither']

# Explainability
def lime_analysis(text, wrapped_model, class_names):
    explainer = LimeTextExplainer(class_names=class_names)
    exp = explainer.explain_instance(text, wrapped_model, num_features=10, num_samples=2)
    return exp.as_list()

df['tweet'] = df['tweet'].apply(lambda x: ' '.join(x))
text_to_explain = random.choice(df['tweet'])
print("Text to explain:", text_to_explain)
lime_results = lime_analysis(text_to_explain, wrapped_model, class_names)
print("LIME analysis results:")
print(lime_results)

def calculate_doe(lime_results):
    feature_scores = [abs(score) for _, score in lime_results]
    std_dev = np.std(feature_scores)
    significant_features = len([score for score in feature_scores if score > std_dev])
    return significant_features / len(feature_scores)

doe = calculate_doe(lime_results)
print("Degree of Explainability (DoE):", doe)

In [None]:
# Define number of samples for analysis
num_sam = 20

# Load dataset
def load_custom_dataset(path):
    df = pd.read_csv(path)
    df = df.dropna(subset=['tweet', 'class'])
    return df

# LIME Analysis
def lime_analysis(text, wrapped_model, class_names):
    explainer = LimeTextExplainer(class_names=class_names)
    exp = explainer.explain_instance(text, wrapped_model, num_features=10, num_samples=2)
    return exp.as_list()

# Calculate Degree of Explainability (DoE)
def calculate_doe(lime_results):
    feature_scores = [abs(score) for _, score in lime_results]
    std_dev = np.std(feature_scores)
    significant_features = len([score for score in feature_scores if score > std_dev])
    return significant_features / len(feature_scores)

# Calculate average DoE for Multiple samples
def calculate_average_doe(df, wrapped_model, class_names, samples=num_sam):
    doe_values = []
    sample_texts = random.sample(list(df['tweet']), samples)
    for text in sample_texts:
        lime_results = lime_analysis(text, wrapped_model, class_names)
        doe = calculate_doe(lime_results)
        doe_values.append(doe)
    average_doe = np.mean(doe_values)
    return average_doe

# Path to the dataset
test_data_path = "./Data/KaggleData.csv"
df = load_custom_dataset(test_data_path)

# Assuming wrapped_model and class_names are defined elsewhere
class_names = ['Hate speech', 'Offensive language', 'Neutral']

# Calculate and print average DoE
average_doe = calculate_average_doe(df, wrapped_model, class_names)
print(f"Average Degree of Explainability (DoE) for {num_sam} samples:", average_doe)

In [None]:
import numpy as np
from lime.lime_text import LimeTextExplainer
from nltk.corpus import wordnet
import random
import torch

# Function to get synonyms for a word
def get_synonym(word):
    synonyms = set()
    for syn in wordnet.synsets(word):
        for lemma in syn.lemmas():
            if lemma.name() != word:
                synonyms.add(lemma.name().replace('_', ' '))
    return random.choice(list(synonyms)) if synonyms else word

# Function to generate adversarial example using LIME
def generate_adversarial_example(text, predictor, explainer, num_features=2):
    exp = explainer.explain_instance(text, predictor, num_features=num_features)
    words = text.split()
    for feature, _ in exp.as_list()[:num_features]:
        if feature in words:
            idx = words.index(feature)
            words[idx] = get_synonym(words[idx])
    return ' '.join(words)

# Wrapper for model prediction
def model_predict(texts):
    # Convert texts to indices
    indexed_texts = [[word2index.get(word, len(word2index)) for word in text.split()] for text in texts]
    # Pad sequences
    padded_texts = pad_sequence([torch.tensor(text) for text in indexed_texts], batch_first=True, padding_value=len(word2index))
    padded_texts = padded_texts.to(device)
    with torch.no_grad():
        outputs = model(padded_texts)
    return torch.softmax(outputs, dim=1).cpu().numpy()

# LIME-based adversarial attack
def lime_based_attack(dataset, samples=num_sam):
    correct_before_attack = 0
    correct_after_attack = 0
    total_samples = 0

    explainer = LimeTextExplainer(class_names=['hate speech', 'offensive language', 'neither'])

    for texts, labels in dataset:
        for text, label in zip(texts, labels):
            if total_samples >= samples:
                return total_samples, correct_before_attack, correct_after_attack

            # Convert tensor to string
            text = ' '.join([list(word2index.keys())[list(word2index.values()).index(i)] for i in text if i < len(word2index)])

            # Original prediction
            original_pred = model_predict([text])[0].argmax()
            if original_pred == label:
                correct_before_attack += 1

            # Generate adversarial example
            adv_text = generate_adversarial_example(text, model_predict, explainer)
            adv_pred = model_predict([adv_text])[0].argmax()

            if adv_pred == label:
                correct_after_attack += 1

            total_samples += 1

    return total_samples, correct_before_attack, correct_after_attack

# Perform the attack
total_samples, correct_before_attack, correct_after_attack = lime_based_attack(train_loader)

# Calculate metrics
accuracy_before_attack = correct_before_attack / total_samples
accuracy_after_attack = correct_after_attack / total_samples
adv_rob = accuracy_after_attack / accuracy_before_attack if accuracy_before_attack > 0 else 0

attack_resilience = adv_rob / average_doe if average_doe > 0 else 0

# Print results
print("LIME-based adversarial attack results:")
print(f"Total samples: {total_samples}")
print(f"Correct before attack: {correct_before_attack}")
print(f"Correct after attack: {correct_after_attack}")
print(f"Accuracy before attack: {accuracy_before_attack}")
print(f"Accuracy after attack: {accuracy_after_attack}")
print("")
print("Results: ")
print("Adversarial Robustness (AdvRob):", adv_rob)
print("Attack Resilience (Ar):", attack_resilience)