Skip to content

Commit

Permalink
docs: Add type annotations (to assist authors of alternative implemen…
Browse files Browse the repository at this point in the history
…tations)
  • Loading branch information
jpmckinney committed Feb 4, 2023
1 parent c3586b0 commit 1522f56
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 45 deletions.
11 changes: 7 additions & 4 deletions ocdsmerge/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
from typing import Tuple


class OCDSMergeError(Exception):
"""Base class for exceptions from within this package"""


class MissingDateKeyError(OCDSMergeError, KeyError):
"""Raised when a release is missing a 'date' key"""

def __init__(self, key, message):
def __init__(self, key: str, message: str):
self.key = key
self.message = message

def __str__(self):
def __str__(self) -> str:
return str(self.message)


Expand All @@ -36,10 +39,10 @@ class OCDSMergeWarning(UserWarning):
class DuplicateIdValueWarning(OCDSMergeWarning):
"""Used when at least two objects in the same array have the same value for the 'id' field"""

def __init__(self, path, id, message):
def __init__(self, path: Tuple[str, ...], id, message: str):
self.path = path
self.id = id
self.message = message

def __str__(self):
def __str__(self) -> str:
return str(self.message)
44 changes: 32 additions & 12 deletions ocdsmerge/flatten.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import uuid
import warnings
from enum import Enum, auto, unique
from typing import TYPE_CHECKING, Any, Dict, Generator, List, Optional, Tuple, Union

from ocdsmerge.exceptions import DuplicateIdValueWarning, InconsistentTypeError
from ocdsmerge.rules import MergeRules

VERSIONED_VALUE_KEYS = frozenset(['releaseID', 'releaseDate', 'releaseTag', 'value'])

Expand All @@ -15,21 +17,25 @@ class MergeStrategy(Enum):

globals().update(MergeStrategy.__members__)

Identifier = Union[int, str]
Flattened = Dict[Tuple[Identifier, ...], Any]
RuleOverrides = Dict[Tuple[str, ...], MergeStrategy]


class IdValue(str):
"""
A string with ``identifier`` and ``original_value`` properties.
"""
def __init__(self, identifier):
def __init__(self, identifier: Identifier):
self.identifier = identifier
str.__init__(identifier)

@property
def original_value(self):
def original_value(self) -> Optional[Identifier]:
return self._original_value

@original_value.setter
def original_value(self, original_value):
def original_value(self, original_value: Optional[Identifier]) -> None:
self._original_value = original_value


Expand All @@ -38,22 +44,30 @@ class IdDict(dict):
A dictionary with an ``identifier`` property.
"""
@property
def identifier(self):
def identifier(self) -> Identifier:
return self._identifier

@identifier.setter
def identifier(self, identifier):
def identifier(self, identifier: Identifier) -> None:
self._identifier = identifier


def is_versioned_value(value):
def is_versioned_value(value: Dict[str, Any]) -> bool:
"""
Returns whether the value is a versioned value.
"""
return len(value) == 4 and VERSIONED_VALUE_KEYS.issuperset(value)


def flatten(obj, merge_rules, rule_overrides, flattened, path=(), rule_path=(), versioned=False):
def flatten(
obj: Union[List[Dict[str, Any]], Dict[str, Any]],
merge_rules: MergeRules,
rule_overrides: RuleOverrides,
flattened: Flattened,
path: Tuple[Identifier, ...] = (),
rule_path: Tuple[str, ...] = (),
versioned: Optional[bool] = False
) -> Flattened:
"""
Flattens a JSON object into key-value pairs, in which the key is the JSON path as a tuple. For example:
Expand Down Expand Up @@ -119,7 +133,9 @@ def flatten(obj, merge_rules, rule_overrides, flattened, path=(), rule_path=(),
return flattened


def _enumerate(obj, path, rule_path, rule):
def _enumerate(
obj: List[Dict[str, Any]], path: Tuple[Identifier, ...], rule_path: Tuple[str, ...], rule: Optional[MergeStrategy]
) -> Generator[Tuple[IdValue, Any], None, None]:
# This tracks the identifiers of objects in an array, to warn about collisions.
identifiers = {}

Expand All @@ -137,7 +153,7 @@ def _enumerate(obj, path, rule_path, rule):
yield new_key, value


def _id_value(key, value, rule):
def _id_value(key: int, value: Dict[str, Any], rule: Optional[MergeStrategy]) -> Tuple[IdValue, IdValue]:
# If it is an array of objects, get the `id` value to apply the identifier merge strategy.
# https://standard.open-contracting.org/latest/en/schema/merging/#identifier-merge
if 'id' in value:
Expand Down Expand Up @@ -167,13 +183,13 @@ def _id_value(key, value, rule):
return new_key, default_key


def unflatten(flattened):
def unflatten(flattened: Flattened) -> Dict[str, Any]:
"""
Unflattens a flattened object into a JSON object.
"""
unflattened = {}
unflattened: Dict[str, Any] = {}

identifiers = {}
identifiers: Dict[Tuple[Identifier, ...], IdDict] = {}

for key in flattened:
current_node = unflattened
Expand Down Expand Up @@ -210,6 +226,10 @@ def unflatten(flattened):
message = 'An earlier release had the literal {!r} for /{}, but the current release has an object with a {!r} key' # noqa: E501
raise InconsistentTypeError(message.format(current_node, '/'.join(key[:end - 1]), part))

# When running mypy, uncomment these lines:
# if TYPE_CHECKING:
# assert type(part) is str

# Otherwise, this is a path to a property of an object. If this is a path to a node we visited before,
# change into it. If it's an `id` field, it's already been set to its original value.
if part in current_node:
Expand Down
61 changes: 40 additions & 21 deletions ocdsmerge/merge.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
from ocdsmerge.flatten import flatten, unflatten
from ocdsmerge.rules import get_merge_rules
from typing import Any, Dict, List, Optional, Type

from ocdsmerge.flatten import Flattened, RuleOverrides, flatten, unflatten
from ocdsmerge.rules import MergeRules, Schema, get_merge_rules
from ocdsmerge.util import sorted_releases


class Merger:
def __init__(self, schema=None, merge_rules=None, rule_overrides=None):
def __init__(
self,
schema: Schema = None,
merge_rules: Optional[MergeRules] = None,
rule_overrides: Optional[RuleOverrides] = None,
):
"""
Initializes a reusable ``Merger`` instance for creating merged releases.
:param schema: the release schema (if not provided, will default to the latest version of OCDS)
:param dict merge_rules: the merge rules (if not provided, will determine the rules from the ``schema``)
:param dict rule_overrides: any rule overrides, in which keys are field paths as tuples, and values are either
:param merge_rules: the merge rules (if not provided, will determine the rules from the ``schema``)
:param rule_overrides: any rule overrides, in which keys are field paths as tuples, and values are either
``ocdsmerge.APPEND`` or ``ocdsmerge.MERGE_BY_POSITION``
:type schema: dict or str
"""
Expand All @@ -22,19 +29,19 @@ def __init__(self, schema=None, merge_rules=None, rule_overrides=None):
self.merge_rules = merge_rules
self.rule_overrides = rule_overrides

def create_compiled_release(self, releases):
def create_compiled_release(self, releases: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Merges a list of releases into a compiled release.
"""
return self._create_merged_release(CompiledRelease, releases)

def create_versioned_release(self, releases):
def create_versioned_release(self, releases: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Merges a list of releases into a versioned release.
"""
return self._create_merged_release(VersionedRelease, releases)

def _create_merged_release(self, cls, releases):
def _create_merged_release(self, cls: Type[MergedRelease], releases: List[Dict[str, Any]]) -> Dict[str, Any]:
merged_release = cls(merge_rules=self.merge_rules, rule_overrides=self.rule_overrides)
merged_release.extend(releases)
return merged_release.asdict()
Expand All @@ -44,16 +51,22 @@ class MergedRelease:
"""
Whether the class is for merging versioned releases.
"""
versioned = None

def __init__(self, data=None, schema=None, merge_rules=None, rule_overrides=None):
versioned: Optional[bool] = None

def __init__(
self,
data: Optional[Dict[str, Any]] = None,
schema: Schema = None,
merge_rules: Optional[MergeRules] = None,
rule_overrides: Optional[RuleOverrides] = None,
):
"""
Initializes a merged release.
:param dict data: the latest copy of the merged release, if any
:param data: the latest copy of the merged release, if any
:param schema: the release schema (if not provided, will default to the latest version of OCDS)
:param dict merge_rules: the merge rules (if not provided, will determine the rules from the ``schema``)
:param dict rule_overrides: any rule overrides, in which keys are field paths as tuples, and values are either
:param merge_rules: the merge rules (if not provided, will determine the rules from the ``schema``)
:param rule_overrides: any rule overrides, in which keys are field paths as tuples, and values are either
``ocdsmerge.APPEND`` or ``ocdsmerge.MERGE_BY_POSITION``
:type schema: dict or str
"""
Expand All @@ -70,20 +83,20 @@ def __init__(self, data=None, schema=None, merge_rules=None, rule_overrides=None
else:
self.data = flatten(data, self.merge_rules, self.rule_overrides, flattened={}, versioned=self.versioned)

def asdict(self):
def asdict(self) -> Dict[str, Any]:
"""
Returns the merged release as a dictionary.
"""
return unflatten(self.data)

def extend(self, releases):
def extend(self, releases: List[Dict[str, Any]]) -> None:
"""
Sorts and merges many releases into the merged release.
"""
for release in sorted_releases(releases):
self.append(release)

def append(self, release):
def append(self, release: Dict[str, Any]) -> None:
"""
Merges one release into the merged release.
"""
Expand All @@ -99,18 +112,22 @@ def append(self, release):
flat = flatten(release, self.merge_rules, self.rule_overrides, flattened={})
self.flat_append(flat, ocid, release_id, date, tag)

def flat_append(self, flat, ocid, release_id, date, tag):
def flat_append(
self, flat: Flattened, ocid: Optional[str], release_id: Optional[str], date: Optional[str], tag: Optional[str]
) -> None:
raise NotImplementedError('subclasses must implement flat_append()')


class CompiledRelease(MergedRelease):
versioned = False

def __init__(self, data=None, **kwargs):
def __init__(self, data: Optional[Dict[str, Any]] = None, **kwargs):
super().__init__(data, **kwargs)
self.data[('tag',)] = ['compiled']

def flat_append(self, flat, ocid, release_id, date, tag):
def flat_append(
self, flat: Flattened, ocid: Optional[str], release_id: Optional[str], date: Optional[str], tag: Optional[str]
) -> None:
# Add an `id` and `date`.
self.data[('id',)] = f'{ocid}-{date}'
self.data[('date',)] = date
Expand All @@ -123,7 +140,9 @@ def flat_append(self, flat, ocid, release_id, date, tag):
class VersionedRelease(MergedRelease):
versioned = True

def flat_append(self, flat, ocid, release_id, date, tag):
def flat_append(
self, flat: Flattened, ocid: Optional[str], release_id: Optional[str], date: Optional[str], tag: Optional[str]
) -> None:
# Don't version the OCID.
flat.pop(('ocid',), None)
self.data[('ocid',)] = ocid
Expand Down
16 changes: 11 additions & 5 deletions ocdsmerge/rules.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
from functools import lru_cache
from typing import Any, Dict, Generator, List, Optional, Set, Tuple, Union

import jsonref

from ocdsmerge.util import get_release_schema_url, get_tags

MergeRules = Dict[Tuple[str, ...], Set[str]]
Schema = Optional[Union[str, Dict[str, Any]]]

def get_merge_rules(schema=None):

def get_merge_rules(schema: Schema = None) -> MergeRules:
"""
Returns merge rules as key-value pairs, in which the key is a JSON path as a tuple, and the value is a list of
merge properties whose values are `true`.
Expand All @@ -19,7 +23,7 @@ def get_merge_rules(schema=None):


@lru_cache()
def _get_merge_rules_from_url_or_path(schema):
def _get_merge_rules_from_url_or_path(schema: str) -> MergeRules:
if schema.startswith('http'):
deref_schema = jsonref.load_uri(schema)
else:
Expand All @@ -28,11 +32,13 @@ def _get_merge_rules_from_url_or_path(schema):
return _get_merge_rules_from_dereferenced_schema(deref_schema)


def _get_merge_rules_from_dereferenced_schema(deref_schema):
def _get_merge_rules_from_dereferenced_schema(deref_schema: Dict[str, Any]) -> MergeRules:
return dict(_get_merge_rules(deref_schema['properties']))


def _get_merge_rules(properties, path=None):
def _get_merge_rules(
properties: Dict[str, Any], path: Optional[Tuple[str, ...]] = None
) -> Generator[Tuple[Tuple[str, ...], Set[str]], None, None]:
"""
Yields merge rules as key-value pairs, in which the first element is a JSON path as a tuple, and the second element
is a list of merge properties whose values are `true`.
Expand Down Expand Up @@ -67,7 +73,7 @@ def _get_merge_rules(properties, path=None):
yield from _get_merge_rules(value['items']['properties'], path=new_path)


def _get_types(prop):
def _get_types(prop: Dict[str, Any]) -> List[str]:
"""
Returns a property's `type` as a list.
"""
Expand Down
7 changes: 4 additions & 3 deletions ocdsmerge/util.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import re
from functools import lru_cache
from typing import Any, Dict, List

import requests

Expand All @@ -8,22 +9,22 @@


@lru_cache()
def get_tags():
def get_tags() -> List[str]:
"""
Returns the tags of all versions of OCDS in alphabetical order.
"""
return re.findall(r'"(\d+__\d+__\d+)/', requests.get('https://standard.open-contracting.org/schema/').text)


def get_release_schema_url(tag):
def get_release_schema_url(tag: str) -> str:
"""
Returns the URL of the release schema in the given version of OCDS.
"""
return f'https://standard.open-contracting.org/schema/{tag}/release-schema.json'


# If we need a method to get dates from releases, see https://github.com/open-contracting/ocds-merge/issues/25
def sorted_releases(releases):
def sorted_releases(releases: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Sorts a list of releases by date.
"""
Expand Down

0 comments on commit 1522f56

Please sign in to comment.