In [1]:
from json import loads, dumps
from requests import get
from types import SimpleNamespace
from dataclasses import dataclass, field

In [154]:
def load_config(filename='config.json'):
    with open(filename, 'r') as file:
        return SimpleNamespace(**loads(file.read()))
    
def download_attack(url):
    response = get(url)
    if response.ok:
        return response.json()
    
def collect_objects(attack):
    collections = {}
    for obj in attack['objects']:
        tp = obj['type']
        if tp in collections:
            collections[tp].append(obj)
        else:
            collections.update({
                tp: [obj]
            })
    return collections

def get_technique_id(references):
    for ref in references:
        if ref['source_name'] == 'mitre-attack':
            return ref['external_id']
    return None

@dataclass
class ATechnique:
    name: str
    description: str = ""
    detection: str = ""
    data_sources: list = field(default_factory=list)
    platforms: list = field(default_factory=list)
    permissions_required: list = field(default_factory=list)
    defenses_bypassed: list = field(default_factory=list)
    id: str = ""
    tactics: list = field(default_factory=list)
    references: list = field(default_factory=list)
    ref: str = ""
    
@dataclass
class ARelation:
    source : str
    src: str
    target : str
    trg: str
    description: str
    type: str
    ref: str
    
@dataclass
class AGroup:
    name: str
    description: str = ""
    id: str = ""
    aliases: list = field(default_factory=list)
    ref: str = ""
    
def parse_techniques(objects):
    techniques = []
    for obj in collections['attack-pattern']:

        if 'revoked' in obj and obj['revoked']: continue

        id = get_technique_id(obj['external_references'])
        
        tactics = []
        for tac in obj['kill_chain_phases']:
            if tac['kill_chain_name'] == 'mitre-attack':
                tactics.append(
                    tac['phase_name']
                )

        defense = []
        if 'x_mitre_defense_bypassed' in obj and obj['x_mitre_defense_bypassed']:
            defense = obj['x_mitre_defense_bypassed']

        permissions = []
        if 'x_mitre_permissions_required' in obj and obj['x_mitre_permissions_required']:
            permissions = obj['x_mitre_permissions_required']

        tech = {
            'name': obj['name'],
            'platforms': obj['x_mitre_platforms'],
            'permissions_required': permissions,
            'defenses_bypassed': defense,
            'description': obj['description'],
            'detection': obj['x_mitre_detection'],
            'tactics': tactics,
            'id': id,
            'ref': obj['id']
        }

        atech = ATechnique(**tech)
        techniques.append(atech)
    return techniques

def parse_relations(relations):
    result_relations = []
    for rel in relations:
        data = {
            'source' : rel['source_ref'],
            'target' : rel['target_ref'],
            'description': rel.get('description', ''),
            'type': rel['relationship_type'],
            'ref': rel['id']
        }
        result_relations.append(
            ARelation(**data)
        )
    return result_relations

def parse_groups(groups_obj):
    groups = []
    for obj in groups_obj:
        #print(dumps(grp, indent=4))
        if 'revoked' in obj and obj['revoked']: continue

        id = get_technique_id(obj['external_references'])

        grp = {
                'name': obj['name'],
                'id': id,
                'ref': obj['id'],
                'description': obj['description'],
                'aliases': obj['aliases']
            }

        agrp = AGroup(**grp)
        groups.append(agrp)
    return groups


In [3]:
cfg = load_config()

In [4]:
attack = download_attack(cfg.attack_data)

In [64]:
collections = collect_objects(attack)

In [41]:
techniques_list = parse_techniques(collections['attack-pattern'])
techniques_id_index = {}
techniques = {}
for tech in techniques_list:
    techniques_id_index.update({
        tech.ref: tech.id
    })
    techniques.update({
        tech.id : tech
    })

In [106]:
relations_index = parse_relations(collections['relationship'])

In [43]:
groups_list = parse_groups(collections['intrusion-set'])
groups_id_index = {}
groups = {}
for grp in groups_list:
    groups_id_index.update({
        grp.ref : grp.id
    })
    groups.update({
        grp.id: grp
    })


In [33]:
relations_index = []

In [36]:
relations[0].source

'course-of-action--beb45abb-11e8-4aef-9778-1f9ac249784f'

In [53]:
reltypes = set()
for rel in relations:
    if rel.type == 'revoked-by':
        print(rel)
    #reltypes.add(rel.type)
#reltypes

ARelation(source='malware--310f437b-29e7-4844-848c-7220868d074a', target='malware--b42378e0-f147-496f-992a-26a49705395b', description='', type='revoked-by', ref='relationship--87231371-e005-44ab-9b66-1954615f2a7e')
ARelation(source='intrusion-set--9559ecaf-2e75-48a7-aee8-9974020bc772', target='intrusion-set--17862c7d-9e60-48a0-b48e-da4dc4c3f6b0', description='', type='revoked-by', ref='relationship--3680408d-e56e-4d68-a74d-2678093ed53f')
ARelation(source='intrusion-set--68ba94ab-78b8-43e7-83e2-aed3466882c6', target='intrusion-set--4ca1929c-7d64-4aab-b849-badbfc0c760d', description='', type='revoked-by', ref='relationship--632ca9a0-a9f3-4b27-96e1-9fcb8bab11cb')
ARelation(source='attack-pattern--52f3d5a6-8a0f-4f82-977e-750abf90d0b0', target='attack-pattern--0042a9f5-f053-4769-b3ef-9ad018dfa298', description='', type='revoked-by', ref='relationship--8be10d07-69bd-47ae-9dea-5918d1005699')
ARelation(source='attack-pattern--c1a452f3-6499-4c12-b7e9-a6a0a102af76', target='attack-pattern--70079

In [67]:
relations = []
for rel in relations_index:
    # Groups use techniques 
    if 'intrusion-set' in rel.source and 'attack-pattern' in rel.target and rel.type == 'uses':
        try:
            source_id = groups_id_index[rel.source]
            target_id = techniques_id_index[rel.target]
            relations.append((source_id, target_id, rel.type))
        except: pass

In [110]:
for obj in attack['objects']:
    if obj['id'] == 'course-of-action--beb45abb-11e8-4aef-9778-1f9ac249784f':
        print(get_technique_id(obj['external_references']))
        print(obj)
    

T1088
{'object_marking_refs': ['marking-definition--fa42a846-8d90-4e51-bc29-71d5b4802168'], 'type': 'course-of-action', 'name': 'Bypass User Account Control Mitigation', 'description': 'Remove users from the local administrator group on systems. Although UAC bypass techniques exist, it is still prudent to use the highest enforcement level for UAC when possible and mitigate bypass opportunities that exist with techniques such as [DLL Search Order Hijacking](https://attack.mitre.org/techniques/T1038). \n\nCheck for common UAC bypass weaknesses on Windows systems to be aware of the risk posture and address issues where appropriate. (Citation: Github UACMe)', 'x_mitre_version': '1.0', 'modified': '2019-07-24T14:13:23.637Z', 'created': '2018-10-17T00:14:20.652Z', 'external_references': [{'url': 'https://attack.mitre.org/mitigations/T1088', 'external_id': 'T1088', 'source_name': 'mitre-attack'}, {'url': 'https://github.com/hfiref0x/UACME', 'description': 'UACME Project. (2016, June 16). UACM

In [55]:
a


1

In [155]:
class Attack:
    
    @staticmethod
    def get_id(references):
        for ref in references:
            if ref['source_name'] == 'mitre-attack':
                return ref['external_id']
        return None

    def __init__(self):
        self.load_config()                         # creates self.cfg
        self.download_attack(self.cfg.attack_data) # creates self.attack
        self.collect_objects()                     # creates self.collections
        self.parse_techniques()
        self.parse_groups()
        self.parse_relations()

    def load_config(self, filename='config.json'):
        with open(filename, 'r') as file:
            data = SimpleNamespace(**loads(file.read()))
            file.close()
        self.cfg = data

    def download_attack(self, url):
        response = get(url)
        if response.ok:
            self.attack = response.json()

    def collect_objects(self):
        self.collections = {}
        for obj in self.attack['objects']:
            tp = obj['type']
            if tp in self.collections:
                self.collections[tp].append(obj)
            else:
                self.collections.update({
                    tp: [obj]
                })

    def parse_techniques(self):
        self.techniques = {}
        self.techniques_list = []
        for obj in self.collections['attack-pattern']:

            if 'revoked' in obj and obj['revoked']: continue
            if 'x-mitre-deprecated' in obj and obj['x-mitre-deprecated']: continue

            id = Attack.get_id(obj['external_references'])

            tactics = []
            for tac in obj['kill_chain_phases']:
                if tac['kill_chain_name'] == 'mitre-attack':
                    tactics.append(
                        tac['phase_name']
                    )

            defense = []
            if 'x_mitre_defense_bypassed' in obj and obj['x_mitre_defense_bypassed']:
                defense = obj['x_mitre_defense_bypassed']

            permissions = []
            if 'x_mitre_permissions_required' in obj and obj['x_mitre_permissions_required']:
                permissions = obj['x_mitre_permissions_required']

            tech = {
                'name': obj['name'],
                'platforms': obj['x_mitre_platforms'],
                'permissions_required': permissions,
                'defenses_bypassed': defense,
                'description': obj['description'],
                'detection': obj['x_mitre_detection'],
                'tactics': tactics,
                'id': id,
                'ref': obj['id']
            }

            atech = ATechnique(**tech)
            self.techniques.update({
                id: atech
            })
            self.techniques_list.append(atech)
            
    def parse_groups(self):
        self.groups_list = []
        self.groups = {}
        for obj in self.collections['intrusion-set']:

            if 'revoked' in obj and obj['revoked']: continue
            if 'x-mitre-deprecated' in obj and obj['x-mitre-deprecated']: continue
            
            id = Attack.get_id(obj['external_references'])

            grp = {
                    'name': obj['name'],
                    'id': id,
                    'ref': obj['id'],
                    'description': obj['description'],
                    'aliases': obj['aliases']
                }

            agrp = AGroup(**grp)
            self.groups_list.append(agrp)
            self.groups.update({
                id: agrp
            })
        return groups
    
    def parse_relations(self):
        self.relations = []
        for rel in self.collections['relationship']:
            data = {
                'source' : rel['source_ref'],
                'target' : rel['target_ref'],
                'description': rel.get('description', ''),
                'type': rel['relationship_type'],
                'ref': rel['id']
            }
            
            src_type = data['source'].split('--')[0]
            trg_type = data['target'].split('--')[0]
            
            src_id = None
            trg_id = None
            
            for obj in self.collections[src_type]:
                if obj['id'] == data['source']:
                    if 'external_references' in obj:
                        src_id = Attack.get_id(obj['external_references'])

            for obj in self.collections[trg_type]:
                if obj['id'] == data['target']:
                    if 'external_references' in obj:
                        trg_id = Attack.get_id(obj['external_references'])
            
            data.update({
                'src': src_id, 'trg': trg_id
            })
            
            self.relations.append(
                ARelation(**data)
            )


In [156]:
a = Attack()

In [1]:
from attack import AGroup, ARelation, ATechnique, Attack

In [2]:
a = Attack()

In [3]:
a.collections.keys()

dict_keys(['x-mitre-collection', 'attack-pattern', 'relationship', 'course-of-action', 'identity', 'intrusion-set', 'malware', 'tool', 'x-mitre-tactic', 'x-mitre-matrix', 'x-mitre-data-source', 'x-mitre-data-component', 'marking-definition'])