# LLM Enhanced API testing  
Perform security testing based on an OpenAPI spec


## 1 - Environment

In [9]:
'''
Set up the environment
'''

from datetime import datetime
import os

def create_output_folder():
    """
    Create a timestamped folder for the output files.
    """
    # TODO - need to rethink this
    # long version
    # timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
    # this will create a daily folder
    timestamp = datetime.now().strftime("%Y%m%d")
    folder_name = f"vampi-{timestamp}/"
    output_folder = os.path.join(output_base_path, folder_name)
    os.makedirs(output_folder, exist_ok=True)
    return output_folder

## Paths
# API spec path
api_spec_path = "/m2-data/jupyterNotebooks/api/spec/openapi3Vampi.yml"
# base output folder path
output_base_path = "/m2-data/jupyterNotebooks/api/test_runs/"
# create a new folder for this run
output_folder = create_output_folder()

#models
model_name = "qwen2.5-coder:7b"
# model_name = "tinyllama:latest"

# test data setup
test_email_domain = "test.com"
base_test_username = "adb123"

# print the paths
print("API specification file : ", api_spec_path)
print("output path : ", output_folder)



API specification file :  /m2-data/jupyterNotebooks/api/spec/openapi3Vampi.yml
output path :  /m2-data/jupyterNotebooks/api/test_runs/vampi-20241231/


## 2 - Validate and parse the spec

In [10]:
import os
import json
import yaml
from datetime import datetime
from openapi_spec_validator import validate_spec


def validate_openapi_spec(file_path, output_folder):
    """
    Validates the OpenAPI specification and attempts partial parsing on failure.

    Args:
        file_path (str): Path to the OpenAPI specification file.
        output_folder (str): Path to the output folder for logging.

    Returns:
        dict: The parsed OpenAPI specification (best effort), or an empty dict if parsing completely fails.
    """
    try:
        with open(file_path, 'r') as file:
            if file_path.endswith('.json'):
                spec = json.load(file)
            elif file_path.endswith(('.yaml', '.yml')):
                spec = yaml.safe_load(file)
            else:
                raise ValueError("Unsupported file format. Only JSON and YAML are allowed.")

        validate_spec(spec)
        print("OpenAPI spec validation successful.")
        return spec

    except (json.JSONDecodeError, yaml.YAMLError) as e:
        error_message = f"Failed to parse the specification file: {e}"
    except Exception as e:  # Generic handling for validation errors
        error_message = f"Validation error: {e}"

    # Log validation errors
    log_file = os.path.join(output_folder, "validation_errors.log")
    with open(log_file, 'w') as log:
        log.write(error_message)
    print(f"Validation failed. Errors logged to: {log_file}")

    # Attempt to return partial spec if possible
    try:
        with open(file_path, 'r') as file:
            if file_path.endswith('.json'):
                return json.load(file)
            elif file_path.endswith(('.yaml', '.yml')):
                return yaml.safe_load(file)
    except Exception as e:
        print(f"Unable to perform partial parsing: {e}")
        return {}

def parse_spec_details(spec):
    """
    Parses the OpenAPI specification to extract servers, endpoints, and additional metadata.

    Args:
        spec (dict): The OpenAPI specification.

    Returns:
        dict: A dictionary containing servers, endpoints, and useful metadata.
    """
    endpoints = []
    servers = spec.get('servers', [])
    info = spec.get('info', {})
    tags = spec.get('tags', [])
    security = spec.get('security', [])
    external_docs = spec.get('externalDocs', {})

    for path, methods in spec.get('paths', {}).items():
        for method, details in methods.items():
            endpoint_info = {
                "path": path,
                "method": method.upper(),
                "parameters": details.get("parameters", []),
                "requestBody": details.get("requestBody", {}).get("content", {}),
                "responses": details.get("responses", {})
            }
            endpoints.append(endpoint_info)

    return {
        "servers": servers,
        "info": info,
        "tags": tags,
        "security": security,
        "externalDocs": external_docs,
        "endpoints": endpoints
    }

def main():
    """
    Main function to validate and parse the OpenAPI spec, saving output to JSON files.
    """

    # Validate the spec
    spec = validate_openapi_spec(api_spec_path, output_folder)
    if not spec:
        print("Failed to validate or parse the OpenAPI spec. Exiting.")
        return

    # Parse spec details
    parsed_data = parse_spec_details(spec)

    # Save parsed data to a JSON file
    output_file = os.path.join(output_folder, "parsed_spec.json")
    with open(output_file, 'w') as file:
        json.dump(parsed_data, file, indent=4)

    print(f"Parsed specification details saved to: {output_file}")

if __name__ == "__main__":
    main()


OpenAPI spec validation successful.
Parsed specification details saved to: /m2-data/jupyterNotebooks/api/test_runs/vampi-20241231/parsed_spec.json


## Health Checks - LLM Assisted

In [5]:
import json
import requests

# File paths
# base_path = "/m2-data/jupyterNotebooks/api/test_runs/vampi-1/"
parsed_spec_path = f"{output_folder}parsed_spec.json"
health_endpoints_path = f"{output_folder}endpoints_health.json"
model_url = "http://localhost:11434/api/generate"  # Ollama API endpoint
# model_name = "qwen2.5-coder:7b"

def find_candidate_endpoints(parsed_spec):
    """
    Identify candidate heartbeat endpoints based on the criteria.

    Args:
        parsed_spec (dict): The parsed OpenAPI spec JSON content.

    Returns:
        list: A list of candidate endpoints with 200 OK responses and no parameters.
    """
    candidates = []
    endpoints = parsed_spec.get("endpoints", [])
    
    for endpoint in endpoints:
        # Check for 200 OK response
        responses = endpoint.get("responses", {})
        if "200" in responses:
            # Check for no parameters or requestBody
            if not endpoint.get("parameters") and not endpoint.get("requestBody"):
                candidates.append(endpoint)
    
    return candidates

def query_model_for_ranking(candidates, model_name="qwen2.5-coder:7b"):
    """
    Query the model to rank or refine candidate heartbeat endpoints.

    Args:
        candidates (list): A list of candidate endpoints.
        model_name (str): The name of the model to query.

    Returns:
        list: Ranked or refined list of heartbeat endpoints.
    """
    prompt = (
        "You are analyzing API endpoints. Based on the following candidates, rank the endpoints "
        "most likely to be a heartbeat endpoint, which typically returns 200 OK with no parameters. "
        "Return the top 3 most probable candidates as a JSON array.\n\n"
        f"Candidates:\n{json.dumps(candidates, indent=2)}\n\n"
    )
    payload = {
        "model": model_name,
        "prompt": prompt
    }

    try:
        # Stream the response for incremental output
        response = requests.post(model_url, json=payload, stream=True)
        response.raise_for_status()

        # Process the streamed response incrementally
        result = ""
        for chunk in response.iter_lines():
            if chunk:
                try:
                    # Decode each line and append to result
                    result += chunk.decode('utf-8')
                except Exception as decode_error:
                    print(f"Error decoding chunk: {decode_error}")

        # Attempt to extract JSON from the response
        try:
            # Locate and parse the JSON part of the response
            start_idx = result.find("[")  # Assuming JSON array starts with [
            end_idx = result.rfind("]") + 1  # Assuming JSON array ends with ]
            if start_idx != -1 and end_idx != -1:
                ranked_candidates = json.loads(result[start_idx:end_idx])
            else:
                raise ValueError("Could not locate JSON array in model response.")
            return ranked_candidates
        except json.JSONDecodeError as e:
            print(f"JSON parsing error: {e}")
            return candidates[:3]  # Fallback to the first 3 candidates

    except Exception as e:
        print(f"Error querying model: {e}")
        return candidates[:3]  # Fallback to the first 3 candidates

def save_health_endpoints(health_endpoints, file_path):
    """
    Save the selected health endpoints to a JSON file.

    Args:
        health_endpoints (list): The list of health endpoints.
        file_path (str): Path to save the JSON file.
    """
    try:
        with open(file_path, "w") as file:
            json.dump(health_endpoints, file, indent=4)
        print(f"Health endpoints saved to: {file_path}")
    except Exception as e:
        print(f"Failed to save health endpoints: {e}")

def main():
    """
    Main function to find, rank, and save heartbeat endpoint candidates.
    """
    # Load the parsed OpenAPI spec JSON file
    try:
        with open(parsed_spec_path, "r") as file:
            parsed_spec = json.load(file)
    except Exception as e:
        print(f"Failed to load parsed spec: {e}")
        return

    # Find candidate endpoints
    candidates = find_candidate_endpoints(parsed_spec)

    if not candidates:
        print("No candidate heartbeat endpoints found.")
        return

    # Query the model for refined ranking
    ranked_candidates = query_model_for_ranking(candidates)

    # Save the top 3 candidates to a new JSON file
    save_health_endpoints(ranked_candidates[:3], health_endpoints_path)

    # Print the top 3 candidates
    print("Top 3 Heartbeat Endpoint Candidates:")
    for i, candidate in enumerate(ranked_candidates[:3], start=1):
        print(f"{i}. Path: {candidate.get('path')}, Method: {candidate.get('method')}")

if __name__ == "__main__":
    main()


JSON parsing error: Expecting value: line 1 column 2 (char 1)
Health endpoints saved to: /m2-data/jupyterNotebooks/api/test_runs/vampi-20241231/endpoints_health.json
Top 3 Heartbeat Endpoint Candidates:
1. Path: /createdb, Method: GET
2. Path: /, Method: GET
3. Path: /users/v1, Method: GET


## Test Health Check Endpoints
  - Appends endpoints_health.json with a list of the endpoints that were up

Up endpoints successfully appended to: /m2-data/jupyterNotebooks/api/test_runs/vampi-20241231/endpoints_health.json
Health Check Results:
/createdb: Unexpected status code: 500
/: Application is UP
/users/v1: Application is UP


## Determine User Registration Endpoints
Determine API endpoints for user registration

In [35]:
import json
import requests

def extract_endpoints_from_parsed_spec(parsed_spec_path):
    """
    Extract a list of endpoints from the parsed OpenAPI spec.

    Args:
        parsed_spec_path (str): Path to the parsed OpenAPI spec file.

    Returns:
        list: A list of endpoint paths.
    """
    try:
        with open(parsed_spec_path, "r") as file:
            parsed_spec = json.load(file)
            # Adjust to handle "path" as a key
            paths = [entry["path"] for entry in parsed_spec.get("endpoints", []) if "path" in entry]
            return paths
    except Exception as e:
        print(f"Error loading or parsing the spec file: {e}")
        return []


def query_llm_for_registration_endpoints(endpoints):
    """
    Query the LLM to identify registration-related endpoints.

    Args:
        endpoints (list): A list of endpoint paths.

    Returns:
        list: A list of LLM-suggested registration-related endpoints.
    """
    llm_url = "http://localhost:11434/api/generate"

    # Limit the number of endpoints to avoid overloading the LLM
    max_endpoints = 10
    truncated_endpoints = endpoints[:max_endpoints]

    # Construct the prompt
    prompt = (
        "Analyze the following list of API endpoints and identify any that are related to "
        "user or account registration. Provide any relevant observations and reasoning.\n\n"
        "Endpoints:\n" + "\n".join(truncated_endpoints)
    )

    payload = {
        "model": model_name,
        "prompt": prompt
    }

    # Debugging: Print the payload being sent
    print("Payload being sent to LLM:", json.dumps(payload, indent=4))

    try:
        response = requests.post(llm_url, json=payload)
        response.raise_for_status()
        suggestions = response.json()
        return suggestions
    except requests.exceptions.RequestException as e:
        print(f"HTTP error querying LLM: {e}")
    except json.JSONDecodeError as e:
        print(f"Error decoding LLM response: {e}")
    return []


    try:
        response = requests.post(llm_url, json=payload)
        response.raise_for_status()
        suggestions = response.json()
        return suggestions
    except Exception as e:
        print(f"Error querying LLM for registration-related endpoints: {e}")
        return []

def main():
    """
    Main function to extract endpoints and query LLM.
    """
    parsed_spec_path = f"{output_folder}parsed_spec.json"
    endpoints = extract_endpoints_from_parsed_spec(parsed_spec_path)

    if not endpoints:
        print("No endpoints found in the parsed spec.")
        return

    print("Extracted endpoints:", endpoints)

    suggestions = query_llm_for_registration_endpoints(endpoints)

    if suggestions:
        print("LLM-suggested registration-related endpoints:", suggestions)
    else:
        print("No registration-related endpoints identified by LLM.")

if __name__ == "__main__":
    main()


Extracted endpoints: ['/createdb', '/', '/users/v1', '/users/v1/_debug', '/users/v1/register', '/users/v1/login', '/me', '/users/v1/{username}', '/users/v1/{username}', '/users/v1/{username}/email', '/users/v1/{username}/password', '/books/v1', '/books/v1', '/books/v1/{book_title}']
Payload being sent to LLM: {
    "model": "qwen2.5-coder:7b",
    "prompt": "Analyze the following list of API endpoints and identify any that are related to user or account registration. Provide any relevant observations and reasoning.\n\nEndpoints:\n/createdb\n/\n/users/v1\n/users/v1/_debug\n/users/v1/register\n/users/v1/login\n/me\n/users/v1/{username}\n/users/v1/{username}\n/users/v1/{username}/email"
}


KeyboardInterrupt: 