# OAS->requests

In [86]:
import requests
from openapi_schema_pydantic import OpenAPI
import json
import yaml

In [87]:
!wget https://petstore3.swagger.io/api/v3/openapi.json

--2024-11-05 20:41:53--  https://petstore3.swagger.io/api/v3/openapi.json
Resolving petstore3.swagger.io (petstore3.swagger.io)... 52.86.77.166, 54.226.28.221
Connecting to petstore3.swagger.io (petstore3.swagger.io)|52.86.77.166|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [application/json]
Saving to: ‘openapi.json.5’

openapi.json.5          [ <=>                ]  15.71K  --.-KB/s    in 0s      

2024-11-05 20:41:54 (196 MB/s) - ‘openapi.json.5’ saved [16090]



In [88]:
BASE_URL = "https://petstore3.swagger.io/api/v3"

# Load and parse the OAS file
with open("openapi.json.3") as f:
    spec_dict = json.load(f)
oas = OpenAPI.parse_obj(spec_dict)

In [89]:
def get_request_body_from_schema(operation, oas):
    """Generates a default request body from the schema if `data` is not provided."""
    requestBody = operation.get("requestBody", {})
    if not requestBody: return None
    content = requestBody.get("content", {})
    schema_ref = content.get("application/json", {}).get("schema", {}).get("$ref")
    if schema_ref:
        schema_name = schema_ref.split("/")[-1]
        schema = oas.components.schemas.get(schema_name)
        if schema:
            return {"title": "Example Task", "completed": False}  # Example structure
    return None

In [90]:
def generate_request_by_operation_id(oas, operation_id, parameters=None, data=None):
    # Search for the operation by `operationId`
    for path, path_item in oas.paths.items():
        for method, operation in path_item.dict().items():
            if operation and operation.get("operationId") == operation_id:
                url = f"{BASE_URL}{path}"
                headers = {"Content-Type": "application/json"}
                
                # Determine request body: use `data` if provided; otherwise, generate from schema
                request_body = data or get_request_body_from_schema(operation, oas)

                # Make the HTTP request dynamically
                response = requests.request(method.upper(), url, params=parameters, json=request_body, headers=headers)
                print(url, request_body)
                return response.json() if response.status_code in (200, 201) else response.status_code
    print(f"OperationId '{operation_id}' not found.")
    return None

In [91]:
ans = generate_request_by_operation_id(oas, "findPetsByStatus", parameters={"status":"sold"})
ans[:3]

https://petstore3.swagger.io/api/v3/pet/findByStatus None


[{'id': 4144792027,
  'category': {'id': 1, 'name': 'Iron_Man'},
  'name': 'Captain_Marvel',
  'photoUrls': [],
  'tags': [],
  'status': 'sold'},
 {'id': 2651827784,
  'category': {'id': 1, 'name': 'Iron_Man'},
  'name': 'Captain_Marvel',
  'photoUrls': [],
  'tags': [],
  'status': 'sold'},
 {'id': 8844051868,
  'category': {'id': 1, 'name': 'Iron_Man'},
  'name': 'Captain_Marvel',
  'photoUrls': [],
  'tags': [],
  'status': 'sold'}]