# OpenObserve Kunai - Msticpy - sigma rules conversion

## Description
This notebook helps to convert, review and transform sigma rules to openobserve queries and alerts.

It leverages python_openobserve, msticpy and sigma.

Known issue(s):
* convert_rule2alert() folder_id not working, alert created in default folder.

Last updated: 2025/07/06

### Installation

In [None]:
# Only run first time to install/upgrade msticpy to latest version
# %pip install --upgrade msticpy python-openobserve

### Setup

In [2]:
# Check we are running Python 3.6
import sys

MIN_REQ_PYTHON = (3, 6)
if sys.version_info < MIN_REQ_PYTHON:
    print("Check the Kernel->Change Kernel menu and ensure that Python 3.6")
    print("or later is selected as the active kernel.")
    sys.exit("Python %s.%s or later is required.\n" % MIN_REQ_PYTHON)

# imports
import pandas as pd
from datetime import datetime, timedelta

# data library imports
from msticpy.data.data_providers import QueryProvider

print("Imports Complete")

Imports Complete


In [3]:
# to create alerts
import json
from python_openobserve.openobserve import OpenObserve
from dotenv import load_dotenv

In [4]:
import os

# Custom Certificate Authority?
# os.environ["REQUESTS_CA_BUNDLE"] = os.environ["HOME"] + "/tmp/ca-bundle-internal.pem"

In [5]:
# use custom config file?
# os.environ['MSTICPYCONFIG'] = '/path/to/msticpyconfig.yaml'

In [None]:
# FIXME! does not get MSTICPYCONFIG...
from msticpy.config import MpConfigFile, MpConfigEdit

mpconfig = MpConfigFile()
mpconfig.load_default()
mpconfig.view_settings()
# mpedit = MpConfigEdit()
# mpedit

In [None]:
load_dotenv()


## Instantiating a query provider

You can instantiate a data provider for OpenObserve by specifying the credentials in connect or in msticpy config file. 
<br> If the details are correct and authentication is successful, it will show connected.

In [6]:
openobserve_prov = QueryProvider("OpenObserve")
# openobserve_prov.connect(connection_str=<url>, user=<user>, password=<password>)
# openobserve_prov.connect()
openobserve_prov.connect(
    connection_str=os.environ.get("OPENOBSERVE_URL"),
    user=os.environ.get("OPENOBSERVE_USER"),
    password=os.environ.get("OPENOBSERVE_PASS"),
)

connected with user root@example.com


In [7]:
openobserve_prov_direct = OpenObserve(
    host=os.environ.get("OPENOBSERVE_URL"),
    user=os.environ.get("OPENOBSERVE_USER"),
    password=os.environ.get("OPENOBSERVE_PASS"),
)

## Running test OpenObserve query

In [8]:
openobserve_query = """
SELECT log_file_name,count(*) FROM "default" GROUP BY log_file_name
"""
df = openobserve_prov.exec_query(openobserve_query, days=1, verbosity=1)
df.head()

INFO: from 2025-07-05 20:40:39.233681 to 2025-07-06 20:40:39.233681, TZ UTC
'Query Time start 1751748039233681 end 1751834439233681'
{'query': {'end_time': 1751834439233681,
           'sql': '\n'
                  'SELECT log_file_name,count(*) FROM "default" GROUP BY '
                  'log_file_name\n',
           'start_time': 1751748039233681}}
DEBUG: results shape (1, 2)


Unnamed: 0,count(*),log_file_name
0,5,history.log


In [9]:
openobserve_query = """SELECT body__systemd_unit, count(*) FROM "journald" group by body__systemd_unit order by count(*) desc"""
df = openobserve_prov.exec_query(
    openobserve_query,
    start_time=datetime.now() - timedelta(days=1),
    end_time=datetime.now() - timedelta(days=0),
)
df.head()

Unnamed: 0,body__systemd_unit,count(*)
0,ssh.service,30301
1,wpa_supplicant.service,7959
2,cron.service,5103
3,init.scope,3820
4,,1350


## Sigma settings

In [10]:
base_rulesdir = os.environ["HOME"] + "/tmp/sigma/rules/linux/network_connection"
pipeline_f = (
    os.environ["HOME"]
    + "/tmp/pySigma-backend-openobserve/sigma/pipelines/openobserve/kunai.yml"
)
table_name = "kunai"
# numeric id
target_folder_id = 73456789

## Sigma functions

In [11]:
import pathlib

from sigma.collection import SigmaCollection
from sigma.backends.openobserve import openobserve
from sigma.pipelines.sysmon import sysmon_pipeline
from sigma.pipelines.common import logsource_linux_process_creation
from sigma.pipelines.common import logsource_linux_file_create
from sigma.pipelines.common import logsource_linux_network_connection

from sigma.processing.resolver import ProcessingPipelineResolver

# Create the pipeline resolver
piperesolver = ProcessingPipelineResolver()
# Add pipelines
piperesolver.add_pipeline_class(sysmon_pipeline())  # Sysmon
# FIXME! AttributeError: 'LogsourceCondition' object has no attribute 'name'
# piperesolver.add_pipeline_class(logsource_linux_process_creation()) # Linux
# piperesolver.add_pipeline_class(logsource_linux_file_create()) # Linux
# piperesolver.add_pipeline_class(logsource_linux_network_connection()) # Linux

In [12]:
from sigma.pipelines.openobserve.kunai import kunai_pipeline

piperesolver.add_pipeline_class(kunai_pipeline())

In [13]:
# Create a combined pipeline
combined_pipeline = piperesolver.resolve(piperesolver.pipelines)
# Instantiate backend using the combined pipeline
openobserve_backend = openobserve.openobserveBackend(combined_pipeline)

In [14]:
from subprocess import check_output
import sqlglot

In [15]:
# https://github.com/SigmaHQ/sigma-cli/blob/main/sigma/cli/convert.py#L8
from sigma.exceptions import (
    SigmaError,
    SigmaPipelineNotAllowedForBackendError,
    SigmaPipelineNotFoundError,
)
from sigma.plugins import InstalledSigmaPlugins

plugins = InstalledSigmaPlugins.autodiscover()
backends = plugins.backends
pipelines = plugins.pipelines
pipeline_resolver = plugins.get_pipeline_resolver()

In [16]:
# Debug
# piperesolver

In [17]:
def convert_rulefile(rule_f, table_name, sigma_backend, output_format="default"):
    with open(rule_f, "r", encoding="utf-8") as f:
        print(f" => Reading rule file {rule_f}")
        rule_y = f.read()

    rule = SigmaCollection.from_yaml(rule_y)
    rule_sql = sigma_backend.convert(rule, output_format)
    # print(f"DEBUG type {type(rule_sql)}")
    if output_format == "o2alert":
        # NOK
        # rule_tmp = json.loads(rule_sql)
        # rule_sql = rule_tmp[0]
        rule_sql = rule_sql.replace("\n", "")
        # rule_sql = rule_sql[1:-1]
    rule_sql = rule_sql.replace("<TABLE_NAME>", f"{table_name}")

    # print(f" => rule out {rule_sql}")

    return rule_sql

In [18]:
def convert_rulesdir(
    openobserve_prov,
    rulesdir,
    table_name,
    sigma_backend,
    stats=False,
    limit=5,
    verbosity=0,
):
    """
    Convert sigma rules from rulesdir with given sigma backend and openobserve table, Run them
    Return results shape rules list, empty results rules lists, errors rules list

    Full python

    Args:
      openobserve_prov: msticpy provider object to run queries
      rulesdir: directory path containing the rules to evaluate
      table_name: openobserve table/streams name to use for the rules
      sigma_backend: sigma backend to use for conversion, including pipeline
      stats: run extra queries to provide statistics/TopN process for each rule
      limit: TopN used for statistics
      verbosity: debug/troubleshoot from 0/less to 5/more
    """
    # rule_f => empty bool, errors true, lines returned
    df_results = pd.DataFrame(columns=["rule", "empty", "errors", "hits"])
    df_stats = pd.DataFrame()
    for rule_f in pathlib.Path(rulesdir).iterdir():
        rule_s = rule_f.name
        # print(f"Listed rule file {rule_f}")
        if not rule_f.is_file():  # or str(rule_f).endswith('.yml'):
            print(f"Skipping rule file {rule_f}")
            continue

        print(f"=> Processing rule file {rule_s}:")
        rule_sql = convert_rulefile(rule_f, table_name, sigma_backend)

        # basic verify sql
        try:
            sqlglot.transpile(rule_sql)
        except sqlglot.errors.ParseError as exc:
            # raise exc
            print(f"Error from sqlglot on rule: {rule_sql} as {exc}")
        except Exception as exc:
            print(f"Exception on rule: {rule_sql} as {exc}")
        # print(f"```{rule_sql}```")

        try:
            df = openobserve_prov.exec_query(
                rule_sql,
                days=1,
            )
        except:
            df_results.loc[len(df_results)] = [rule_s, False, True, 0]
            continue

        if df.empty:
            df_results.loc[len(df_results)] = [rule_s, True, False, 0]
        else:
            df_results.loc[len(df_results)] = [rule_s, False, False, df.shape[0]]
            if stats:
                try:
                    # print(f"Calculating stats for {rule_s}")
                    df_stats_tmp = rulestats_agg(
                        rule_s,
                        rule_sql,
                        openobserve_prov,
                        1,
                        limit=limit,
                        verbosity=verbosity,
                    )
                    df_stats = pd.concat([df_stats, df_stats_tmp])
                    # print(f"  -> {df_stats_tmp.shape}, {df_stats.shape}")
                except Exception as err:
                    print(f"stats exception: {err}")

    return (df_results, df_stats)

In [19]:
# `sigma convert -t openobserve -p ../pySigma-backend-openobserve/sigma/pipelines/openobserve/kunai.yml ../sigma/rules/linux/process_creation/proc_creation_lnx_doas_execution.yml`
def convert_rulesdir_cli(
    openobserve_prov,
    rulesdir,
    pipeline_f,
    table_name,
    stats=False,
    limit=5,
    verbosity=0,
):
    """
    Convert sigma rules from rulesdir with given pipeline and openobserve table, Run them
    Return results shape rules list, empty results rules lists, errors rules list
    """
    # rule_f => empty bool, errors true, lines returned
    df_results = pd.DataFrame(columns=["rule", "empty", "errors", "hits"])
    df_stats = pd.DataFrame()
    for rule_f in pathlib.Path(rulesdir).iterdir():
        rule_s = rule_f.name
        if not rule_f.is_file():
            print(f"Skipping rule file {rule_s}")
            continue

        cmd = f"sigma convert -t openobserve -p {pipeline_f} {rule_f}"
        print(f"=> Processing rule file {rule_s}:")
        rule_sql = check_output(
            ["sigma", "convert", "-t", "openobserve", "-p", pipeline_f, rule_f],
            encoding="UTF-8",
        )
        rule_sql = str(rule_sql).replace("<TABLE_NAME>", f'"{table_name}"')
        print(f"```{rule_sql}```")
        try:
            df = openobserve_prov.exec_query(
                rule_sql,
                days=1,
            )
        except:
            df_results.loc[len(df_results)] = [rule_s, False, True, 0]
            continue

        if df.empty:
            df_results.loc[len(df_results)] = [rule_s, True, False, 0]
        else:
            df_results.loc[len(df_results)] = [rule_s, False, False, df.shape[0]]
            if stats:
                try:
                    # print(f"Calculating stats for {rule_s}")
                    df_stats_tmp = rulestats_agg(
                        rule_s,
                        rule_sql,
                        openobserve_prov,
                        1,
                        limit=limit,
                        verbosity=verbosity,
                    )
                    df_stats = pd.concat([df_stats, df_stats_tmp])
                    # print(f"  -> {df_stats_tmp.shape}, {df_stats.shape}")
                except Exception as err:
                    print(f"stats exception: {err}")

    return (df_results, df_stats)

In [20]:
def rulestats_agg(rulename, rule_sql, openobserve_prov, days, limit=5, verbosity=0):
    """
    Make an aggregate count to identify Top results that are possibly noise
    https://sqlglot.com/sqlglot.html#examples
    """
    import sqlglot

    column_names = []

    # usually includes: data_command_line 	data_exe_path 	info_event_name
    for column in sqlglot.parse_one(rule_sql).find_all(sqlglot.exp.Column):
        # print(column.alias_or_name)
        column_names.append(column.alias_or_name)
    column_names.append("info_task_name")
    column_names.append("info_parent_task_name")
    column_names = list(set(column_names))
    # print(column_names)
    # Make aggregated query
    agg_sql = (
        rule_sql.replace(
            "SELECT * ", f"SELECT {",".join(column_names)},count(*) as count_ "
        )
        + f" GROUP BY {",".join(column_names)} ORDER BY count_ DESC LIMIT {limit}"
    )
    # only works if len(column_names)=1
    # agg_sql = f"SELECT string_agg({",".join(column_names)}, ', ') as topN FROM (" + test_sql.replace("SELECT * ", f"SELECT {",".join(column_names)},count(*) as count_ ") + f" GROUP BY {",".join(column_names)} ORDER BY count_ DESC LIMIT {topN}" + ')'
    # agg_sql = f"SELECT string_agg(data_exe_path, ', ') as topN FROM (" + test_sql.replace("SELECT * ", f"SELECT {",".join(column_names)},count(*) as count_ ") + f" GROUP BY {",".join(column_names)} ORDER BY count_ DESC LIMIT {topN}" + ')'
    # print(agg_sql)

    df_agg = openobserve_prov.exec_query(agg_sql, days=days, verbosity=verbosity)
    df_agg["rule"] = rulename
    return df_agg

In [21]:
def convert_rule2alert(
    openobserve,
    rule_f,
    pipeline_f,
    table_name,
    alertowner,
    alertdestination,
    sigma_backend,
    folderid="",
    verbosity=0,
    overwrite=False,
):
    """
    Convert given sigma rule to openobserve alert and set it in live instance
    Return results shape rules list, empty results rules lists, errors rules list
    """
    print(f"=> Processing rule file {rule_f}:")
    # cli
    # cmd = f'sigma convert -t openobserve -f o2alert -p {pipeline_f} {rule_f}'
    # rule_alert = check_output(['sigma', 'convert', '-t', 'openobserve', '-f', 'o2alert', '-p', pipeline_f, rule_f], encoding='UTF-8')
    # rule_alert = str(rule_alert).replace('<TABLE_NAME>', table_name)
    # python
    rule_alert = convert_rulefile(
        rule_f, table_name, sigma_backend, output_format="o2alert"
    )
    # rule_alert = convert_rulefile(rule_f, table_name, sigma_backend, output_format="default")

    rule_alert = str(rule_alert).replace("<alert-owner-TBD>", alertowner)
    rule_alert = str(rule_alert).replace("<alert-destination-TBD>", alertdestination)
    rule_alert = str(rule_alert).replace("<alert-folder_id-TBD>", str(folderid))
    # 'Return 400. Text: Json deserialize error: invalid type: sequence, expected struct CreateAlertRequestBody at line 1 column 0' -> take array[0] from str
    rule_alert = rule_alert[1:-1]

    print(f"```(type {type(rule_alert)}: {rule_alert}```")

    openobserve.create_update_object_by_name(
        "alerts",
        json.loads(rule_alert),
        verbosity=verbosity,
        overwrite=overwrite,
    )

## Functions tests

In [22]:
# test_sql = """SELECT * FROM "kunai" WHERE (data_path LIKE '/etc/cron.d/%' OR data_path LIKE '/etc/cron.daily/%' OR data_path LIKE '/etc/cron.hourly/%' OR data_path LIKE '/etc/cron.monthly/%' OR data_path LIKE '/etc/cron.weekly/%' OR data_path LIKE '/var/spool/cron/crontabs/%') OR (data_path LIKE '%/etc/cron.allow%' OR data_path LIKE '%/etc/cron.deny%' OR data_path LIKE '%/etc/crontab%')"""
# test_sql = """SELECT * FROM "kunai" WHERE data_exe_path LIKE '%/bin/bash' AND (NOT (data_dst_ip='127.0.0.1' OR data_dst_ip='0.0.0.0'))"""
test_sql = """SELECT * FROM "kunai" WHERE (data_exe_path LIKE '%/file' AND regexp_like(data_command_line, '(.){200,}', 'i')) OR (data_exe_path LIKE '%/ls' AND data_command_line LIKE '%-R%') OR data_exe_path LIKE '%/find' OR data_exe_path LIKE '%/tree' OR data_exe_path LIKE '%/findmnt' OR data_exe_path LIKE '%/mlocate'"""

In [23]:
rulestats_agg("rulename", test_sql, openobserve_prov, 1)

In [24]:
rule_f = (
    os.environ.get("HOME")
    + "/tmp/others/sigma/rules/linux/process_creation/proc_creation_lnx_doas_execution.yml"
)

In [25]:
convert_rulefile(rule_f, table_name, openobserve_backend, output_format="default")

In [26]:
convert_rulefile(rule_f, table_name, openobserve_backend, output_format="o2alert")

In [27]:
convert_rule2alert(
    openobserve_prov_direct,
    rule_f,
    pipeline_f,
    "kunai",
    "root@example.com",
    "alert-destination-email",
    openobserve_backend,
    folderid=target_folder_id,
)

## Rules Conversion
Convert rules, check against live instance if returns results, errors or not.
Include or not statistics of TopN aggregated result for matching rule.

In [29]:
rulesdir = base_rulesdir + "/network_connection"

In [30]:
(df_res1, df_stats1) = convert_rulesdir(
    openobserve_prov, rulesdir, table_name, openobserve_backend, stats=True
)

=> Processing rule file net_connection_lnx_crypto_mining_indicators.yml:
 => Reading rule file /home/user/tmp/others/sigma/rules/linux/network_connection/net_connection_lnx_crypto_mining_indicators.yml
=> Processing rule file net_connection_lnx_back_connect_shell_dev.yml:
 => Reading rule file /home/user/tmp/others/sigma/rules/linux/network_connection/net_connection_lnx_back_connect_shell_dev.yml
=> Processing rule file net_connection_lnx_domain_localtonet_tunnel.yml:
 => Reading rule file /home/user/tmp/others/sigma/rules/linux/network_connection/net_connection_lnx_domain_localtonet_tunnel.yml
=> Processing rule file net_connection_lnx_susp_malware_callback_port.yml:
 => Reading rule file /home/user/tmp/others/sigma/rules/linux/network_connection/net_connection_lnx_susp_malware_callback_port.yml
=> Processing rule file net_connection_lnx_ngrok_tunnel.yml:
 => Reading rule file /home/user/tmp/others/sigma/rules/linux/network_connection/net_connection_lnx_ngrok_tunnel.yml


In [31]:
df_res1

Unnamed: 0,rule,empty,errors,hits
0,net_connection_lnx_crypto_mining_indicators.yml,True,False,0
1,net_connection_lnx_back_connect_shell_dev.yml,True,False,0
2,net_connection_lnx_domain_localtonet_tunnel.yml,True,False,0
3,net_connection_lnx_susp_malware_callback_port.yml,True,False,0
4,net_connection_lnx_ngrok_tunnel.yml,True,False,0


In [32]:
(df_res1b, df_stats1b) = convert_rulesdir_cli(
    openobserve_prov, rulesdir, pipeline_f, table_name, stats=True
)

=> Processing rule file net_connection_lnx_crypto_mining_indicators.yml:


Parsing Sigma rules


```SELECT * FROM "kunai" WHERE data_dst_hostname='pool.minexmr.com' OR data_dst_hostname='fr.minexmr.com' OR data_dst_hostname='de.minexmr.com' OR data_dst_hostname='sg.minexmr.com' OR data_dst_hostname='ca.minexmr.com' OR data_dst_hostname='us-west.minexmr.com' OR data_dst_hostname='pool.supportxmr.com' OR data_dst_hostname='mine.c3pool.com' OR data_dst_hostname='xmr-eu1.nanopool.org' OR data_dst_hostname='xmr-eu2.nanopool.org' OR data_dst_hostname='xmr-us-east1.nanopool.org' OR data_dst_hostname='xmr-us-west1.nanopool.org' OR data_dst_hostname='xmr-asia1.nanopool.org' OR data_dst_hostname='xmr-jp1.nanopool.org' OR data_dst_hostname='xmr-au1.nanopool.org' OR data_dst_hostname='xmr.2miners.com' OR data_dst_hostname='xmr.hashcity.org' OR data_dst_hostname='xmr.f2pool.com' OR data_dst_hostname='xmrpool.eu' OR data_dst_hostname='pool.hashvault.pro' OR data_dst_hostname='moneroocean.stream' OR data_dst_hostname='monerocean.stream'
```
=> Processing rule file net_connection_lnx_back_connect

Parsing Sigma rules


```SELECT * FROM "kunai" WHERE data_exe_path LIKE '%/bin/bash' AND (NOT (data_dst_ip='127.0.0.1' OR data_dst_ip='0.0.0.0'))
```
=> Processing rule file net_connection_lnx_domain_localtonet_tunnel.yml:


Parsing Sigma rules


```SELECT * FROM "kunai" WHERE (data_dst_hostname LIKE '%.localto.net' OR data_dst_hostname LIKE '%.localtonet.com') AND Initiated='true'
```
=> Processing rule file net_connection_lnx_susp_malware_callback_port.yml:


Parsing Sigma rules


```SELECT * FROM "kunai" WHERE (Initiated='true' AND (data_dst_port=888 OR data_dst_port=999 OR data_dst_port=2200 OR data_dst_port=2222 OR data_dst_port=4000 OR data_dst_port=4444 OR data_dst_port=6789 OR data_dst_port=8531 OR data_dst_port=50501 OR data_dst_port=51820)) AND (NOT (data_dst_ip LIKE '127.%' OR data_dst_ip LIKE '10.%' OR data_dst_ip LIKE '172.16.%' OR data_dst_ip LIKE '172.17.%' OR data_dst_ip LIKE '172.18.%' OR data_dst_ip LIKE '172.19.%' OR data_dst_ip LIKE '172.20.%' OR data_dst_ip LIKE '172.21.%' OR data_dst_ip LIKE '172.22.%' OR data_dst_ip LIKE '172.23.%' OR data_dst_ip LIKE '172.24.%' OR data_dst_ip LIKE '172.25.%' OR data_dst_ip LIKE '172.26.%' OR data_dst_ip LIKE '172.27.%' OR data_dst_ip LIKE '172.28.%' OR data_dst_ip LIKE '172.29.%' OR data_dst_ip LIKE '172.30.%' OR data_dst_ip LIKE '172.31.%' OR data_dst_ip LIKE '192.168.%' OR data_dst_ip LIKE '169.254.%' OR data_dst_ip='::1/128' OR data_dst_ip LIKE 'fe8%' OR data_dst_ip LIKE 'fe9%' OR data_dst_ip LIKE 'fea%'

Parsing Sigma rules


```SELECT * FROM "kunai" WHERE data_dst_hostname LIKE '%tunnel.us.ngrok.com%' OR data_dst_hostname LIKE '%tunnel.eu.ngrok.com%' OR data_dst_hostname LIKE '%tunnel.ap.ngrok.com%' OR data_dst_hostname LIKE '%tunnel.au.ngrok.com%' OR data_dst_hostname LIKE '%tunnel.sa.ngrok.com%' OR data_dst_hostname LIKE '%tunnel.jp.ngrok.com%' OR data_dst_hostname LIKE '%tunnel.in.ngrok.com%'
```


In [33]:
df_res1b

Unnamed: 0,rule,empty,errors,hits
0,net_connection_lnx_crypto_mining_indicators.yml,True,False,0
1,net_connection_lnx_back_connect_shell_dev.yml,True,False,0
2,net_connection_lnx_domain_localtonet_tunnel.yml,False,True,0
3,net_connection_lnx_susp_malware_callback_port.yml,False,True,0
4,net_connection_lnx_ngrok_tunnel.yml,True,False,0


In [34]:
rulesdir = base_rulesdir + "/file_event"

In [35]:
(df_res3, df_stats3) = convert_rulesdir(
    openobserve_prov, rulesdir, table_name, openobserve_backend, stats=True
)

=> Processing rule file file_event_lnx_susp_shell_script_under_profile_directory.yml:
 => Reading rule file /home/user/tmp/others/sigma/rules/linux/file_event/file_event_lnx_susp_shell_script_under_profile_directory.yml
=> Processing rule file file_event_lnx_wget_download_file_in_tmp_dir.yml:
 => Reading rule file /home/user/tmp/others/sigma/rules/linux/file_event/file_event_lnx_wget_download_file_in_tmp_dir.yml
=> Processing rule file file_event_lnx_triple_cross_rootkit_persistence.yml:
 => Reading rule file /home/user/tmp/others/sigma/rules/linux/file_event/file_event_lnx_triple_cross_rootkit_persistence.yml
=> Processing rule file file_event_lnx_persistence_sudoers_files.yml:
 => Reading rule file /home/user/tmp/others/sigma/rules/linux/file_event/file_event_lnx_persistence_sudoers_files.yml
=> Processing rule file file_event_lnx_doas_conf_creation.yml:
 => Reading rule file /home/user/tmp/others/sigma/rules/linux/file_event/file_event_lnx_doas_conf_creation.yml
=> Processing rule f

In [36]:
df_res3

Unnamed: 0,rule,empty,errors,hits
0,file_event_lnx_susp_shell_script_under_profile...,True,False,0
1,file_event_lnx_wget_download_file_in_tmp_dir.yml,True,False,0
2,file_event_lnx_triple_cross_rootkit_persistenc...,True,False,0
3,file_event_lnx_persistence_sudoers_files.yml,True,False,0
4,file_event_lnx_doas_conf_creation.yml,True,False,0
5,file_event_lnx_triple_cross_rootkit_lock_file.yml,True,False,0
6,file_event_lnx_persistence_cron_files.yml,True,False,0


In [1]:
rulesdir = base_rulesdir + "/process_creation"

In [38]:
(df_res2, df_stats2) = convert_rulesdir(
    openobserve_prov, rulesdir, table_name, openobserve_backend, stats=True
)

=> Processing rule file proc_creation_lnx_malware_gobrat_grep_payload_discovery.yml:
 => Reading rule file /home/user/tmp/others/sigma/rules/linux/process_creation/proc_creation_lnx_malware_gobrat_grep_payload_discovery.yml
=> Processing rule file proc_creation_lnx_touch_susp.yml:
 => Reading rule file /home/user/tmp/others/sigma/rules/linux/process_creation/proc_creation_lnx_touch_susp.yml
=> Processing rule file proc_creation_lnx_file_and_directory_discovery.yml:
 => Reading rule file /home/user/tmp/others/sigma/rules/linux/process_creation/proc_creation_lnx_file_and_directory_discovery.yml
=> Processing rule file proc_creation_lnx_ssm_agent_abuse.yml:
 => Reading rule file /home/user/tmp/others/sigma/rules/linux/process_creation/proc_creation_lnx_ssm_agent_abuse.yml
=> Processing rule file proc_creation_lnx_base64_shebang_cli.yml:
 => Reading rule file /home/user/tmp/others/sigma/rules/linux/process_creation/proc_creation_lnx_base64_shebang_cli.yml
=> Processing rule file proc_creat

In [39]:
df_res2

Unnamed: 0,rule,empty,errors,hits
0,proc_creation_lnx_malware_gobrat_grep_payload_...,True,False,0
1,proc_creation_lnx_touch_susp.yml,True,False,0
2,proc_creation_lnx_file_and_directory_discovery...,False,False,1000
3,proc_creation_lnx_ssm_agent_abuse.yml,True,False,0
4,proc_creation_lnx_base64_shebang_cli.yml,True,False,0
...,...,...,...,...
108,proc_creation_lnx_crontab_removal.yml,True,False,0
109,proc_creation_lnx_omigod_scx_runasprovider_exe...,False,True,0
110,proc_creation_lnx_esxcli_vsan_discovery.yml,True,False,0
111,proc_creation_lnx_remove_package.yml,True,False,0


In [40]:
df_res2[df_res2["empty"] & ~df_res2["errors"]]

Unnamed: 0,rule,empty,errors,hits
0,proc_creation_lnx_malware_gobrat_grep_payload_...,True,False,0
1,proc_creation_lnx_touch_susp.yml,True,False,0
3,proc_creation_lnx_ssm_agent_abuse.yml,True,False,0
4,proc_creation_lnx_base64_shebang_cli.yml,True,False,0
5,proc_creation_lnx_sudo_cve_2019_14287.yml,True,False,0
...,...,...,...,...
107,proc_creation_lnx_security_tools_disabling.yml,True,False,0
108,proc_creation_lnx_crontab_removal.yml,True,False,0
110,proc_creation_lnx_esxcli_vsan_discovery.yml,True,False,0
111,proc_creation_lnx_remove_package.yml,True,False,0


In [41]:
df_stats2

Unnamed: 0,count_,data_command_line,data_exe_path,info_event_name,info_parent_task_name,info_task_name,os_type,rule
0,1009,find -O3 /var/lib/php/sessions/ -ignore_readdi...,/usr/bin/find,execve,sessionclean,find,linux,proc_creation_lnx_file_and_directory_discovery...
1,1009,find /usr/lib/php -mindepth 1 -maxdepth 1 -reg...,/usr/bin/find,execve,phpquery,find,linux,proc_creation_lnx_file_and_directory_discovery...
2,193,find /proc/233/fd -ignore_readdir_race -lname ...,/usr/bin/find,execve,sessionclean,find,linux,proc_creation_lnx_file_and_directory_discovery...
3,188,find /proc/231/fd -ignore_readdir_race -lname ...,/usr/bin/find,execve,sessionclean,find,linux,proc_creation_lnx_file_and_directory_discovery...
4,158,find /etc/apt/trusted.gpg.d -mindepth 1 -maxde...,/usr/bin/find,execve,apt-key,find,linux,proc_creation_lnx_file_and_directory_discovery...
...,...,...,...,...,...,...,...,...
0,304,id -u,/usr/bin/id,execve,occ,id,linux,proc_creation_lnx_local_account.yml
1,58,id -u,/usr/bin/id,execve,91-release-upgr,id,linux,proc_creation_lnx_local_account.yml
2,58,id -u,/usr/bin/id,execve,update-motd-fsc,id,linux,proc_creation_lnx_local_account.yml
3,56,id -u,/usr/bin/id,execve,sh,id,linux,proc_creation_lnx_local_account.yml


## Sigma rule to alerts

In [42]:
res_to_alerts = [
    base_rulesdir + "/network_connection/" + item
    for item in df_res1[df_res1["empty"] & ~df_res1["errors"]]["rule"].tolist()
]
# res_to_alerts += [base_rulesdir + "/file_event/" + item for item in df_res2[df_res2["empty"] & ~df_res2["errors"]]['rule'].tolist()]
res_to_alerts

['/home/user/tmp/others/sigma/rules/linux/network_connection/net_connection_lnx_crypto_mining_indicators.yml',
 '/home/user/tmp/others/sigma/rules/linux/network_connection/net_connection_lnx_back_connect_shell_dev.yml',
 '/home/user/tmp/others/sigma/rules/linux/network_connection/net_connection_lnx_domain_localtonet_tunnel.yml',
 '/home/user/tmp/others/sigma/rules/linux/network_connection/net_connection_lnx_susp_malware_callback_port.yml',
 '/home/user/tmp/others/sigma/rules/linux/network_connection/net_connection_lnx_ngrok_tunnel.yml']

In [43]:
# SELECT * FROM "kunai" WHERE data_exe_path LIKE '%/doas'
# res_to_alerts = [
#     os.environ["HOME"]
#     + "/tmp/sigma/rules/linux/process_creation/proc_creation_lnx_doas_execution.yml"
# ]
# res_to_alerts = [ os.environ["HOME"] + '/tmp/sigma/rules/linux/network_connection/net_connection_lnx_back_connect_shell_dev.yml']

In [1]:
for rule_f in res_to_alerts:
    convert_rule2alert(
        openobserve_prov_direct,
        rule_f,
        pipeline_f,
        "kunai",
        "root@example.com",
        "alert-destination-email",
        openobserve_backend,
        folderid=target_folder_id,
        verbosity=2,
        overwrite=True,
    )

=> Processing rule file /home/user/tmp/others/sigma/rules/linux/network_connection/net_connection_lnx_crypto_mining_indicators.yml:
 => Reading rule file /home/user/tmp/others/sigma/rules/linux/network_connection/net_connection_lnx_crypto_mining_indicators.yml
```(type <class 'str'>: {"name": "Linux_Crypto_Mining_Pool_Connections", "org_id": "default", "stream_type": "logs", "stream_name": "kunai", "is_real_time": false, "query_condition": {"type": "sql", "conditions": [], "sql": "SELECT data_command_line,data_exe_path,info_event_name,info_parent_task_name,info_task_name,info_task_uid FROM \"kunai\" WHERE info_event_name='connect' AND (data_dst_hostname='pool.minexmr.com' OR data_dst_hostname='fr.minexmr.com' OR data_dst_hostname='de.minexmr.com' OR data_dst_hostname='sg.minexmr.com' OR data_dst_hostname='ca.minexmr.com' OR data_dst_hostname='us-west.minexmr.com' OR data_dst_hostname='pool.supportxmr.com' OR data_dst_hostname='mine.c3pool.com' OR data_dst_hostname='xmr-eu1.nanopool.org

## References

- [OpenObserve REST API](https://openobserve.ai/docs/api/)
- [Unofficial python-openobserve module](https://github.com/JustinGuese/python-openobserve) with some customizations and enhancements (>=0.3)
- Openobserve github discussions: https://github.com/openobserve/openobserve/discussions/