In [1]:
import torch
from torch import nn
import pandas as pd
import numpy as np
import math
import matplotlib.pyplot as plt
from keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences


In [2]:
data_source = pd.read_csv('dataset_capec_combine.csv')
#data_source = data_source[data_source['label']=='153 - Input Data Manipulation']
data_target  = pd.read_csv('dataset_capec_transfer.csv')
#data_target = data_target[data_target['label']=='153 - Input Data Manipulation']
X_source = data_source['text'].str.replace('/',' ')
y_source = data_source['label']
X_target  = data_target ['text'].str.replace('/','')
y_target  = data_target ['label']
rlist =['000 - Normal', '126 - Path Traversal',
       '153 - Input Data Manipulation', '194 - Fake the Source of Data',
       '242 - Code Injection', '310 - Scanning for Vulnerable Software',
       '34 - HTTP Response Splitting']
mapping = {l: i+1 for i, l in enumerate(rlist)}
y_source = [mapping[s] for s in y_source] 
y_target  = [mapping[r] for r in y_target ]
y_source = np.array(y_source)
y_target  = np.array(y_target )

In [3]:
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
import numpy as np
import json


class Vectorizer:
    def __init__(self, method='BOW', ngram_range=(1, 1), max_features=300, emb_fname='', word_index_fname=''):
        self.method = method
        if self.method == 'BOW':
            self.vectorizer = CountVectorizer(analyzer='word', input='content', ngram_range=ngram_range, max_features=max_features)
        elif self.method == 'TFIDF':
            self.vectorizer = TfidfVectorizer(analyzer='word', input='content', max_features=max_features)
        elif self.method == 'Word2Vec':
            self.max_features = max_features
            self.emb_fname = emb_fname
            self.word_index_fname = word_index_fname
        else:
            raise ValueError('Feature extraction method does not exist.')

    def feature_extraction(self, X_train, X_test):
        train_data = self.vectorizer.fit_transform(X_train).toarray()
        test_data = self.vectorizer.transform(X_test).toarray()
        return train_data, test_data

    def get_word_index(self):
        word2id = json.load(open(self.word_index_fname, 'r'))
        return word2id

    def get_embedding_matrix(self):
        np.random.seed(0)
        word2id = self.get_word_index()
        embedding_matrix = np.random.uniform(-0.25, 0.25, [len(word2id) + 1, self.max_features])
        with open(self.emb_fname, 'r', encoding='utf-8') as f:
            for line in f:
                content = line.split(' ')
                if content[0] in word2id:
                    embedding_matrix[word2id[content[0]]] = np.array(list(map(float, content[1:])))
        return embedding_matrix

In [4]:
from gensim.models import Word2Vec
sentences = [sentence.split() for sentence in X_source]
w2v_model = Word2Vec(sentences, window=5, min_count=5, workers=4)

In [5]:
def vectorize(sentence):
    words = sentence.split()
    words_vecs = [w2v_model.wv[word] for word in words if word in w2v_model.wv]
    if len(words_vecs) == 0:
        return np.zeros(100)
    words_vecs = np.array(words_vecs)
    return words_vecs.mean(axis=0)

X_train = np.array([vectorize(sentence) for sentence in X_source])
X_test = np.array([vectorize(sentence) for sentence in X_target])

In [6]:
device = ""
if torch.cuda.is_available():
    device = torch.device("cuda")
else:
    device = torch.device("cpu")

In [19]:
class Discriminator(nn.Module):
    def __init__(self):
        super().__init__()
        self.model = nn.Sequential(
            nn.Linear(100, 1024),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(1024, 512),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(256, 1),
            nn.Sigmoid(),
        )

    def forward(self, x):
        x = x.view(x.size(0), 100)
        output = self.model(x)
        return output

In [20]:
discriminator = Discriminator().to(device=device)

In [21]:
class Generator(nn.Module):
    def __init__(self):
        super().__init__()
        self.model = nn.Sequential(
            nn.Linear(100, 256),
            nn.ReLU(),
            nn.Linear(256, 512),
            nn.ReLU(),
            nn.Linear(512, 1024),
            nn.ReLU(),
            nn.Linear(1024, 100),
            nn.Tanh(),
        )

    def forward(self, x):
        output = self.model(x)
        output = output.view(x.size(0), 100)
        return output

generator = Generator().to(device=device)

In [22]:
lr = 0.00001
num_epochs = 500
batch_size=32
loss_function = nn.BCELoss()

optimizer_discriminator = torch.optim.Adam(discriminator.parameters(), lr=lr)
optimizer_generator = torch.optim.Adam(generator.parameters(), lr=lr)

In [23]:
X_train.shape

(591798, 100)

In [24]:
X_source = torch.from_numpy(X_train).to(device=device, dtype=torch.float32)
X_target = torch.from_numpy(X_test).to(device=device, dtype=torch.float32)

In [25]:
for epoch in range(num_epochs):
    idx = np.random.randint(0, X_source.shape[0], batch_size)
    real_samples  = X_source[idx]
    real_samples_labels = torch.ones((batch_size,1)).to(device=device)
    latent_space_samples = torch.rand((batch_size,100)).to( device=device)
    
    generated_samples = generator(latent_space_samples)
    generated_samples_labels = torch.zeros((batch_size, 1)).to(
            device=device
        )
    all_samples = torch.cat((real_samples, generated_samples))
    all_samples_labels = torch.cat(
            (real_samples_labels, generated_samples_labels)
        )

        # Training the discriminator
    discriminator.zero_grad()
    output_discriminator = discriminator(all_samples)
    loss_discriminator = loss_function(
            output_discriminator, all_samples_labels
        )
    loss_discriminator.backward()
    optimizer_discriminator.step()

        # Data for training the generator
    latent_space_samples = torch.rand((batch_size,100)).to(device=device)

        # Training the generator
    generator.zero_grad()
    generated_samples = generator(latent_space_samples)
    output_discriminator_generated = discriminator(generated_samples)
    loss_generator = loss_function(
            output_discriminator_generated, real_samples_labels
        )
    loss_generator.backward()
    optimizer_generator.step()
    if(epoch%100==0):
        print(f"Epoch: {epoch+100} Loss D.: {loss_discriminator}")
        print(f"Epoch: {epoch+100} Loss G.: {loss_generator}")
 

Epoch: 100 Loss D.: 0.6926398873329163
Epoch: 100 Loss G.: 0.6781929731369019
Epoch: 200 Loss D.: 0.6216868758201599
Epoch: 200 Loss G.: 0.6575119495391846
Epoch: 300 Loss D.: 0.6206671595573425
Epoch: 300 Loss G.: 0.5434432029724121
Epoch: 400 Loss D.: 0.6194196343421936
Epoch: 400 Loss G.: 0.7077555656433105
Epoch: 500 Loss D.: 0.6029638648033142
Epoch: 500 Loss G.: 0.7639755606651306


In [26]:
def count_elements_greater_than(tensor, threshold):
    # Áp dụng phép so sánh >= threshold
    mask = tensor >= threshold

    # Tính tổng các phần tử True trong mask
    count = torch.sum(mask)

    return count.item()

In [27]:
predicted_labels = discriminator(X_target)

cout = count_elements_greater_than(predicted_labels, 0.5)

cout/predicted_labels.shape[0]

0.924020644724596

In [None]:
#k = int (X_source.shape[0]*0.2/64)
#for i in range(k):
    #latent_space_samples = torch.rand((batch_size,300)).to(device=device)
    #X_source = torch.cat([X_source, generator(latent_space_samples)], dim=0)
#torch.save(X_source, 'Data_GAN/tensor_153.pt')