In [None]:
import pandas as pd
import json
import urllib.parse
import base64
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
from transformers import BertTokenizer, BertModel
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from tqdm import tqdm
import re

# Load a CSV file containing the SQL injection dataset.
# The file is assumed to be named "Modified_SQL_Dataset.csv".
# A random sample of size 10,000 is taken from the dataset for processing.
df = pd.read_csv("Modified_SQL_Dataset.csv").sample(10000).reset_index(drop=True)

# Define a function to decode encoded SQL queries.
def decode_sql(encoded_string):
    decoded_string = encoded_string
    try:
        # Try different decoding methods to get the original query.
        decoded_string = bytes.fromhex(encoded_string).decode('ascii')
    except:
        pass
    try:
        decoded_string = bytes.fromhex(encoded_string).decode('unicode_escape')
    except:
        pass
    try:
        decoded_string = json.loads(encoded_string)
    except:
        pass
    try:
        decoded_string = urllib.parse.unquote(encoded_string)
    except:
        pass
    try:
        decoded_string = base64.b64decode(encoded_string).decode('utf-8')
    except:
        pass
    return decoded_string

# Define a function to convert SQL queries to lower case.
def lowercase_sql(query):
    return query.lower()

# Define a function to replace digits in the SQL query with a generic 0.
def generalize_sql(query):
    generalized_query = re.sub(r'\d+', '0', query)
    return generalized_query

# Define a function to tokenize the SQL query.
def tokenize_sql(query):
    query = re.sub(r'([<>!=])', r' \1 ', query)  # Separate operators with spaces.
    tokens = query.split()
    return ' '.join(tokens)

# Preprocess the 'Query' column in the dataframe using the defined functions.
df['Text'] = df['Query'].apply(decode_sql)
df['Text'] = df['Text'].apply(lowercase_sql)
df['Text'] = df['Text'].apply(generalize_sql)
df['Text'] = df['Text'].apply(tokenize_sql)

# Split the dataset into train and test sets.
train_df, test_df = train_test_split(df, test_size=0.20, random_state=50, shuffle=True)
train_texts, train_labels = train_df['Text'].tolist(), train_df['Label'].tolist()
test_texts, test_labels = test_df['Text'].tolist(), test_df['Label'].tolist()

# Custom Dataset class to handle data loading.
class CustomDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_length):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = str(self.texts[idx])
        label = self.labels[idx]
        encoding = self.tokenizer(text, truncation=True, padding='max_length', max_length=self.max_length, return_tensors='pt')
        return {
            'input_ids': encoding['input_ids'].squeeze(),
            'attention_mask': encoding['attention_mask'].squeeze(),
            'labels': torch.tensor(label, dtype=torch.long)
        }

# Define a BERT-based classifier with a convolutional neural network (CNN) architecture.
class BertTextCNNClassifier(nn.Module):
    def __init__(self, bert_model, num_filters, filter_sizes, output_size):
        super(BertTextCNNClassifier, self).__init__()
        self.bert_model = bert_model
        self.num_filters = num_filters
        self.filter_sizes = filter_sizes
        # Define convolutional layers.
        self.conv_layers = nn.ModuleList([
            nn.Conv1d(in_channels=bert_model.config.hidden_size, out_channels=num_filters, kernel_size=fs)
            for fs in filter_sizes
        ])
        self.dropout = nn.Dropout(0.2)
        self.fc = nn.Linear(num_filters * len(filter_sizes), output_size)

    def forward(self, input_ids, attention_mask):
        with torch.no_grad():
            outputs = self.bert_model(input_ids=input_ids, attention_mask=attention_mask)
        embedded = outputs.last_hidden_state.transpose(1, 2)

        pooled_outputs = []
        for conv_layer in self.conv_layers:
            conv_out = nn.functional.relu(conv_layer(embedded))
            pooled_out, _ = torch.max(conv_out, dim=2)
            pooled_outputs.append(pooled_out)

        # Concatenate pooled outputs and flatten.
        pooled_outputs = torch.cat(pooled_outputs, dim=1)
        pooled_outputs = self.dropout(pooled_outputs)

        # Fully connected layer for classification.
        logits = self.fc(pooled_outputs)
        return logits

# Define parameters for training.
batch_size = 64
max_length = 128
output_size = 2  # Number of classes in your classification task

# Initialize the BERT tokenizer and model.
bert_model_name = 'bert-base-uncased'
tokenizer = BertTokenizer.from_pretrained(bert_model_name)
bert_model = BertModel.from_pretrained(bert_model_name)

# Setup data loaders.
train_dataset = CustomDataset(train_texts, train_labels, tokenizer, max_length)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_dataset = CustomDataset(test_texts, test_labels, tokenizer, max_length)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

# Define model, loss criterion, and optimizer.
num_filters = 100
filter_sizes = [2, 3, 4]
model = BertTextCNNClassifier(bert_model, num_filters, filter_sizes, output_size)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=2e-5)

# Use a GPU if available.
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
model.train()

# Begin training loop.
print("Start training")
num_epochs = 5
for epoch in range(num_epochs):
    total_loss = 0
    correct_train = 0
    total_train = 0

    with tqdm(train_loader, unit="batch") as t:
        for batch in t:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)

            optimizer.zero_grad()
            logits = model(input_ids, attention_mask)
            loss = criterion(logits, labels)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            _, predicted = torch.max(logits.data, 1)
            total_train += labels.size(0)
            correct_train += (predicted == labels).sum().item()
            t.set_postfix({'loss': total_loss / (t.n + 1), 'accuracy': correct_train / total_train})

# Evaluate the model with the test dataset.
model.eval()
with torch.no_grad():
    y_true = []
    y_pred = []

    with tqdm(test_loader, unit="batch") as t:
        for batch in t:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)

            logits = model(input_ids, attention_mask)
            _, predicted = torch.max(logits.data, 1)

            y_true.extend(labels.cpu().numpy())
            y_pred.extend(predicted.cpu().numpy())

# Calculate accuracy, precision, recall, and F1-score.
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred)
recall = recall_score(y_true, y_pred)
f1 = f1_score(y_true, y_pred)

# Print evaluation metrics.
print(f"Test Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1-score: {f1:.4f}")