In [2]:
import json
import re

In [3]:
s3 = json.load(open("../s3.json"))

In [50]:
def tabs(n):
    return "  " * n

def filter_operations(shapes):
   return { k:v for k, v in shapes.items() if v["type"] == "operation"}

def input_member_for_operation(operation_id):
   return operation_id + "Request"

# Clean a doc string for OpenAI's OpenAPI format.
# Notably, remove HTML tags, newlines and tabs, and truncate to 200 chars since that's the limit.
# https://platform.openai.com/docs/plugins/getting-started/openapi-definition
def clean_docstring(string): 
   TAGS_REGEX = re.compile('<.*?>') 
   WHITESPACE_REGEX = re.compile('\s+')
   string = re.sub(TAGS_REGEX, '', string)  # Remove HTML tags in documentation like <p>, <a>, etc.
   string = re.sub(WHITESPACE_REGEX, ' ', string)  # Replace all more-than-one spaces with just one space
   return string.replace("\n", "").replace("\t", "")[:200]  # Remove newlines, tabs, and truncate

def convert_member_type(member_type):
  if member_type in ["blob", "timestamp", "enum"]:
    return "string"
  elif member_type == "long":
    return "integer"
  elif member_type == "structure" or member_type == "map":
    return "object"
  elif member_type == "list":
    return "array"
  else:
    return member_type

def write_openapi_header(f):
    f.write("""openapi: 3.0.1
info:
  version: 'v1'
  title: AWS SDK for Natural Language
  description: Use AWS with natural language
servers:
  - url: https://gpt4-plugin-test-mataslauzadis.vercel.app/
""")

def write_paths(f, shapes):
    f.write("paths:\n")

    shapes = filter_operations(shapes)
    # shapes = { k:v for k, v in shapes.items() if k == "com.amazonaws.s3#PutObject" }

    for shape in shapes:
      endpoint = shapes[shape]["input"]["target"]  # i.e com.amazonaws.s3#AbortMultipartUploadRequest
      endpoint = endpoint.replace(".", "#")  # i.e com#amazonaws#s3#AbortMultipartUploadRequest
      endpoint = "/" + "/".join(endpoint.split("#"))  # i.e /com/amazonaws/s3/AbortMultipartUploadRequest

      f.write(tabs(1) + endpoint + ":\n")

      # Write HTTP Method (GET, POST, DELETE, etc.)
      # TODO Manually setting everything to POST
      http_trait = shapes[shape]["traits"].get("smithy.api#http")
      f.write(tabs(2) + "POST:\n")

      # Write Operation ID
      operation_id = endpoint.split("/")[-1]  # i.e AbortMultipartUploadRequest
      f.write(tabs(3) + f"operationId: {operation_id}\n")

      # Write summary
      documentation = shapes[shape]["traits"].get("smithy.api#documentation")
      if documentation is not None:
        # pass
        f.write(tabs(3) + f"summary: {clean_docstring(documentation)}\n")

      # Write input
      input_target = shapes[shape]["input"]["target"]
      f.write(tabs(3) + "requestBody:\n")
      f.write(tabs(4) + "required: true\n")
      f.write(tabs(4) + "content:\n")
      f.write(tabs(5) + "application/json:\n")
      f.write(tabs(6) + "schema:\n")
      f.write(tabs(7) + f"$ref: {input_target}\n")

      # Write responses (only specify HTTP 200 OK response for now)
      f.write(tabs(3) + "responses:\n")
      f.write(tabs(4) + "\"200\":\n")
      f.write(tabs(5) + "description: \"OK\"\n")
      f.write(tabs(5) + "content:\n")
      f.write(tabs(6) + "text:\n")
      f.write(tabs(7) + "\"OK\"\n")

      # Operation done, write newline separator
      f.write("\n")

def write_schemas(f, shapes):
    f.write("components:\n")
    f.write(tabs(1) + "schemas:\n")

    operations = filter_operations(shapes)
    # operations = { k:v for k, v in operations.items() if k == "com.amazonaws.s3#PutObject" }

    for shape in operations:
      input_target = shapes[shape]["input"]["target"]
      write_schema_recursive(f, shapes, input_target, 2)

def write_schema_recursive(f, shapes, shape, num_tabs = 2):
  print(f"Processing {shape}")
  if (shape == "smithy.api#Unit"): return

  f.write(tabs(num_tabs) + shape + ":\n")
  f.write(tabs(num_tabs + 1) + f"type: {convert_member_type(shapes[shape]['type'])}\n")
  f.write(tabs(num_tabs + 1) + "properties:\n")

  for member in shapes[shape]["members"]:
    member_target = shapes[shape]["members"][member]["target"]
    print(f"Processing member {member}")
    if is_member_terminal(shapes[member_target]):
      print(f"Reached terminal member {member}")
      write_terminal_member(f, member_target, shapes, num_tabs + 2)
    else:
      print(f"Recursing on {member}")
      write_schema_recursive(f, shapes, member_target, num_tabs + 2)


TERMINAL_TYPES = ["string", "boolean", "integer", "timestamp", "long", "blob", "enum", "list", "map"]
def is_member_terminal(member):
  return member["type"] in TERMINAL_TYPES


def write_terminal_member(f, member, shapes, num_tabs=4):
  member_type = shapes[member]["type"]
  print(f"Writing terminal member {member} of type {member_type}")

  # Write property name
  f.write(tabs(num_tabs) + member + ":\n")

  # Write property type
  f.write(tabs(num_tabs+1) + f"type: {convert_member_type(member_type)}\n")

  if member_type == "map":
     f.write(tabs(num_tabs + 1) + "additionalProperties:\n")
     # FIXME OpenAPI can only model map types as a string. 
     # add validation to make sure the Smithy model map types are only strings.
     f.write(tabs(num_tabs + 2) + "type: string")

  # Finish writing enum
  if member_type == "enum":
    f.write(tabs(num_tabs+1) + "enum: [")
    enum_shape = shapes[member]
    enum_values = [enum_shape["members"][member]["traits"]["smithy.api#enumValue"] for member in enum_shape["members"]]
    f.write(",".join(enum_values))
    f.write("]\n")

  # Finish writing list
  # if member_type == "list":
    # list_member_target = shapes[member["target"]]["member"]["target"]
    # list_member_type = shapes[list_member_target]
    # f.write(tabs(num_tabs+1) + "items:\n")
    # f.write(tabs(num_tabs+2) + f"type: poopy\n")

  # Write documentation
  if shapes[member].get("traits") is not None and shapes[member].get("traits").get("smithy.api#documentation") is not None:
    f.write(tabs(num_tabs+1) + f"description: {clean_docstring(shapes[member]['traits']['smithy.api#documentation'])}\n")

  # TODO Write "required" property

# def write_member(member, shapes, num_tabs = 4):
#     terminal_types = ["string", "boolean", "integer", "timestamp", "long", "blob", "enum", "list"]
#     member_type = member["type"]
#     if member_type in terminal_types:
#       write_terminal_member(f, member, shapes)
#     else:
#       if member_type == "structure":
#         f.write(tabs(4) + member["target"] + ":\n")
#         f.write(tabs(5) + f"type: {convert_member_type(member_type)}\n")
#         f.write(tabs(5) + "properties:\n")
#         for submember_name in shapes[member["target"]]["members"]:
#           submember = shapes[shapes[member["target"]]["members"][submember_name]["target"]]
#           print(submember)
#           if submember["type"] in terminal_types:
#              write_terminal_member(f, submember, shapes, 6)
#           else: raise Exception(member_type + "sub member is not terminal.")

with open("../openapi.yaml", "w") as f:
    write_openapi_header(f)
    write_paths(f, s3["shapes"])
    write_schemas(f, s3["shapes"])

Processing com.amazonaws.s3#AbortMultipartUploadRequest
Processing member Bucket
Reached terminal member Bucket
Writing terminal member com.amazonaws.s3#BucketName of type string
Processing member Key
Reached terminal member Key
Writing terminal member com.amazonaws.s3#ObjectKey of type string
Processing member UploadId
Reached terminal member UploadId
Writing terminal member com.amazonaws.s3#MultipartUploadId of type string
Processing member RequestPayer
Reached terminal member RequestPayer
Writing terminal member com.amazonaws.s3#RequestPayer of type enum
Processing member ExpectedBucketOwner
Reached terminal member ExpectedBucketOwner
Writing terminal member com.amazonaws.s3#AccountId of type string
Processing com.amazonaws.s3#CompleteMultipartUploadRequest
Processing member Bucket
Reached terminal member Bucket
Writing terminal member com.amazonaws.s3#BucketName of type string
Processing member Key
Reached terminal member Key
Writing terminal member com.amazonaws.s3#ObjectKey of ty