In [8]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from stix2.v21 import (ThreatActor, Identity, AttackPattern, Campaign, IntrusionSet, Relationship, ExternalReference, Bundle, Grouping)
from efficient_apriori import apriori
import re
from pyattck import Attck
import requests
from stix2 import MemoryStore, Filter

In [9]:
# Settings Values - for apriori algorithm
confidenceLevel = 0.70
supportLevel = 0.05
abstract = False
# initial actor value to build the activity attack-graph from 
intelSeed = "G0079"

In [10]:
def GenerateAprioriLists():
    # some data comes from this dataset with TTPs
    df = pd.read_csv("Categorized_Adversary_TTPs.csv") # sample dataset of attacks


    # more data is gained by using attck data from tool and malware TTPs
    attack = Attck()

    malwares = attack.enterprise.malwares + attack.enterprise.tools
    ttpLists = []
    for malware in malwares:
        ttpLists.append(["'" + ttp.id + "'" for ttp in malware.techniques])

    # to use the apriori we need to generate a list of lists
    aprList = ttpLists
    for row in df.values:
        aprList.append((row[13].strip('][').split(', ')))
    return aprList

In [11]:
def AbstractTTPs(ttpList):
    # take sub-techniques and remove the .### to abstract them to parent techniques 
    for i in range(0,len(ttpList)):
        ttpList[i] = [re.sub(r'\.[0-9]+', '', ttp) for ttp in ttpList[i]]
    return ttpList 

In [12]:
# takes a list of lists and returns a list of rules sorted by size 
def AprioriMining(aprList):
    # perform apriori rule association mining
    itemsets, rules = apriori(aprList, min_support=supportLevel, min_confidence=confidenceLevel)
    
    # sort by size to get the 1:1 mappings first and so on. 
    ruleNums = np.array([len(rule.lhs+rule.rhs) for rule in rules])
    rules = np.array(rules)
    inds = ruleNums.argsort()[::]
    rules = rules[inds]
    
    # maximum rule size of 4 to limit number of rules, any rules with size > 4 are redundant anyways
    rules = [x for x in filter(lambda rule: len(rule.lhs+rule.rhs) <= 4, rules)]
    return rules

In [13]:
# downloads latest MITRE framework from the branch
def get_data_from_branch(domain):
    """get the ATT&CK STIX data from MITRE/CTI. Domain should be 'enterprise-attack', 'mobile-attack' or 'ics-attack'. Branch should typically be master."""
    stix_json = requests.get(f"https://raw.githubusercontent.com/mitre-attack/attack-stix-data/master/{domain}/{domain}.json").json()
    return MemoryStore(stix_data=stix_json["objects"])

src = get_data_from_branch("enterprise-attack")

In [14]:
# get TTPs from seed, where seed is an APT group code: GXXXX
def ExtractSeedTTPs(seed):
    seeds = []
    actor = actors[seed]
    for ttp in actor.techniques:
        seeds.append(ttp.id)
        
    # if we are using abstracted TTPs then remove sub technique
    if (abstract):
        for i in range(0, len(seeds)):
            seeds[i] = re.sub(r'\.[0-9]+', '', seeds[i])
    return seeds

In [15]:
def ExportBundle(bundle, filename):
    with open(filename, "w") as f:
        f.write(bundle.serialize())
        f.close()

In [35]:
# taken from mitre stix - https://github.com/mitre-attack/attack-stix-data/blob/master/USAGE.md#access-the-most-recent-version-from-github-via-requests
def get_related(thesrc, src_type, rel_type, target_type, reverse=False):
    """build relationship mappings
       params:
         thesrc: MemoryStore to build relationship lookups for
         src_type: source type for the relationships, e.g "attack-pattern"
         rel_type: relationship type for the relationships, e.g "uses"
         target_type: target type for the relationship, e.g "intrusion-set"
         reverse: build reverse mapping of target to source
    """

    relationships = thesrc.query([
        Filter('type', '=', 'relationship'),
        Filter('relationship_type', '=', rel_type),
        Filter('revoked', '=', False),
    ])

    # See section below on "Removing revoked and deprecated objects"
    relationships = remove_revoked_deprecated(relationships)

    # stix_id => [ { relationship, related_object_id } for each related object ]
    id_to_related = {}

    # build the dict
    for relationship in relationships:
        if src_type in relationship.source_ref and target_type in relationship.target_ref:
            if (relationship.source_ref in id_to_related and not reverse) or (relationship.target_ref in id_to_related and reverse):
                # append to existing entry
                if not reverse:
                    id_to_related[relationship.source_ref].append({
                        "relationship": relationship,
                        "id": relationship.target_ref
                    })
                else:
                    id_to_related[relationship.target_ref].append({
                        "relationship": relationship,
                        "id": relationship.source_ref
                    })
            else:
                # create a new entry
                if not reverse:
                    id_to_related[relationship.source_ref] = [{
                        "relationship": relationship,
                        "id": relationship.target_ref
                    }]
                else:
                    id_to_related[relationship.target_ref] = [{
                        "relationship": relationship,
                        "id": relationship.source_ref
                    }]
    # all objects of relevant type
    if not reverse:
        targets = thesrc.query([
            Filter('type', '=', target_type),
            Filter('revoked', '=', False)
        ])
    else:
        targets = thesrc.query([
            Filter('type', '=', src_type),
            Filter('revoked', '=', False)
        ])

    # build lookup of stixID to stix object
    id_to_target = {}
    for target in targets:
        id_to_target[target.id] = target

    # build final output mappings
    output = {}
    for stix_id in id_to_related:
        value = []
        for related in id_to_related[stix_id]:
            if not related["id"] in id_to_target:
                continue  # targeting a revoked object
            value.append({
                "object": id_to_target[related["id"]],
                "relationship": related["relationship"]
            })
        output[stix_id] = value
    return output

def remove_revoked_deprecated(stix_objects):
    """Remove any revoked or deprecated objects from queries made to the data source"""
    # Note we use .get() because the property may not be present in the JSON data. The default is False
    # if the property is not set.
    return list(
        filter(
            lambda x: x.get("x_mitre_deprecated", False) is False and x.get("revoked", False) is False,
            stix_objects
        )
    )

In [33]:
# taken from mitre stix - https://github.com/mitre-attack/attack-stix-data/blob/master/USAGE.md#access-the-most-recent-version-from-github-via-requests
def technique_mitigated_by_mitigations(thesrc):
    """return technique_id => {mitigation, relationship} for each mitigation of the technique."""
    return get_related(thesrc, "course-of-action", "mitigates", "attack-pattern", reverse=True)

In [114]:
def get_mitigations_from_technique(thesrc, stix_id):
    # returns list of mitigation objects and relationships linked to attack pattern object
    mitigations = technique_mitigated_by_mitigations(src)
    if (stix_id in mitigations.keys()):
        mitigations = mitigations[stix_id]
    else:
        mitigations = []
    return mitigations

In [99]:
len(get_mitigations_from_technique(src, "attack-pattern--1ecb2399-e8ba-4f6b-8ba7-5c27d49405cf"))

0

In [120]:
# each rule has a confidence, lift, support, lhs, rhs, conviction, rule power factor (RPF), 

# takes an APTGroup and generates hypothesized relationships and attack pattern objects
# returns a tuple of: (displayInfo, attackPatterns, relationships, groupings, mitigationObjects)
def CreateRelationships(APTGroup, aprioriLists):
    displayInfo = [] #stores the TTP and tactic for easy logging
    seen = [] #stores TTPs that have been seen by the algorithm
    attackPatterns = [] #stores generated attack pattern objects
    relationships = {} # stores generated relationship objects
    groupings = {} # stores generated grouping objects
    
    mitigationRelationships = {} # {stix id, object}
    mitigationObjects = {} # {stix id, [COA object, count]} #stores all mitigation objects
    
    
    # get seeds out of the APT group
    #seeds = ExtractSeedTTPs(APTGroup)
    seeds = ["T1566", "T1204"]
    
    # perform association rule mining
    rules = AprioriMining(aprioriLists) 
    
    #add seeds to activity-attack-graph as nodes
    for seed in seeds:
        # get the stix object for the TTP
        ttp = src.query([ Filter("external_references.external_id", "=", seed) ])[0]
        
        # create and add objects to lists, context is used to identify seeds from hypothesized events
        groupings[seed] = (Grouping(object_refs=[ttp.id], context = "Seed Event"))
        attackPatterns.append(src.query([ Filter("external_references.external_id", "=", seed) ])[0])
    
    seedTotals = ["'"+x+"'" for x in seeds]
    # use a queue to iterate through and create a tree of TTPs
    while len(seeds) > 0:
        for rule in rules:
            # check to see if the left hand side of a rule is satisfied 
            if "'"+seeds[0]+"'" in rule.lhs and set(rule.lhs).issubset(seedTotals):
                #if the lhs is satisfied, then loop through each TTP in the rhs 
                for ttpName in rule.rhs:
                    # if this TTP hasn't been visited already then create STIX objects
                    if ttpName not in seen:
                        ttp = src.query([ Filter("external_references.external_id", "=", ttpName[1:-1]) ])[0]
                        
                        # track rule tactics for logging to displayInfo
                        tactics = []
                        for i in ttp['kill_chain_phases']:
                            tactics.append(i['phase_name'])
                        
                        mitigations = get_mitigations_from_technique(src, ttp.id)
                        for m in mitigations:
                            mitigation = m["object"]
                            relationship = m["relationship"]
                            mitigationRelationships[relationship.id] = relationship
                            if mitigation.id in mitigationObjects.keys():
                                mitigationObjects[mitigation.id][1] += 1
                            else:
                                mitigationObjects[mitigation.id] = [mitigation, 1]
                        
                        # create a new grouping object with the rule name. 
                        # I am using ttpName[1:-1] here because TTPs have quotes around them for some reason 
                        # so I want to get rid of the quotes
                        # later I will fix this issue so that ttpName[1:-1] is not neccessary
                        groupings[ttpName[1:-1]] = (Grouping(object_refs=[ttp.id]+[mitigation["object"].id for mitigation in mitigations], context = "Hypothesized Event"))
                        #groupings[ttpName[1:-1]] = (Grouping(object_refs=[ttp.id, MITIGATION OBJECT ID], context = "Hypothesized Event"))
                        

                        
                        displayInfo.append([ttp['name'], ttpName[1:-1], tactics])
                        attackPatterns.append(ttp)
                        seedTotals.append(ttpName)
                        seen.append(ttpName)
                        seeds.append(ttp['external_references'][0]['external_id'])

                    # if the relationship already exists between two objects then we take the one with higher confidence
                    # make sure the exact relationship does not alraedy exist
                    if not (groupings[seeds[0]]['id'], groupings[ttpName[1:-1]]['id']) in relationships:
                        # if the opposite relationship exists then choose the one with the highest confidence to add
                        # we do this because if we do not then on the graph there will be two arrows going opposite directions to connect the same 2 TTPs
                        # this causes clutter and makes the confidence values unreadable since they will be layered on top of each other
                        if (groupings[ttpName[1:-1]]['id'], groupings[seeds[0]]['id']) in relationships:
                            # if existing relationship has higher confidence, move on
                            if float(relationships[(groupings[ttpName[1:-1]]['id'], groupings[seeds[0]]['id'])]['relationship_type']) > rule.confidence:
                                continue # move to next iteration of for loop, so code after this statement won't execute
                            # if existing relationship has lower confidence, delete it and let the new relationship take it's place
                            else:
                                del relationships[(groupings[ttpName[1:-1]]['id'], groupings[seeds[0]]['id'])]
                        relationships[(groupings[seeds[0]]['id'], groupings[ttpName[1:-1]]['id'])] = Relationship(groupings[seeds[0]]['id'], str(round(rule.confidence, 3)), groupings[ttpName[1:-1]]['id'])
        
        # pop to progress the queue
        seeds.pop(0)

    return (displayInfo, attackPatterns, relationships, groupings, mitigationObjects, mitigationRelationships)

In [93]:
aprioriList = AbstractTTPs(GenerateAprioriLists()) # generate lists for apriori
#aprioriList = (GenerateAprioriLists())


In [121]:
displayInfo, attackPatterns, relationships, groupings, mitigationObjects, mitigationRelationships = CreateRelationships(intelSeed, aprioriList)

In [13]:
# displays the results of the algorithm 
for info in displayInfo:
    print(f"{info[1]}:\t{info[0]}\t {info[2]}")
    print("")

T1059:	Command and Scripting Interpreter	 ['execution']

T1105:	Ingress Tool Transfer	 ['command-and-control']

T1204:	User Execution	 ['execution']

T1027:	Obfuscated Files or Information	 ['defense-evasion']

T1071:	Application Layer Protocol	 ['command-and-control']

T1082:	System Information Discovery	 ['discovery']

T1547:	Boot or Logon Autostart Execution	 ['persistence', 'privilege-escalation']

T1566:	Phishing	 ['initial-access']

T1140:	Deobfuscate/Decode Files or Information	 ['defense-evasion']

T1057:	Process Discovery	 ['discovery']

T1083:	File and Directory Discovery	 ['discovery']

T1070:	Indicator Removal	 ['defense-evasion']



In [111]:
sssss = [m[0] for m in mitigationObjects.values()]
type(sssss)

list

In [122]:
# bundle up stix objects
bundle = Bundle(attackPatterns+list(groupings.values())+list(relationships.values())+list(mitigationRelationships.values())+[m[0] for m in mitigationObjects.values()], allow_custom=True)

In [123]:
# export stix objects for visualization here: https://github.com/yukh1402/cti-stix-diamond-activity-attack-graph
ExportBundle(bundle, "test.json")

In [19]:
# need to create relationshiops
# for some reason APOs are off center under tactics 

In [124]:
list(mitigationRelationships.values())


[Relationship(type='relationship', spec_version='2.1', id='relationship--2cf7243f-d5d7-473b-9cb7-27c7186565d3', created_by_ref='identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5', created='2020-03-09T13:41:14.474Z', modified='2022-05-24T14:00:00.188Z', relationship_type='mitigates', description='Where possible, only permit execution of signed scripts.', source_ref='course-of-action--590777b3-b475-4c7c-aaf8-f4a73b140312', target_ref='attack-pattern--7385dfaf-6886-4229-9ecd-6fd678040830', revoked=False, object_marking_refs=['marking-definition--fa42a846-8d90-4e51-bc29-71d5b4802168'], x_mitre_attack_spec_version='2.1.0', x_mitre_domains=['enterprise-attack'], x_mitre_modified_by_ref='identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5', x_mitre_version='1.0'),
 Relationship(type='relationship', spec_version='2.1', id='relationship--6abc6901-d152-4b5f-b27d-8b973ae567cb', created_by_ref='identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5', created='2020-03-09T13:41:14.499Z', modified='2022-05-24T14:00: