Skip to content

Commit

Permalink
adds from_datasources function for graphing multiple artifacts.
Browse files Browse the repository at this point in the history
  • Loading branch information
yampelo committed Nov 13, 2019
1 parent 368fbb0 commit 4abf529
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 3 deletions.
19 changes: 18 additions & 1 deletion README.md
Expand Up @@ -39,7 +39,7 @@ Beagle can be used directly as a python library, or through a provided web inter
<img src ="docs/imgs/upload_to_graph.gif">
</center>

The library can be used either as a sequence of functional calls.
The library can be used either as a sequence of functional calls from a single datasource.

```python
>>> from beagle.datasources import SysmonEVTX
Expand All @@ -49,6 +49,23 @@ The library can be used either as a sequence of functional calls.
<networkx.classes.multidigraph.MultiDiGraph at 0x12700ee10>
```

As a graph generated from a set of multiple artifacts

```python
>>> from beagle.datasources import SysmonEVTX, HXTriage, PCAP
>>> from beagle.backends import NetworkX

>>> nx = NetworkX.from_datasources(
datasources=[
SysmonEVTX("malicious.evtx"),
HXTriage("alert.mans"),
PCAP("traffic.pcap"),
]
)
>>> G = nx.graph()
<networkx.classes.multidigraph.MultiDiGraph at 0x12700ee10>
```

Or by strictly calling each intermediate step of the data source to graph process.

```python
Expand Down
34 changes: 33 additions & 1 deletion beagle/backends/base_backend.py
@@ -1,8 +1,11 @@
from abc import ABCMeta, abstractmethod
from typing import List, Any, Union
from typing import List, Any, Union, TYPE_CHECKING

from beagle.nodes import Node

if TYPE_CHECKING:
from beagle.datasources import DataSource


class Backend(object, metaclass=ABCMeta):
"""Abstract Backend Class. All Backends must implement the
Expand Down Expand Up @@ -56,3 +59,32 @@ def is_empty(self) -> bool:
"""Returns true if there wasn't a graph created.
"""
raise NotImplementedError()

@classmethod
def from_datasources(
cls, datasources: Union["DataSource", List["DataSource"]], *args, **kwargs
) -> "Backend":
"""Create a backend instance from a set of datasources
Parameters
----------
datasources : Union[DataSource, List[DataSource]]
A set of datasources to use when creating the backend.
Returns
-------
Backend
Returns the configured instance
"""

nodes = [] # type: List[Node]

if not isinstance(datasources, List):
datasources = [datasources]

for datasource in datasources:
nodes += datasource.to_transformer().run()

instance = cls(*args, nodes=nodes, **kwargs) # type: ignore

return instance
55 changes: 54 additions & 1 deletion tests/backend/test_networkx.py
@@ -1,5 +1,5 @@
import json
from typing import Callable
from typing import Callable, List

import networkx
import pytest
Expand All @@ -8,6 +8,22 @@
from beagle.nodes import Process, File


from io import BytesIO

from scapy.all import Ether, PcapWriter, Packet, IP, UDP, TCP, DNS, DNSQR, DNSRR

from scapy.layers.http import HTTPRequest, HTTP

from beagle.datasources.pcap import PCAP


def packets_to_datasource_events(packets: List[Packet]) -> PCAP:
f = BytesIO()
PcapWriter(f).write(packets)
f.seek(0)
return PCAP(f) # type: ignore


@pytest.fixture()
def nx() -> Callable[..., NetworkX]:
def _backend(*args, **kwargs) -> networkx.Graph:
Expand Down Expand Up @@ -186,3 +202,40 @@ def test_add_node_overlaps_existing(nx):
assert networkx.has_path(G, u, v2)
assert "Launched" in G[u][v]
assert "Wrote" in G[u][v2]


def test_from_datasources():
packets_1 = [
Ether(src="ab:ab:ab:ab:ab:ab", dst="12:12:12:12:12:12")
/ IP(src="127.0.0.1", dst="192.168.1.1")
/ TCP(sport=12345, dport=80)
/ HTTP()
/ HTTPRequest(Method="GET", Path="/foo", Host="https://google.com")
]

packets_2 = [
# HTTP Packet
Ether(src="ab:ab:ab:ab:ab:ab", dst="12:12:12:12:12:12")
/ IP(src="127.0.0.1", dst="192.168.1.1")
/ TCP(sport=12345, dport=80)
/ HTTP()
/ HTTPRequest(Method="GET", Path="/foo", Host="https://google.com"),
# DNS Packet
Ether(src="ab:ab:ab:ab:ab:ab", dst="12:12:12:12:12:12")
/ IP(src="127.0.0.1", dst="192.168.1.1")
/ UDP(sport=80, dport=53)
/ DNS(rd=1, qd=DNSQR(qtype="A", qname="google.com"), an=DNSRR(rdata="123.0.0.1")),
# TCP Packet
Ether(src="ab:ab:ab:ab:ab:ab", dst="12:12:12:12:12:12")
/ IP(src="127.0.0.1", dst="192.168.1.1")
/ TCP(sport=80, dport=5355),
]

nx = NetworkX.from_datasources(
[packets_to_datasource_events(packets) for packets in [packets_1, packets_2]]
)

# Make the graph
nx.graph()

assert not nx.is_empty()

0 comments on commit 4abf529

Please sign in to comment.