<a href="https://colab.research.google.com/github/truong-peter/HMM_Classification/blob/main/TruongPeter_Midterm1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

##PROCESSING DATA

**IMPORTS**

In [None]:
!pip install hmmlearn
from hmmlearn import hmm
from sklearn.preprocessing import LabelEncoder
import pandas as pd
import numpy as np
import os

import sys
np.set_printoptions(threshold=sys.maxsize) #print array with no truncations 

In [None]:
from google.colab import drive
drive.mount('/content/drive')

**PROCESSING DATA**



In [None]:
def read_opcodes_from_dir(directory):
    opcodes = []
    for filename in os.listdir(directory):
        filepath = os.path.join(directory, filename)
        with open(filepath) as file:
            opcode_list = [line.strip() for line in file]
        opcodes.append(opcode_list)
    return opcodes

In [None]:
zbot_opcodes = read_opcodes_from_dir('/content/drive/MyDrive/zbot')
win_opcodes = read_opcodes_from_dir('/content/drive/MyDrive/winwebsec')

##PREPROCESSING DATA

**ENCODING ZBOT & WIN**

In [None]:
encoder = LabelEncoder()
encoder.fit([opcode for sublist in zbot_opcodes + win_opcodes for opcode in sublist])

In [None]:
def encode_opcodes(opcodes, encoder):
    encoded_opcodes = []
    for opcode_list in opcodes:
        encoded_opcode_list = encoder.transform(opcode_list)
        encoded_opcodes.append(encoded_opcode_list)
    return encoded_opcodes

In [None]:
encoded_zbot_opcodes = encode_opcodes(zbot_opcodes, encoder)


In [None]:
encoded_win_opcodes = encode_opcodes(win_opcodes, encoder)


##TRAINING/TESTING DATA SPLIT

**SPLIT ZBOT BETWEEN TESTING / TRAINING , EXTRACT WINWEB TESTING**

In [None]:
# Stores files with greater than or equal to 500 opcodes into training and testing files. 
# Max of 200 test text files.
def data_split(encoded_zbot_opcodes, encoded_win_opcodes):
  num_training_files = int(len(encoded_zbot_opcodes)*.8)
  num_testing_files = 200
  zbot_test_files = []
  zbot_train_files = []

  for file in encoded_zbot_opcodes:
    if len(file) >= 500: 
      if len(zbot_test_files) < num_testing_files:
          zbot_test_files.append(file)

      elif len(zbot_train_files) < num_training_files:
          zbot_train_files.append(file)

  win_test_files = []

  for file in encoded_win_opcodes:
    if len(file) >= 500:
      if len(win_test_files) < num_testing_files: 
          win_test_files.append(file) 

          

  return zbot_train_files, zbot_test_files, win_test_files


**EXTRACT 500 FROM EACH TEST FILES**

In [None]:

def extract_opcodes(test_files):
  num_opcodes = 500
  opcodes = []
  for file in test_files:
      opcode_list = file[:num_opcodes] 
      opcodes.append(opcode_list)
  return opcodes

**STORING INDIVIDUAL OPCODES AS ELEMENT: TRAINING DATA(ZBOT, 30,000 OPCODES)**

In [None]:
#Store 30,000 opcodes from zbot_train_files as individual element in array
def zbot_opcodes_list(zbot_train_files):
    MAX_OPCODES = 30000 
    opcodes_count = 0

    zbot_opcodes_list = []
    for i in range(len(zbot_train_files)): 
        for opcode in zbot_train_files[i]:
            if(opcodes_count>= MAX_OPCODES):
              break
            zbot_opcodes_list.append([opcode])
            opcodes_count +=1

    opcodes_array = np.array(zbot_opcodes_list).reshape(-1, 1)

    return opcodes_array

##NOISE REDUCTION (can skip if not applying)

In [None]:
def reduce_opcode_count(encoded_zbot_opcodes, encoded_win_opcodes, num_opcodes):
  
    combined_encoded_opcodes = np.concatenate((encoded_zbot_opcodes, encoded_win_opcodes))
    flat_opcodes = np.concatenate(combined_encoded_opcodes)
  
    # Count frequency of each unique integer (opcode)
    opcode_counts = np.bincount(flat_opcodes)

    opcode_freqs = {}
    for opcode, count in enumerate(opcode_counts):
        opcode_freqs[opcode] = count

    # Sort by frequency opcodes in descending order
    sorted_opcode_freqs = dict(sorted(opcode_freqs.items(), key=lambda item: item[1], reverse=True))

    # Get most freq opcodes in range of max unique opcodes
    most_frequent_opcodes = list(sorted_opcode_freqs.keys())[:num_opcodes]
    unique_opcodes = set(most_frequent_opcodes)

    updated_zbot_opcodes = []
    for opcode_list in encoded_zbot_opcodes:
        updated_zbot_opcodes.append([opcode for opcode in opcode_list if opcode in unique_opcodes])
        
    updated_win_opcodes = []
    for opcode_list in encoded_win_opcodes:
        updated_win_opcodes.append([opcode for opcode in opcode_list if opcode in unique_opcodes])

    return updated_zbot_opcodes, updated_win_opcodes


##METHOD CALL (no noise reduction) *run only one*

In [None]:
#SPLIT DATA 
zbot_train_files, zbot_test_files, win_test_files = data_split(encoded_zbot_opcodes,encoded_win_opcodes)

In [None]:
#STORING INDIVIDUAL OPCODES AS ELEMENT: TESTING DATA(ZBOT, 30,000 OPCODES)
zbot_opcodes_list = zbot_opcodes_list(zbot_train_files)

##METHOD CALL (with noise reduction) *run only one*

In [None]:
#noise reduction, changes the total amount of unique opcodes based on frequency
M = 30
encoded_zbot_opcodes_reduced, encoded_win_opcodes_reduced = reduce_opcode_count(encoded_zbot_opcodes, encoded_win_opcodes, M)


In [None]:
# SPLIT ENCODED_ZBOT_OPCODES FOR TRAINING / TESTING
zbot_train_files, zbot_test_files, win_test_files= data_split(encoded_zbot_opcodes_reduced,encoded_win_opcodes_reduced)

In [None]:
# storing each opcode as a list for training
zbot_opcodes_list = zbot_opcodes_list(zbot_train_files)

## HMM MODEL TRAINING/TESTING/SCORING

In [None]:
# EXTRACT 500 OPCODES FROM EACH TEST FILES
zbot_test = extract_opcodes(zbot_test_files)

In [None]:
#EXTRACT 500 OPCODES FROM EACH TEST FILES
win_test = extract_opcodes(win_test_files)

In [None]:
model = hmm.CategoricalHMM(n_components=2, n_iter=100)


In [None]:
model.fit(zbot_opcodes_list)

In [None]:
# Iterate over each file in the test set and score it using the trained HMM model
def calculate_scores(test_data, model):
    scores = []
    for sequence in test_data:
        sequence = np.array(sequence).reshape(-1, 1)
        score = model.score(sequence)
        scores.append(score)
    return scores

In [None]:
zbot_scores = calculate_scores(zbot_test, model)

In [None]:
win_scores = calculate_scores(win_test, model)

**CONVERTING TO CVS FILE**

In [None]:
zbot_cvs = {'zbot_score_results' : zbot_scores}
win_cvs = {'win_score_results' : win_scores}

zbotDF = pd.DataFrame(zbot_cvs)
winDF = pd.DataFrame(win_cvs)

result = pd.concat([zbotDF, winDF], axis=1)
