/
openapi.py
722 lines (614 loc) · 26.1 KB
/
openapi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
# -*- coding: utf-8 -*-
"""Utilities for generating OpenAPI Specification (fka Swagger) entities from
marshmallow :class:`Schemas <marshmallow.Schema>` and :class:`Fields <marshmallow.fields.Field>`.
.. warning::
This module is treated as private API.
Users should not need to use this module directly.
"""
from __future__ import absolute_import, unicode_literals
import operator
import functools
import warnings
from collections import OrderedDict
import marshmallow
from marshmallow.utils import is_collection
from marshmallow.orderedset import OrderedSet
from apispec.compat import RegexType, iteritems
from apispec.utils import OpenAPIVersion, build_reference
from .common import (
get_fields,
make_schema_key,
resolve_schema_instance,
get_unique_schema_name,
)
from apispec.exceptions import APISpecError
MARSHMALLOW_VERSION_INFO = tuple(
[int(part) for part in marshmallow.__version__.split(".") if part.isdigit()]
)
# marshmallow field => (JSON Schema type, format)
DEFAULT_FIELD_MAPPING = {
marshmallow.fields.Integer: ("integer", "int32"),
marshmallow.fields.Number: ("number", None),
marshmallow.fields.Float: ("number", "float"),
marshmallow.fields.Decimal: ("number", None),
marshmallow.fields.String: ("string", None),
marshmallow.fields.Boolean: ("boolean", None),
marshmallow.fields.UUID: ("string", "uuid"),
marshmallow.fields.DateTime: ("string", "date-time"),
marshmallow.fields.Date: ("string", "date"),
marshmallow.fields.Time: ("string", None),
marshmallow.fields.Email: ("string", "email"),
marshmallow.fields.URL: ("string", "url"),
marshmallow.fields.Dict: ("object", None),
# Assume base Field and Raw are strings
marshmallow.fields.Field: ("string", None),
marshmallow.fields.Raw: ("string", None),
marshmallow.fields.List: ("array", None),
}
__location_map__ = {
"query": "query",
"querystring": "query",
"json": "body",
"headers": "header",
"cookies": "cookie",
"form": "formData",
"files": "formData",
}
# Properties that may be defined in a field's metadata that will be added to the output
# of field2property
# https://github.com/OAI/OpenAPI-Specification/blob/master/versions/3.0.2.md#schemaObject
_VALID_PROPERTIES = {
"format",
"title",
"description",
"default",
"multipleOf",
"maximum",
"exclusiveMaximum",
"minimum",
"exclusiveMinimum",
"maxLength",
"minLength",
"pattern",
"maxItems",
"minItems",
"uniqueItems",
"maxProperties",
"minProperties",
"required",
"enum",
"type",
"items",
"allOf",
"properties",
"additionalProperties",
"readOnly",
"xml",
"externalDocs",
"example",
}
_VALID_PREFIX = "x-"
class OpenAPIConverter(object):
"""Converter generating OpenAPI specification from Marshmallow schemas and fields
:param str|OpenAPIVersion openapi_version: The OpenAPI version to use.
Should be in the form '2.x' or '3.x.x' to comply with the OpenAPI standard.
"""
def __init__(self, openapi_version, schema_name_resolver, spec):
self.openapi_version = OpenAPIVersion(openapi_version)
self.schema_name_resolver = schema_name_resolver
self.spec = spec
# Schema references
self.refs = {}
# Field mappings
self.field_mapping = DEFAULT_FIELD_MAPPING
@staticmethod
def _observed_name(field, name):
"""Adjust field name to reflect `dump_to` and `load_from` attributes.
:param Field field: A marshmallow field.
:param str name: Field name
:rtype: str
"""
if MARSHMALLOW_VERSION_INFO[0] < 3:
# use getattr in case we're running against older versions of marshmallow.
dump_to = getattr(field, "dump_to", None)
load_from = getattr(field, "load_from", None)
return dump_to or load_from or name
return field.data_key or name
def map_to_openapi_type(self, *args):
"""Decorator to set mapping for custom fields.
``*args`` can be:
- a pair of the form ``(type, format)``
- a core marshmallow field type (in which case we reuse that type's mapping)
"""
if len(args) == 1 and args[0] in self.field_mapping:
openapi_type_field = self.field_mapping[args[0]]
elif len(args) == 2:
openapi_type_field = args
else:
raise TypeError("Pass core marshmallow field type or (type, fmt) pair.")
def inner(field_type):
self.field_mapping[field_type] = openapi_type_field
return field_type
return inner
def field2type_and_format(self, field):
"""Return the dictionary of OpenAPI type and format based on the field
type
:param Field field: A marshmallow field.
:rtype: dict
"""
# If this type isn't directly in the field mapping then check the
# hierarchy until we find something that does.
for field_class in type(field).__mro__:
if field_class in self.field_mapping:
type_, fmt = self.field_mapping[field_class]
break
else:
warnings.warn(
"Field of type {} does not inherit from marshmallow.Field.".format(
type(field)
),
UserWarning,
)
type_, fmt = "string", None
ret = {"type": type_}
if fmt:
ret["format"] = fmt
return ret
def field2default(self, field):
"""Return the dictionary containing the field's default value
Will first look for a `doc_default` key in the field's metadata and then
fall back on the field's `missing` parameter. A callable passed to the
field's missing parameter will be ignored.
:param Field field: A marshmallow field.
:rtype: dict
"""
ret = {}
if "doc_default" in field.metadata:
ret["default"] = field.metadata["doc_default"]
else:
default = field.missing
if default is not marshmallow.missing and not callable(default):
ret["default"] = default
return ret
def field2choices(self, field, **kwargs):
"""Return the dictionary of OpenAPI field attributes for valid choices definition
:param Field field: A marshmallow field.
:rtype: dict
"""
attributes = {}
comparable = [
validator.comparable
for validator in field.validators
if hasattr(validator, "comparable")
]
if comparable:
attributes["enum"] = comparable
else:
choices = [
OrderedSet(validator.choices)
for validator in field.validators
if hasattr(validator, "choices")
]
if choices:
attributes["enum"] = list(functools.reduce(operator.and_, choices))
return attributes
def field2read_only(self, field, **kwargs):
"""Return the dictionary of OpenAPI field attributes for a dump_only field.
:param Field field: A marshmallow field.
:rtype: dict
"""
attributes = {}
if field.dump_only:
attributes["readOnly"] = True
return attributes
def field2write_only(self, field, **kwargs):
"""Return the dictionary of OpenAPI field attributes for a load_only field.
:param Field field: A marshmallow field.
:rtype: dict
"""
attributes = {}
if field.load_only and self.openapi_version.major >= 3:
attributes["writeOnly"] = True
return attributes
def field2nullable(self, field, **kwargs):
"""Return the dictionary of OpenAPI field attributes for a nullable field.
:param Field field: A marshmallow field.
:rtype: dict
"""
attributes = {}
if field.allow_none:
attributes[
"x-nullable" if self.openapi_version.major < 3 else "nullable"
] = True
return attributes
def field2range(self, field, **kwargs):
"""Return the dictionary of OpenAPI field attributes for a set of
:class:`Range <marshmallow.validators.Range>` validators.
:param Field field: A marshmallow field.
:rtype: dict
"""
validators = [
validator
for validator in field.validators
if (
hasattr(validator, "min")
and hasattr(validator, "max")
and not hasattr(validator, "equal")
)
]
attributes = {}
for validator in validators:
if validator.min is not None:
if hasattr(attributes, "minimum"):
attributes["minimum"] = max(attributes["minimum"], validator.min)
else:
attributes["minimum"] = validator.min
if validator.max is not None:
if hasattr(attributes, "maximum"):
attributes["maximum"] = min(attributes["maximum"], validator.max)
else:
attributes["maximum"] = validator.max
return attributes
def field2length(self, field, **kwargs):
"""Return the dictionary of OpenAPI field attributes for a set of
:class:`Length <marshmallow.validators.Length>` validators.
:param Field field: A marshmallow field.
:rtype: dict
"""
attributes = {}
validators = [
validator
for validator in field.validators
if (
hasattr(validator, "min")
and hasattr(validator, "max")
and hasattr(validator, "equal")
)
]
is_array = isinstance(
field, (marshmallow.fields.Nested, marshmallow.fields.List)
)
min_attr = "minItems" if is_array else "minLength"
max_attr = "maxItems" if is_array else "maxLength"
for validator in validators:
if validator.min is not None:
if hasattr(attributes, min_attr):
attributes[min_attr] = max(attributes[min_attr], validator.min)
else:
attributes[min_attr] = validator.min
if validator.max is not None:
if hasattr(attributes, max_attr):
attributes[max_attr] = min(attributes[max_attr], validator.max)
else:
attributes[max_attr] = validator.max
for validator in validators:
if validator.equal is not None:
attributes[min_attr] = validator.equal
attributes[max_attr] = validator.equal
return attributes
def field2pattern(self, field, **kwargs):
"""Return the dictionary of OpenAPI field attributes for a set of
:class:`Range <marshmallow.validators.Regexp>` validators.
:param Field field: A marshmallow field.
:rtype: dict
"""
regex_validators = (
v
for v in field.validators
if isinstance(getattr(v, "regex", None), RegexType)
)
v = next(regex_validators, None)
attributes = {} if v is None else {"pattern": v.regex.pattern}
if next(regex_validators, None) is not None:
warnings.warn(
"More than one regex validator defined on {} field. Only the "
"first one will be used in the output spec.".format(type(field)),
UserWarning,
)
return attributes
def metadata2properties(self, field):
"""Return a dictionary of properties extracted from field Metadata
Will include field metadata that are valid properties of `OpenAPI schema
objects
<https://github.com/OAI/OpenAPI-Specification/blob/master/versions/3.0.2.md#schemaObject>`_
(e.g. “description”, “enum”, “example”).
In addition, `specification extensions
<https://github.com/OAI/OpenAPI-Specification/blob/master/versions/3.0.2.md#specification-extensions>`_
are supported. Prefix `x_` to the desired extension when passing the
keyword argument to the field constructor. apispec will convert `x_` to
`x-` to comply with OpenAPI.
:param Field field: A marshmallow field.
:rtype: dict
"""
# Dasherize metadata that starts with x_
metadata = {
key.replace("_", "-") if key.startswith("x_") else key: value
for key, value in iteritems(field.metadata)
}
# Avoid validation error with "Additional properties not allowed"
ret = {
key: value
for key, value in metadata.items()
if key in _VALID_PROPERTIES or key.startswith(_VALID_PREFIX)
}
return ret
def field2property(self, field):
"""Return the JSON Schema property definition given a marshmallow
:class:`Field <marshmallow.fields.Field>`.
Will include field metadata that are valid properties of OpenAPI schema objects
(e.g. "description", "enum", "example").
https://github.com/OAI/OpenAPI-Specification/blob/master/versions/3.0.2.md#schemaObject
:param Field field: A marshmallow field.
:rtype: dict, a Property Object
"""
ret = {}
for attr_func in (
self.field2type_and_format,
self.field2default,
self.field2choices,
self.field2read_only,
self.field2write_only,
self.field2nullable,
self.field2range,
self.field2length,
self.field2pattern,
self.metadata2properties,
):
ret.update(attr_func(field))
if isinstance(field, marshmallow.fields.Nested):
del ret["type"]
schema_dict = self.resolve_nested_schema(field.schema)
if ret and "$ref" in schema_dict:
ret.update({"allOf": [schema_dict]})
else:
ret.update(schema_dict)
elif isinstance(field, marshmallow.fields.List):
# field.container was renamed to field.inner in marshmallow 3.0.0rc8
inner_field = field.inner if hasattr(field, "inner") else field.container
ret["items"] = self.field2property(inner_field)
elif isinstance(field, marshmallow.fields.Dict):
if MARSHMALLOW_VERSION_INFO[0] >= 3:
# field.value_container was renamed to field.value_field in marshmallow 3.0.0rc8
value_field = (
field.value_field
if hasattr(field, "value_field")
else field.value_container
)
if value_field:
ret["additionalProperties"] = self.field2property(value_field)
return ret
def resolve_nested_schema(self, schema):
"""Return the Open API representation of a marshmallow Schema.
Adds the schema to the spec if it isn't already present.
Typically will return a dictionary with the reference to the schema's
path in the spec unless the `schema_name_resolver` returns `None`, in
which case the returned dictoinary will contain a JSON Schema Object
representation of the schema.
:param schema: schema to add to the spec
"""
schema_instance = resolve_schema_instance(schema)
schema_key = make_schema_key(schema_instance)
if schema_key not in self.refs:
name = self.schema_name_resolver(schema)
if not name:
try:
json_schema = self.schema2jsonschema(schema)
except RuntimeError:
raise APISpecError(
"Name resolver returned None for schema {schema} which is "
"part of a chain of circular referencing schemas. Please"
" ensure that the schema_name_resolver passed to"
" MarshmallowPlugin returns a string for all circular"
" referencing schemas.".format(schema=schema)
)
if getattr(schema, "many", False):
return {"type": "array", "items": json_schema}
return json_schema
name = get_unique_schema_name(self.spec.components, name)
self.spec.components.schema(name, schema=schema)
return self.get_ref_dict(schema_instance)
def schema2parameters(
self, schema, default_in="body", name="body", required=False, description=None
):
"""Return an array of OpenAPI parameters given a given marshmallow
:class:`Schema <marshmallow.Schema>`. If `default_in` is "body", then return an array
of a single parameter; else return an array of a parameter for each included field in
the :class:`Schema <marshmallow.Schema>`.
https://github.com/OAI/OpenAPI-Specification/blob/master/versions/3.0.2.md#parameterObject
"""
openapi_default_in = __location_map__.get(default_in, default_in)
if self.openapi_version.major < 3 and openapi_default_in == "body":
prop = self.resolve_schema_dict(schema)
param = {
"in": openapi_default_in,
"required": required,
"name": name,
"schema": prop,
}
if description:
param["description"] = description
return [param]
assert not getattr(
schema, "many", False
), "Schemas with many=True are only supported for 'json' location (aka 'in: body')"
fields = get_fields(schema, exclude_dump_only=True)
return self.fields2parameters(fields, default_in=default_in)
def fields2parameters(self, fields, default_in="body"):
"""Return an array of OpenAPI parameters given a mapping between field names and
:class:`Field <marshmallow.Field>` objects. If `default_in` is "body", then return an array
of a single parameter; else return an array of a parameter for each included field in
the :class:`Schema <marshmallow.Schema>`.
In OpenAPI3, only "query", "header", "path" or "cookie" are allowed for the location
of parameters. In OpenAPI 3, "requestBody" is used when fields are in the body.
This function always returns a list, with a parameter
for each included field in the :class:`Schema <marshmallow.Schema>`.
https://github.com/OAI/OpenAPI-Specification/blob/master/versions/3.0.2.md#parameterObject
"""
parameters = []
body_param = None
for field_name, field_obj in iteritems(fields):
if field_obj.dump_only:
continue
param = self.field2parameter(
field_obj,
name=self._observed_name(field_obj, field_name),
default_in=default_in,
)
if (
self.openapi_version.major < 3
and param["in"] == "body"
and body_param is not None
):
body_param["schema"]["properties"].update(param["schema"]["properties"])
required_fields = param["schema"].get("required", [])
if required_fields:
body_param["schema"].setdefault("required", []).extend(
required_fields
)
else:
if self.openapi_version.major < 3 and param["in"] == "body":
body_param = param
parameters.append(param)
return parameters
def field2parameter(self, field, name="body", default_in="body"):
"""Return an OpenAPI parameter as a `dict`, given a marshmallow
:class:`Field <marshmallow.Field>`.
https://github.com/OAI/OpenAPI-Specification/blob/master/versions/3.0.2.md#parameterObject
"""
location = field.metadata.get("location", None)
prop = self.field2property(field)
return self.property2parameter(
prop,
name=name,
required=field.required,
multiple=isinstance(field, marshmallow.fields.List),
location=location,
default_in=default_in,
)
def property2parameter(
self,
prop,
name="body",
required=False,
multiple=False,
location=None,
default_in="body",
):
"""Return the Parameter Object definition for a JSON Schema property.
https://github.com/OAI/OpenAPI-Specification/blob/master/versions/3.0.2.md#parameterObject
:param dict prop: JSON Schema property
:param str name: Field name
:param bool required: Parameter is required
:param bool multiple: Parameter is repeated
:param str location: Location to look for ``name``
:param str default_in: Default location to look for ``name``
:raise: TranslationError if arg object cannot be translated to a Parameter Object schema.
:rtype: dict, a Parameter Object
"""
openapi_default_in = __location_map__.get(default_in, default_in)
openapi_location = __location_map__.get(location, openapi_default_in)
ret = {"in": openapi_location, "name": name}
if openapi_location == "body":
ret["required"] = False
ret["name"] = "body"
ret["schema"] = {
"type": "object",
"properties": {name: prop} if name else {},
}
if name and required:
ret["schema"]["required"] = [name]
else:
ret["required"] = required
if self.openapi_version.major < 3:
if multiple:
ret["collectionFormat"] = "multi"
ret.update(prop)
else:
if multiple:
ret["explode"] = True
ret["style"] = "form"
if prop.get("description", None):
ret["description"] = prop.pop("description")
ret["schema"] = prop
return ret
def schema2jsonschema(self, schema):
"""Return the JSON Schema Object for a given marshmallow
:class:`Schema <marshmallow.Schema>` instance. Schema may optionally
provide the ``title`` and ``description`` class Meta options.
https://github.com/OAI/OpenAPI-Specification/blob/master/versions/3.0.2.md#schemaObject
Example: ::
class UserSchema(Schema):
_id = fields.Int()
email = fields.Email(description='email address of the user')
name = fields.Str()
class Meta:
title = 'User'
description = 'A registered user'
oaic = OpenAPIConverter(openapi_version='3.0.2', schema_name_resolver=resolver, spec=spec)
pprint(oaic.schema2jsonschema(UserSchema))
# {'description': 'A registered user',
# 'properties': {'_id': {'format': 'int32', 'type': 'integer'},
# 'email': {'description': 'email address of the user',
# 'format': 'email',
# 'type': 'string'},
# 'name': {'type': 'string'}},
# 'title': 'User',
# 'type': 'object'}
:param Schema schema: A marshmallow Schema instance
:rtype: dict, a JSON Schema Object
"""
fields = get_fields(schema)
Meta = getattr(schema, "Meta", None)
partial = getattr(schema, "partial", None)
ordered = getattr(schema, "ordered", False)
jsonschema = self.fields2jsonschema(fields, partial=partial, ordered=ordered)
if hasattr(Meta, "title"):
jsonschema["title"] = Meta.title
if hasattr(Meta, "description"):
jsonschema["description"] = Meta.description
return jsonschema
def fields2jsonschema(self, fields, ordered=False, partial=None):
"""Return the JSON Schema Object given a mapping between field names and
:class:`Field <marshmallow.Field>` objects.
:param dict fields: A dictionary of field name field object pairs
:param bool ordered: Whether to preserve the order in which fields were declared
:param bool|tuple partial: Whether to override a field's required flag.
If `True` no fields will be set as required. If an iterable fields
in the iterable will not be marked as required.
:rtype: dict, a JSON Schema Object
"""
jsonschema = {"type": "object", "properties": OrderedDict() if ordered else {}}
for field_name, field_obj in iteritems(fields):
observed_field_name = self._observed_name(field_obj, field_name)
property = self.field2property(field_obj)
jsonschema["properties"][observed_field_name] = property
if field_obj.required:
if not partial or (
is_collection(partial) and field_name not in partial
):
jsonschema.setdefault("required", []).append(observed_field_name)
if "required" in jsonschema:
jsonschema["required"].sort()
return jsonschema
def get_ref_dict(self, schema):
"""Method to create a dictionary containing a JSON reference to the
schema in the spec
"""
schema_key = make_schema_key(schema)
ref_schema = build_reference(
"schema", self.openapi_version.major, self.refs[schema_key]
)
if getattr(schema, "many", False):
return {"type": "array", "items": ref_schema}
return ref_schema
def resolve_schema_dict(self, schema):
if isinstance(schema, dict):
if schema.get("type") == "array" and "items" in schema:
schema["items"] = self.resolve_schema_dict(schema["items"])
if schema.get("type") == "object" and "properties" in schema:
schema["properties"] = {
k: self.resolve_schema_dict(v)
for k, v in schema["properties"].items()
}
return schema
return self.resolve_nested_schema(schema)