This repository was archived by the owner on Dec 11, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 151
First pass at a lambda-supporting combination class #15
Merged
ArtificialErmine
merged 8 commits into
mitre-attack:master
from
ArtificialErmine:layerops
May 5, 2020
Merged
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
dd2aa5b
First pass at a lambda-supporting combination class
757fedb
Next pass at a layerops solution
dc51429
Addressed first round of review's comments
6a00586
Reorganized things, added more detailed documentation via README
d93fb1c
Cleaned things up slightly, improved documentation
47dc1f2
Further refinement
8b0ecf9
Tweaking import style
d7bc3db
Fixed documentation
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,121 @@ | ||
| # layers | ||
|
|
||
| This folder contains modules and scripts for working with ATT&CK Navigator layers. "ATT&CK Navigator Layers are a set of annotations overlayed on top of the ATT&CK Matrix. For more about ATT&CK Navigator layers, visit the ATT&CK Navigator repository. The core module allows users to load, validate, manipulate, and save ATT&CK layers. A brief overview of the components can be found below. All scripts adhere to the MITRE ATT&CK Navigator Layer file format, [version 3.0](https://github.com/mitre-attack/attack-navigator/blob/develop/layers/LAYERFORMATv3.md). | ||
|
|
||
| #### Core Modules | ||
| | script | description | | ||
| |:-------|:------------| | ||
| | [filter](core/filter.py) | Implements a basic [filter object](https://github.com/mitre-attack/attack-navigator/blob/develop/layers/LAYERFORMATv3.md#filter-object-properties). | | ||
| | [gradient](core/gradient.py) | Implements a basic [gradient object](https://github.com/mitre-attack/attack-navigator/blob/develop/layers/LAYERFORMATv3.md#gradient-object-properties). | | ||
| | [layer](core/layer.py) | Provides an interface for interacting with core module's layer representation. A further breakdown can be found in the corresponding section below. | | ||
| | [layout](core/layout.py) | Implements a basic [layout object](https://github.com/mitre-attack/attack-navigator/blob/develop/layers/LAYERFORMATv3.md#layout-object-properties). | | ||
| | [legenditem](core/legenditem.py) | Implements a basic [legenditem object](https://github.com/mitre-attack/attack-navigator/blob/develop/layers/LAYERFORMATv3.md#legenditem-object-properties). | | ||
| | [metadata](core/metadata.py) | Implements a basic [metadata object](https://github.com/mitre-attack/attack-navigator/blob/develop/layers/LAYERFORMATv3.md#metadata-object-properties). | | ||
| | [technique](core/technique.py) | Implements a basic [technique object](https://github.com/mitre-attack/attack-navigator/blob/develop/layers/LAYERFORMATv3.md#technique-object-properties). | | ||
|
|
||
| #### Manipulator Scripts | ||
| | script | description | | ||
| |:-------|:------------| | ||
| | [layerops](manipulators/layerops.py) | Provides a means by which to combine multiple ATT&CK layer objects in customized ways. A further breakdown can be found in the corresponding section below. | | ||
|
|
||
| ## Layer | ||
| The Layer class provides format validation and read/write capabilities to aid in working with ATT&CK Navigator Layers in python. It is the primary interface through which other Layer-related classes defined in the core module should be used. The Layer class API and a usage example are below. | ||
|
|
||
| | method [x = Layer()]| description | | ||
| |:-------|:------------| | ||
| | x.from_str(_input_) | Loads an ATT&CK layer from a string representation of a json layer. | | ||
| | x.from_dict(_input_) | Loads an ATT&CK layer from a dictionary. | | ||
| | x.from_file(_filepath_) | Loads an ATT&CK layer from a file location specified by the _filepath_. | | ||
| | x.to_file(_filepath_) | Saves the current state of the loaded ATT&CK layer to a json file denoted by the _filepath_. | | ||
| | x.to_dict() | Returns a representation of the current ATT&CK layer object as a dictionary. | | ||
| | x.to_str() | Returns a representation of the current ATT&CK layer object as a string representation of a dictionary. | | ||
|
|
||
| #### Example Usage | ||
|
|
||
| ```python | ||
| example_layer_dict = { | ||
| "name": "example layer", | ||
| "version": "3.0", | ||
| "domain": "mitre-enterprise" | ||
| } | ||
|
|
||
| example_layer_location = "/path/to/layer/file.json" | ||
| example_layer_out_location = "/path/to/new/layer/file.json" | ||
|
|
||
| from layers.core import Layer | ||
|
|
||
| layer1 = Layer(example_layer_dict) # Create a new layer and load existing data | ||
| layer1.to_file(example_layer_out_location) # Write out the loaded layer to the specified file | ||
|
|
||
| layer2 = Layer() # Create a new layer object | ||
| layer2.from_dict(example_layer_dict) # Load layer data into existing layer object | ||
| print(layer2.to_dict()) # Retrieve the loaded layer's data as a dictionary, and print it | ||
|
|
||
| layer3 = Layer() # Create a new layer object | ||
| layer3.from_file(example_layer_location) # Load layer data from a file into existing layer object | ||
| ``` | ||
|
|
||
| ## layerops.py | ||
| Layerops.py provides the LayerOps class, which is a way to combine layer files in an automated way, using user defined lambda functions. Each LayerOps instance, when created, ingests the provided lambda functions, and stores them for use. An existing LayerOps class can be used to combine layer files according to the initialized lambda using the process method. The breakdown of this two step process is documented in the table below, while examples of both the list and dictionary modes of operation can be found below. | ||
|
|
||
| ##### LayerOps() | ||
| ```python | ||
| x = LayerOps(score=score, comment=comment, enabled=enabled, colors=colors, metadata=metadata, name=name, desc=desc, default_values=default_values) | ||
| ``` | ||
|
|
||
| Each of the _inputs_ takes a lambda function that will be used to combine technique object fields matching the parameter. The one exception to this is _default_values_, which is an optional dictionary argument containing default values to provide the lambda functions if techniques of the combined layers are missing them. | ||
|
|
||
| ##### .process() Method | ||
| ```python | ||
| x.process(data, default_values=default_values) | ||
| ``` | ||
| The process method applies the lambda functions stored during initialization to the layer objects in _data_. _data_ must be either a list or a dictionary of Layer objects, and is expected to match the format of the lambda equations provided during initialization. default_values is an optional dictionary argument that overrides the currently stored default | ||
| values with new ones for this specific processing operation. | ||
|
|
||
| #### Example Usage | ||
| ```python | ||
| from layers.manipulators.layerops import LayerOps | ||
| from layers.core.layer import Layer | ||
|
|
||
| demo = Layer() | ||
| demo.from_file("C:\Users\attack\Downloads\layer.json") | ||
| demo2 = Layer() | ||
| demo2.from_file("C:\Users\attack\Downloads\layer2.json") | ||
| demo3 = Layer() | ||
| demo3.from_file("C:\Users\attack\Downloads\layer3.json") | ||
|
|
||
| # Example 1) Build a LayerOps object that takes a list and averages scores across the layers | ||
| lo = LayerOps(score=lambda x: sum(x) / len(x), | ||
| name=lambda x: x[1], | ||
| desc=lambda x: "This is an list example") # Build LayerOps object | ||
| out_layer = lo.process([demo, demo2]) # Trigger processing on a list of demo and demo2 layers | ||
| out_layer.to_file("C:\demo_layer1.json") # Save averaged layer to file | ||
| out_layer2 = lo.process([demo, demo2, demo3]) # Trigger processing on a list of demo, demo2, demo3 | ||
| visual_aid = out_layer2.to_dict() # Retrieve dictionary representation of processed layer | ||
|
|
||
| # Example 2) Build a LayerOps object that takes a dictionary and averages scores across the layers | ||
| lo2 = LayerOps(score=lambda x: sum([x[y] for y in x]) / len([x[y] for y in x]), | ||
| color=lambda x: x['b'], | ||
| desc=lambda x: "This is a dict example") # Build LayerOps object, with lambda | ||
| out_layer3 = lo2.process({'a': demo, 'b': demo2}) # Trigger processing on a dictionary of demo and demo2 | ||
| dict_layer = out_layer3.to_dict() # Retrieve dictionary representation of processed layer | ||
| print(dict_layer) # Display retrieved dictionary | ||
| out_layer4 = lo2.process({'a': demo, 'b': demo2, 'c': demo3})# Trigger processing on a dictionary of demo, demo2, demo3 | ||
| out_layer4.to_file("C:\demo_layer4.json") # Save averaged layer to file | ||
|
|
||
| # Example 3) Build a LayerOps object that takes a single element dictionary and inverts the score | ||
| lo3 = LayerOps(score=lambda x: 100 - x['a'], | ||
| desc= lambda x: "This is a simple example") # Build LayerOps object to invert score (0-100 scale) | ||
| out_layer5 = lo3.process({'a': demo}) # Trigger processing on dictionary of demo | ||
| print(out_layer5.to_dict()) # Display processed layer in dictionary form | ||
| out_layer5.to_file("C:\demo_layer5.json") # Save inverted score layer to file | ||
|
|
||
| # Example 4) Build a LayerOps object that combines the comments from elements in the list, with custom defaults | ||
| lo4 = LayerOps(score=lambda x: '; '.join(x), | ||
| default_values= { | ||
| "comment": "This was an example of new default values" | ||
| }, | ||
|
ArtificialErmine marked this conversation as resolved.
|
||
| desc= lambda x: "This is a defaults example") # Build LayerOps object to combine descriptions, defaults | ||
| out_layer6 = lo4.process([demo2, demo3]) # Trigger processing on a list of demo2 and demo0 | ||
| out_layer6.to_file("C:\demo_layer6.json") # Save combined comment layer to file | ||
| ``` | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,10 @@ | ||
| from .layer import Layer | ||
| from .exceptions import * | ||
| from .filter import Filter | ||
| from .gradient import Gradient | ||
| from .layerobj import _LayerObj | ||
| from .layout import Layout | ||
| from .legenditem import LegendItem | ||
| from .metadata import Metadata | ||
| from .technique import Technique | ||
|
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,87 @@ | ||
| UNSETVALUE = '(x)' | ||
|
|
||
|
|
||
| class BadInput(Exception): | ||
| pass | ||
|
|
||
|
|
||
| class BadType(Exception): | ||
| pass | ||
|
|
||
|
|
||
| class UninitializedLayer(Exception): | ||
| pass | ||
|
|
||
|
|
||
| class UnknownLayerProperty(Exception): | ||
| pass | ||
|
|
||
|
|
||
| class UnknownTechniqueProperty(Exception): | ||
| pass | ||
|
|
||
|
|
||
| def handler(caller, msg): | ||
| """ | ||
| Prints a debug/warning/error message | ||
| :param caller: the entity that called this function | ||
| :param msg: the message to log | ||
| """ | ||
| print('[{}] - {}'.format(caller, msg)) | ||
|
|
||
|
|
||
| def typeChecker(caller, testee, type, field): | ||
| """ | ||
| Verifies that the tested object is of the correct type | ||
| :param caller: the entity that called this function (used for error | ||
| messages) | ||
| :param testee: the element to test | ||
| :param type: the type the element should be | ||
| :param field: what the element is to be used as (used for error | ||
| messages) | ||
| :raises BadType: error denoting the testee element is not of the | ||
| correct type | ||
| """ | ||
| if not isinstance(testee, type): | ||
| handler(caller, '{} [{}] is not a {}'.format(testee, field, | ||
| str(type))) | ||
| raise BadType | ||
|
|
||
|
|
||
| def typeCheckerArray(caller, testee, type, field): | ||
| """ | ||
| Verifies that the tested object is an array of the correct type | ||
| :param caller: the entity that called this function (used for error | ||
| messages) | ||
| :param testee: the element to test | ||
| :param type: the type the element should be | ||
| :param field: what the element is to be used as (used for error | ||
| messages) | ||
| :raises BadType: error denoting the testee element is not of the | ||
| correct type | ||
| """ | ||
| if not isinstance(testee, list): | ||
| handler(caller, '{} [{}] is not a {}'.format(testee, field, | ||
| "Array")) | ||
| raise BadType | ||
| if not isinstance(testee[0], type): | ||
| handler(caller, '{} [{}] is not a {}'.format(testee, field, | ||
| "Array of " + type)) | ||
| raise BadType | ||
|
|
||
|
|
||
| def categoryChecker(caller, testee, valid, field): | ||
| """ | ||
| Verifies that the tested object is one of a set of valid values | ||
| :param caller: the entity that called this function (used for error | ||
| messages) | ||
| :param testee: the element to test | ||
| :param valid: a list of valid values for the testee | ||
| :param field: what the element is to be used as (used for error | ||
| messages) | ||
| :raises BadInput: error denoting the testee element is not one of | ||
| the valid options | ||
| """ | ||
| if testee not in valid: | ||
| handler(caller, '{} not a valid value for {}'.format(testee, field)) | ||
| raise BadInput |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,65 @@ | ||
| try: | ||
| from ..core.exceptions import typeCheckerArray, categoryChecker, \ | ||
| UNSETVALUE | ||
| except ValueError: | ||
| from core.exceptions import typeCheckerArray, categoryChecker, \ | ||
| UNSETVALUE | ||
|
|
||
|
|
||
| class Filter: | ||
| def __init__(self, domain="mitre-enterprise"): | ||
| """ | ||
| Initialization - Creates a filter object, with an optional | ||
| domain input | ||
|
|
||
| :param domain: The domain used for this layer (mitre-enterprise | ||
| or mitre-mobile) | ||
| """ | ||
| self.__stages = UNSETVALUE | ||
| self.domain = domain | ||
| self.__platforms = UNSETVALUE | ||
|
|
||
| @property | ||
| def stages(self): | ||
| if self.__stages != UNSETVALUE: | ||
| return self.__stages | ||
|
|
||
| @stages.setter | ||
| def stages(self, stage): | ||
| typeCheckerArray(type(self).__name__, stage, str, "stage") | ||
| categoryChecker(type(self).__name__, stage[0], ["act", "prepare"], | ||
| "stages") | ||
| self.__stages = stage | ||
|
|
||
| @property | ||
| def platforms(self): | ||
| if self.__platforms != UNSETVALUE: | ||
| return self.__platforms | ||
|
|
||
| @platforms.setter | ||
| def platforms(self, platforms): | ||
| typeCheckerArray(type(self).__name__, platforms, str, "platforms") | ||
| self.__platforms = [] | ||
| valids = ["Windows", "Linux", "macOS", "AWS", "GCP", "Azure", | ||
| "Azure AD", "Office 365", "SaaS"] | ||
| if self.domain == "mitre-mobile": | ||
| valids = ['Android', 'iOS'] | ||
| for entry in platforms: | ||
| categoryChecker(type(self).__name__, entry, valids, "platforms") | ||
| self.__platforms.append(entry) | ||
|
|
||
| def get_dict(self): | ||
| """ | ||
| Converts the currently loaded data into a dict | ||
| :returns: A dict representation of the local filter object | ||
| """ | ||
| temp = dict() | ||
| listing = vars(self) | ||
| for entry in listing: | ||
| if entry == 'domain': | ||
| continue | ||
| if listing[entry] != UNSETVALUE: | ||
| temp[entry.split(type(self).__name__ + '__')[-1]] \ | ||
| = listing[entry] | ||
| if len(temp) > 0: | ||
| return temp |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,55 @@ | ||
| try: | ||
| from ..core.exceptions import typeChecker, typeCheckerArray | ||
| except ValueError: | ||
| from core.exceptions import typeChecker, typeCheckerArray | ||
|
|
||
|
|
||
| class Gradient: | ||
| def __init__(self, colors, minValue, maxValue): | ||
| """ | ||
| Initialization - Creates a gradient object | ||
|
|
||
| :param colors: The array of color codes for this gradient | ||
| :param minValue: The minValue for this gradient | ||
| :param maxValue: The maxValue for this gradient | ||
| """ | ||
| self.colors = colors | ||
| self.minValue = minValue | ||
| self.maxValue = maxValue | ||
|
|
||
| @property | ||
| def colors(self): | ||
| return self.__colors | ||
|
|
||
| @colors.setter | ||
| def colors(self, colors): | ||
| typeCheckerArray(type(self).__name__, colors, str, "colors") | ||
| self.__colors = [] | ||
| for entry in colors: | ||
| self.__colors.append(entry) | ||
|
|
||
| @property | ||
| def minValue(self): | ||
| return self.__minValue | ||
|
|
||
| @minValue.setter | ||
| def minValue(self, minValue): | ||
| typeChecker(type(self).__name__, minValue, int, "minValue") | ||
| self.__minValue = minValue | ||
|
|
||
| @property | ||
| def maxValue(self): | ||
| return self.__maxValue | ||
|
|
||
| @maxValue.setter | ||
| def maxValue(self, maxValue): | ||
| typeChecker(type(self).__name__, maxValue, int, "maxValue") | ||
| self.__maxValue = maxValue | ||
|
|
||
| def get_dict(self): | ||
| """ | ||
| Converts the currently loaded gradient file into a dict | ||
| :returns: A dict representation of the current gradient object | ||
| """ | ||
| return dict(colors=self.__colors, minValue=self.__minValue, | ||
| maxValue=self.maxValue) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem to work for me:
Changing the import at the beginning of the layerops script to
from layers.core.layer import Layerseems to fix the issue, but it assumes the working directory is the root of this repo. If I'm running from thelayersfolder and runfrom manipulators.layerops import LayerOps, the absolute import breaks.I'm not sure what the best practice is with this — a relative import, or just tell the user to always import from the repo root folder? If the latter, please specify where this is being run from in the example as well as putting towards the top of the file "all scripts should be imported and run from the root folder of this repository."
Long term if this is going to become a pip package this probably needs to be relative. Not sure if it's worth doing now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Speaking of importing, this conversation doesn't seem to have been addressed: #15 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was, I just failed to notice that the updated init.py wasn't included in the update for some reason. I've fixed this, and it appears to work properly from wherever now.