In [1]:
# Mount Google Drive to access files
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [36]:
# importing all libraries
import torch
from transformers import GPT2LMHeadModel, GPT2Tokenizer, TextDataset, DataCollatorForLanguageModeling
import spacy
import requests
from bs4 import BeautifulSoup
# Load English language model
nlp = spacy.load("en_core_web_sm")
import nltk
nltk.download('punkt')
nltk.download('stopwords')
from nltk.tokenize import sent_tokenize
import csv

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [3]:
# URL of the MITRE ATT&CK matrix
mitre_url = "https://attack.mitre.org/matrices/enterprise/"

In [4]:
def extract_text_from_html(url):
    response = requests.get(url)
    html_content = response.text
    soup = BeautifulSoup(html_content, 'html.parser')
    text = soup.get_text()
    return text

In [6]:
mitre_data = extract_text_from_html(mitre_url)

In [5]:
# Load model and tokenizer
tokenizer = GPT2Tokenizer.from_pretrained("gpt2")
model = GPT2LMHeadModel.from_pretrained("gpt2")

tokenizer_config.json:   0%|          | 0.00/26.0 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/1.04M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/665 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/548M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/124 [00:00<?, ?B/s]

In [7]:
# Splitting text data into sentences
sentences = nltk.sent_tokenize(mitre_data)

In [9]:
len(sentences)

6

In [10]:
import re

sentences_refined = []

for sentence in sentences:
    # Replace multiple occurrences of '\n' with a single space
    sentence = re.sub(r'\n+', ' ', sentence)

    # Ensure sentence is greater than 10 characters and does not contain more than 5 consecutive spaces
    if len(sentence) > 10:
        if len(sentence) <= 200:  # Check if sentence is within desired length range
            sentences_refined.append(sentence)
        else:
            # Split long sentences into smaller chunks of max 200 characters
            chunks = [sentence[i:i+200] for i in range(0, len(sentence), 200)]
            for chunk in chunks:
                # Ensure chunk length is within desired range and does not contain more than 5 consecutive spaces
                if len(chunk) > 10:
                    sentences_refined.append(chunk)

In [11]:
len(sentences_refined)

207

In [12]:
from nltk.sentiment.vader import SentimentIntensityAnalyzer

# Function to perform sentiment analysis
def analyze_sentiment(text):
    sia = SentimentIntensityAnalyzer()
    sentiment_scores = sia.polarity_scores(text)
    return sentiment_scores

In [13]:
import nltk
nltk.download('vader_lexicon')

[nltk_data] Downloading package vader_lexicon to /root/nltk_data...


True

In [14]:
def categorize_sentences(sentences):
    threat_mitre = []
    action_mitre = []
    information_mitre = []

    for sentence in sentences:
        sentiment_scores = analyze_sentiment(sentence)
        compound_score = sentiment_scores['compound']

        # Categorize the sentence based on the compound score
        if compound_score >= 0.05:  # Positive sentiment
            action_mitre.append(sentence)
        elif compound_score <= -0.05:  # Negative sentiment
            threat_mitre.append(sentence)
        else:  # Neutral sentiment
            information_mitre.append(sentence)

    return threat_mitre, action_mitre, information_mitre

In [15]:
# Example usage:
threat_mitre, action_mitre, information_mitre = categorize_sentences(sentences_refined)

In [16]:
type(threat_mitre)

list

In [17]:
threat_mitre[2]

'Botnet Web Services Serverless Network Devices Develop Capabilities\xa0(4)  =  Malware Code Signing Certificates Digital Certificates Exploits Establish Accounts\xa0(3)  =  Social Media Accounts Email Accou'

In [18]:
action_mitre[5]

'termine Physical Locations Business Relationships Identify Business Tempo Identify Roles Phishing for Information\xa0(4)  =  Spearphishing Service Spearphishing Attachment Spearphishing Link Spearphishin'

In [19]:
information_mitre[4]

'View on the ATT&CK® Navigator  Version Permalink Live Version layout: side                             side                                                      flat                          show sub-'

In [20]:
from gensim.downloader import load
glove_model = load('glove-twitter-100')



In [21]:
import numpy as np
import re
from gensim.models import KeyedVectors
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from sklearn.metrics.pairwise import cosine_similarity

In [22]:
# Define function to clean text
def clean_text(text):
    # Remove special characters and punctuation
    clean_text = re.sub(r'[^\w\s]', '', text.lower())
    # Remove stopwords
    stop_words = set(stopwords.words('english'))
    tokens = word_tokenize(clean_text)
    filtered_tokens = [word for word in tokens if word not in stop_words]
    return ' '.join(filtered_tokens)

In [23]:
# Clean MITRE sentences
cleaned_threat_mitre = [clean_text(sentence) for sentence in threat_mitre]
cleaned_action_mitre = [clean_text(sentence) for sentence in action_mitre]
cleaned_information_mitre = [clean_text(sentence) for sentence in information_mitre]

In [25]:
# Concatenate all sentences in cleaned_threat_mitre into a single document
document_threat = ' '.join(cleaned_threat_mitre)

# Concatenate all sentences in cleaned_action_mitre into a single document
document_action = ' '.join(cleaned_action_mitre)

# Concatenate all sentences in cleaned_information_mitre into a single document
document_information = ' '.join(cleaned_information_mitre)

In [26]:
# Clean the concatenated documents
cleaned_document_threat = clean_text(document_threat)
cleaned_document_action = clean_text(document_action)
cleaned_document_information = clean_text(document_information)

In [28]:
len(cleaned_document_action)

10958

In [29]:
# Use word embeddings (GloVe)
def embed_sentence(sentence):
    tokens = [word for word in sentence.split() if word in glove_model.index_to_key]
    if not tokens:
        return np.zeros(glove_model.vector_size)
    sum_vector = np.sum([glove_model.get_vector(word) for word in tokens], axis=0)
    return sum_vector / len(tokens)

In [30]:
# Embed the cleaned documents
embedding_document_threat = embed_sentence(cleaned_document_threat)
embedding_document_action = embed_sentence(cleaned_document_action)
embedding_document_information = embed_sentence(cleaned_document_information)

In [31]:
regenerated_sentences_file = '/content/drive/My Drive/newtask/regenerated_sentences.txt'

with open(regenerated_sentences_file, 'r') as file:
    regenerated_sentences = file.readlines()

In [37]:
# Specify the CSV file path
csv_file = "classified_sentences.csv"

# Classify sentences
classified_sentences = []
for sentence in regenerated_sentences:  # Process only the first 100 sentences
    sentence_embedding = embed_sentence(sentence.strip())

    max_similarity_threat = cosine_similarity([sentence_embedding], [embedding_document_threat])[0][0]
    max_similarity_action = cosine_similarity([sentence_embedding], [embedding_document_action])[0][0]
    max_similarity_information = cosine_similarity([sentence_embedding], [embedding_document_information])[0][0]

    if max_similarity_threat > max_similarity_action and max_similarity_threat > max_similarity_information:
        classified_sentences.append((sentence.strip(), 'threat'))
    elif max_similarity_action > max_similarity_threat and max_similarity_action > max_similarity_information:
        classified_sentences.append((sentence.strip(), 'action'))
    elif max_similarity_information > max_similarity_threat and max_similarity_information > max_similarity_action:
        classified_sentences.append((sentence.strip(), 'information'))

In [39]:
# Specify the CSV file path
csv_file = "/content/drive/My Drive/newtask/classified_sentences.csv"

# Write classified sentences to a CSV file under the "newtask" folder
with open(csv_file, 'w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(["Sentence", "Category"])  # Write header
    writer.writerows(classified_sentences)  # Write classified sentences

print("Results saved to", csv_file)

Results saved to /content/drive/My Drive/newtask/classified_sentences.csv
