Malware Detection by Memory Baselining
Switch branches/tags
Nothing to show
Clone or download
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Failed to load latest commit information.
mamba-codeid
mamba-common
mamba-ruleengine
mamba
.gitignore
COPYING
MANIFEST.in
README.md
setup.cfg
setup.py

README.md

Mamba Anomaly Guard - Malware Detection by Memory Baselining

Mamba Anomaly Guard is a system for malware detection on live systems or with dumped memory images. By comparing operating system artifacts like processes, loaded libraries and code with various methods against a baseline, unknown and therefore possibly malicious artifacts can be detected for further investigation.

Mamba's Rule Engine allows users to write or customize new or existing detection rules without deep knowledge of the system. Read the section Rule Engine for more information.

Installation

Mamba requires a Python 3 virtual env. The standalone version depends on Rekall.

Create virtual env

On Linux, you can create a virtual env with

$ virtualenv /tmp/mamba_env -p /usr/bin/python3
Already using interpreter /usr/bin/python3
Using base prefix '/usr'
New python executable in /tmp/mamba_env/bin/python3
Also creating executable in /tmp/mamba_env/bin/python
Installing setuptools, pkg_resources, pip, wheel...done.
$ source /tmp/mamba_env/bin/activate
$ pip install --upgrade setuptools pip wheel

Install from GitHub

If you use Ubuntu, ensure you installed the following packages:

  • python3-dev
  • libfuzzy-dev (for building ssdeep)
  • libncurses5-dev (needed by Rekall)
sudo apt install python3-dev libfuzzy-dev libncurses5-dev

Install Mamba Anomaly Guard in the following order to ensure all dependencies are met with the correct versions.

$ # activate virtual env
$ git clone https://github.com/mambalabs/anomaly_guard.git anomaly_guard
$ pip install -e anomaly_guard/mamba-common
$ pip install -e anomaly_guard/mamba-codeid
$ pip install -e anomaly_guard/mamba-ruleengine
$ pip install -e anomaly_guard/

Usage

Currently, Mamba Anomaly Guard can be used as standalone tool to detect malware in Windows 7 x64 memory images. Rekall is used to parse the image, therefore the image format has to be supported by Rekall and a profile has to be available.

The results can be uploaded to a result server (TODO: explain) or will be printed to STDOUT if the -u flag is not provided.

Options

$ mamba
usage: mamba [-h] [-n HOSTNAME] [-c COMMENT] [-u] memory_image

Mamba Anomaly Guard

positional arguments:
  memory_image  Path to memory image file

optional arguments:
  -h, --help    show this help message and exit
  -n HOSTNAME   Memory image source hostname
  -c COMMENT    Comment
  -u            Upload the results to the detection server

Memory Baselining

Memory artifacts can be used to build a baseline which represents all known good states of a set of systems. This works very well for a small set of homogeneous systems, but is also possible for heterogeneous environments.

Only attributes of artifacts with the following characteristics can be considered:

  • Metadata: static across reboots, e.g. no memory addresses
  • Common to multiple installations, e.g. not unique on each system
  • Non-metadata: static content or existence of a function which can transform the variable data to a static output, e.g. the entropy distribution of code.

For all memory attributes used for baselining, a unique ID is calculated by building a hash over the (normalized) identifying attributes of an artifact.

Code Baselining

TODO

Rule Engine

The Rule Engine processes the memory artifacts according to the defined rules. Before the attributes from a system can be compared against the baseline, they have to be normalized and transformed to a common structure. The result of a comparison always contains the identifier of the artifact which enables linking it with the original data.

Data which has to be checked against the baseline is referred as "artifact data" in the rules,

Pre-defined rules

The pre-defined rules are located in the directory rules/rules-available. To activate a rule, create a symlink (or a copy if your file system doesn't support symlinks) to rules-enabled. The following rules are provided:

  • KnownProcesses: Checks whether a process, identified by it's main image, is known
  • ProcessRelations: Checks whether the parent-child-relations between processes are known
  • KnownDlls: Checks the loaded DLLs of processes which are identified by path and size
  • Injections: Checks potential code injections based on their size and entropy distribution of their code
  • LoadedInjections: Correlates potential injections with loaded DLLs (needs no baseline)

Rule types

The Rule Engine provides different rule types for different kinds of artifact types and comparison mechanisms.

ProcessRule

Base rule for process related artifacts which will be compared against the baseline. It provides default filters, validators as well as a rich output for STDOUT and JSON for common artifacts like the direct attributes of a process, images (DLLs) etc.

SystemRule

Base rule for system wide artifacts like loaded kernel modules which will be compared against the baseline.

ProcessAnomalyRule

Base rule for process related artifacts which will be evaluated without using a baseline.

SystemAnomalyRule

Base rule for system wide related artifacts which will be evaluated without using a baseline.

Writing an attribute check rule

Rules which only check if an artifact defined by a subset of it's attributes is known can be defined with a simple Python dictionary which defines the attributes to check and their type. The attributes can be nested if the original data is nested.

Rules are applied on system level, therefore the process has to be included if the rule checks sub-artifacts from processes.

Example

class KnownDllRule(ProcessRule):
    output_images_if_ok = True
    identifier = "KnownDlls"
    fields = {
        "name": str,
        "main_image_path": str,
        "images": {
            "path": str,
            "size": int,
        }
    }

Writing a complex rule

For rules which require more logic than comparing attributes, the following methods provided by the Rule Engine can be overwritten:

  • run: Method which is called when an output is requested from a rule by render or to_json
  • validate_result: Calls run and may run additional validation functions on the result data
  • get_artifact_data: fetches the artifact data, e.g. from the Mamba session
  • get_baseline_data: fetches the baseline data if it's a baseline rule
  • transform_artifact_item: Is called on each artifact item to do the needed calculations and normalizations
  • transform_baseline_item: Is called on each baseline item to do the needed calculations and normalizations
  • compare: Compares the artifact items against the baseline items
  • render: Writes the result to STDOUT
  • to_json: Returns the result data as JSON for further processing

The code below is an example and does not represent the complete Injections rule

class InjectionRule(ProcessRule):
    identifier = "Injections"
    fields = {
        "name": str,
        "injections": {
            "size": int,
            "entropy_vector": str,
        }
    }

    def render(self):
        table_header = ["Rule", "PID", "Name", "Type", "Artifact", "Anomalies"]
        for process in self.filter_result():
            for injection in process['injections']:
                if not injection['ok']:
                    table_body.append([...])

    def to_json(self, filter_result=True):
        result = []
        for process in process_iterator():
            process_dict = ... # create process dict

            result.append(process_dict)

        out = {
            "rule_identifier": self.identifier,
            "data": self._remove_internal_fields(result),
            "rule_type": self.rule_type
        }
        return out

    def validate_result(self, validation_stages=list()):
        if self.compare_result is None:
            self.run()
        self._result_is_validated = True

    def run(self):
        new_artifacts = {}
        baseline_artifacts = {}

        for process in self.get_artifact_data():
            if process.injections != {}:
                new_artifacts[process.id] = self.transform_artifact_item(process.injections)

        self.get_baseline_data()
        for process_id in self._baseline_data:
            baseline_artifacts[process_id] = self.transform_baseline_item(self._baseline_data[process_id])

        return self.compare(baseline_artifacts, new_artifacts)

    def get_baseline_data(self):
        for process in self._artifact_data:
            if process.injections != {}:
                injections = self.baseline_client.get_injections(process.uid)
                if not injections:
                    injections = self.baseline_client.get_injections(process.name)

        return self._baseline_data

    def transform_artifact_item(self, artifact_item):
        injection_entropy_vectors = {}

        for injection in artifact_item.values():
            entropy_vector = {}
            values = [float(v) for v in getattr(injection, 'entropy_vector_as_string').split(":")]
            for offset in range(len(values)):
                entropy_vector[offset] = values[offset]
            injection_entropy_vectors[injection.base] = entropy_vector

        return injection_entropy_vectors

    def transform_baseline_item(self, baseline_item):
        injections_by_size = {}
        for injection in baseline_item:
            if injection['size'] not in injections_by_size:
                injections_by_size[injection['size']] = []
            injections_by_size[injection['size']].append(injection)

        injection_entropy_matrix = {}
        for size in injections_by_size:
            entropy_matrix = {}
            for injection in injections_by_size[size]:
                ...

            injection_entropy_matrix[size] = entropy_matrix

        return injection_entropy_matrix

    def compare(self, baseline_artifacts, new_artifacts):
        self.compare_result = ArtifactResultContainer(self.__class__)
        for process_id in new_artifacts:
            self.compare_result[process_id] = self.session.processes[process_id].as_dict(include_images=False)
            self.compare_result[process_id]['ok'] = True

            for injection_base in new_artifacts[process_id]:
                injection_vector = new_artifacts[process_id][injection_base]
                injection_size = len(injection_vector)

                if injection_size not in baseline_artifacts[process_id]:
                    ...
                    self._mark_injection(...)
                    continue

                boundary_matrix = baseline_artifacts[process_id][injection_size]
                out_of_bound_sum = self._get_out_of_bound_sum(injection_vector, boundary_matrix)
                if out_of_bound_sum == 0:
                    self._mark_injection(process_id, injection_base, True)
                else:
                    self._mark_injection(...)

        return self.compare_result