In [1]:
import sys
import os
import re
import json
from jinja2 import Environment, FileSystemLoader, select_autoescape

if os.getcwd() not in sys.path:
    sys.path.append(os.path.abspath(os.path.join(os.getcwd(), '..')))

env = Environment(loader=FileSystemLoader('./templates'), autoescape=select_autoescape())

In [2]:
with open('./../openapi.json', 'r', encoding='utf-8') as fp:
    twitter_api =  json.load(fp)

In [207]:
def validate(self, spec):
    keys = set()
    for key in spec:
        if key != 'x-twitter-streaming':
            format_ = re.sub('(?<!^)(?=[A-Z])', '_', key).lower()
            if format_ == 'in':
                format_ = 'in_'
        else:
            format_ = 'x_twitter_streaming'
        assert format_ in self.__dict__.keys(), (key, spec)
        keys.add(format_)
    return keys


class Discriminator:
    def __init__(self, spec):
        self.property_name = spec['propertyName']
        self.mapping = {}
        for k, v in spec['mapping'].items():
            assert v.startswith('#'), spec
            self.mapping[k] = v  # Schema(, name=None) 

    def __eq__(self, other):
        if isinstance(other, Discriminator):
            return self.property_name == other.property_name and self.mapping == other.mapping
        return False

class Schema:
    schemas = []

    def __init__(self, spec, name):
        _ref = spec.get('$ref')
        if _ref is not None:
            assert len(spec) == 1, spec
            spec = twitter_api
            for item in _ref.split('/')[1:]:
                name = item
                spec = spec[item]
        self.name = name
        # 
        self.default = spec.get('default')
        assert self.default is None or isinstance(self.default, (str, int))
        self.description = spec.get('description')
        assert self.description is None or isinstance(self.description, str)
        self.discriminator = None
        if 'discriminator' in spec:
            self.discriminator = Discriminator(spec.get('discriminator'))
        self.enum = spec.get('enum')
        assert self.enum is None or isinstance(self.enum, list) and all(isinstance(i, str) for i in self.enum)
        self.example = spec.get('example')
        assert self.example is None \
               or isinstance(self.example, list) and all(isinstance(i, (str, float)) for i in self.example) \
               or isinstance(self.example, (str, int)) or isinstance(self.example, dict), self.example
        self.format = spec.get('format')
        assert self.format is None or isinstance(self.format, str)
        self.items = Schema(spec['items'], name=None) if 'items' in spec else None
        self.max_items = spec.get('maxItems')
        assert self.max_items is None or isinstance(self.max_items, int)
        self.max_length = spec.get('maxLength')
        assert self.max_length is None or isinstance(self.max_length, int)
        self.maximum = spec.get('maximum')
        assert self.maximum is None or isinstance(self.maximum, int)
        self.min_items = spec.get('minItems')
        assert self.min_items is None or isinstance(self.min_items, int)
        self.min_length = spec.get('minLength')
        assert self.min_length is None or isinstance(self.min_length, int)
        self.minimum = spec.get('minimum')
        assert self.minimum is None or isinstance(self.minimum, int)
        self.all_of = None
        if 'allOf' in spec:
            self.all_of = [Schema(item, name=None) for item in spec['allOf']]
            assert 'type' not in spec or spec['type'] == 'allOf', spec
            spec['type'] = 'allOf'
        self.one_of = None
        if 'oneOf' in spec:
            self.one_of = [Schema(item, name=None) for item in spec['oneOf']]
            assert 'type' not in spec or spec['type'] == 'oneOf', spec
            spec['type'] = 'oneOf'
        self.pattern = spec.get('pattern')  # regex
        assert self.pattern is None or isinstance(self.pattern, str)
        self.properties = None
        if 'properties' in spec:
            self.properties = {key: Schema(value, name=key) for key, value in spec['properties'].items()}
        if self.discriminator is not None:
            assert spec['type'] == 'object', spec
        self.required = spec.get('required')
        assert self.required is None or isinstance(self.required, list) and all(
            isinstance(i, str) for i in self.required)
        self.type = spec.get('type')
        if self.type == 'array':
            assert self.items is not None
            self.type = 'list'
        assert self.type in {'allOf', 'oneOf', 'list', 'boolean', 'integer', 'number', 'object', 'string'}, spec
        self.unique_items = spec.get('uniqueItems')
        assert self.unique_items is None or isinstance(self.unique_items, bool)
        self.additional_properties = spec.get('additionalProperties')
        assert self.additional_properties is None or isinstance(self.additional_properties, bool)
        # validate and store params
        self._params = validate(self, spec)
        self._params.update({'name', '_params'})
        self.schemas.append(self)

    @property
    def complete_type(self):
        if self.type == 'list':
            return 'list of {}'.format(self.items.complete_type)
        return self.type

    def __eq__(self, other):
        if isinstance(other, Schema):
            _params = set()
            _params.update(self._params)
            _params.update(other._params)
            return all(map(lambda p: getattr(self, p) == getattr(other, p), _params))
        return False


    def to_dict(self):
        data = {}
        for p in self._params:
            value = getattr(self, p) 
            if isinstance(value, Schema):
                value = value.to_dict()
            elif isinstance(value, dict):
                value = {key: val.to_dict() if isinstance(val, Schema) else val for key, val in value.items()}
            data[p] = value
        return data
    
class Parameter:
    def __init__(self, spec):
        _ref = spec.get('$ref')
        if _ref is not None:
            assert len(spec) == 1, 'error'
            spec = twitter_api
            for item in _ref.split('/')[1:]:
                spec = spec[item]
        # actual params
        self.description = spec.get('description')
        assert isinstance(self.description, str)
        self.example = spec.get('example')
        assert self.example is None or isinstance(self.example, str)
        self.explode = spec.get('explode')
        assert self.explode is None or isinstance(self.explode, bool)
        self.in_ = spec.get('in')
        assert self.in_ or isinstance(self.in_, str)
        self.name = spec.get('name')
        assert self.name or isinstance(self.name, str)
        self.required = spec.get('required', False)
        assert isinstance(self.required, bool)
        self.schema = Schema(spec.get('schema'), name=self.name)
        self.style = spec.get('style')
        assert isinstance(self.style, str)
        validate(self, spec)


class Response:
    def __init__(self, status_code, spec):
        self.status_code = status_code
        self.description = spec.get('description')
        assert isinstance(self.description, str)
        self.content = dict()
        for key, value in spec.get('content').items():
            assert len(value) == 0 or len(value) == 1 and 'schema' in value
            self.content[key] = Schema(value['schema'], name=key)
        validate(self, spec)


class RequestBody:
    def __init__(self, spec):
        self.required = spec.get('required', None)
        assert self.required is None or isinstance(self.required, bool)
        self.content = dict()
        for key, value in spec.get('content', {}).items():
            assert len(value) == 0 or len(value) == 1 and 'schema' in value
            self.content[key] = Schema(value['schema'], name=key)
        validate(self, spec)


class Token:
    def __init__(self, type_, auth):
        self.type_ = type_
        assert self.type_ in {'BearerToken', 'OAuth2UserToken', 'UserToken'}
        self.auth = auth
        assert self.auth is None or isinstance(self.auth, list) and all(isinstance(i, str) for i in self.auth)


def make_tokens(spec):
    return [Token(k, v) for k, v in spec.items()]


class RequestDef:
    def __init__(self, method, spec):
        self.method = method
        assert isinstance(self.method, str)
        self.description = spec.get('description')
        assert isinstance(self.description, str)
        self.external_docs = spec.get('externalDocs', {}).get('url')
        assert self.external_docs is None or isinstance(self.external_docs, str)
        self.operation_id = re.sub('(?<!^)(?=[A-Z])', '_', spec['operationId']).lower()
        assert isinstance(self.operation_id, str)
        self.parameters = sorted(list(map(Parameter, spec.get('parameters', []))), key=lambda x: not x.required)
        self.request_body = RequestBody(spec.get('requestBody', {}))
        self.responses = [Response(status_code, v) for status_code, v in spec.get('responses', {}).items()]
        self.security = list(map(make_tokens, spec.get('security', [])))
        self.summary = spec.get('summary')
        assert isinstance(self.summary, str)
        # A list of tags for API documentation control. 
        # Tags can be used for logical grouping of operations 
        # by resources or any other qualifier.
        self.tags = spec.get('tags')
        assert self.tags is None or isinstance(self.tags, list) and all(isinstance(i, str) for i in self.tags)
        self.x_twitter_streaming = spec.get('x-twitter-streaming', None)
        assert self.x_twitter_streaming is None or isinstance(self.x_twitter_streaming, bool)
        validate(self, spec)


class PathDef:
    def __init__(self, path, spec):
        self.path = path
        self.requests = [RequestDef(k, v) for k, v in spec.items()]


paths = [PathDef(k, v) for k, v in twitter_api['paths'].items()]

In [208]:
object_types = dict()

for schema in Schema.schemas:
    if schema.type == 'object':
        if schema.name in object_types:
            if object_types[schema.name] == schema:
                continue
            if not isinstance(object_types[schema.name], list):
                object_types[schema.name] = [object_types[schema.name]]
            object_types[schema.name].append(schema)
        else:
            object_types[schema.name] = schema

In [217]:
method_template = env.get_template('request_method.jinja2')
methods_template = env.get_template('request_methods.jinja2')

def file_filter(x): 
    return x.endswith('.py') and x[:-3] not in ['__init__', 'request']

output_path = '../tweetkit/twitter/requests'

for file in filter(file_filter, os.listdir(output_path)):
    output_file_path = os.path.join(output_path, file)
    os.remove(output_file_path)

tags = set()

for path_def in paths:
    for request in path_def.requests:
        tags.update(request.tags)

for tag in tags:
    methods = []
    for path_def in paths:
        for request in path_def.requests:
            request_body_required = request.request_body.required
            if request.request_body.content:
                if request_body_required is None:
                    request_body_required = False
                assert len(request.request_body.content) == 1
                request_body_content_type = 'application/json'
                request_body_content = request.request_body.content[request_body_content_type]
            if tag == request.tags[0]:
                method = method_template.render(path=path_def.path, request=request)
                methods.append(method)
    request_methods = methods_template.render(tag=next(filter(lambda t: t['name'] == tag, twitter_api['tags'])), methods=methods)
    output_file_path = os.path.join(output_path, '{}.py'.format(tag.lower()))
    with open(output_file_path, 'w', encoding='utf-8') as fp:
        fp.write(request_methods)