diff --git a/layers/README.md b/layers/README.md new file mode 100644 index 00000000..7f30858f --- /dev/null +++ b/layers/README.md @@ -0,0 +1,121 @@ +# layers + +This folder contains modules and scripts for working with ATT&CK Navigator layers. "ATT&CK Navigator Layers are a set of annotations overlayed on top of the ATT&CK Matrix. For more about ATT&CK Navigator layers, visit the ATT&CK Navigator repository. The core module allows users to load, validate, manipulate, and save ATT&CK layers. A brief overview of the components can be found below. All scripts adhere to the MITRE ATT&CK Navigator Layer file format, [version 3.0](https://github.com/mitre-attack/attack-navigator/blob/develop/layers/LAYERFORMATv3.md). + +#### Core Modules +| script | description | +|:-------|:------------| +| [filter](core/filter.py) | Implements a basic [filter object](https://github.com/mitre-attack/attack-navigator/blob/develop/layers/LAYERFORMATv3.md#filter-object-properties). | +| [gradient](core/gradient.py) | Implements a basic [gradient object](https://github.com/mitre-attack/attack-navigator/blob/develop/layers/LAYERFORMATv3.md#gradient-object-properties). | +| [layer](core/layer.py) | Provides an interface for interacting with core module's layer representation. A further breakdown can be found in the corresponding section below. | +| [layout](core/layout.py) | Implements a basic [layout object](https://github.com/mitre-attack/attack-navigator/blob/develop/layers/LAYERFORMATv3.md#layout-object-properties). | +| [legenditem](core/legenditem.py) | Implements a basic [legenditem object](https://github.com/mitre-attack/attack-navigator/blob/develop/layers/LAYERFORMATv3.md#legenditem-object-properties). | +| [metadata](core/metadata.py) | Implements a basic [metadata object](https://github.com/mitre-attack/attack-navigator/blob/develop/layers/LAYERFORMATv3.md#metadata-object-properties). | +| [technique](core/technique.py) | Implements a basic [technique object](https://github.com/mitre-attack/attack-navigator/blob/develop/layers/LAYERFORMATv3.md#technique-object-properties). | + +#### Manipulator Scripts +| script | description | +|:-------|:------------| +| [layerops](manipulators/layerops.py) | Provides a means by which to combine multiple ATT&CK layer objects in customized ways. A further breakdown can be found in the corresponding section below. | + +## Layer +The Layer class provides format validation and read/write capabilities to aid in working with ATT&CK Navigator Layers in python. It is the primary interface through which other Layer-related classes defined in the core module should be used. The Layer class API and a usage example are below. + +| method [x = Layer()]| description | +|:-------|:------------| +| x.from_str(_input_) | Loads an ATT&CK layer from a string representation of a json layer. | +| x.from_dict(_input_) | Loads an ATT&CK layer from a dictionary. | +| x.from_file(_filepath_) | Loads an ATT&CK layer from a file location specified by the _filepath_. | +| x.to_file(_filepath_) | Saves the current state of the loaded ATT&CK layer to a json file denoted by the _filepath_. | +| x.to_dict() | Returns a representation of the current ATT&CK layer object as a dictionary. | +| x.to_str() | Returns a representation of the current ATT&CK layer object as a string representation of a dictionary. | + +#### Example Usage + +```python +example_layer_dict = { + "name": "example layer", + "version": "3.0", + "domain": "mitre-enterprise" +} + +example_layer_location = "/path/to/layer/file.json" +example_layer_out_location = "/path/to/new/layer/file.json" + +from layers.core import Layer + +layer1 = Layer(example_layer_dict) # Create a new layer and load existing data +layer1.to_file(example_layer_out_location) # Write out the loaded layer to the specified file + +layer2 = Layer() # Create a new layer object +layer2.from_dict(example_layer_dict) # Load layer data into existing layer object +print(layer2.to_dict()) # Retrieve the loaded layer's data as a dictionary, and print it + +layer3 = Layer() # Create a new layer object +layer3.from_file(example_layer_location) # Load layer data from a file into existing layer object +``` + +## layerops.py +Layerops.py provides the LayerOps class, which is a way to combine layer files in an automated way, using user defined lambda functions. Each LayerOps instance, when created, ingests the provided lambda functions, and stores them for use. An existing LayerOps class can be used to combine layer files according to the initialized lambda using the process method. The breakdown of this two step process is documented in the table below, while examples of both the list and dictionary modes of operation can be found below. + +##### LayerOps() +```python + x = LayerOps(score=score, comment=comment, enabled=enabled, colors=colors, metadata=metadata, name=name, desc=desc, default_values=default_values) +``` + + Each of the _inputs_ takes a lambda function that will be used to combine technique object fields matching the parameter. The one exception to this is _default_values_, which is an optional dictionary argument containing default values to provide the lambda functions if techniques of the combined layers are missing them. + +##### .process() Method +```python +x.process(data, default_values=default_values) +``` +The process method applies the lambda functions stored during initialization to the layer objects in _data_. _data_ must be either a list or a dictionary of Layer objects, and is expected to match the format of the lambda equations provided during initialization. default_values is an optional dictionary argument that overrides the currently stored default + values with new ones for this specific processing operation. + +#### Example Usage +```python +from layers.manipulators.layerops import LayerOps +from layers.core.layer import Layer + +demo = Layer() +demo.from_file("C:\Users\attack\Downloads\layer.json") +demo2 = Layer() +demo2.from_file("C:\Users\attack\Downloads\layer2.json") +demo3 = Layer() +demo3.from_file("C:\Users\attack\Downloads\layer3.json") + +# Example 1) Build a LayerOps object that takes a list and averages scores across the layers +lo = LayerOps(score=lambda x: sum(x) / len(x), + name=lambda x: x[1], + desc=lambda x: "This is an list example") # Build LayerOps object +out_layer = lo.process([demo, demo2]) # Trigger processing on a list of demo and demo2 layers +out_layer.to_file("C:\demo_layer1.json") # Save averaged layer to file +out_layer2 = lo.process([demo, demo2, demo3]) # Trigger processing on a list of demo, demo2, demo3 +visual_aid = out_layer2.to_dict() # Retrieve dictionary representation of processed layer + +# Example 2) Build a LayerOps object that takes a dictionary and averages scores across the layers +lo2 = LayerOps(score=lambda x: sum([x[y] for y in x]) / len([x[y] for y in x]), + color=lambda x: x['b'], + desc=lambda x: "This is a dict example") # Build LayerOps object, with lambda +out_layer3 = lo2.process({'a': demo, 'b': demo2}) # Trigger processing on a dictionary of demo and demo2 +dict_layer = out_layer3.to_dict() # Retrieve dictionary representation of processed layer +print(dict_layer) # Display retrieved dictionary +out_layer4 = lo2.process({'a': demo, 'b': demo2, 'c': demo3})# Trigger processing on a dictionary of demo, demo2, demo3 +out_layer4.to_file("C:\demo_layer4.json") # Save averaged layer to file + +# Example 3) Build a LayerOps object that takes a single element dictionary and inverts the score +lo3 = LayerOps(score=lambda x: 100 - x['a'], + desc= lambda x: "This is a simple example") # Build LayerOps object to invert score (0-100 scale) +out_layer5 = lo3.process({'a': demo}) # Trigger processing on dictionary of demo +print(out_layer5.to_dict()) # Display processed layer in dictionary form +out_layer5.to_file("C:\demo_layer5.json") # Save inverted score layer to file + +# Example 4) Build a LayerOps object that combines the comments from elements in the list, with custom defaults +lo4 = LayerOps(score=lambda x: '; '.join(x), + default_values= { + "comment": "This was an example of new default values" + }, + desc= lambda x: "This is a defaults example") # Build LayerOps object to combine descriptions, defaults +out_layer6 = lo4.process([demo2, demo3]) # Trigger processing on a list of demo2 and demo0 +out_layer6.to_file("C:\demo_layer6.json") # Save combined comment layer to file +``` diff --git a/layers/core/__init__.py b/layers/core/__init__.py new file mode 100644 index 00000000..b6b4ae3e --- /dev/null +++ b/layers/core/__init__.py @@ -0,0 +1,10 @@ +from .layer import Layer +from .exceptions import * +from .filter import Filter +from .gradient import Gradient +from .layerobj import _LayerObj +from .layout import Layout +from .legenditem import LegendItem +from .metadata import Metadata +from .technique import Technique + diff --git a/layers/core/exceptions.py b/layers/core/exceptions.py new file mode 100644 index 00000000..b3dfcc37 --- /dev/null +++ b/layers/core/exceptions.py @@ -0,0 +1,87 @@ +UNSETVALUE = '(x)' + + +class BadInput(Exception): + pass + + +class BadType(Exception): + pass + + +class UninitializedLayer(Exception): + pass + + +class UnknownLayerProperty(Exception): + pass + + +class UnknownTechniqueProperty(Exception): + pass + + +def handler(caller, msg): + """ + Prints a debug/warning/error message + :param caller: the entity that called this function + :param msg: the message to log + """ + print('[{}] - {}'.format(caller, msg)) + + +def typeChecker(caller, testee, type, field): + """ + Verifies that the tested object is of the correct type + :param caller: the entity that called this function (used for error + messages) + :param testee: the element to test + :param type: the type the element should be + :param field: what the element is to be used as (used for error + messages) + :raises BadType: error denoting the testee element is not of the + correct type + """ + if not isinstance(testee, type): + handler(caller, '{} [{}] is not a {}'.format(testee, field, + str(type))) + raise BadType + + +def typeCheckerArray(caller, testee, type, field): + """ + Verifies that the tested object is an array of the correct type + :param caller: the entity that called this function (used for error + messages) + :param testee: the element to test + :param type: the type the element should be + :param field: what the element is to be used as (used for error + messages) + :raises BadType: error denoting the testee element is not of the + correct type + """ + if not isinstance(testee, list): + handler(caller, '{} [{}] is not a {}'.format(testee, field, + "Array")) + raise BadType + if not isinstance(testee[0], type): + handler(caller, '{} [{}] is not a {}'.format(testee, field, + "Array of " + type)) + raise BadType + + +def categoryChecker(caller, testee, valid, field): + """ + Verifies that the tested object is one of a set of valid values + :param caller: the entity that called this function (used for error + messages) + :param testee: the element to test + :param valid: a list of valid values for the testee + :param field: what the element is to be used as (used for error + messages) + :raises BadInput: error denoting the testee element is not one of + the valid options + """ + if testee not in valid: + handler(caller, '{} not a valid value for {}'.format(testee, field)) + raise BadInput diff --git a/layers/core/filter.py b/layers/core/filter.py new file mode 100644 index 00000000..5b36a8aa --- /dev/null +++ b/layers/core/filter.py @@ -0,0 +1,65 @@ +try: + from ..core.exceptions import typeCheckerArray, categoryChecker, \ + UNSETVALUE +except ValueError: + from core.exceptions import typeCheckerArray, categoryChecker, \ + UNSETVALUE + + +class Filter: + def __init__(self, domain="mitre-enterprise"): + """ + Initialization - Creates a filter object, with an optional + domain input + + :param domain: The domain used for this layer (mitre-enterprise + or mitre-mobile) + """ + self.__stages = UNSETVALUE + self.domain = domain + self.__platforms = UNSETVALUE + + @property + def stages(self): + if self.__stages != UNSETVALUE: + return self.__stages + + @stages.setter + def stages(self, stage): + typeCheckerArray(type(self).__name__, stage, str, "stage") + categoryChecker(type(self).__name__, stage[0], ["act", "prepare"], + "stages") + self.__stages = stage + + @property + def platforms(self): + if self.__platforms != UNSETVALUE: + return self.__platforms + + @platforms.setter + def platforms(self, platforms): + typeCheckerArray(type(self).__name__, platforms, str, "platforms") + self.__platforms = [] + valids = ["Windows", "Linux", "macOS", "AWS", "GCP", "Azure", + "Azure AD", "Office 365", "SaaS"] + if self.domain == "mitre-mobile": + valids = ['Android', 'iOS'] + for entry in platforms: + categoryChecker(type(self).__name__, entry, valids, "platforms") + self.__platforms.append(entry) + + def get_dict(self): + """ + Converts the currently loaded data into a dict + :returns: A dict representation of the local filter object + """ + temp = dict() + listing = vars(self) + for entry in listing: + if entry == 'domain': + continue + if listing[entry] != UNSETVALUE: + temp[entry.split(type(self).__name__ + '__')[-1]] \ + = listing[entry] + if len(temp) > 0: + return temp diff --git a/layers/core/gradient.py b/layers/core/gradient.py new file mode 100644 index 00000000..f6317461 --- /dev/null +++ b/layers/core/gradient.py @@ -0,0 +1,55 @@ +try: + from ..core.exceptions import typeChecker, typeCheckerArray +except ValueError: + from core.exceptions import typeChecker, typeCheckerArray + + +class Gradient: + def __init__(self, colors, minValue, maxValue): + """ + Initialization - Creates a gradient object + + :param colors: The array of color codes for this gradient + :param minValue: The minValue for this gradient + :param maxValue: The maxValue for this gradient + """ + self.colors = colors + self.minValue = minValue + self.maxValue = maxValue + + @property + def colors(self): + return self.__colors + + @colors.setter + def colors(self, colors): + typeCheckerArray(type(self).__name__, colors, str, "colors") + self.__colors = [] + for entry in colors: + self.__colors.append(entry) + + @property + def minValue(self): + return self.__minValue + + @minValue.setter + def minValue(self, minValue): + typeChecker(type(self).__name__, minValue, int, "minValue") + self.__minValue = minValue + + @property + def maxValue(self): + return self.__maxValue + + @maxValue.setter + def maxValue(self, maxValue): + typeChecker(type(self).__name__, maxValue, int, "maxValue") + self.__maxValue = maxValue + + def get_dict(self): + """ + Converts the currently loaded gradient file into a dict + :returns: A dict representation of the current gradient object + """ + return dict(colors=self.__colors, minValue=self.__minValue, + maxValue=self.maxValue) diff --git a/layers/core/layer.py b/layers/core/layer.py new file mode 100644 index 00000000..cb17a099 --- /dev/null +++ b/layers/core/layer.py @@ -0,0 +1,118 @@ +import json +try: + from ..core.exceptions import UninitializedLayer, BadType, BadInput, \ + handler + from ..core.layerobj import _LayerObj +except ValueError: + from core.exceptions import UninitializedLayer, BadType, BadInput, \ + handler + from core.layerobj import _LayerObj + + +class Layer: + def __init__(self, init_data={}, strict=True): + """ + Initialization - create a new Layer object + :param init_data: Optionally provide base Layer json or string + data on initialization + """ + self.__layer = None + self.strict = strict + if isinstance(init_data, str): + self.from_str(init_data) + else: + self.from_dict(init_data) + + @property + def layer(self): + if self.__layer is not None: + return self.__layer + return "No Layer Loaded Yet!" + + def from_str(self, init_str): + """ + Loads a raw layer string into the object + :param init_str: the string representing the layer data to + be loaded + """ + self._data = json.loads(init_str) + self._build() + + def from_dict(self, init_dict): + """ + Loads a raw layer string into the object + :param init_dict: the dictionary representing the layer data to + be loaded + """ + self._data = init_dict + if self._data != {}: + self._build() + + def from_file(self, filename): + """ + loads input from a layer file specified by filename + :param filename: the target filename to load from + """ + with open(filename, 'r') as fio: + raw = fio.read() + self._data = json.loads(raw) + self._build() + + def to_file(self, filename): + """ + saves the current state of the layer to a layer file specified by + filename + :param filename: the target filename to save as + """ + if self.__layer is not None: + with open(filename, 'w') as fio: + json.dump(self.__layer.get_dict(), fio) + else: + raise UninitializedLayer + + def _build(self): + """ + Loads the data stored in self.data into a LayerObj (self.layer) + """ + try: + self.__layer = _LayerObj(self._data['version'], self._data['name'], + self._data['domain']) + except BadType or BadInput as e: + handler(type(self).__name__, 'Layer is malformed: {}. ' + 'Unable to load.'.format(e)) + self.__layer = None + return + except KeyError as e: + handler(type(self).__name__, 'Layer is missing parameters: {}. ' + 'Unable to load.'.format(e)) + self.__layer = None + return + + for key in self._data: + if key not in ['version', 'name', 'domain']: + try: + self.__layer._linker(key, self._data[key]) + except Exception as e: + if self.strict: + handler(type(self).__name__, "{} error. " + "Unable to load." + .format(str(e.__class__.__name__))) + self.__layer = None + return + + def to_dict(self): + """ + Converts the currently loaded layer file into a dict + :returns: A dict representation of the current layer object + """ + if self.__layer is not None: + return self.__layer.get_dict() + + def to_str(self): + """ + Converts the currently loaded layer file into a string + representation of a dictionary + :returns: A string representation of the current layer object + """ + if self.__layer is not None: + return json.dumps(self.to_dict()) diff --git a/layers/core/layerobj.py b/layers/core/layerobj.py new file mode 100644 index 00000000..dda4a6a5 --- /dev/null +++ b/layers/core/layerobj.py @@ -0,0 +1,370 @@ +try: + from ..core.filter import Filter + from ..core.layout import Layout + from ..core.technique import Technique + from ..core.gradient import Gradient + from ..core.legenditem import LegendItem + from ..core.metadata import Metadata + from ..core.exceptions import UNSETVALUE, typeChecker, BadInput, handler, \ + categoryChecker, UnknownLayerProperty +except ValueError: + from core.filter import Filter + from core.layout import Layout + from core.technique import Technique + from core.gradient import Gradient + from core.legenditem import LegendItem + from core.metadata import Metadata + from core.exceptions import UNSETVALUE, typeChecker, BadInput, handler, \ + categoryChecker, UnknownLayerProperty + +class _LayerObj: + def __init__(self, version, name, domain): + """ + Initialization - Creates a layer object + + :param version: The corresponding att&ck layer version + :param name: The name for this layer + :param domain: The domain for this layer (mitre-enterprise + or mitre-mobile) + """ + self.version = version + self.name = name + self.__description = UNSETVALUE + self.domain = domain + self.__filters = UNSETVALUE + self.__sorting = UNSETVALUE + self.__layout = UNSETVALUE + self.__hideDisabled = UNSETVALUE + self.__techniques = UNSETVALUE + self.__gradient = UNSETVALUE + self.__legendItems = UNSETVALUE + self.__showTacticRowBackground = UNSETVALUE + self.__tacticRowBackground = UNSETVALUE + self.__selectTechniquesAcrossTactics = UNSETVALUE + self.__selectSubtechniquesWithParent = UNSETVALUE + self.__metadata = UNSETVALUE + + @property + def version(self): + return self.__version + + @version.setter + def version(self, version): + typeChecker(type(self).__name__, version, str, "version") + categoryChecker(type(self).__name__, version, ["3.0"], "version") + self.__version = version + + @property + def name(self): + return self.__name + + @name.setter + def name(self, name): + typeChecker(type(self).__name__, name, str, "name") + self.__name = name + + @property + def domain(self): + return self.__domain + + @domain.setter + def domain(self, domain): + typeChecker(type(self).__name__, domain, str, "domain") + categoryChecker(type(self).__name__, domain, ["mitre-enterprise", + "mitre-mobile"], + "domain") + self.__domain = domain + + @property + def description(self): + if self.__description != UNSETVALUE: + return self.__description + + @description.setter + def description(self, description): + typeChecker(type(self).__name__, description, str, "description") + self.__description = description + + @property + def filters(self): + if self.__filters != UNSETVALUE: + return self.__filters + + @filters.setter + def filters(self, filters): + try: + temp = Filter(self.domain) + temp.stages = filters['stages'] + temp.platforms = filters['platforms'] + self.__filters = temp + except KeyError as e: + handler(type(self).__name__, "Unable to properly extract " + "information from filter: {}." + .format(e)) + raise BadInput + + @property + def sorting(self): + if self.__sorting != UNSETVALUE: + return self.__sorting + + @sorting.setter + def sorting(self, sorting): + typeChecker(type(self).__name__, sorting, int, "sorting") + categoryChecker(type(self).__name__, sorting, [0, 1, 2, 3], "sorting") + self.__sorting = sorting + + @property + def layout(self): + if self.__layout != UNSETVALUE: + return self.__layout + + @layout.setter + def layout(self, layout): + temp = Layout() + if "layout" in layout: + temp.layout = layout['layout'] + if "showName" in layout: + temp.showName = layout['showName'] + if "showID" in layout: + temp.showID = layout['showID'] + self.__layout = temp + + @property + def hideDisabled(self): + if self.__hideDisabled != UNSETVALUE: + return self.__hideDisabled + + @hideDisabled.setter + def hideDisabled(self, hideDisabled): + typeChecker(type(self).__name__, hideDisabled, bool, "hideDisabled") + self.__hideDisabled = hideDisabled + + @property + def techniques(self): + if self.__techniques != UNSETVALUE: + return self.__techniques + + @techniques.setter + def techniques(self, techniques): + typeChecker(type(self).__name__, techniques, list, "techniques") + self.__techniques = [] + entry = "" + try: + for entry in techniques: + temp = Technique(entry['techniqueID']) + temp._loader(entry) + self.__techniques.append(temp) + except KeyError as e: + handler(type(self).__name__, "Unable to properly extract " + "information from technique {}: {}." + .format(entry, e)) + raise BadInput + + @property + def gradient(self): + if self.__gradient != UNSETVALUE: + return self.__gradient + + @gradient.setter + def gradient(self, gradient): + try: + self.__gradient = Gradient(gradient['colors'], + gradient['minValue'], + gradient['maxValue']) + except KeyError as e: + handler(type(self).__name__, 'Gradient is missing parameters: {}. ' + 'Unable to load.'.format(e)) + + @property + def legendItems(self): + if self.__legendItems != UNSETVALUE: + return self.__legendItems + + @legendItems.setter + def legendItems(self, legendItems): + typeChecker(type(self).__name__, legendItems, list, "legendItems") + self.__legendItems = [] + entry = "" + try: + for entry in legendItems: + temp = LegendItem(entry['label'], entry['color']) + self.__legendItems.append(temp) + except KeyError as e: + handler(type(self).__name__, 'LegendItem {} is missing parameters:' + ' {}. Unable to load.' + .format(entry, e)) + + @property + def showTacticRowBackground(self): + if self.__showTacticRowBackground != UNSETVALUE: + return self.__showTacticRowBackground + + @showTacticRowBackground.setter + def showTacticRowBackground(self, showTacticRowBackground): + typeChecker(type(self).__name__, showTacticRowBackground, bool, + "showTacticRowBackground") + self.__showTacticRowBackground = showTacticRowBackground + + @property + def tacticRowBackground(self): + if self.__tacticRowBackground != UNSETVALUE: + return self.__tacticRowBackground + + @tacticRowBackground.setter + def tacticRowBackground(self, tacticRowBackground): + typeChecker(type(self).__name__, tacticRowBackground, str, + "tacticRowBackground") + self.__tacticRowBackground = tacticRowBackground + + @property + def selectTechniquesAcrossTactics(self): + if self.__selectTechniquesAcrossTactics != UNSETVALUE: + return self.__selectTechniquesAcrossTactics + + @selectTechniquesAcrossTactics.setter + def selectTechniquesAcrossTactics(self, selectTechniquesAcrossTactics): + typeChecker(type(self).__name__, selectTechniquesAcrossTactics, bool, + "selectTechniqueAcrossTactics") + self.__selectTechniquesAcrossTactics = selectTechniquesAcrossTactics + + @property + def selectSubtechniquesWithParent(self): + if self.__selectSubtechniquesWithParent != UNSETVALUE: + return self.__selectSubtechniquesWithParent + + @selectSubtechniquesWithParent.setter + def selectSubtechniquesWithParent(self, selectSubtechniquesWithParent): + typeChecker(type(self).__name__, selectSubtechniquesWithParent, bool, + "selectSubtechniquesWithParent") + self.__selectSubtechniquesWithParent = selectSubtechniquesWithParent + + @property + def metadata(self): + if self.__metadata != UNSETVALUE: + return self.__metadata + + @metadata.setter + def metadata(self, metadata): + typeChecker(type(self).__name__, metadata, list, "metadata") + self.__metadata = [] + entry = "" + try: + for entry in metadata: + self.__metadata.append(Metadata(entry['name'], entry['value'])) + except KeyError as e: + handler(type(self).__name__, 'Metadata {} is missing parameters: ' + '{}. Unable to load.' + .format(entry, e)) + + def _enumerate(self): + """ + INTERNAL: Identifies which fields have been set for this Layer + object + :returns: a list of all set fields within this Layer object + """ + temp = ['name', 'version', 'domain'] + if self.description: + temp.append('description') + if self.filters: + temp.append('filters') + if self.sorting: + temp.append('sorting') + if self.layout: + temp.append('layout') + if self.hideDisabled: + temp.append('hideDisabled') + if self.techniques: + temp.append('techniques') + if self.gradient: + temp.append('gradient') + if self.legendItems: + temp.append('legendItems') + if self.showTacticRowBackground: + temp.append('showTacticRowBackground') + if self.tacticRowBackground: + temp.append('tacticRowBackground') + if self.selectTechniquesAcrossTactics: + temp.append('selectTechniquesAcrossTactics') + if self.selectSubtechniquesWithParent: + temp.append('selectSubtechniquesWithParent') + if self.metadata: + temp.append('metadata') + return temp + + def get_dict(self): + """ + Converts the currently loaded layer into a dict + :returns: A dict representation of the current layer object + """ + temp = dict(name=self.name, version=self.version, domain=self.domain) + + if self.description: + temp['description'] = self.description + if self.filters: + temp['filters'] = self.filters.get_dict() + if self.sorting: + temp['sorting'] = self.sorting + if self.layout: + temp['layout'] = self.layout.get_dict() + if self.hideDisabled is not None: + temp['hideDisabled'] = self.hideDisabled + if self.techniques: + temp['techniques'] = [x.get_dict() for x in self.techniques] + if self.gradient: + temp['gradient'] = self.gradient.get_dict() + if self.legendItems: + temp['legendItems'] = [x.get_dict() for x in self.legendItems] + if self.showTacticRowBackground is not None: + temp['showTacticRowBackground'] = self.showTacticRowBackground + if self.tacticRowBackground: + temp['tacticRowBackground'] = self.tacticRowBackground + if self.selectTechniquesAcrossTactics is not None: + temp['selectTechniquesAcrossTactics'] = \ + self.selectTechniquesAcrossTactics + if self.selectSubtechniquesWithParent is not None: + temp['selectSubtechniquesWithParent'] = \ + self.selectSubtechniquesWithParent + if self.metadata: + temp['metadata'] = [x.get_dict() for x in self.metadata] + return temp + + def _linker(self, field, data): + """ + INTERNAL: Acts as a middleman routing the settings of values + within the layer + :param field: The value field being set + :param data: The corresponding data to set that field to + :raises UnknownLayerProperty: An error indicating that an + unexpected property was identified + """ + if field == 'description': + self.description = data + elif field == 'filters': + self.filters = data + elif field == 'sorting': + self.sorting = data + elif field == 'layout': + self.layout = data + elif field == 'hideDisabled': + self.hideDisabled = data + elif field == 'techniques': + self.techniques = data + elif field == 'gradient': + self.gradient = data + elif field == 'legendItems': + self.legendItems = data + elif field == 'showTacticRowBackground': + self.showTacticRowBackground = data + elif field == 'tacticRowBackground': + self.tacticRowBackground = data + elif field == 'selectTechniquesAcrossTactics': + self.selectTechniquesAcrossTactics = data + elif field == 'selectSubtechniquesWithParent': + self.selectSubtechniquesWithParent = data + elif field == 'metadata': + self.metadata = data + else: + handler(type(self).__name__, "Unknown layer property: {}" + .format(field)) + raise UnknownLayerProperty diff --git a/layers/core/layout.py b/layers/core/layout.py new file mode 100644 index 00000000..88619f53 --- /dev/null +++ b/layers/core/layout.py @@ -0,0 +1,60 @@ +try: + from ..core.exceptions import typeChecker, categoryChecker, UNSETVALUE +except ValueError: + from core.exceptions import typeChecker, categoryChecker, UNSETVALUE + + +class Layout: + def __init__(self): + """ + Initialization - Creates a layout object + """ + self.__layout = UNSETVALUE + self.__showID = UNSETVALUE + self.__showName = UNSETVALUE + + @property + def layout(self): + if self.__layout != UNSETVALUE: + return self.__layout + + @layout.setter + def layout(self, layout): + typeChecker(type(self).__name__, layout, str, "layout") + categoryChecker(type(self).__name__, layout, ["side", "flat", "mini"], + "layout") + self.__layout = layout + + @property + def showID(self): + if self.__showID != UNSETVALUE: + return self.__showID + + @showID.setter + def showID(self, showID): + typeChecker(type(self).__name__, showID, bool, "showID") + self.__showID = showID + + @property + def showName(self): + if self.__showName != UNSETVALUE: + return self.__showName + + @showName.setter + def showName(self, showName): + typeChecker(type(self).__name__, showName, bool, "showName") + self.__showName = showName + + def get_dict(self): + """ + Converts the currently loaded data into a dict + :returns: A dict representation of the local layout object + """ + listing = vars(self) + temp = dict() + for entry in listing: + if listing[entry] != UNSETVALUE: + temp[entry.split(type(self).__name__ + '__')[-1]]\ + = listing[entry] + if len(temp) > 0: + return temp diff --git a/layers/core/legenditem.py b/layers/core/legenditem.py new file mode 100644 index 00000000..5dd39689 --- /dev/null +++ b/layers/core/legenditem.py @@ -0,0 +1,41 @@ +try: + from ..core.exceptions import typeChecker +except ValueError: + from core.exceptions import typeChecker + + +class LegendItem: + def __init__(self, label, color): + """ + Initialization - Creates a legendItem object + + :param label: The label described by this object + :param color: The color associated with the label + """ + self.label = label + self.color = color + + @property + def color(self): + return self.__color + + @color.setter + def color(self, color): + typeChecker(type(self).__name__, color, str, "color") + self.__color = color + + @property + def label(self): + return self.__label + + @label.setter + def label(self, label): + typeChecker(type(self).__name__, label, str, "label") + self.__label = label + + def get_dict(self): + """ + Converts the currently loaded data into a dict + :returns: A dict representation of the local legendItem object + """ + return dict(label=self.__label, color=self.__color) diff --git a/layers/core/metadata.py b/layers/core/metadata.py new file mode 100644 index 00000000..1330abff --- /dev/null +++ b/layers/core/metadata.py @@ -0,0 +1,41 @@ +try: + from ..core.exceptions import typeChecker +except ValueError: + from core.exceptions import typeChecker + + +class Metadata: + def __init__(self, name, value): + """ + Initialization - Creates a metadata object + + :param name: the name for this metadata entry + :param value: the corresponding value for this metadata entry + """ + self.name = name + self.value = value + + @property + def name(self): + return self.__name + + @name.setter + def name(self, name): + typeChecker(type(self).__name__, name, str, "name") + self.__name = name + + @property + def value(self): + return self.__value + + @value.setter + def value(self, value): + typeChecker(type(self).__name__, value, str, "value") + self.__value = value + + def get_dict(self): + """ + Converts the currently loaded data into a dict + :returns: A dict representation of the local metadata object + """ + return dict(name=self.__name, value=self.__value) diff --git a/layers/core/technique.py b/layers/core/technique.py new file mode 100644 index 00000000..4b2efc4c --- /dev/null +++ b/layers/core/technique.py @@ -0,0 +1,168 @@ +try: + from ..core.exceptions import BadInput, handler, typeChecker, \ + UNSETVALUE, UnknownTechniqueProperty, BadType + from ..core.metadata import Metadata +except ValueError: + from core.exceptions import BadInput, handler, typeChecker, \ + UNSETVALUE, UnknownTechniqueProperty, BadType + from core.metadata import Metadata + + +class Technique: + def __init__(self, tID): + """ + Initialization - Creates a technique object + + :param tID: The techniqueID associated with this technique object + """ + self.techniqueID = tID + self.__tactic = UNSETVALUE + self.__comment = UNSETVALUE + self.__enabled = UNSETVALUE + self.__score = UNSETVALUE + self.__color = UNSETVALUE + self.__metadata = UNSETVALUE + self.__showSubtechniques = UNSETVALUE + + @property + def techniqueID(self): + return self.__techniqueID + + @techniqueID.setter + def techniqueID(self, techniqueID): + typeChecker(type(self).__name__, techniqueID, str, "techniqueID") + if not techniqueID.startswith('T'): + handler(type(self).__name__, '{} not a valid value for techniqueID' + .format(techniqueID)) + raise BadInput + else: + self.__techniqueID = techniqueID + + @property + def tactic(self): + if self.__tactic != UNSETVALUE: + return self.__tactic + + @tactic.setter + def tactic(self, tactic): + typeChecker(type(self).__name__, tactic, str, "tactic") + self.__tactic = tactic + + @property + def comment(self): + if self.__tactic != UNSETVALUE: + return self.__comment + + @comment.setter + def comment(self, comment): + typeChecker(type(self).__name__, comment, str, "comment") + self.__comment = comment + + @property + def enabled(self): + if self.__enabled != UNSETVALUE: + return self.__enabled + + @enabled.setter + def enabled(self, enabled): + typeChecker(type(self).__name__, enabled, bool, "enabled") + self.__enabled = enabled + + @property + def score(self): + if self.__score != UNSETVALUE: + return self.__score + + @score.setter + def score(self, score): + try: + typeChecker(type(self).__name__, score, int, "score") + self.__score = score + except BadType: + typeChecker(type(self).__name__, score, float, "score") + self.__score = int(score) + + @property + def color(self): + if self.__color != UNSETVALUE: + return self.__color + + @color.setter + def color(self, color): + typeChecker(type(self).__name__, color, str, "color") + self.__color = color + + @property + def metadata(self): + if self.metadata != UNSETVALUE: + return self.__metadata + + @metadata.setter + def metadata(self, metadata): + typeChecker(type(self).__name__, metadata, list, "metadata") + self.__metadata = [] + entry = "" + try: + for entry in metadata: + self.__metadata.append(Metadata(entry['name'], entry['value'])) + except KeyError as e: + handler(type(self).__name__, 'Metadata {} is missing parameters: ' + '{}. Unable to load.' + .format(entry, e)) + + @property + def showSubtechniques(self): + if self.__showSubtechniques != UNSETVALUE: + return self.__showSubtechniques + + @showSubtechniques.setter + def showSubtechniques(self, showSubtechniques): + typeChecker(type(self).__name__, showSubtechniques, bool, + "showSubtechniques") + self.__showSubtechniques = showSubtechniques + + def _loader(self, data): + """ + INTERNAL: Acts a middleman for loading values into the technique + object from a dict representation + :param data: A dict describing the technique + :raises UnknownTechniqueProperty: An error indicating that an + unexpected property was found on the technique + """ + for entry in data.keys(): + if entry == 'techniqueID': + pass + elif entry == 'tactic': + self.tactic = data[entry] + elif entry == 'comment': + self.comment = data[entry] + elif entry == 'enabled': + self.enabled = data[entry] + elif entry == 'score': + self.score = data[entry] + elif entry == 'color': + self.color = data[entry] + elif entry == 'metadata': + self.metadata = data[entry] + elif entry == 'showSubtechniques': + self.showSubtechniques = data[entry] + else: + handler(type(self).__name__, "Unknown technique property: {}" + .format(entry)) + raise UnknownTechniqueProperty + + def get_dict(self): + """ + Converts the currently loaded data into a dict + :returns: A dict representation of the local technique object + """ + dset = vars(self) + temp = {} + for key in dset: + entry = key.split(type(self).__name__ + '__')[-1] + if dset[key] != UNSETVALUE: + if entry != 'metadata': + temp[entry] = dset[key] + else: + temp[entry] = [x.get_dict() for x in dset[key]] + return temp diff --git a/layers/manipulators/layerops.py b/layers/manipulators/layerops.py new file mode 100644 index 00000000..f8b0f7d4 --- /dev/null +++ b/layers/manipulators/layerops.py @@ -0,0 +1,349 @@ +# Example Use: +# from layers.manipulators.layerops import LayerOps +# from layers.core.layer import Layer + +# demo = Layer() +# demo.load_file("C:\Users\attack\Downloads\layer.json") +# demo2 = Layer() +# demo2.load_input(Existing_Layer_String_Previously_Loaded) + +# lo = LayerOps(score=lambda x: x[0] * x[1], name=lambda x: x[1], +# desc=lambda x: "This is an list example") +# out_layer = lo.process([demo, demo2]) +# out_layer.export_file("C:\demo_layer1.json") + +# lo2 = LayerOps(score=lambda x: x['a'], color=lambda x: x['b'], +# desc=lambda x: "This is a dict example") +# out_layer2 = lo2.process({'a': demo, 'b': demo2}) +# dict_layer = out_layer2.get_dict() +# print(dict_layer) +# out_layer2.export_file("C:\demo_layer2.json") + +import copy +try: + from core import Layer +except ModuleNotFoundError: + from ..core import Layer + + +class InvalidFormat(Exception): + pass + + +class BadLambda(Exception): + pass + + +class MismatchedDomain(Exception): + pass + + +class LayerOps: + def __init__(self, score=None, comment=None, enabled=None, colors=None, + metadata=None, name=None, desc=None, default_values=None): + """ + Initialization - configures the object to handle processing + based on user provided Lambdas + :param score: lambda to calculate score + :param comment: lambda to generate comments + :param enabled: lambda to determine enabled status + :param metadata: lambda to generate metadata + :param name: new name to apply to the resulting layer + :param desc: new description to apply to the resulting layer + :param default_values: dictionary containing desired default + values for missing data element values + """ + self._score = score + self._comment = comment + self._enabled = enabled + self._colors = colors + self._metadata = metadata + self._name = name + self._desc = desc + self._default_values = { + "comment": "", + "enabled": True, + "color": "#ffffff", + "score": 1, + "metadata": [] + } + if default_values is not None: + for entry in default_values: + self._default_values[entry] = default_values[entry] + + def process(self, data, default_values=None): + """ + takes a list or dict of Layer objects, and processes them + :param data: A dict or list of Layer objects. + :param default_values: dictionary containing desired default values for + missing data element values + :raises InvalidFormat: An error indicating that the layer data + wasn't provided in a list or dict + """ + if isinstance(data, dict): + temp = {} + for entry in data.keys(): + temp[entry] = data[entry].layer.techniques + self.mode = 'dict' + out = self._merge_to_template(data, key=list(data.keys())[0]) + elif isinstance(data, list): + temp = [] + for entry in data: + temp.append(entry.layer.techniques) + self.mode = 'list' + out = self._merge_to_template(data) + else: + raise InvalidFormat + da = temp + corpus = self._build_template(temp) + + defaults = self._default_values + if default_values is not None: + for entry in default_values: + defaults[entry] = default_values[entry] + + return self._compute(data, da, corpus, out, defaults) + + def _compute(self, data, da, corpus, out, defaults): + """ + INTERNAL: Applies the configured lambda to the dataset + :param data: the dataset being processed + :param da: extracted techniques from the dataset, sorted by + dataset format + :param corpus: master list of combined techniques and + technique data + :param out: baseline template for the new layer + :param defaults: default values each technique should use if a + field is missing + :returns: a Layer object representing the resultant layer + """ + composite = copy.deepcopy(corpus) + if self._score is not None: + for entry in composite: + entry['score'] = self._applyOperation(da, entry, 'score', + self._score, defaults) + + if self._comment is not None: + for entry in composite: + entry['comment'] = self._applyOperation(da, entry, 'comment', + self._comment, defaults) + + if self._enabled is not None: + for entry in composite: + entry['enabled'] = self._applyOperation(da, entry, 'enabled', + self._enabled, defaults) + + if self._colors is not None: + for entry in composite: + entry['color'] = self._applyOperation(da, entry, 'color', + self._colors, defaults) + + if self._metadata is not None: + for entry in composite: + entry['metadata'] = self._applyOperation(da, entry, 'metadata', + self._metadata, + defaults) + + processed = copy.deepcopy(out) + processed['techniques'] = composite + if self._name is not None: + processed['name'] = self._applyOperation(data, None, 'name', + self._name, defaults, + glob='name') + + if self._desc is not None: + processed['description'] = self._applyOperation(data, None, + 'description', + self._desc, + defaults, + glob='description') + + return Layer(processed) + + def _merge_to_template(self, data, key=0): + """ + INTERNAL: merges initial layer files in either dict or list form + into a single template. Defaults to the first entry in the + case of difference in metadata. + :param key: the key referencing the first entry to default to + :raises MismatchedDomain: An error indicating that the layers + came from different domains + """ + out = {} + dict_map = [] + collide = [] + if self.mode == 'dict': + for x in data.keys(): + dict_map.append(x) + collide.append(data[x]) + else: + for x in data: + collide.append(x) + key_space = data[key].layer._enumerate() + _raw = data[key].layer.get_dict() + for entry in key_space: + if entry != 'techniques': + standard = _raw[entry] + if any(y != standard for y in + [getattr(x.layer, entry) for x in collide]): + if entry == 'domain': + print('FATAL ERROR! Layer mis-match on domain. ' + 'Exiting.') + raise MismatchedDomain + print('Warning! Layer mis-match detected for {}. ' + 'Defaulting to {}\'s value'.format(entry, key)) + out[entry] = standard + return out + + def _build_template(self, data): + """ + INTERNAL: builds a base template by combining available technique + listings from each layer + :param data: the raw ingested technique data (list or dict) + """ + if self.mode == 'list': + return self._template(data) + elif self.mode == 'dict': + temp = {} + t2 = [] + for key in data: + temp[key] = self._template([data[key]]) + for key in temp: + for elm in temp[key]: + if not any(elm['techniqueID'] == x['techniqueID'] + for x in t2): + t2.append(elm) + else: + [x.update(elm) + if x['techniqueID'] == elm['techniqueID'] + else x for x in t2] + return t2 + + def _template(self, data): + """ + INTERNAL: creates a template technique entry for a given listing + of techniques + :param data: a single layer's technique data + :returns: a list of technique templates + """ + temp = [] + for entry in data: + temp.append([{"techniqueID": x.techniqueID, "tactic": x.tactic} + if x.tactic else {"techniqueID": x.techniqueID} + for x in entry]) + return list({v['techniqueID']: v + for v in [elm for list in temp for elm in list]}.values()) + + def _grabList(self, search, collection): + """ + INTERNAL: generates a list containing all values for a given key + across the collection + :param search: the key to search for + :param collection: the data collection to search + :returns: a list of values for that key across the collection + """ + temp = [] + for x in collection: + temp.append(self._grabElement(search, x)) + return temp + + def _grabDict(self, search, collection): + """ + INTERNAL: generates a dictionary containing all values for a given + key across the collection + :param search: the key to search for + :param collection: the data collection to search + :returns: a dict of values for that key across the collection + """ + temp = {} + for key in collection: + temp[key] = self._grabElement(search, collection[key]) + return temp + + def _grabElement(self, search, listing): + """ + INTERNAL: returns a matching element in the listing for the + search key + :param search: the key to search for + :param listing: the data element to search + :returns: the found value, or an empty dict + """ + val = [x for x in listing if self._inDict(search, x)] + if len(val) > 0: + return val[0] + return {} + + def _inDict(self, search, complete): + """ + INTERNAL: returns bool of whether or not the key searched + for can be found across the dataset corpus + :param search: the key to search for + :param complete: the data set to search for + :returns: true/false + """ + comp_list = complete.get_dict().items() + search_terms = search.items() + return all(elm in comp_list for elm in search_terms) + + def _applyOperation(self, corpus, element, name, lda, defaults, glob=None): + """ + INTERNAL: applies a lambda expression to the dataset + :param corpus: the dataset + :param element: the template file to fill out + :param name: the name of the field being processed + :param lda: the lambda expression to apply + :param defaults: any default values should the field not be found + in a dataset entry + :raises BadLambda: error denoting that an error has occurred when + running the lambda function + :returns: lambda output + """ + if self.mode == 'list': + if glob: + listing = [getattr(x.layer, glob) for x in corpus] + listing = [{name: x} for x in listing] + else: + listing = self._grabList(element, corpus) + listing = [x.get_dict() if not isinstance(x, dict) + else dict() for x in listing] + values = [] + for x in listing: + if name in x: + values.append(x[name]) + else: + if defaults is not None: + if name in defaults: + values.append(defaults[name]) + continue + values.append(self._default_values[name]) + else: + values = {} + if glob: + listing = {} + for k in corpus.keys(): + listing[k] = {glob: getattr(corpus[k].layer, glob)} + else: + temp = self._grabDict(element, corpus) + listing = {} + for k in temp.keys(): + if temp[k] != {}: + listing[k] = temp[k].get_dict() + else: + listing[k] = {} + for elm in listing: + if name in listing[elm]: + values[elm] = listing[elm][name] + else: + if defaults is not None: + if name in defaults: + values[elm] = defaults[name] + continue + values[elm] = self._default_values[name] + try: + return lda(values) + except IndexError and KeyError: + print('Unable to continue, lambda targeting "{}" could not operate' + ' correctly on {}. Maybe the field is missing?' + .format(name, element)) + print('[RAW] Extracted matching elements: {}'.format(listing)) + raise BadLambda