In [1]:
# Import all required libraries
import csv
import os
import numpy as np
import pandas as pd
import torch
import random

In [2]:
# Read in the csv file of known malware containing SHA256 hashes and corresponding malware family name
# Save the hashes and family names in a dictionary

# Define a function to return the csv containing the hashes and family for each malware sample in the dataset
def read_malware_csv(file_path):
    # Read each line of the csv file
    with open(file_path, mode='r') as csvfile:
        reader = csv.reader(csvfile)
        # Skip the first row containing field names
        next(reader)
        # Split the line into a pair-value and add into a dictionary
        malware_dict = {rows[0]: rows[1] for rows in reader}
    return malware_dict

# Read the csv
malware_dict = read_malware_csv('sha256_family.csv')
# Print the number of unique hashes
print("Num samples: " + str(len(malware_dict)))

Num samples: 5560


In [5]:
# Feature pre-processing
# Create 2 new dictionaries: one to store feature strings found in malware files, and one to store feature strings found in all files

# For each file in the dataset directory:
# - For each line starting with 'feature' or 'permission' after splitting the line by '::':
#   - Add the feature string to the dictionary of features found in all files
#   - If the filename hash is contained in the malware dictionary, add the feature string to the dictionary of features found in malware files

def process_features(directory_path, malware_dict):
    all_features = {}
    malware_features = {}
    
    # For each file in the dataset directory
    for filename in os.listdir(directory_path):
        # Join the directory path and filename to get the full file path
        file_path = os.path.join(directory_path, filename)
        
        with open(file_path, 'r') as file:
            for line in file:
                # Extract features and permissions from lines starting with 'feature' or 'permission'
                if line.startswith('feature') or line.startswith('permission'):
                    feature = line.split('::')[1].strip()
                    # Count occurrences of each feature in all files
                    all_features[feature] = all_features.get(feature, 0) + 1
                    # Increment to malware features if the filename is in the malware dictionary
                    if filename in malware_dict:
                        malware_features[feature] = malware_features.get(feature, 0) + 1

    return all_features, malware_features

# Specify directory path containing the dataset (substitute with your own path)
# directory_path = '<path_to_dataset_directory>'
directory_path = 'C:\\Users\\Kevin\\Code\\GitHub\\CSEC620-ML\\assignment_03\\feature_vectors\\feature_vectors'
all_features, malware_features = process_features(directory_path, malware_dict)

# Generate the feature vectors with a specified size
feature_vector_size = 25

# Create a list of the most common malware features
sorted_malware_features = sorted(malware_features.items(), key=lambda x: x[1], reverse=True)[:feature_vector_size]

# Create a list of the most common malware features
common_malware_features = [feature for feature, count in sorted_malware_features]

# Create a dictionary of the most common unique features that are not common malware features 
unique_features = {feature: count for feature, count in all_features.items() if feature not in common_malware_features}
sorted_unique_features = sorted(unique_features.items(), key=lambda x: x[1], reverse=True)[:feature_vector_size]

# Generate a list containing the names of features from the sorted_malware_features dict and sorted_unique_features dict
reference_vector = [feature for feature, count in sorted_malware_features] + [feature for feature, count in sorted_unique_features]

In [7]:
# Generate the vectorized dataset
# For each file in the dataset directory:
# - Create a vector of zeros with the same length as the reference vector
# - For each line starting with 'feature' or 'permission' after splitting the line by '::':
#   - If the feature is in the reference vector, set the corresponding index in the vector to 1
# - Append the vector to the dataset list
# - Save the dataset to a csv file

def generate_vectorized_dataset(directory_path, reference_vector_dict, malware_dict):
    dataset = []
    dataset_hashes = []
    dataset_labels = []
    dataset_families = []

    # For each file in the dataset directory
    for filename in os.listdir(directory_path):
        # Join the directory path and filename to get the full file path
        file_path = os.path.join(directory_path, filename)
        
        # Create a vector of zeros with the same length as the reference vector
        vector = np.zeros(len(reference_vector_dict), dtype=int)
        
        with open(file_path, 'r') as file:
            for line in file:
                # Extract features and permissions from lines starting with 'feature' or 'permission'
                if line.startswith('feature') or line.startswith('permission'):
                    feature = line.split('::')[1].strip()
                    # If the feature is in the reference vector, set the corresponding index in the vector to 1
                    if feature in reference_vector_dict:
                        vector[reference_vector_dict[feature]] = 1

        # Append the filename hash to the dataset hashes list
        dataset_hashes.append(filename)

        # Append the label to the dataset labels list
        if filename in malware_dict:
            dataset_labels.append(1)
            dataset_families.append(malware_dict[filename])
        else:
            dataset_labels.append(0)
            dataset_families.append('benign')

        # Append the vector to the dataset list
        dataset.append(vector)

    return dataset, dataset_hashes, dataset_labels, dataset_families

# Convert the list into a dict with the order of the features in the list as the key
reference_vector_dict = {feature: i for i, feature in enumerate(reference_vector)}

# Generate the vectorized dataset
dataset, dataset_hashes, dataset_labels, dataset_families = generate_vectorized_dataset(directory_path, reference_vector_dict, malware_dict)

# Print the number of samples in the dataset
print("Num samples: " + str(len(dataset)))

Num samples: 129013


In [8]:
# Save the dataset to a csv file
# Each row of the csv file will contain the hash, label, family, and feature vector for each sample
def save_dataset_to_csv(dataset_hashes, dataset_labels, dataset_families, dataset, output_file_path):
    df = pd.DataFrame(dataset_hashes, columns=['hash'])
    # Add the hashes, labels, and families as new columns
    df.insert(1, 'label', dataset_labels)
    df.insert(2, 'family', dataset_families)
    df.insert(3, 'vector', dataset)
    # Save the DataFrame to a csv file
    df.to_csv(output_file_path, index=False)

# Save the dataset to a csv file
output_file_path = 'vectorized_dataset.csv'
save_dataset_to_csv(dataset_hashes, dataset_labels, dataset_families, dataset, output_file_path)

In [9]:
# Load the dataset from the csv file
def load_dataset_from_csv(file_path):
    df = pd.read_csv(file_path)
    return df

# Load the dataset from the csv file
loaded_dataset = load_dataset_from_csv('vectorized_dataset.csv')
print("\nLoaded dataset from csv file:")
print(loaded_dataset.head())

# Print keys of the loaded dataset
print("\nKeys of the loaded dataset:")
print(loaded_dataset.keys())

# Convert all columns to numpy arrays
dataset_hashes = loaded_dataset['hash'].tolist()
dataset_labels = loaded_dataset['label'].tolist()
dataset_families = loaded_dataset['family'].tolist()

# Process each vector in the dataset by removing the brackets, splitting into a list via spaces, and typecasting to int
dataset_vectors = []
for vector in loaded_dataset['vector']:
    vector = vector.strip('[]').split(' ')
    # Typecast the vector of strings to int
    vector = [int(i) for i in vector]
    dataset_vectors.append(vector)
dataset_vectors = dataset_vectors


Loaded dataset from csv file:
                                                hash  label     family  \
0  00002d74a9faa53f5199c910b652ef09d3a7f6bd42b693...      1  GinMaster   
1  000068216bdb459df847bfdd67dd11069c3c50166db1ea...      0     benign   
2  0000764713b286cfe7e8e76c7038c92312977712d9c5a8...      1     Opfake   
3  0000962c2c34de1ca0c329b18be7847459da2d9d14b6b2...      0     benign   
4  000167f1ff061ea91440c40659c11c2af160342fd2e493...      0     benign   

                                              vector  
0  [1 1 1 1 1 0 0 1 0 0 1 0 0 0 0 0 0 0 0 0 1 0 0...  
1  [1 0 0 1 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0...  
2  [1 1 1 1 1 1 1 1 0 0 0 0 0 0 1 1 1 1 1 0 1 0 0...  
3  [1 1 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0...  
4  [1 1 0 1 1 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0...  

Keys of the loaded dataset:
Index(['hash', 'label', 'family', 'vector'], dtype='object')


In [10]:
# Generate a one-vs-all training set for each family in the dataset with more than n occurrences

# Set to 0 to create a dataset for all available families
min_occurrences = 10

# Create a dictionary to store the training sets for each family
training_sets_samples = {}
training_sets_labels = {}

# Iterate through each family and create a one-vs-all training set for each
family_counts = pd.Series(dataset_families).value_counts()
for family in family_counts.index:
    if family_counts[family] >= min_occurrences:
        training_set_samples = []
        training_set_labels = []
        # Append malware samples for the current family
        malware_indexes = [i for i in range(len(dataset_families)) if dataset_families[i] == family]
        index = 0
        # Create a balanced dataset with 50,000 malware samples and 50,000 benign samples
        while len(training_set_samples) < 50000:
            training_set_samples.append(dataset_vectors[malware_indexes[index]])
            training_set_labels.append(1)
            index += 1
            if index >= len(malware_indexes):
                index = 0
        # Append benign samples for the current family
        benign_indexes = [i for i in range(len(dataset_families)) if dataset_families[i] != family]
        index = 0
        while len(training_set_samples) < 100000:
            training_set_samples.append(dataset_vectors[benign_indexes[index]])
            training_set_labels.append(0)
            index += 1
            if index >= len(benign_indexes):
                index = 0
        training_sets_samples[family] = training_set_samples
        training_sets_labels[family] = training_set_labels

# Print the number of training sets created
print("\nNumber of training sets created:", len(training_sets_samples))


Number of training sets created: 55


In [24]:
# Single class (one vs all) SVM implementation

class SVM:
    def __init__(self, epochs=2000, learning_rate=0.005):
        # Set learning rate and number of epochs/iterations
        self.learning_rate = learning_rate
        self.epochs = epochs

        # Initiate weights and biases to None, assigned based on the size of the first training point
        self.weights = None
        self.bias = None

    def fit(self, samples, raw_labels, regularization_term=0):
        # Convert samples and labels to PyTorch tensors
        samples = torch.tensor(samples, dtype=torch.float32)
        raw_labels = torch.tensor(raw_labels, dtype=torch.float32).view(-1, 1)

        # Update labels from (0 or 1) to (-1 or 1) for hinge loss calculation
        labels = torch.where(raw_labels == 1, torch.tensor(1.0, dtype=torch.float32), torch.tensor(-1.0, dtype=torch.float32))

        # Get the number of samples and number of features per sample
        num_samples, num_features = samples.shape

        # Initialize n weights to 0, where n is the number of features
        # requires_grad set to True to allow automatic tuning by PyTorch optimizer
        self.weights = torch.zeros((num_features, 1), dtype=torch.float32, requires_grad=True)

        # Set the initial bias to 0 (tensor of size 1)
        # requires_grad set to True to allow automatic tuning by PyTorch optimizer
        self.bias = torch.zeros(1, dtype=torch.float32, requires_grad=True)

        # Specify Stocastic Gradient Descent as the optimizer
        optimizer = torch.optim.SGD([self.weights, self.bias], lr=self.learning_rate)

        # Training function
        for epoch_index in range(self.epochs):
            # Reset gradients
            optimizer.zero_grad()

            # Predict the score of the sample (same as predict function w/o function call overhead)
            prediction = torch.matmul(samples, self.weights) - self.bias

            # Compute the margin by applying the labels to the corresponding predictions
            margin = prediction * labels

            # Compute the average hinge loss
            hinge_loss = torch.mean(torch.clamp(1 - margin, min=0))

            # Add regularization to the cost function
            reg_cost = regularization_term * torch.norm(self.weights, p=2) / 2

            # Calculate total loss
            loss = hinge_loss + reg_cost

            # Use PyTorch to automatically compute gradients
            loss.backward()

            # Update parameters
            optimizer.step()
        
    def predict(self, samples):
        # Compute the score/prediction by finding the dot product between the data points and the weights, including the bias
        # Apply a sigmoid to the prediction to convert it to a probability for the multi-class ensemble classification
        return torch.sigmoid(torch.matmul(torch.tensor(samples, dtype=torch.float32), self.weights) + self.bias)

In [None]:
# Create an ensemble classifier which initializes multiple one vs all SVM classifiers, including one for benign samples
# Each SVM classifier makes a prediction on the sample data and outputs a score
# The classifier with the highest score classifies the sample

class Ensemble_SVM:
    def __init__(self, epochs=2000, learning_rate=0.005):
        # Store the number of epochs and learning rate
        self.epochs = epochs
        self.learning_rate = learning_rate
        # Set the number of models in the ensemble
        self.num_models = None
        # Initialize a list to store the models and a list for the corresponding family names
        self.models = []
        self.model_families = []
    
    def fit(self, families, sample_sets, label_sets, regularization_term=0):
        # Get the number of models
        self.num_models = len(families)
        for index in range(len(families)):
            self.model_families.append(families[index])
            # Create an SVM model for each family and train it
            model = SVM(epochs=self.epochs, learning_rate=self.learning_rate)
            model.fit(sample_sets[index], label_sets[index], regularization_term)
            self.models.append(model)
    
    def predict(self, samples):
        # Predict the class of the sample using voting (highest score)
        predictions = []
        for model in self.models:
            predictions.append(model.predict(samples))
        # Get the index of the model with the highest prediction
        max_index = predictions.index(max(predictions))
        return self.model_families[max_index]
    
    # Predict the class of the sample with weighted predictions
    def weighted_predict(self, samples, bias):
        # Predict the class of the sample using voting (highest score)
        predictions = []
        for model_index in range(len(self.models)):
            # Apply the accuracy of the model as a bias to the prediction
            prediction = self.models[model_index].predict(samples)
            # predictions.append(self.models[model_index].predict(samples) * bias[model_index])
            predictions.append(prediction + (prediction * bias[model_index]))
        # Get the index of the model with the highest prediction
        max_index = predictions.index(max(predictions))
        return self.model_families[max_index]

In [34]:
# Train an ensemble SVM model using the top 20 families with the most occurrences

# Get the top 20 families with the most occurrences and convert the names to a list
top_families = family_counts[:20].index.tolist()

# Create a list of sample sets and label sets for the top 20 families
top_sample_sets = []

# Create a list of label sets for the top 20 families
top_label_sets = []

# Append the sample and label sets for each family to the corresponding list
for family in top_families:
    top_sample_sets.append(training_sets_samples[family])
    top_label_sets.append(training_sets_labels[family])

# Train the ensemble SVM model
ensemble_svm = Ensemble_SVM(epochs=2000, learning_rate=0.005)
ensemble_svm.fit(top_families, top_sample_sets, top_label_sets, regularization_term=0)

# Print the top 20 families used in the ensemble model
print("\nTop 20 families used in the ensemble model:")
print(top_families)


Top 20 families used in the ensemble model:
['benign', 'FakeInstaller', 'DroidKungFu', 'Plankton', 'Opfake', 'GinMaster', 'BaseBridge', 'Iconosys', 'Kmin', 'FakeDoc', 'Geinimi', 'Adrd', 'DroidDream', 'ExploitLinuxLotoor', 'Glodream', 'MobileTx', 'FakeRun', 'SendPay', 'Gappusin', 'Imlog']


In [87]:
# Store the accuracy of each model in the ensemble and the average prediction score
model_accuracies = []
average_prediction_score = []
for index in range(len(top_families)):
    # Get the predictions for each model
    predictions = ensemble_svm.models[index].predict(top_sample_sets[index])
    predictions = torch.where(predictions > 0.5, torch.tensor(1.0), torch.tensor(0.0))
    # Calculate the accuracy of the model
    num_correct = 0
    for i in range(len(predictions)):
        if predictions[i] == top_label_sets[index][i]:
            num_correct += 1
    model_accuracies.append(num_correct / len(predictions))
    # Calculate the average prediction score
    average_prediction_score.append(torch.mean(predictions).item())

In [125]:
print(model_accuracies)
print(average_prediction_score)

[0.77985, 0.9099, 0.84629, 0.83726, 0.90233, 0.68011, 0.85549, 0.85222, 0.89953, 0.87912, 0.91804, 0.76802, 0.72493, 0.84486, 0.82257, 0.91053, 0.88627, 0.73633, 0.66047, 0.79471]
[0.3179900050163269, 0.5900999903678894, 0.6417099833488464, 0.6611400246620178, 0.5976700186729431, 0.819890022277832, 0.5717499852180481, 0.6477800011634827, 0.6004700064659119, 0.6208800077438354, 0.46239998936653137, 0.73198002576828, 0.7750700116157532, 0.5122600197792053, 0.6484500169754028, 0.5894700288772583, 0.6137300133705139, 0.763670027256012, 0.8395299911499023, 0.7052900195121765]


In [126]:
# Adjust the bias using the average prediction score
# Bias is multiplied to the model prediction
# If an individual model has high accuracy but low average prediction score, increase the prediction score
# If an individual model has low accuracy but high average prediction score, decrease the prediction score
# The closer the ratio is to 1:1, the less bias is applied to the prediction
bias = [model_accuracies[i] / average_prediction_score[i] / 5 for i in range(len(top_families))]
print(bias)

[0.4904871144990607, 0.3083884137780568, 0.2637608957191304, 0.2532776624522216, 0.3019492267668099, 0.1659027385430322, 0.299253177828677, 0.26312019465538333, 0.2996086366725348, 0.28318515302000513, 0.39707613369873834, 0.20984725619907257, 0.18706181096821728, 0.3298559197979777, 0.25370344003898826, 0.3089317371179168, 0.28881429315562945, 0.19283983231494656, 0.15734280060569503, 0.2253569391353849]


In [105]:
# Get the accuracy of the ensemble model on the training set
correct_predictions = 0
for i in range(len(dataset_vectors)):
    family = ensemble_svm.weighted_predict(torch.tensor(dataset_vectors[i], dtype=torch.float32), bias)
    if family == dataset_families[i]:
        correct_predictions += 1
accuracy = correct_predictions / len(dataset_vectors)
print(f"\nAccuracy of the ensemble model on the training set: {accuracy}")

  return torch.sigmoid(torch.matmul(torch.tensor(samples, dtype=torch.float32), self.weights) + self.bias)



Accuracy of the ensemble model on the training set: 0.01438614713245952


In [41]:
# Print the length of the dataset vectors
print("\nLength of the dataset vectors:")
print(len(dataset_vectors))


Length of the dataset vectors:
129013


In [66]:
# Classify the first 5 samples in the dataset using the ensemble model
print("\nClassifying the first 5 samples in the dataset using the ensemble model:")
for i in range(5):
    family = ensemble_svm.weighted_predict(torch.tensor(dataset_vectors[i], dtype=torch.float32), bias)
    print(f"Sample {i}: {dataset_hashes[i]}")
    print(f"\tFamily: {family}, Actual Family: {dataset_families[i]}")


Classifying the first 5 samples in the dataset using the ensemble model:
Sample 0: 00002d74a9faa53f5199c910b652ef09d3a7f6bd42b693755a233635c3ffb0f4
	Family: benign, Actual Family: GinMaster
Sample 1: 000068216bdb459df847bfdd67dd11069c3c50166db1ea8772cdc9250d948bcf
	Family: benign, Actual Family: benign
Sample 2: 0000764713b286cfe7e8e76c7038c92312977712d9c5a86d504be54f3c1d025a
	Family: benign, Actual Family: Opfake
Sample 3: 0000962c2c34de1ca0c329b18be7847459da2d9d14b6b23a21cbc6427522403c
	Family: benign, Actual Family: benign
Sample 4: 000167f1ff061ea91440c40659c11c2af160342fd2e493d609e4996b8820e78f
	Family: benign, Actual Family: benign


  return torch.sigmoid(torch.matmul(torch.tensor(samples, dtype=torch.float32), self.weights) + self.bias)


In [67]:
# Classify the first 5 samples in the dataset using the ensemble model
start = 0
end = 10000
correct_predictions = 0
for i in range(start, end):
    family = ensemble_svm.weighted_predict(torch.tensor(dataset_vectors[i], dtype=torch.float32), bias)
    if family != 'benign' and family == dataset_families[i]:
        print(f"Sample {i}: {dataset_hashes[i]}")
        print(f"\tFamily: {family}, Actual Family: {dataset_families[i]}")
        correct_predictions += 1
accuracy = correct_predictions / (end - start)
print(f"\nAccuracy of the ensemble model on the first {end} samples: {accuracy}")

  return torch.sigmoid(torch.matmul(torch.tensor(samples, dtype=torch.float32), self.weights) + self.bias)



Accuracy of the ensemble model on the first 10000 samples: 0.0


In [58]:
index = 1
family = ensemble_svm.model_families[index]
training_set_samples = top_sample_sets[index]
training_set_labels = top_label_sets[index]
svm = ensemble_svm.models[index]

# Print the first 5 samples of the training set for the family where the label is 1
print(f"\nFirst 5 samples of the training set for family '{family}' where the label is 1:")
count = 0
for i in range(len(training_sets_labels[family])):
    if training_sets_labels[family][i] == 1:
        print(f"Sample {i}: {dataset_hashes[i]}")
        print(f"\t{training_sets_samples[family][i]}, {training_sets_labels[family][i]}")
        print(f"\tPrediction: {svm.predict(torch.tensor(training_sets_samples[family][i], dtype=torch.float32))}")
        count += 1
        if count == 5:
            break

# Print the first 5 samples of the training set for the family where the label is 0
print(f"\nFirst 5 samples of the training set for family '{family}' where the label is 0:")
count = 0
for i in range(len(training_sets_labels[family])):
    if training_sets_labels[family][i] == 0:
        print(f"Sample {i}: {dataset_hashes[i]}")
        print(f"\t{training_sets_samples[family][i]}, {training_sets_labels[family][i]}")
        print(f"\tPrediction: {svm.predict(torch.tensor(training_sets_samples[family][i], dtype=torch.float32))}")
        count += 1
        if count == 5:
            break

# Get the accuracy of the model on the training set
correct_predictions = 0
prediction_score_accumulator = 0
for i in range(len(training_sets_labels[family])):
    prediction = svm.predict(torch.tensor(training_sets_samples[family][i], dtype=torch.float32))
    prediction_score_accumulator += prediction
    if (prediction >= 0.5 and training_sets_labels[family][i] == 1) or (prediction < 0.5 and training_sets_labels[family][i] == 0):
        correct_predictions += 1
accuracy = correct_predictions / len(training_sets_labels[family])
print(f"\nAccuracy of the model on the training set for family '{family}': {accuracy}")
print(f"Average prediction score: {prediction_score_accumulator / len(training_sets_labels[family])}")


First 5 samples of the training set for family 'FakeInstaller' where the label is 1:
Sample 0: 00002d74a9faa53f5199c910b652ef09d3a7f6bd42b693755a233635c3ffb0f4
	[1, 0, 1, 0, 0, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], 1
	Prediction: tensor([0.8909], grad_fn=<SigmoidBackward0>)
Sample 1: 000068216bdb459df847bfdd67dd11069c3c50166db1ea8772cdc9250d948bcf
	[1, 1, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], 1
	Prediction: tensor([0.8845], grad_fn=<SigmoidBackward0>)
Sample 2: 0000764713b286cfe7e8e76c7038c92312977712d9c5a86d504be54f3c1d025a
	[1, 1, 1, 0, 0, 1, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], 1
	Prediction: tensor([0.9182], grad_fn=<SigmoidBackward0>)
Sample 3: 0000962c2c34de1ca0c329b18be7847459da2d9d

  return torch.sigmoid(torch.matmul(torch.tensor(samples, dtype=torch.float32), self.weights) + self.bias)



Accuracy of the model on the training set for family 'FakeInstaller': 0.9099
Average prediction score: tensor([0.6457], grad_fn=<DivBackward0>)
