# Kusto Ingest

The aim of this notebook is to provide an example of sourcing, processing and ingesting security data to a custom
[Kusto aka Azure Data Explorer (ADE) cluster](https://docs.microsoft.com/en-us/azure/data-explorer/data-explorer-overview).

Kusto/ADE is a fast and highly scalable data exploration service for log and telemetry data, hosted in Azure - and is used across Microsoft
for analysing huge datasets of this sort.

### Dependencies

For this data ingestion notebook only, the following are additional requirements:

```
    > pip install azure-kusto-data azure-kusto-ingest
```

### Example Data

We use the [Open Threat Research Forge Mordor Security Datasets](https://github.com/OTRF/Security-Datasets/), and the corresponding
[MSTICPY Mordor Data Provider](https://msticpy.readthedocs.io/en/latest/data_acquisition/MordorData.html) to retrieve them.

The plan here is to consider `Microsoft-Windows-Sysmon/Operational` data and create a set of database tables, each holding the 
events (merged from across different datasets) belonging to a specific event type. For more information on this source data,
see [Sysinternals Sysmon](https://docs.microsoft.com/en-us/sysinternals/downloads/sysmon).

After processing and uploading, we'll end up with a structure that looks like the following:

```
    cluster('msticpykustodemo')
        .database('MsticPyKustoDemo')
            .table('Event1')
                ... set of Sysmon Process Creation events (Event Type 1)
            .table('Event2')
                ... set of Sysmon File Creation Time events (Event Type 2)
            etc.
```

___See: [./Kusto-Analysis.ipynb](./Kusto-Analysis.ipynb) for details on data analysis, following on from the prep and ingestion here.___

In [1]:
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.helpers import dataframe_from_result_table
from azure.kusto.ingest import IngestionProperties, QueuedIngestClient
import pandas as pd
import re

from msticpy.data import QueryProvider

## Kusto Config

You'll need to provide Query and Ingest URIs for your own writable Kusto/ADE cluster. See 
[Quickstart: Create an Azure Data Explorer cluster and database](https://docs.microsoft.com/en-us/azure/data-explorer/create-cluster-database-portal?tabs=one-click-create-database)
for guidance on how to set one of these up within your Azure subscription.

When producing this example, I provisioned a dev/test cluster with the following configuration:

```
    Region = UK West
    Availability zones = (none)
    Cluster name = msticpykustodemo

    Workload = Dev/test
    Size = (none)
    Compute = Dev(No SLA)_Standard_D11_v2

    Scaling method = Manual scale
    Instance count = 1

    Streaming ingestion = Off
    Enable purge = Off
    Auto-Stop cluster = On

    System-assigned identity = On
    Double encryption = Off
    User-assigned Identity = Off
    User-assigned identities = (none)
    Tenants permissions = My tenant only
```

<br>

___Caution: watch out for costs! Azure Kusto/ADE clusters aren't cheap to run!___

In [2]:
# Replace these with your own values.

KUSTO_CLUSTER_URI = 'https://msticpykustodemo.ukwest.kusto.windows.net'
KUSTO_CLUSTER_INGEST_URI = 'https://ingest-msticpykustodemo.ukwest.kusto.windows.net'
KUSTO_DATABASE = 'MsticPyKustoDemo'

## Data Loading & Preparation

Here we use the MSTICPY Mordor Query Provider to retrieve a number of example source datasets.

We then split these up to create a series of Pandas DataFrames, each containing a
single event type, for events from all sources.

In [3]:
mdr_data = QueryProvider("Mordor", use_cached=True, save_folder='./mordor', silent=True)
mdr_data.connect()

events = {}

for dataset in mdr_data.list_queries():
    if dataset.startswith('atomic.windows'):
        print('[+] Retrieving Dataset', f'"{dataset}"')
        try:
            df = getattr(mdr_data, dataset)(silent=True, use_cached=True)
            if type(df) != pd.DataFrame:
                continue
            for i, r in df.iterrows():
                row_entry = r.to_dict()
                
                # For demo purposes, we'll consider only Sysmon event logs.
                if row_entry['Channel'] != 'Microsoft-Windows-Sysmon/Operational':
                    continue

                # Every entry should have a @timestamp:datetime field.
                if '@timestamp' not in row_entry:
                    if 'TimeCreated' not in row_entry:
                        continue
                    row_entry['@timestamp'] = row_entry['TimeCreated']
                    del row_entry['TimeCreated']

                for k, v in row_entry.items():
                    # Try and parse datetimes, in various formats, to datetimes
                    if type(v) != str:
                        continue
                    if re.match(r'^\d{4}-\d{2}-\d{2}[ T]{1}\d{2}:\d{2}:\d{2}(\.\d+){0,1}([+-][\.\d]{5}|Z){0,1}$', v):
                        row_entry[k] = pd.to_datetime(v, utc=True)

                # Normalise fieldnames - remove @, capitalise.
                for k in list(row_entry.keys()):
                    k_fixed = k
                    if k_fixed[0] == '@': k_fixed = k_fixed[1:]
                    if k_fixed[0].islower(): k_fixed = k_fixed[0].upper() + k_fixed[1:]
                    if k != k_fixed:
                        row_entry[k_fixed] = row_entry[k]
                        del row_entry[k]

                event_id = row_entry['EventID']
                if not event_id in events:
                    events[event_id] = []
                events[event_id].append(row_entry)

        except Exception as ex:
            print('[!] Retrieving', f'"{dataset}"', 'Failed [', str(ex), ']')

Retrieving Mitre data...
Retrieving Mordor data...


Downloading Mordor metadata: 100%|██████████| 96/96 [00:00<00:00, 2154.64 files/s]


[+] Retrieving Dataset "atomic.windows.collection.host.msf_record_mic"
[+] Retrieving Dataset "atomic.windows.credential_access.host.cmd_lsass_memory_dumpert_syscalls"
[+] Retrieving Dataset "atomic.windows.credential_access.host.cmd_psexec_lsa_secrets_dump"
[+] Retrieving Dataset "atomic.windows.credential_access.host.cmd_sam_copy_esentutl"
[+] Retrieving Dataset "atomic.windows.credential_access.host.covenant_dcsync_dcerpc_drsuapi_DsGetNCChanges"
[+] Retrieving Dataset "atomic.windows.credential_access.host.empire_dcsync_dcerpc_drsuapi_DsGetNCChanges"
[+] Retrieving Dataset "atomic.windows.credential_access.host.empire_mimikatz_backupkeys_dcerpc_smb_lsarpc"
[+] Retrieving Dataset "atomic.windows.credential_access.host.empire_mimikatz_extract_keys"
[+] Retrieving Dataset "atomic.windows.credential_access.host.empire_mimikatz_logonpasswords"
[+] Retrieving Dataset "atomic.windows.credential_access.host.empire_mimikatz_lsadump_patch"
[+] Retrieving Dataset "atomic.windows.credential_acc

[!] Retrieving "atomic.windows.lateral_movement.network.empire_dcom_shellwindows_stager" Failed [ ('Mordor download error', 'Could not extract zip file for https://raw.githubusercontent.com/OTRF/Security-Datasets/master/datasets/atomic/windows/lateral_movement/network/empire_dcom_shellwindows_stager.zip.', 'File does not exist or is corrupt.', 'https://msticpy.readthedocs.io/en/latest/data_acquisition/MordorData.html') ]
[+] Retrieving Dataset "atomic.windows.lateral_movement.network.empire_msbuild_dcerpc_wmi_smb"
Cannot process files of type .cap
Cannot process files of type .cap
[+] Retrieving Dataset "atomic.windows.lateral_movement.network.empire_psexec_dcerpc_tcp_svcctl"
Cannot process files of type .cap
Cannot process files of type .cap
[+] Retrieving Dataset "atomic.windows.lateral_movement.network.empire_psremoting_stager"
Cannot process files of type .cap
Cannot process files of type .cap
[+] Retrieving Dataset "atomic.windows.lateral_movement.network.empire_shell_dcerpc_smb_s

In [4]:
for event_id in sorted(events.keys()):
    df = pd.DataFrame(events[event_id])
    df.sort_values('Timestamp', inplace=True)
    df.reset_index(drop=True, inplace=True)

    # Because the source dataframes (and hence rows for this dataframe) contained many event types,
    # they have many irrelevant columns for this single event type.
    # Here we attempt to filter out any columns that are all empty, dealing with different kinds of empty.

    null_columns = []
    for test_column in df.columns:
        if df[~df[test_column].isnull() & (df[test_column]!='-')].empty:
            null_columns.append(test_column)

    df.drop(null_columns, axis=1, inplace=True)
    events[event_id] = df

    print('[+] EventID: ', event_id, ', #Records:', df.shape[0], ', #Columns: ', df.shape[1], sep='')

[+] EventID: 1, #Records:1240, #Columns: 56
[+] EventID: 2, #Records:532, #Columns: 39
[+] EventID: 3, #Records:2592, #Columns: 51
[+] EventID: 4, #Records:3, #Columns: 33
[+] EventID: 5, #Records:1078, #Columns: 38
[+] EventID: 6, #Records:1, #Columns: 35
[+] EventID: 7, #Records:57139, #Columns: 49
[+] EventID: 8, #Records:350, #Columns: 44
[+] EventID: 9, #Records:1838, #Columns: 37
[+] EventID: 10, #Records:103207, #Columns: 46
[+] EventID: 11, #Records:5256, #Columns: 40
[+] EventID: 12, #Records:114904, #Columns: 40
[+] EventID: 13, #Records:50340, #Columns: 42
[+] EventID: 14, #Records:1, #Columns: 34
[+] EventID: 15, #Records:76, #Columns: 38
[+] EventID: 17, #Records:256, #Columns: 38
[+] EventID: 18, #Records:858, #Columns: 40
[+] EventID: 19, #Records:3, #Columns: 37
[+] EventID: 20, #Records:3, #Columns: 37
[+] EventID: 21, #Records:3, #Columns: 36
[+] EventID: 22, #Records:544, #Columns: 39
[+] EventID: 23, #Records:1584, #Columns: 41
[+] EventID: 24, #Records:17, #Columns

## Kusto Ingestion

First, we need to authenticate to Kusto/ADE. By default, we'll use "az" client auth - which requires
that you run "az login" from a shell. See
[Kusto connection strings](https://docs.microsoft.com/en-us/azure/data-explorer/kusto/api/connection-strings/kusto)
for more details and alternative options.

Next, we retrieve the existing database tables, then drop any with existing names that we intend to replace.

Following that, we create a new table for each event type, then upload the DataFrame containing our records
(with some type conversion as appropriate).

In [5]:
# Use Azure CLI authentication: requires that you run "az login".

kusto_client = KustoClient(KustoConnectionStringBuilder.with_az_cli_authentication(KUSTO_CLUSTER_URI))
kusto_ingest_client = QueuedIngestClient(KustoConnectionStringBuilder.with_az_cli_authentication(KUSTO_CLUSTER_INGEST_URI))

# Query existing tables

db_tables = dataframe_from_result_table(kusto_client.execute_mgmt(KUSTO_DATABASE, '.show tables').primary_results[0])
ingest_tables = [f'Event{k}' for k in events.keys()]

# !!! Caution !!!
# Drop any existing tables that match our new table names

drop_tables = [tbl for tbl in ingest_tables if not db_tables[db_tables['TableName']==tbl].empty]
if len(drop_tables):
    db_tables = dataframe_from_result_table(kusto_client.execute_mgmt(KUSTO_DATABASE, f'.drop tables ({",".join(drop_tables)})').primary_results[0])

In [6]:
pandas_kusto_type_mappings = {
    'datetime64[ns, UTC]': 'datetime',
    'object': 'string',
    'float64': 'decimal',
    'int64': 'long'
}

for event_key in events.keys():
    tbl_name = f'Event{event_key}'
    print('[+] Ingesting "', tbl_name, '"...', sep='')

    event_columns = events[event_key].dtypes.to_dict()
    event_columns = ", ".join([f'{k}:{pandas_kusto_type_mappings.get(str(v), str(v))}' for k, v in event_columns.items()])

    kusto_client.execute_mgmt(KUSTO_DATABASE, f'.create table {tbl_name} ({event_columns})').primary_results[0]

    # Ingest the DataFrame.

    ingest_result = kusto_ingest_client.ingest_from_dataframe(
        events[event_key],
        ingestion_properties=IngestionProperties(
            database=KUSTO_DATABASE,
            table=tbl_name
        )
    )

    print('   ', ingest_result.status)

[+] Ingesting "Event13"...
    IngestionStatus.QUEUED
[+] Ingesting "Event12"...
    IngestionStatus.QUEUED
[+] Ingesting "Event10"...
    IngestionStatus.QUEUED
[+] Ingesting "Event9"...
    IngestionStatus.QUEUED
[+] Ingesting "Event3"...
    IngestionStatus.QUEUED
[+] Ingesting "Event7"...
    IngestionStatus.QUEUED
[+] Ingesting "Event18"...
    IngestionStatus.QUEUED
[+] Ingesting "Event1"...
    IngestionStatus.QUEUED
[+] Ingesting "Event23"...
    IngestionStatus.QUEUED
[+] Ingesting "Event11"...
    IngestionStatus.QUEUED
[+] Ingesting "Event2"...
    IngestionStatus.QUEUED
[+] Ingesting "Event5"...
    IngestionStatus.QUEUED
[+] Ingesting "Event22"...
    IngestionStatus.QUEUED
[+] Ingesting "Event17"...
    IngestionStatus.QUEUED
[+] Ingesting "Event24"...
    IngestionStatus.QUEUED
[+] Ingesting "Event8"...
    IngestionStatus.QUEUED
[+] Ingesting "Event26"...
    IngestionStatus.QUEUED
[+] Ingesting "Event15"...
    IngestionStatus.QUEUED
[+] Ingesting "Event19"...
    Inge

# Done!

You should soon be able to query and interact with the data in Kusto - see [Kusto-Analysis.ipynb](./Kusto-Analysis.ipynb) for details.

Kusto make take some minutes to actually perform the ingest - and so data won't be available to query immediately!