<a href="https://colab.research.google.com/github/summermccune/Tokenization-Testing-for-Malware-Data/blob/main/Project_ALL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Imports

In [2]:
!pip install datasets
!pip install hmmlearn



In [3]:
from google.colab import drive
drive.mount('/content/drive')
import pandas as pd
import numpy as np
import re
from sklearn.model_selection import train_test_split
from collections import Counter
from datasets import load_dataset
from tokenizers.pre_tokenizers import Whitespace
from tokenizers import Tokenizer, ByteLevelBPETokenizer, SentencePieceBPETokenizer
from tokenizers.models import BPE, WordPiece, Unigram
from tokenizers.trainers import BpeTrainer, WordPieceTrainer, UnigramTrainer
from transformers import PreTrainedTokenizerFast
from gensim.models import Word2Vec
from hmmlearn import hmm
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay,accuracy_score, f1_score, precision_score, recall_score

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# Data

In [None]:
#read in the data
FakeRean = pd.read_csv('/content/drive/MyDrive/Tokenization-Final/Families/FakeRean.csv')
OnLineGames = pd.read_csv('/content/drive/MyDrive/Tokenization-Final/Families/OnLineGames.csv')
Vobfus = pd.read_csv('/content/drive/MyDrive/Tokenization-Final/Families/Vobfus.csv')
Winwebsec = pd.read_csv('/content/drive/MyDrive/Tokenization-Final/Families/Winwebsec.csv')
BHO = pd.read_csv('/content/drive/MyDrive/Tokenization-Final/Families/BHO.csv')
CeeInject = pd.read_csv('/content/drive/MyDrive/Tokenization-Final/Families/CeeInject.csv')
Renos = pd.read_csv('/content/drive/MyDrive/Tokenization-Final/Families/Renos.csv')

#make csvs to df with train and test columns
dataset = pd.concat([FakeRean, OnLineGames, Vobfus, Winwebsec, BHO, CeeInject, Renos], ignore_index=True)

#drop first column
dataset = dataset.iloc[:, 1:]

print(dataset.head())
print(dataset.tail())

# Functions

In [None]:
def count(df, c):
  for row in df['Opcodes']:
     data = row.split()
     c.update(data)

In [None]:
def removeNonVocab(vocab, series):
  rows = []
  vocab_str = '|'.join(vocab)
  pattern = '\\b((?!\\b( |' + vocab_str + ')\\b).)*\\b'
  for row in series:
    row = re.sub(pattern, '', row)
    row = re.sub(' +', ' ', row)
    rows.append(row.split())
  return rows

In [None]:
def opcodes_to_numbers_dict(df):
  #creating a list of number representation for each opcode that is in the dataset
  opcode_to_number = {}
  count = 0
  for opcode in df['Opcodes']:
    opcode_to_number[opcode] = count
    count += 1
  print(opcode_to_number)

  return opcode_to_number

In [None]:
def opcodes_to_numbers(columns):
  opcode_to_number = opcodes_to_numbers_dict(dataset)
  opcode_sequences = []
  for sample in columns:
    temp = []
    for opcode in sample:
      temp.append(opcode_to_number[opcode])
    opcode_sequences.append(temp)
  return opcode_sequences

In [None]:
def train_hmm_models(opcodes,n_states,n_restarts):
  hmm_models = []
  for opcode_seq in opcodes:

      model = hmm.CategoricalHMM(n_components=n_states, n_iter=100)
      opcode_seq = np.array(opcode_seq)
      model.fit(opcode_seq.reshape(-1, 1))
      hmm_models.append(model)
      best_model = None
      best_score = -np.inf

      for i in range(n_restarts):
        model = hmm.MultinomialHMM(n_components=n_states, n_iter=100)
        opcode_seq = np.array(opcode_seq)
        model.fit(opcode_seq.reshape(-1, 1))

        #check if the model has a higher score than the current best model
        score = model.score(opcode_seq.reshape(-1, 1))
        if score > best_score:
          best_model = model
          best_score = score

      hmm_models.append(best_model)

  return hmm_models

In [None]:
def b_matrix_to_features(hmm_models, max_feature_length):
  hmm2vec_features = []
  for model in hmm_models:
    #determine the hidden state that has the highest probability with respect to the mov opcode
    mov_index = np.argmax(model.emissionprob_[:, opcode_to_number['mov']])

    #deem this to be the first half of the HMM2Vec feature vector, with the other row of the B matrix being the second half of the vector
    sorted_indices = [mov_index, 1 - mov_index]
    sorted_bmatrices = model.emissionprob_[sorted_indices]

    # Flatten the rearranged B matrix to create HMM2Vec feature vector
    feature_vector = sorted_bmatrices.flatten()

    # pad or truncate feature_vector to ensure consistent length
    if len(feature_vector) < max_feature_length:
      feature_vector = np.pad(feature_vector, (0, max_feature_length - len(feature_vector)), mode='constant')
    elif len(feature_vector) > max_feature_length:
      feature_vector = feature_vector[:max_feature_length]

    hmm2vec_features.append(feature_vector)

  return hmm2vec_features

In [None]:
def batch_iterator(data):
  for i in range(0, len(dataset), batch_size):
      yield dataset['Opcodes'][i : i + batch_size]

In [None]:
def prepare_tokenizer_trainer(alg):
    """
    Prepares the tokenizer and trainer with unknown & special tokens
    """
    if alg == 'BPE':
        tokenizer = Tokenizer(BPE(unk_token = unk_token))
        trainer = BpeTrainer(special_tokens = spl_tokens, vocab_size=v_size)
    elif alg == 'UNI':
        tokenizer = Tokenizer(Unigram())
        trainer = UnigramTrainer(unk_token= unk_token, special_tokens = spl_tokens, vocab_size=v_size)
    elif alg == 'WPC':
        tokenizer = Tokenizer(WordPiece(unk_token = unk_token))
        trainer = WordPieceTrainer(special_tokens = spl_tokens, vocab_size=v_size)

    tokenizer.pre_tokenizer = Whitespace()
    return tokenizer, trainer

In [None]:
def train_tokenizer(alg):
    """
    Trains the tokenizer
    """
    if (alg == 'BPE' or alg == 'UNI' or alg == 'WPC'):
      tokenizer, trainer = prepare_tokenizer_trainer(alg)
      tokenizer.train_from_iterator(batch_iterator('train'), trainer=trainer)
      return tokenizer
    elif alg == 'BBPE':
      tokenizer = ByteLevelBPETokenizer()
      tokenizer.train_from_iterator(batch_iterator('train'),vocab_size=v_size)
    elif alg == 'SPC':
      tokenizer = SentencePieceBPETokenizer()
      tokenizer.train_from_iterator(batch_iterator('train'),vocab_size=v_size)
    tokenizer.save("Tokenizers/"+alg+"-trained.json")
    return tokenizer

In [None]:
def encode(tokenizer):
  encode = tokenizer.encode_batch(dataset['Opcodes'])
  tokens = [encoding.tokens for encoding in encode]
  return tokens

# Tokenization

In [None]:
#set up values
batch_size = 1000
unk_token = "<UNK>"
spl_tokens = ["<UNK>", "<SEP>", "<MASK>", "<CLS>"]
v_size = 100

#### BPE Tokenizer

---





In [None]:
BPE_tokenizer = train_tokenizer('BPE')
BPE_tokenizer.save("drive/MyDrive/Tokenization-Final/Tokenizers/BPE-trained.json")

In [None]:
BPE_tokens = encode(BPE_tokenizer)
print(BPE_tokens[0])

# Embedding

# Classification