In [None]:
TENANT_ID = None
REGION = None
INVESTIGATION_TITLE = None
INDICATORS = None

In [None]:
import logging

import pandas as pd
from datetime import datetime
from textwrap import dedent
from taegis_magic.pandas.ioc import is_domain, is_hash, is_ip_address
from taegis_magic.pandas.utils import default_schema_columns, groupby

from IPython.display import display_markdown

log = logging.getLogger(__name__)

if not INDICATORS:
    log.warning("No indicators provided, skipping notebook execution.")
    raise SystemExit()

%load_ext taegis_magic

In [None]:
def generate_query(value: str, date: str, event_type: str) -> str:
    earliest = datetime.strptime(date, "%Y%m%d").strftime("%Y-%m-%dT00:00:00Z")
    latest = datetime.strptime(date, "%Y%m%d").strftime("%Y-%m-%dT23:59:59Z")

    if is_hash(value):
        logical_type = "hash"
    elif is_domain(value):
        logical_type = "domain"
    elif is_ip_address(value):
        logical_type = "ip"
    else:
        log.error(f"Unsupported logical type for value: {value}")
        return "Error"

    query = dedent(
        f"""
    FROM {event_type}
    WHERE @{logical_type} = '{value}'
    EARLIEST='{earliest}' LATEST='{latest}'
    """.strip()
    )
    return query


indicators_by_type = {}
for indicator in INDICATORS or []:
    indicator["query"] = generate_query(
        value=indicator["matching_value"],
        date=indicator["date"],
        event_type=indicator["event_type"],
    )
    if indicator["event_type"] not in indicators_by_type:
        indicators_by_type[indicator["event_type"]] = []
    if indicator["query"] != "Error":
        indicators_by_type[indicator["event_type"]].append(indicator)

# IOC Threat Hunt

This is an Indicator of Comprise of Hashes, IPs and Domains.

In [None]:
display_markdown("List of Indicators:", raw=True)

for ioc in {indicator["matching_value"] for indicator in (INDICATORS or [])}:
    display_markdown(f"- {ioc}", raw=True)

In [None]:
results_dfs = {}
for event_type, indicators in indicators_by_type.items():
    display_markdown(f"### Searching {event_type} indicators:", raw=True)
    for indicator in indicators:
        display_markdown(
            f"Search criteria: {indicator['date']} - {indicator['matching_value']}",
            raw=True,
        )
        query = indicator["query"]
        %taegis --verbose events search --tenant $TENANT_ID --region $REGION --assign results --cell "$query" --track --cache

        if event_type not in results_dfs:
            results_dfs[event_type] = results
        else:
            results_dfs[event_type] = pd.concat(
                [
                    results_dfs[event_type],
                    results,
                ]
            ).reset_index(drop=True)

        display_markdown("<br/>", raw=True)

    display_markdown(f"#### Top 20 {event_type} results:", raw=True)
    display_markdown(
        groupby(results_dfs[event_type], columns=default_schema_columns(event_type))
        .sort_values(by="count", ascending=False)
        .head(20)
        .to_html(),
        raw=True,
    )
    display_markdown("<br/>", raw=True)

In [None]:
%save_notebook

In [None]:
%generate_report

In [None]:
%taegis investigations search-queries stage

In [None]:
%taegis investigations create \
--title "$INVESTIGATION_TITLE" \
--key-findings "$TAEGIS_MAGIC_REPORT_FILENAME" \
--priority MEDIUM \
--type THREAT_HUNT \
--status AWAITING_ACTION \
--assignee-id "@customer" \
--tenant $TENANT_ID \
--region $REGION