# Stamus Workshop

## SELKS

* https://www.stamus-networks.com/selks

SELKS stands for Suricata, Elasticsearch, Logstash, Kibana, Scirius. It also includes Arkime, Evebokx and CyberChef, though I suppose there the acronym was already set. It is a fully open-source and free to use. One might rightfully assume that Stamus Security Platform, our commertial product, is basically SELKS with support. But that's actually not correct. We treat SELKS and SSP as totally separate products aimed at different audiences. 

SELKS is a beginner-friendly Suricata stack for enthusiasts, open-source aficionados, and for smaller deployments. It is an suiteable for small to medium sized organizations, or for PoC setups in larger organizations. It also works well for forensics and trainings. By comparison, SSP is mostly used by large enterprises that often monitor multiple 10 or 40 gigabit links. We also offer smaller probes like 100 megabit and 1 gigabit, but our most popular solution is in 10 gigabit range. We are currently also working on 100 gigabit probe.

To sum it up:

* SELKS is a single-box distribution with no probe management, whereas SSP comprises of management server (SCS) and any number of licenced probes;
* SSP probes are pre-optimized for specific traffic sizes. We licence hardware in addition to software, meaning customer receives optimized hardware appliance for traffic capture;
* SELKS deployment size is basically up to the user. Capture hardware, suricata and elasticsearch optimizations need to be customized per deployment;
* SSP probe is not just a server running Suricata. It's an appliance with custom Suricata build and event post-processing pipeline. By comparison, SELKS uses logstash to ingest EVE JSON logs from local open-source Suricata into elasticsearch with no additional processing;
* Scirius hunting dashboard is basically the same, but SSP has additional filtersets;
* SSP has a backend threat intel feed to highlight and contextualize high fidelity alerts;
* No post-processing means SELKS is lacking advanced analytics features, such as DoC (declaration of compromise) contextualization, host fingerprinting, beacon detection, etc;
* SELKS ruleset management is functionally identical to SSP, but the latter packages ET PRO with the product (if licenced accordingly);
* We also have original Stamus rulesets for threat hunting. They are mostly open-source and can be enabled in SELKS, but we provide propiretary versions that are optimized for SSP;

### SELKS on Docker

SELKS was originally envisioned as security distribution (sales like to use the term "turnkey solution"). Similar to Security Onion, but focused on Suricata data. That is still the case. We have major releases that can be installed as ISO images. But technology marches on and anyone can now set up latest and greatest codebase as a docker stack.

Firstly, we need to clone the SELKS repository from github.

```
git clone https://github.com/StamusNetworks/SELKS.git
```

Then navigate to `docker` subfolder in the newly cloned repository.

```
cd SELKS/docker
```

### (Optional) - set up a clean vagrant env

While the stack can be spun up on bere metal, I suggest a clean environment for testing. Vagrant is a useful tool quickly spinning one up. Note that we generally suggest at least 8 gigabytes of memory for this host. If you want to use [vagrant](https://developer.hashicorp.com/vagrant/downloads?product_intent=vagrant), then simply add this content into `Vagrantfile` in `SELKS/docker` folder.

```ruby
$provision = <<-SCRIPT
  export DEBIAN_FRONTEND=noninteractive

  apt-get update
SCRIPT

NAME = 'selks'.freeze
CPU = 4
MEM = 8192

Vagrant.configure(2) do |config|
  config.vm.define NAME do |box|
    box.vm.box = 'debian/bullseye64'
    box.vm.hostname = NAME
    box.vm.network :private_network, ip: '192.168.56.10'
    box.vm.synced_folder '.', '/selks', type: 'rsync', rsync__exclude: '.git/'
    box.vm.provider :virtualbox do |vb|
      vb.customize ['modifyvm', :id, '--memory', MEM]
      vb.customize ['modifyvm', :id, '--cpus', CPU]
    end
    box.vm.provider 'libvirt' do |v, _|
      v.cpus = CPU
      v.memory = MEM
    end
    box.vm.provision 'shell', inline: $provision
  end
end
```

Then start the vagrant provisioner.

```
vagrant up
```

SSH into the box.

```
vagrant ssh
```

And navigate to the folder where selks source was synced. Note that `synced_folder` must not be a file share. Selks setup script will set up and mount data directories into docker containers. So it will run into permission issues if we mount the folder. Example vagrantfile uses `rsync` provisioner instead to bypass the issue.

```
cd /selks
```

### Virtual capture interface


Before setting up the SELKS instance, we should create a dummy interface for Suricata to listen on. Normally you'd want to capture data off a real NIC, but this setup is meant for investigating malware PCAPs. So, it's better to create a psuedo interface for traffic replay.

```
sudo ip link add tppdummy0 type dummy
sudo ip link set tppdummy0 up 
sudo ip link set dev tppdummy0 mtu 9000
```

**These commands are not persistent!** They need to be reapplied after reboot.

### Setup

Once done, launch the easy-setup script. Normally it would be interactive, but we'll use a noninteractive version to quickly set up a playground instance. **Note that this non-interactive version assumes a clean box and will install docker for you**.

```
sudo ./easy-setup.sh  --non-interactive -i tppdummy0 --iA --es-memory 4G --ls-memory 2G
```

It's mostly just a wrapper to set up dependencies and `.env` file that docker-compose would then be using. It will also pull docker images. Feel free to experiment with other options. If you feel proficient, then `.env` can also me modified directly.

In fact, we should actually do it since the default image is pretty old and does not have new frontend design that matches with SSP. We can change that easily by defining a much newer image. Just add this line at the end of the `.env` file.

```
SCIRIUS_VERSION=master-20231006
```

Finally start up the docker stack.

```
sudo -E docker compose up -d
```

Finally, visit the external IP port 443 of whatever server you chose to deploy SELKS. Default user is `selks_user`. Password is the same.

### PCAP replay

Once the stack is up and running, we can use the newly created dummy interface for traffic replay. But before we do, we need a PCAP file. A good source is [Malware Traffic Analysis](https://malware-traffic-analysis.net). Feel free to pick a case study you'd like to investigate. Each case usually has a zipped PCAP file. Standard practice is to password protect those files to avoid having them trigger IDS systems. Password is always `infected`.

Some good samples are:
* [Web server scanning traffic](https://malware-traffic-analysis.net/2022/01/03/2022-01-01-thru-03-server-activity-with-log4j-attempts.pcap.zip);
* [Malware infecting workstation and then compromises DC](https://malware-traffic-analysis.net/2020/03/04/2020-03-04-Trickbot-spreads-from-client-to-DC.pcap.zip)

Then download a sample and unzip it.

```
wget "https://malware-traffic-analysis.net/2022/01/03/2022-01-01-thru-03-server-activity-with-log4j-attempts.pcap.zip"
unzip 2022-01-01-thru-03-server-activity-with-log4j-attempts.pcap.zip
```

And start the replay. Note that most MTA PCAPs are really small, so adjust the `pps` (packets per second) accordingly. Most files will be processed really fast and scirius might not be able to draw out a timeline. Lower value might be better for smaller files. Simply don't make it so small you need to wait too long. Your mileage might vary.

```
sudo apt install tcpreplay
```


```
sudo tcpreplay --pps 200 -i tppdummy0 2022-01-01-thru-03-server-activity-with-log4j-attempts.pcap
```

Hunting dashboard should then be populated by data. Could be that some cases do not trigger many (or any) rules. That does not mean we don't have data! Simply peek into kibana or evebox instead.

To reset the state before replaying another PCAP, use a cleanup script. It will wipe all events from elasticsearch.

```
sudo ./scripts/cleanup.sh
```

## XS Hunting with SSP

Here are a few case studies to explore with SSP and XS data. General hunting flow is:

* apply filters to find interesting events;
* use filtersets to streamline the process or when unsure what to do;
* investigate individual events or interesting metadata values;
* use host insights for pivoting;

### Executable downloads via Powershell

![Powershell filterset](img/hy-powershell-filters.png)

### HTTP user-agents

![Suspicious user agents](img/hy-one-word-ua.png)

### Discovery

![SMB EXE discovery](img/hy-discovery.png)

## The REST API

A little known feature in our products is the ability to query the REST API. REST, which stands for REpresentational State Transfer, is a standard paradigm for building web applications whereby backend server is responsible for frontend components via API requests. In our case, most frontend components simply fetch and display data from backend URLs. Important part being that we have already implemented a number of useful API endpoints to fetch useful data. It's also fairly simple to add new endpoints.

But before we can discuss newly added endpoints or even how anyone could contribute to adding them, we must first explore how API queries work. In short, anyone with proper *API token* is able to issue authenticated requests to endpoints. To generate that token, we must first navigate to `Account Settings` section which is available at the top right corner of the title menu.

![Account Settings](img/account-settings.png)

Then on the left hand side, choose `Edit Token`.

![Edit Token](img/edit-token.png)

Finally, the token will be visible in the `Token` field. If empty, then simply click `Regenerate` button to create a new one. Then copy the value to a keychain or password safe of your choice.

![Generate Token](img/generate-token.png)

Once we have found our token, we can start issuing queries to Scirius REST API. We can even fetch data from the command line! Simply point you web client to the appliance IP or fully qualified domain name with API endpoint in the *URI path*. API token must be defined within the `Authorization` header.

```bash
curl -XGET "https://$SELKS_OR_SSP/rest/rules/es/alerts_count/" \
    -H "Authorization: Token $TOKEN" \
    -H 'Content-Type: application/json'
```

This very simple endpoint returns the number of alerts that match within given time period. If left undefined, it will default to 30 days in the past to now.

```json
{
  "prev_doc_count": 0,
  "doc_count": 810605
}
```

We can pull data directly from any SELKS or SSP instance. Directly from command line. That's pretty cool! But let's look at something more powerful.

### Scirius REST API with Python

Firstly, we need to point our notebooks to the right host. We also need to store the authentication token along with any parameters that might alter the connection. After all, hard coding variables like this into each notebook will severely diminish their usability. And to make matters worse, committing and pushing API tokens is a security breach. To keep things simple, we decided to use `.env` files. In fact, our SELKS on docker setup uses the same method, so it was only natural to use it for notebooks as well. It can be set up as described in [Suricata Analytics main README file](https://github.com/StamusNetworks/suricata-analytics/tree/main#jupyter).

```bash
SCIRIUS_TOKEN=<TOKEN VALUE>
SCIRIUS_HOST=<IP or Hostname>
SCIRIUS_TLS_VERIFY=yes
```

For now we handle a very limited set of options. Those being the token value itself, server IP or hostname, and an option to disable TLS verification if using self-signed certificates. Latter being the default for most lab setups and out of the box SELKS installations.

Python has`dotenv` package to import variables in this file into python session. Once imported, `dotenv_values` allows us to use variables in environment file like any other python dictionary. Note that Suricata Analytics project includes a reference docker container which mounts the environment file from project root directory into the home folder of container. Subsequent example is written with this in mind.

In [None]:
from dotenv import dotenv_values
import os

In [None]:
CONFIG = dotenv_values("../../.env")

We can use Python `requests` package to interact with Scirius REST API. But before we do, we need to set up some parameters. Like before, the API token is passed with `Authorization` header. Though this time it's more structured. We can also use the environment dictionary to dynamically build the URL and authentication.

In [None]:
import requests
URL = "https://{host}/rest/rules/es/events_tail".format(host=CONFIG["SCIRIUS_HOST"])
HEADERS = {
    "Authorization": "Token {token}".format(token=CONFIG["SCIRIUS_TOKEN"])
}

Each API endpoint usually defines it's own parameters. But some are common for most. The important ones being:
* `qfilter` for passing a KQL style query to the endpoint;
* `from_date` unix epoch to define point in time from which we want to retrieve the events;
* `to_date` unix epoch to define point in time to which the data should be retrieved;
* `page_size` how many documents should be fetched;

Note that we can pass any Kibana style query to the endpoint using the `qfilter` parameter. Essentially allowing us to fetch any data we want. We can also modify the query the query period. The default is to fetch data from last 30 days. This is something to be careful with since many queries might match more documents than what's returned by Elasticsearch. A wide query over past 30 days with default page size would return a tiny sample of overall data, and would thus not be very useful.

Ideally, we would need to fetch something specific. For example, we might be interested in `http` events where HTTP URI contains a command injection.

In [None]:
from datetime import datetime, timedelta, timezone

In [None]:
from_date = datetime.now(timezone.utc) - timedelta(days=365)
to_date = datetime.now(timezone.utc)

In [None]:
GET_PARAMS = {
    "qfilter": "event_type: alert AND alert.signature: *cobalt*",
    "page_size": 100,
    "from_date": int(from_date.strftime('%s')) * 1000,
    "to_date": int(to_date.strftime('%s')) * 1000
}

Most data can simply be fetched with HTTP GET requests. A very powerful API endpoint to get started with is `events_tail` which allows the user to query raw EVE events.

In [None]:
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

In [None]:
resp = requests.get(URL,
                    headers=HEADERS,
                    verify=False if CONFIG["SCIRIUS_TLS_VERIFY"] == "no" else True,
                    params=GET_PARAMS)

Once the data is retrieved, we can simply load the values from `results` JSON key and pass them to Pandas `json_normalize` helper to build a flat dataframe of EVE events. Once done, we can interact with the data as described in previous posts.

In [None]:
import pandas as pd
import json

In [None]:
DF = pd.json_normalize(json.loads(resp.text)["results"])

We can simply measure how many events were fetched.

In [None]:
len(DF)

Or we could subset the data frame for a quick glance.

In [None]:
(
    DF
    [["timestamp", "src_ip", "dest_ip", "event_type", "flow_id", "alert.signature", "http.hostname", "flow_id"]]
    .head(3)
)

Naturally, a more useful interaction would be some kind of aggregate report. For example, we could see what URL-s were accessed, what user agents were used for individual HTTP hosts.

In [None]:
(
    DF
    .groupby(["http.hostname"])
    .agg({
        "src_ip": "nunique",
        "dest_ip": "nunique",
        "http.hostname": "unique",
        "http.url": "unique",
        "http.http_user_agent": "unique"
    })
)

This is really powerful but involves some some boilerplate. In the next section we'll see how Suricata Analytics improves on this.

### Suricata Analytics data connector

Boilerplate refers to code that repeats in many parts of the code with little variation. But it must be there to set up some other functionality. In our case, user would need to import the API token and Scirius server address in every notebook using `dotenv`. If we ever changed how they are stored, then every notebook would break. Secondly, we would need to import requests and set up HTTP query parameters all the time.

Notebooks can become really complex. Especially when weighed down with code that's actually not relevant for exploring data. Having discarded many notebooks for that reason, we decided to write a Python *data connector* to move this complexity from notebooks to importable library. This connector is also part of the Suricata Analytics project and can simply be installed with `pip install .` while in the project root directory. This idea was very much inspired by [MSTIC Jupyter and Python Security Tools](https://msticpy.readthedocs.io/en/latest/), developed by [Microsoft Threat Intelligence team (MSTIC)](https://www.microsoft.com/en-us/security/blog/topic/threat-intelligence/?sort-by=newest-oldest&date=any). Like our project, it provides data connectors to quickly import and analyze security data into Jupyter Notebooks.

Once installed, the connector can be imported into any notebook.

In [None]:
from surianalytics.connectors import RESTSciriusConnector

Then we create new connector object. Environment file is automatically detected on object initialization, though the user can override the parameters with object arguments as well.

In [None]:
CONNECTOR = RESTSciriusConnector()

The object maintains persistent state so the user only needs to set certain parameters once. Page size parameter is one that could be easily overlooked. User might execute one query with modified page size yet forget to pass that argument in the next. That could skew the results since the second data fetch might be partial, due to more documents matching the query than would be returned by Elasticsearch.

The object allows user to simply set the parameter once. All subsequent queries would then use the value until it's once again updated.

In [None]:
CONNECTOR.set_page_size(100)

Same is true for defining the query time period. Relative time queries are very common when working with NSM data. Most users simply need to know what happened X amount of time ago in the past, and might not really care for setting exact timestamps.

We provided a helper method that handles this calculation automatically. Likewise, the time frame will apply to all subsequent queries once set.

In [None]:
CONNECTOR.set_query_delta(hours=365 * 24)

Naturally, the user could also explicitly set from and to timestamps as RFC3339 formatted strings, a unix Epochs, or parsed Python timestamp objects. Our library handles basic validation such as ensuring that timestamps are not in reverse. 

These are just some of the ways how we can easily prepare the following method call. That call would then be functionally identical to `requests` example that was shown in prior section, albeit with less lines of code. We also do not need to worry about parsing the results. Our library automatically converts the resulting JSON into a normalized pandas data frame, further reducing redundant code. 

In [None]:
DF = CONNECTOR.get_events_df(qfilter="event_type: alert AND alert.signature: *cobalt*")

In [None]:
(
    DF
    [["timestamp", "src_ip", "dest_ip", "event_type", "flow_id", "alert.signature", "http.hostname", "flow_id"]]
    .head(3)
)

## EVE JSON - Hunting without signatures

A common misconception is that Suricata is IDS that is only useful for rule based detection. That is actually not the case. Suricata has a large number of protocol parsers, each of which is able to emit JSON logs. Suricata was built from the ground up to be protocol aware. In the ancient times, IDS rules had to be written for the entire packet payload. That was one of the innovations in Suricata. Instead of dealing with entire packet, suricata would parse commonly used fields and allow the rule writer to specify a buffer. For example, if rulewriter wanted to match on HTTP user agent, she would not need to deal with entire HTTP header. Content match could be specified to only apply on user-agent buffer. Not a big leap to log a buffer that's already parsed.

In [None]:
FIELDS = [f for f in CONNECTOR.get_unique_fields() if not any(s in f for s in ["stats", "statistics", "metrix", "host_id", "@", "geoip", "target.src", "target.dest"])]

In [None]:
len(FIELDS)

In [None]:
FIELDS

But, does one need a signature match to log those fields? Nope. Protocol logger simply needs to be enabled. Once enough data is parsed, Suricata will emit a protocol event in JSON format. That JSON is called EVE (Extensible Event Format). Protocol of this event is marked under `event_type` field. Note that *alert* is only one of many.

In [None]:
[e for e in CONNECTOR.get_eve_unique_values(counts="no", field="event_type") if not any(s in e for s in ["stamus", "aggregate", "metrix", "stats"]) ]

To illustrate this point, let's run a query for `http` event type and search for a bad malware delivery sample. For now, we only want to investigate EVE event structure and common Red Team mistakes. So we only need a few samples. For that we'll

In [None]:
CONNECTOR.set_page_size(3)

In [None]:
from IPython.display import JSON

In [None]:
JSON(CONNECTOR.get_events_tail(qfilter="event_type: http AND http.http_user_agent: *wget* AND http.url: *.sh"))

Note that if we investigate `metadata.flowbits` field, we might oberve that a flow has already been marked by signatures. That's an important concept - signature can emit alert, but it can also flag traffic. But nevertheless, we could turn off the detection engine entirely and still be able to threat hunt by running queries on protocol logs.

But there's more. Literally. Suricata can and will emit multiple events for a single flow. For example, what does a web server respond with to a HTTP request? A file, of course. Therefore, Suricata would emit HTTP event type once the request is parsed. The response would generate a `fileinfo` event with all metadata about what was returned. In this case, it's actually a shell script. Furthermore, `http` section that was visible in prior event type should also be visible here.

In [None]:
JSON(CONNECTOR.get_events_tail(qfilter="event_type: fileinfo AND http.http_user_agent: *wget* AND http.url: *.sh"))

### Event Types and Flow Correlation 

Now comes really important concept - **flow correlation**. Each distinct event emitted by Suricata for the same flow shares `flow_id` value. That can easily be used to link the events and to build a event history.

In [None]:
DATA = (
    CONNECTOR
    .set_page_size(10)
    .get_events_tail(qfilter="flow_id: 805522171050078")
)

In [None]:
JSON(DATA)

In [None]:
DF = (
    pd
    .json_normalize(DATA)
    .sort_values(by=["timestamp"], ascending=True)
)

In [None]:
DF

In [None]:
pd.set_option('display.max_columns', None)
pd.options.display.html.use_mathjax = False

In [None]:
if len(DF) > 0:
    DF[["src_ip", "src_port", "dest_ip", "dest_port", "event_type"] + sorted([c for c in list(DF.columns.values) if c.startswith("alert") or c.startswith("http") or c.startswith("fileinfo")])]

In [None]:
pd.set_option('display.max_columns', 20)

In [None]:
if len(DF) > 0:
    print(list(DF["http.http_response_body_printable"].dropna())[0])

### Hunting unique values

A powerful hunting technique is simply to keep an eye out for unique values. For example, we might luck out with something interesting when investigating TLS SNI values that originate from a tagged asset with internet destination. With this we might pick up initial delivery addresses or C2 traffic.

In [None]:
(
    CONNECTOR
    .get_eve_unique_values(qfilter="event_type: tls AND tls.sni: * AND target.src.actor_id: bt AND NOT target.dest.actor_id: bt", 
                           field="tls.sni", 
                           counts="no")
)

This can be tricky, as there will be a lot of noise. But we did luck out with something interesting - someone was reading a hacking guide *on the target host*. A major opsec no-no.

In [None]:
JSON(CONNECTOR.get_events_tail(qfilter="tls.sni: level23hacktools.com"))

Of course, often the red teamers simply neglegt to set up proper delivery infrastructure and simply pull malware from IP. This pops out and essentially burns their infrastructure. We can check it by simply doing a same kind of query but for outbound HTTP requests.

In [None]:
(
    CONNECTOR
    .get_eve_unique_values(qfilter="event_type: http AND target.src.actor_id: bt AND NOT target.dest.actor_id: bt", 
                           field="http.hostname", 
                           counts="no")
)

In [None]:
JSON(CONNECTOR.get_events_tail(qfilter="src_ip: 10.107.99.112 AND event_type: fileinfo"))

### SMB

Another case we should explore is SMB data. As mentioned before, Suricata emits multiple EVE events for the same flow. SMB takes this to the extreme - each SMB function call becomes a distinct EVE message. For example, let's list unique `flow_id` values for SMB events inside blue team networks. But this time, let's also check document counts to see how many events actually match a single flow.

In [None]:
pd.DataFrame(
    CONNECTOR.get_eve_unique_values(qfilter="event_type: smb AND target.src.actor_id: bt", 
                                    field="flow_id", 
                                    counts="yes", 
                                    size=20)
)

Note that a single SMB flow can produce thousands, if not tens of thousands SMB messages. Why? Because a SMB session is acutally a sequence of commands and responses. One one hand, Suricata produces a lot of useful data to really understand what happens in a SMB session. On the other hand, it produces so much data that it's hard to find a good place to start exploring. In fact, we've come to a conclusion that most people are not looking at this data at all...

Those commands and responses are a good place to start off.

In [None]:
CONNECTOR.get_eve_unique_values(qfilter="event_type: smb AND target.src.actor_id: bt", 
                                field="smb.command", 
                                counts="yes", 
                                size=50)

In [None]:
CONNECTOR.get_eve_unique_values(qfilter="event_type: smb AND target.src.actor_id: bt", 
                                field="smb.status", 
                                counts="yes", 
                                size=50)

We find that listing unique SMB status values (responses from the server) can reveal a lot of really interesting events. For example, we have statuses like `STATUS_SHARING_VIOLATION`, `STATUS_BUFFER_OVERFLOW`, `STATUS_ACCESS_DENIED`, etc. Those are all interesting events. Furthermore, we have total gibberish statuses like `UNKNOWN_00_0000`, which actually means a status that's so rare that Suricata is actually unaware of the code. And that definitely warrants investigation.

For now, let's drill down on authentication failures to see if we can catch some RT brute force attempts.

In [None]:
CONNECTOR.get_eve_unique_values(qfilter="event_type: smb AND target.src.actor_id: bt AND smb.status: STATUS_ACCESS_DENIED", 
                                field="flow_id", 
                                counts="yes", 
                                size=20)

We should instantly find plenty to investigate. Now let's investigate one of those flows.

In [None]:
DF_SMB = (
    CONNECTOR
    .set_page_size(10000)
    .get_events_df(qfilter="flow_id: 5854994698319")
)

A quick peek will show us that the data is quite noisy. Lot of fields to investigate, too many in fact.

In [None]:
DF_SMB.head(3)

So, let's see what SMB fields are actually emitted.

In [None]:
[c for c in list(DF_SMB.columns.values) if c.startswith("smb.")]

Now let's do a selection of some interesting columns. It should paint a fairly nice picture about what is happening. In the selected case, we should be able to clearly see a bruteforce pattern with quite a lot to unpack.

In [None]:
pd.set_option('display.max_rows', None)
(
    DF_SMB[["timestamp", 
            "src_ip", 
            "dest_ip",
            "target.src.id",
            "target.dest.id",
            "smb.command", 
            "smb.status", 
            "smb.ext_status.severity",
            "smb.named_pipe",
            "smb.ntlmssp.user",
            "smb.ntlmssp.host",
            "smb.ntlmssp.domain",
            "smb.filename",
            "smb.client_dialects",
            "smb.request.native_os",
            "smb.response.native_os"]]
    .sort_values(by=["timestamp"], ascending=True)
)

Exploring this data per event is difficult. We can drill down into a single flow with under 100 messages pretty easily, but that's only one flow of many. And a flow with thousands of events is going to be really tricky to investigate.

One trick I like to use is graph representation. Idea is to connect unique SMB commands with statuses. Both are low cardinality fields and they are present in every single SMB message. So we can easily build a birds eye view of the data.

In [None]:
JSON(CONNECTOR.get_eve_fields_graph(qfilter="flow_id: 5854994698319", 
                                    col_src="smb.command", 
                                    col_dest="smb.status"))

This is a raw node and edge listing of a graph. The edges for this flow already reveal a lot of what is going on. Note how the failures top everything else. A data table in Kibana or Splunk can easily draw a nice overview of commands in relation to responses. But the data returned by this API endpoint is meant to build an actual graph.

This approach greatly condenses the amount of info we need to process. The graph would not be much bigger if a flow had 10000 distinct events. We can now expand this idea further - let's build a graph of statuses in relation to commands for all SMB traffic originating from a blue team asset. While the resulting graph is a bit more convoluted, we should still see strange connections pretty clearly. Furthermore, if this was real traffic, then it's a great profiling technique. Most workstations should use SMB resources pretty much the same way. Any new connection is an anomaly which should be investigated.

In [None]:
g = (
    CONNECTOR
    .get_eve_fields_graph_nx(qfilter="event_type: smb AND target.src.actor_id: bt",
                             col_src="smb.command", 
                             col_dest="smb.status")
)

In [None]:
import networkx as nx
import holoviews as hv
import hvplot.networkx as hvnx

In [None]:
hv.extension('bokeh')

In [None]:
from bokeh.plotting import show

In [None]:
# generate layout
pos = nx.layout.spring_layout(g)

width = 1600
height = 1600

# locate source nodes
n_src = [i for i, (_, a) in enumerate(g.nodes(data=True)) if a["kind"] == "source"]
# locate destination nodes
n_dst = [i for i, (_, a) in enumerate(g.nodes(data=True)) if a["kind"] == "destination"]

# generate nodes per kind
nodes_src = hvnx.draw_networkx_nodes(g, pos, nodelist=n_src, node_color='#A0CBE2').opts(width=width, height=height)
nodes_dst = hvnx.draw_networkx_nodes(g, pos, nodelist=n_dst, node_color="Orange").opts(width=width, height=height)

# generate edges
edges = (
    hvnx
    .draw_networkx_edges(g, pos)
    .opts(width=width, height=height)
)

# overlay nodes and edges
res = edges * nodes_src * nodes_dst

labels = hvnx.draw_networkx_labels(g, pos, nodelist=n_src)
res = res * labels

labels = hvnx.draw_networkx_labels(g, pos, nodelist=n_dst)
res = res * labels

show(hv.render(res))

We could also investigate SMB status codes in relation to domain user. This view is limited. NTLMSSP user is not present in all messages. Only in the authentication requests. So the edges are limited. But we can nevertheless use this technique to separate users that should be investigated.

In [None]:
g = (
    CONNECTOR
    .get_eve_fields_graph_nx(qfilter="event_type: smb AND target.src.actor_id: bt",
                             col_src="smb.status", 
                             col_dest="smb.ntlmssp.user")
)

In [None]:
# generate layout
pos = nx.layout.spring_layout(g)

width = 1600
height = 1600

# locate source nodes
n_src = [i for i, (_, a) in enumerate(g.nodes(data=True)) if a["kind"] == "source"]
# locate destination nodes
n_dst = [i for i, (_, a) in enumerate(g.nodes(data=True)) if a["kind"] == "destination"]

# generate nodes per kind
nodes_src = hvnx.draw_networkx_nodes(g, pos, nodelist=n_src, node_color='#A0CBE2').opts(width=width, height=height)
nodes_dst = hvnx.draw_networkx_nodes(g, pos, nodelist=n_dst, node_color="Orange").opts(width=width, height=height)

# generate edges
edges = (
    hvnx
    .draw_networkx_edges(g, pos)
    .opts(width=width, height=height)
)

# overlay nodes and edges
res = edges * nodes_src * nodes_dst

labels = hvnx.draw_networkx_labels(g, pos, nodelist=n_src)
res = res * labels

labels = hvnx.draw_networkx_labels(g, pos, nodelist=n_dst)
res = res * labels

show(hv.render(res))

Again, this view does not need to be an actual graph. A data table would likely work well enough

# Fin