In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
from elasticquery import ElasticQuery
from elasticsearch_dsl import Q
import polars as pl
import logging
import json
import ast
import asyncio

pl.Config.set_tbl_rows(100)
pl.Config.set_fmt_str_lengths(1000)

eq = ElasticQuery(host="10.10.10.20")

In [None]:
query = Q('bool', must=[
        Q('match', **{'event.dataset': 'esf'}),
        Q('match', **{'event.action': 'exec'}),
    ])

df = eq.search(query, start_date="2024-11-21T23:56:48Z", end_date="2024-11-22T12:04:33Z")
# if using the demo data, do the following instead
# df = pl.read_csv("demo_data.csv")

In [None]:
df.head()

In [None]:
# save before marking
df.write_csv("demo_data.csv")

In [None]:
conditions = [
    (df['host.name'] == 'scr-office-imac.local') & (df['process.group_leader.pid'].is_in([11138, 11181, 12829, 11298, 10957, 12826])),
    (df['host.name'] == 'scr-it-mac.local') & (df['process.group_leader.pid'].is_in([12951, 12520, 12353, 14703, 12658, 12532, 14705]))
]

df = df.with_columns(
    pl.when(conditions[0] | conditions[1])
      .then(1)
      .otherwise(0)
      .alias("malicious")
)

In [None]:
query = Q('bool', must=[
        Q('match', **{'event.code': '8000'})
])
user_df = eq.search(query, start_date="2024-11-21", end_date="2024-11-22")
user_df = user_df.unique(subset=["user.name"])
user_df_filtered = user_df.select(["user.name", "user.department", "user.title"])

In [None]:
merged_df = user_df_filtered.join(df, on="user.name", how="inner")

In [None]:
from gpt import GPT

gpt = GPT()

async def process_strings_and_analyze_concurrently(df):
    async def process_single_group(group_key, group_df):
        try:
            # Extract the grouping columns
            host_name, group_leader_pid = group_key

            # Collect relevant fields from the group
            collected_data = group_df.sort("@timestamp").select(
                ["process.pid", "process.parent.pid", "process.command_line", "process.parent.name"]
            ).to_pandas().to_markdown()
            
            user_message = f"""
                    Beginning of commands for analysis:
                    User name: {group_df["user.name"].unique()[0]}
                    User department: {group_df["user.department"].unique()[0]}
                    User title: {group_df["user.title"].unique()[0]}
                    OS Type: {group_df["host.os.family"][0]}
                    Hostname: {group_df["host.name"][0]}
                    Processes: {collected_data}
                    """

            # Check the length of the message
            max_length = 1048576  # Maximum allowed length for the message
            if len(user_message) > max_length:
                print(f"Group {group_key} message exceeds max length. Splitting into smaller chunks.")

                # Split the DataFrame into smaller chunks
                chunk_size = len(group_df) // (len(user_message) // max_length + 1)
                chunks = [group_df[i:i + chunk_size] for i in range(0, len(group_df), chunk_size)]

                # Process each chunk separately and combine results
                chunk_results = []
                for chunk in chunks:
                    chunk_data = chunk.sort("@timestamp").select(
                        ["process.pid", "process.parent.pid", "process.command_line", "process.parent.name"]
                    ).to_pandas().to_markdown()
                    
                    chunk_message = f"""
                            Beginning of commands for analysis (chunked):
                            User name: {chunk["user.name"].unique()[0]}
                            User department: {chunk["user.department"].unique()[0]}
                            User title: {chunk["user.title"].unique()[0]}
                            OS Type: {chunk["host.os.family"][0]}
                            Hostname: {chunk["host.name"][0]}
                            Processes: {chunk_data}
                            """
                    result = await gpt.analyze(chunk_message)
                    chunk_results.append(result)

                # Combine results from all chunks
                combined_result = {
                    "analysis": " ".join(chunk['analysis'] for chunk in chunk_results),
                    "suspicious_score": max(chunk['suspicious_score'] for chunk in chunk_results),
                }
                any_suspicious = any(chunk['verdict'] == 'suspicious' for chunk in chunk_results)
                combined_result['mitre_tag'] = []

                for chunk in chunk_results:
                    mitre_tags = chunk.get('mitre_tag', [])
                    if isinstance(mitre_tags, str):
                        mitre_tags = ast.literal_eval(mitre_tags)
                    for tag in mitre_tags:
                        if tag not in combined_result['mitre_tag']:
                            combined_result['mitre_tag'].append(tag)

                combined_result['verdict'] = 'suspicious' if any_suspicious else 'benign'
            else:
                # If within allowed length, process normally
                result = await gpt.analyze(user_message)
                combined_result = result
                if isinstance(combined_result['mitre_tag'], str):
                    combined_result['mitre_tag'] = ast.literal_eval(combined_result['mitre_tag'])

            # Include grouping keys in the result
            combined_result['host.name'] = host_name
            combined_result['process.group_leader.pid'] = group_leader_pid

            return combined_result

        except Exception as e:
            print(f"Error processing group {group_key}: {e}")
            return {
                "host.name": host_name,
                "process.group_leader.pid": group_leader_pid,
                "error": str(e),
            }

    # Group by `host.name` and `process.group_leader.pid`
    grouped = df.group_by(["host.name", "process.group_leader.pid"])

    # Create tasks for each group
    tasks = [
        process_single_group(group_key, group_df)
        for group_key, group_df in grouped
    ]

    # Process all groups asynchronously
    results = await asyncio.gather(*tasks)

    return pl.DataFrame(results)

In [None]:
results_df = await process_strings_and_analyze_concurrently(merged_df)

In [None]:
results_df = results_df.cast({"process.group_leader.pid": pl.UInt64 })

In [None]:
results_df.sample()

In [None]:
final_results_df = merged_df.join(results_df,
                                 on=["host.name", "process.group_leader.pid"],
                                 )

In [None]:
final_results_df = final_results_df.with_columns(
    (pl.col("malicious").cast(pl.Int64)).alias("malicious_binary")
)

final_results_df = final_results_df.with_columns(
    pl.when(pl.col("verdict") == "suspicious")
    .then(1)
    .otherwise(0)
    .alias("verdict_binary")
)

In [None]:
final_results_df.head()

In [None]:
import seaborn as sns
import numpy as np
import matplotlib.pyplot as plt
import polars as pl


grouped_by_host = (
    final_results_df.group_by(["host.name", "process.group_leader.pid"])
    .agg(pl.col("verdict_binary").first(), pl.col("malicious_binary").first())
)

grouped_data = (
    grouped_by_host.group_by(["verdict_binary", "malicious_binary"])
    .agg(pl.len().alias("count"))
    .join(pl.DataFrame({
        "verdict_binary": [0, 0, 1, 1],
        "malicious_binary": [0, 1, 0, 1]
    }), on=["verdict_binary", "malicious_binary"], how="outer")
    .fill_null(0)
)

# Construct Confusion Matrix
confusion_matrix = np.zeros((2, 2), dtype=int)

for row in grouped_data.iter_rows(named=True):
    verdict = int(row["verdict_binary_right"])
    malicious = int(row["malicious_binary_right"])
    count = int(row["count"])
    confusion_matrix[verdict, malicious] = count

sns.heatmap(confusion_matrix, annot=True, fmt="d", cmap="Blues")
plt.xlabel("Actual (Malicious Binary)")
plt.ylabel("Predicted (Verdict Binary)")
plt.title("Confusion Matrix")
plt.xticks([0.5, 1.5], ["0", "1"], rotation=0)
plt.yticks([0.5, 1.5], ["0", "1"], rotation=0)
plt.show()

In [None]:
grouped_by_host.filter((pl.col("verdict_binary") == 0) & (pl.col("malicious_binary") == 1))

In [None]:
final_results_df.filter(pl.col("process.group_leader.pid").is_in([12949, 11169, 11176, 11295, 12828, 14703])).select(["verdict_binary", "malicious_binary", "suspicious_score",  "process.command_line", "process.group_leader.pid", "host.name", "analysis", "user.department", "user.title"])
    

In [None]:
with pl.Config(
    tbl_formatting="MARKDOWN",
    tbl_hide_column_data_types=True,
    tbl_hide_dataframe_shape=True,
    ):
    pl.Config.set_tbl_width_chars(12000)  
    print(final_results_df.filter(pl.col("verdict_binary") == 0)['process.command_line'].value_counts().sort(by="count").tail())
