In [1]:
from aiohttp import ClientSession
from bs4 import BeautifulSoup
import re
import json
from copy import copy

In [2]:
def parse_response_param(types):
    schema = {"type": "object", "properties": {}, "required": set()}

    for section in types:
        text = section.text.strip()

        name_ending = text.find(" ")

        name = text[
            1
            if text[0] == "("
            else 0 : name_ending - 1
            if text[name_ending] == "("
            else name_ending
        ]

        type_start = text.find("(", name_ending + 1) + 1
        type_start += text[type_start] == "("

        type_end = text.find(")", type_start)

        type_str = text[type_start:type_end]

        if not section.find("em", text=True, recursive=False):
            schema["required"].add(name)

        if inner_resp := section.select_one("div.inner-response"):
            childs = inner_resp.select("ul.api-response > li.field-declaration")

            if type_str == "array":
                schema["properties"][name] = {
                    "type": "array",
                    "items": parse_response_param(childs),
                }

            elif type_str == "object":
                schema["properties"][name] = {
                    "type": "object",
                    "properties": parse_response_param(childs)["properties"],
                }
        else:
            schema["properties"][name] = parse_response(type_str)
            
    if len(schema["required"]) == 0:
        del schema["required"]
    else:
        schema["required"] = list(schema["properties"])
    return schema

In [3]:
basic_types = {
    "boolean": {"type": "boolean"},
    "integer": {"type": "integer"},
    "string": {"type": "string"},
    "float": {"type": "number"},
    "UUID": {"type": "string", "format": "uuid"},
    "markup": {"type": "string"},
    "html": {"type": "string"},
}


def parse_response(type_str):
    if (resp := basic_types.get(type_str, None)) :
        return resp
    if type_str.endswith("|null"):
        base_type = copy(parse_response(type_str[:-5]))
        base_type["nullable"] = True
        return base_type
    if type_str.endswith("object"):
        start = int(type_str[0] == "(")
        return {"$ref": "#/components/schemas/" + type_str[start:-7]}
    if type_str.startswith("array"):
        if len(type_str) == 5:
            return {"type": "array", "items": {"description": "Untyped value"}}
        item_type = type_str[type_str.find("[") + 1 : type_str.rfind("]")]
        return {"type": "array", "items": parse_response(item_type)}
    raise ValueError(type_str)

In [4]:
min_max_def_regex = re.compile(
    "((\s*min:\s*(?P<min_value>-?\d*))|(\s*max:\s*(?P<max_value>-?\d*))|(\s*default:\s*(?P<default_value>-?\d*)))*"
)

default_enum = re.compile("default: (?P<default_value>.*)")


def int_or_none(num):
    return int(num) if num is not None else None


def filter_none(obj):
    return dict(filter(lambda x: x[1] is not None, obj.items()))


def get_boundaries(num, is_string=False):
    boundaries = min_max_def_regex.search(num)

    return filter_none(
        {
            "default": boundaries.group("default_value")
            if is_string
            else int_or_none(boundaries.group("default_value")),
            ("minLength" if is_string else "minimum"): int_or_none(
                boundaries.group("min_value")
            ),
            ("maxLength" if is_string else "maximum"): int_or_none(
                boundaries.group("max_value")
            ),
        }
    )

In [5]:
def is_enum(string):
    return "valid values" in string


def parse_enum(string):
    values = []
    names = []

    for segment in string[string.find("(") + 1 : string.find(")")].split(","):
        value = segment.split("=")

        if len(value) == 2:
            values.append(int(value[0].strip()))
            names.append(value[1].strip())
        else:
            values.append(value[0].strip())

    assert len(names) == 0 or len(names) == len(values)
    return values, names

In [6]:
def parse_request_param(type_str, aditional_validation=None):
    if type_str == "boolean":
        return {"type": "boolean"}

    elif type_str == "uuid":
        return {"type": "string", "format": "uuid"}

    elif type_str == "integer":
        if aditional_validation is None:
            return {"type": "integer"}
        return {
            "type": "integer",
            **get_boundaries(aditional_validation.text.strip()),
        }

    elif type_str == "string":
        if aditional_validation:
            text = aditional_validation.text.strip()

            if is_enum(text):
                resp = {"type": "string", "enum": parse_enum(text)[0]}

                if default := default_enum.search(text):
                    resp["default"] = default.group("default_value")
                return resp

            if text == "format YYYY-MM-DD":
                return {"type": "string", "format": "date"}
            return {"type": "string", **get_boundaries(text, True)}
        return {"type": "string"}

    elif type_str.startswith("array"):
        return {"type": "array", "items": parse_request_param(type_str[6:-1])}

    else:
        raise ValueError(type_str)

In [7]:
string_strip_table = str.maketrans(
    {
        ".": "",
        ",": "",
        '"': "",
        "(": "",
        ")": "",
    }
)


class OperatorIdGenerator:
    def __init__(self):
        self.used = set()

    def generate_operation_id(self, desc):
        desc = "_".join(
            desc.replace("'s", "")
            .translate(string_strip_table)
            .lower()
            .replace("fetch", "get")
            .split()
        )
        if desc not in self.used:
            self.used.add(desc)
            return desc

In [8]:
class EndpointPage:
    def __init__(self, resp, ref_objects, doc_url, operation_id_generator):
        self.operation_id_generator = operation_id_generator
        self.ref_objects = ref_objects
        self.resp = resp
        self.doc_url = doc_url

        self.result = {}

        self.responses = {}
        self.url = None
        self.method = None
        self.in_url_param = None
        self.scopes = None

    @classmethod
    async def from_url(cls, session, url):
        return BeautifulSoup(await (await session.get(url)).text())

    in_url_param_regex = re.compile("{(?P<in_url_param>.*)}")

    def update_url(self):
        splited_text = self.resp.select_one("div.api-endpoint > h2").text.split()
        self.method = splited_text[0].lower()
        self.url = splited_text[1]

        if params := self.in_url_param_regex.search(splited_text[1]):
            self.in_url_param = params.group("in_url_param")

    def deprecated(self):
        return bool(self.resp.select_one("p.endpoint-alert.endpoint-warning"))

    def info(self):
        self.update_url()
        divs = self.resp.select("div.api-endpoint > a.collapsable-heading")

        for div in divs:
            text = div.text

            if text == "Authentication":
                self.handle_auth_section(div.find_next())
            elif text == "Parameters":
                self.result = self.handle_params_section(
                    div.find_next()
                    .select_one("table.api-parameters")
                    .findAll("tr", recursive=False)
                )
            elif text == "Response":
                self.handle_response_section(div.find_next())
            elif text == "Mature Content":
                self.result.setdefault(self.method, {"parameters": []})[
                    "parameters"
                ].append(
                    {
                        "name": "mature_content",
                        "required": False,
                        "schema": {"type": "boolean"},
                        "in": "query",
                    }
                )

    def handle_auth_section(self, auth_section):
        self.scopes = {
            "oauth2": [scope.text.strip() for scope in auth_section.select("li")]
        }

    param_type_regex = re.compile("\((?P<type>.*)\)")

    def description(self):
        return self.resp.select_one(
            "div.api-endpoint > p:not(.endpoint-alert)"
        ).text.strip()

    def handle_response_section(self, response_section):
        self.ref_objects.update(link["href"] for link in response_section.select("a"))

        types = response_section.select(
            "div.collapsable-content > ul.api-response > li.field-declaration"
        )

        self.responses = parse_response_param(types)

    def serialize(self):
        resp = {
            "responses": {
                "200": {
                    "content": {"application/json": {"schema": self.responses}},
                    "description": "Request was successful",
                },
                "4XX": {
                    "content": {
                        "application/json": {
                            "schema": {"$ref": "#/components/schemas/error"}
                        }
                    },
                    "description": "Request failed due to client error, e.g. validation failed or User not found",
                },
                "429": {
                    "description": "Rate limit reached or service overloaded",
                    "content": {
                        "application/json": {
                            "schema": {"$ref": "#/components/schemas/error"}
                        }
                    },
                },
                "500": {
                    "content": {
                        "application/json": {
                            "schema": {"$ref": "#/components/schemas/error"}
                        }
                    },
                    "description": "Our servers encountered an internal error, try again",
                },
                "504": {
                    "content": {
                        "application/json": {
                            "schema": {"$ref": "#/components/schemas/error"}
                        }
                    },
                    "description": "Our servers are currently unavailable, try again later. This is normally due to planned or emergency maintenance.",
                },
            }
        }

        desc = self.description()
        res = self.result.setdefault(self.method, {})

        res.update(resp)
        res["summary"] = desc
        res["externalDocs"] = {
            "description": "Official docs url",
            "url": self.doc_url,
        }

        res["deprecated"] = self.deprecated()
        if operation_id := self.operation_id_generator.generate_operation_id(desc):
            res["operationId"] = operation_id
        res["tags"] = [self.url[1 : self.url.find("/", 1)].title()]

        if self.scopes:
            res["security"] = [self.scopes]

        return {self.url: self.result}

    def handle_params_section(self, params_section):
        request_params = {}

        for param in params_section:

            field_name = param.select_one("span.field-name").text
            desc = param.select_one("div.field-description").text.strip()
            aditional_validation = param.select_one("div.field-validation-text")

            optional = param.select_one("div.field-necessity").text.strip()

            if optional == "optional":
                required = False
            elif optional == "required":
                required = True
            else:
                raise ValueError(optional)

            if param.select_one("td.field-about > table.api-parameters"):
                raise ValueError("Cant handle that case")
                a = param.select("td.field-about > table.api-parameters tr")
                deep_obj_types = self.handle_params_section(a, True)[self.method][
                    "parameters"
                ]

                for obj in deep_obj_types:
                    obj["style"] = "deepObject"
                    obj["explode"] = True

                request_params.setdefault(self.method, {"parameters": []})[
                    "parameters"
                ].extend(deep_obj_types)
                continue

            field_type = param.select_one("span.field-type").text

            type_str = self.param_type_regex.search(field_type).group("type")

            final_type = parse_request_param(type_str, aditional_validation)

            base = {
                "name": field_name,
                "description": desc,
                "required": required,
                "schema": final_type,
            }

            if field_name == self.in_url_param:
                base["in"] = "path"
                base["required"] = True
                request_params["parameters"] = [base]
            else:
                base["in"] = "query"
                request_params.setdefault(self.method, {"parameters": []})[
                    "parameters"
                ].append(base)

        return request_params

In [9]:
session = ClientSession()


main_page = await session.get("https://www.deviantart.com/developers/http/v1/20210526")
html = BeautifulSoup(await main_page.text())

urls = list(map(lambda x: x["href"], html.select("td.endpoint-uri a")))
pages = []

for index, url in enumerate(urls):
    pages.append(await EndpointPage.from_url(session, url))

await session.close()

In [10]:
paths = {}
refs = set()
all_scopes = set()
op_code_generator = OperatorIdGenerator()
instances = [EndpointPage(page, refs, url,op_code_generator) for page, url in zip(pages, urls)]

for i, page in enumerate(instances):
    try:
        page.info()
        paths.update(page.serialize())

        if page.scopes:
            all_scopes.update(page.scopes["oauth2"])
    except Exception as E:
        text = str(E)

        if text == "Cant handle that case":
            print("Weirdly formated params in ", urls[i])
        else:
            raise E

Weirdly formated params in  https://www.deviantart.com/developers/http/v1/20210526/feed_settings_update/75da1eec2ea2e6ec1a58fe8f7b0ecac2
Weirdly formated params in  https://www.deviantart.com/developers/http/v1/20210526/stash_publish/a799a5c0967dca14e854286df9746793
Weirdly formated params in  https://www.deviantart.com/developers/http/v1/20210526/user_friends_watch/d73cb9c31413d7f58777cce8f8e071e9
Weirdly formated params in  https://www.deviantart.com/developers/http/v1/20210526/user_profile_update/f0b46355aafba1058d78f60386856d23


In [11]:
client = ClientSession()

to_visit = [*refs]
visited = set()
schemas = {}

while len(to_visit) > 0:
    current_url = to_visit.pop()
    
    if current_url in visited:
        continue
        
    visited.add(current_url)

    resp = BeautifulSoup(await (await client.get(current_url)).text())
    section = resp.select_one("ul.api-response")
    to_visit.extend(
        [link["href"] for link in section.select("a") if link not in visited]
    )

    schemas[current_url[current_url.rfind("/") + 1 :]] = parse_response_param(
        section.select("li.field-declaration")
    )


await client.close()

In [12]:
all_scopes = {scope: f"Access to {scope}" for scope in all_scopes}

In [13]:
schemas["error"] = {
    "type": "object",
    "properties": {
        "error": {
            "type": "string",
            "enum": [
                "invalid_request",
                "unauthorized",
                "unverified_account",
                "server_error",
                "version_error",
            ],
        },
        "error_description": {"type": "string"},
        "error_details": {
            "description": "For validation errors, this will be a key/value map containing error information for each field"
        },
    },
    "required": ["error", "error_description"],
}

In [14]:
security = {
    "oauth2": {
        "type": "oauth2",
        "flows": {
            "implicit": {
                "authorizationUrl": "https://www.deviantart.com/oauth2/authorize",
                "refreshUrl": "https://www.deviantart.com/oauth2/token",
                "scopes": all_scopes,
            },
            "authorizationCode": {
                "authorizationUrl": "https://www.deviantart.com/oauth2/authorize",
                "tokenUrl": "https://www.deviantart.com/oauth2/token",
                "refreshUrl": "https://www.deviantart.com/oauth2/token",
                "scopes": all_scopes,
            },
            "clientCredentials": {
                "tokenUrl": "https://www.deviantart.com/oauth2/token",
                "refreshUrl": "https://www.deviantart.com/oauth2/token",
                "scopes": all_scopes,
            },
        },
    }
}

In [15]:
final_result = {
    "paths": paths,
    "components": {"schemas": schemas, "securitySchemes": security},
    "openapi": "3.0.0",
    "servers": [
        {
            "url": "https://www.deviantart.com/api/v1/oauth2/",
            "description": "Production server",
        }
    ],
    "info": {"title": "Open api schema of deviant art api", "version": "1.0"},
}

In [16]:
with open("schema.json", "w") as f:
    json.dump(final_result, f)