In [1]:
import os
from openai import AzureOpenAI
import argparse
import sys
import json
from azure.search.documents import SearchClient
from azure.core.credentials import AzureKeyCredential

#Load environment variables
from dotenv import load_dotenv
load_dotenv()

True

In [2]:
#instatiate Azure OpenAI client
client = AzureOpenAI(
  azure_endpoint = os.getenv("AZURE_OPENAI_ENDPOINT"), 
  api_key=os.getenv("AZURE_OPENAI_API_KEY"),  
  api_version=os.getenv("AZURE_OPENAI_API_VERSION")
)

model = os.getenv("AZURE_OPENAI_DEPLOYMENT_NAME")


In [3]:
#Test connection
response = client.chat.completions.create(
    model=model, # model = "deployment_name".
    messages=[
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": "Does Azure OpenAI support customer managed keys?"},
        {"role": "assistant", "content": "Yes, customer managed keys are supported by Azure OpenAI."},
        {"role": "user", "content": "Do other Azure AI services support this too?"}
    ]
)

print(response.choices[0].message.content)

Yes, several other Azure AI services support **Customer Managed Keys (CMKs)**, which allow you to have greater control over the encryption keys used to protect your data. CMKs are typically stored and managed in **Azure Key Vault** or **Azure Managed HSM (Hardware Security Module)**. Below are some notable Azure AI services that also support CMKs:

### 1. **Azure Cognitive Search**
   - Azure Cognitive Search supports CMKs, allowing you to encrypt your indexing and storage data using keys you control in Azure Key Vault.
   - With CMKs, you can revoke or rotate keys, helping you maintain compliance with your organization's security policies.

### 2. **Azure Machine Learning (AML)**
   - Azure Machine Learning supports customer-managed keys to encrypt data used in experiments, datasets, models, and other workspace resources.
   - Your keys are stored in Azure Key Vault, and integration with AML ensures your data is protected at rest.

### 3. **Azure Cognitive Services**
   - Many Azure C

In [4]:
# Configure environment variables for Azure Cognitive Search
search_service_endpoint = os.getenv("AZURE_AI_SEARCH_ENDPOINT")
search_index_name = os.getenv("AZURE_AI_SEARCH_INDEX")
search_api_key = os.getenv("AZURE_AI_SEARCH_API_KEY")
credential = AzureKeyCredential("AZURE_AI_SEARCH_API_KEY")

In [5]:
# Initialize the Azure Cognitive Search client
search_client = SearchClient(
    endpoint=search_service_endpoint,
    index_name=search_index_name,
    credential=AzureKeyCredential(search_api_key)
)

In [8]:
system_message = """Assistant supports users to determine whether a firewall rule is risky or not.
You have access to an Azure Cognitive Search index with a list of ports and IP ranges that are classified as risky. You can search for entries by "ip_ranges"
You are designed to be an interactive assistant, so you can ask users clarifying questions to help them determine whether the firewall rule is risky. It's better to give more detailed queries to the search index rather than vague one.
"""

messages = [{"role": "system", "content": system_message},
            {"role": "user", "content": "Help me find out whether this port is risky or not: 20"}]

#"role": "user", "content": "Help me find out whether this firewall rule is risky or not: 193.163.125.240"

tools = [
    {
        "type": "function",
        "function": {
            "name": "query_risky_ip_ranges",
            "description": "Retrieve risky IP ranges from Azure AI Search index",
            "parameters": {
                "type": "object",
                "properties": {
                    "ip_range": {
                        "type": "string",
                        "description": "The ip-range from the firewall rule request to search for in the risky IP range index",
                    },
                },
                "required": ["ip_range"],
            },
        },
    },
    {
        "type": "function",
        "function": {
            "name": "query_risky_ports",
            "description": "Retrieve risky ports from Azure AI Search index",
            "parameters": {
                "type": "object",
                "properties": {
                    "port": {
                        "type": "string",
                        "description": "The port number from the firewall rule request to search for in the risky ports index",
                    },
                },
                "required": ["port"],
            },
        },
    },
]

response = client.chat.completions.create(
    model=model,
    messages=messages,
    tools=tools,
    temperature=0.2,
    tool_choice="auto",
)

print(response.choices[0].message)

ChatCompletionMessage(content=None, refusal=None, role='assistant', annotations=None, audio=None, function_call=None, tool_calls=[ChatCompletionMessageToolCall(id='call_N9MwYrba11TtGdmIdHXBmLvW', function=Function(arguments='{"port":"20"}', name='query_risky_ports'), type='function')])


In [9]:
def query_risky_ip_ranges(ip_range):
    results = search_client.search(
        search_text=ip_range,  # Perform a keyword search using the provided IP range
        query_type="full",  # Use simple query type for keyword search
        search_fields=["ip_ranges"],  # Field to search for the IP range
        select=["risk_id", "description", "cvss_score", "ports", "protocols", "ip_ranges", "last_updated"],  # Select relevant fields
        top=1  # Limit to top 1 results
    )

    top_result = next(iter(results), None)
    if top_result:
        return json.dumps({
            "risk_id": top_result["risk_id"],
            "description": top_result["description"],
            "cvss_score": top_result["cvss_score"],
            "ports": top_result["ports"],
            "protocols": top_result["protocols"],
            "ip_ranges": top_result["ip_ranges"],
            "last_updated": top_result["last_updated"]
        }, indent=2)

    return "No risky IP range found for this input."

In [10]:
def query_risky_ports(port):
    results = search_client.search(
        search_text=str(port),  # Perform a keyword search using the provided port
        query_type="full",  # Use simple query type for keyword search
        search_fields=["ports"],  # Field to search for the port
        select=["risk_id", "description", "cvss_score", "ports", "protocols", "ip_ranges", "last_updated"],  # Select relevant fields
        top=1  # Limit to top 1 results
    )

    top_result = next(iter(results), None)
    if top_result:
        return json.dumps({
            "risk_id": top_result["risk_id"],
            "description": top_result["description"],
            "cvss_score": top_result["cvss_score"],
            "ports": top_result["ports"],
            "protocols": top_result["protocols"],
            "ip_ranges": top_result["ip_ranges"],
            "last_updated": top_result["last_updated"]
        }, indent=2)

    return "No risky port found for this input."

In [None]:
test_ports_risky = query_risky_ports("20")
print(test_ports_risky)


{
  "risk_id": "R002",
  "description": "FTP Data- Unencrypted file transfer",
  "cvss_score": "5.0",
  "ports": "20",
  "protocols": "tcp",
  "ip_ranges": "0.0.0.0/0",
  "last_updated": "2025-03-03T00:00:00Z"
}


In [None]:
test_ipranges_risky = query_risky_ip_ranges("193.163.125.241")
print(test_ipranges_risky)

{
  "risk_id": "R401",
  "description": "Reported abusive IP",
  "cvss_score": "10.0",
  "ports": null,
  "protocols": null,
  "ip_ranges": "193.163.125.241",
  "last_updated": "2025-03-03T00:00:00Z"
}


In [17]:
test_ipranges_unrisky = query_risky_ip_ranges("193.163.125.245")
print(test_ipranges_unrisky)

No risky IP range found for this input.


In [16]:
test_ports_unrisky = query_risky_ports("145")
print(test_ports_unrisky)

No risky port found for this input.


In [91]:
def run_conversation(messages, tools, available_functions):

    # Step 1: send the conversation and available functions to GPT
    response = client.chat.completions.create(
        model=model,
        messages=messages,
        tools=tools,
        tool_choice="auto",
        temperature=0.2,
    )
    response_message = response.choices[0].message

    # Step 2: check if the model wants to call a function
    if response_message.tool_calls:
        print("Recommended Function call:")
        print(response_message.tool_calls[0])
        print()

        # Step 3: call the function
        # Note: the JSON response may not always be valid; be sure to handle errors
        function_name = response_message.tool_calls[0].function.name

        # verify function exists
        if function_name not in available_functions:
            return "Function " + function_name + " does not exist"
        function_to_call = available_functions[function_name]

        try:
            function_args = json.loads(response_message.tool_calls[0].function.arguments)
        except json.JSONDecodeError as e:
            return f"Failed to parse function arguments: {e}"

        try:
            function_response = function_to_call(**function_args)
        except Exception as e:
            return f"Error while calling '{function_name}': {e}"

        #function_args = json.loads(response_message.tool_calls[0].function.arguments)
        #function_response = function_to_call(**function_args)

        print("Output of function call:")
        print(function_response)
        print()

        # Step 4: send the info on the function call and function response to the model

        # adding assistant response to messages
        messages.append(
            {
                "role": response_message.role,
                "function_call": {
                    "name": response_message.tool_calls[0].function.name,
                    "arguments": response_message.tool_calls[0].function.arguments,
                },
                "content": None,
            }
        )

        # adding function response to messages
        messages.append(
            {
                "role": "function",
                "name": function_name,
                "content": function_response,
            }
        )  # extend conversation with function response

        print("Messages in second request:")
        for message in messages:
            print(message)
        print()

        second_response = client.chat.completions.create(
            messages=messages,
            model=model,
        )  # get a new response from GPT where it can see the function response

        return second_response
    else:
        return response

In [94]:
system_message = """Assistant is a large language model designed to help determine whether a firewall rule is risky or not.
You have access to an Azure Cognitive Search index with a list of ports and IP ranges that are classified as risky. You can search for entries by "ip_range" and "port". Perform consecutive queries, first checking for the IP range and then for the port"
You are designed to be an interactive assistant, so you can ask users clarifying questions to help them determine whether the firewall rule is risky. It's better to give more detailed queries to the search index rather than vague one.
If cvss_score >= 7.0, classify the IP/rule as High Risk
If cvss_score between 4.0 and 6.9, classify as Medium Risk
If cvss_score < 4.0, classify as Low Risk
"""

messages = [
    {"role": "system", "content": system_message},
    {
        "role": "user",
        "content": "Help me find out whether this firewall rule is risky or not: ip-range is 193.163.125.245, port is 20, protocol is TCP",
    },
]

available_functions = {
    "query_risky_ip_ranges": query_risky_ip_ranges,
    "query_risky_ports": query_risky_ports,
}

result = run_conversation(messages, tools, available_functions)

print("Final response:")
if isinstance(result, str):
    print(result)  # It's an error message or string content
else:
    print(result.choices[0].message.content)  # It's a ChatCompletionMessage object
#print(result.choices[0].message.content)

Recommended Function call:
ChatCompletionMessageToolCall(id='call_RpbXPAYvprlK7K1XQnKD4rID', function=Function(arguments='{"ip_range": "193.163.125.245"}', name='query_risky_ip_ranges'), type='function')

Output of function call:
No risky IP range found for this input.

Messages in second request:
{'role': 'system', 'content': 'Assistant is a large language model designed to help determine whether a firewall rule is risky or not.\nYou have access to an Azure Cognitive Search index with a list of ports and IP ranges that are classified as risky. You can search for entries by "ip_range" and "port". Perform consecutive queries, first checking for the IP range and then for the port"\nYou are designed to be an interactive assistant, so you can ask users clarifying questions to help them determine whether the firewall rule is risky. It\'s better to give more detailed queries to the search index rather than vague one.\nIf cvss_score >= 7.0, classify the IP/rule as High Risk\nIf cvss_score bet