Skip to content

Commit

Permalink
Fix parent and move serialize_document_url to utils
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Leitsius committed Aug 2, 2017
1 parent 8c25407 commit 9ea7879
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 34 deletions.
39 changes: 7 additions & 32 deletions openregistry/api/models/ocds.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
from urlparse import urlparse, parse_qs
from uuid import uuid4

from schematics.types import (StringType, FloatType, URLType, IntType,
Expand All @@ -12,12 +11,15 @@
DEFAULT_ITEM_CLASSIFICATION, ITEM_CLASSIFICATIONS, TZ, DOCUMENT_TYPES,
IDENTIFIER_CODES
)
from openregistry.api.utils import get_now
from openregistry.api.utils import get_now, serialize_document_url

from .schematics_extender import Model, IsoDateTimeType, HashType
from .roles import document_roles, organization_roles


# OCDS Building Blocks.
# More info: http://standard.open-contracting.org/latest/en/getting_started/building_blocks

class Value(Model):
amount = FloatType(required=True, min_value=0) # Amount as a number.
currency = StringType(required=True, default=DEFAULT_CURRENCY, max_length=3, min_length=3) # The currency in 3-letter ISO 4217 format.
Expand Down Expand Up @@ -87,7 +89,7 @@ class Location(Model):


class Document(Model):
class Options():
class Options:
roles = document_roles

id = MD5Type(required=True, default=lambda: uuid4().hex)
Expand All @@ -109,34 +111,7 @@ class Options():

@serializable(serialized_name="url")
def download_url(self):
url = self.url
if not url or '?download=' not in url:
return url
doc_id = parse_qs(urlparse(url).query)['download'][-1]
root = self.__parent__
parents = []
while root.__parent__ is not None:
parents[0:0] = [root]
root = root.__parent__
request = root.request
if not request.registry.docservice_url:
return url
if 'status' in parents[0] and parents[0].status in type(parents[0])._options.roles:
role = parents[0].status
for index, obj in enumerate(parents):
if obj.id != url.split('/')[(index - len(parents)) * 2 - 1]:
break
field = url.split('/')[(index - len(parents)) * 2]
if "_" in field:
field = field[0] + field.title().replace("_", "")[1:]
roles = type(obj)._options.roles
if roles[role if role in roles else 'default'](field, []):
return url
from openregistry.api.utils import generate_docservice_url
if not self.hash:
path = [i for i in urlparse(url).path.split('/') if len(i) == 32 and not set(i).difference(hexdigits)]
return generate_docservice_url(request, doc_id, False, '{}/{}'.format(path[0], path[-1]))
return generate_docservice_url(request, doc_id, False)
return serialize_document_url(self)


class Identifier(Model):
Expand Down Expand Up @@ -179,7 +154,7 @@ def validate_email(self, data, value):

class Organization(Model):
"""An organization."""
class Options():
class Options:
roles = organization_roles

name = StringType(required=True)
Expand Down
3 changes: 1 addition & 2 deletions openregistry/api/models/schematics_extender.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ def to_primitive(self, value, context=None):
return value.isoformat()



class HashType(StringType):

MESSAGES = {
Expand Down Expand Up @@ -78,7 +77,7 @@ class Options(object):

def __init__(self,*args, **kwargs):
super(Model, self).__init__(*args, **kwargs)
for i, j in self._data.items():
for i, j in self._data.converted.items():
if isinstance(j, list):
for x in j:
set_parent(x, self)
Expand Down
57 changes: 57 additions & 0 deletions openregistry/api/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
from datetime import datetime
from string import hexdigits
from json import dumps
from uuid import uuid4
from functools import partial
Expand All @@ -10,6 +11,11 @@
from cornice.resource import view
from webob.multidict import NestedMultiDict
from pkg_resources import iter_entry_points
from urlparse import urlparse, parse_qs, urlunsplit
from time import time as ttime
from urllib import quote, urlencode
from base64 import b64encode

from jsonpatch import make_patch, apply_patch as _apply_patch

from openregistry.api.events import ErrorDesctiptorEvent
Expand Down Expand Up @@ -165,6 +171,24 @@ def fix_url(item, app_url):
]


def generate_docservice_url(request, doc_id, temporary=True, prefix=None):
docservice_key = getattr(request.registry, 'docservice_key', None)
parsed_url = urlparse(request.registry.docservice_url)
query = {}
if temporary:
expires = int(ttime()) + 300 # EXPIRES
mess = "{}\0{}".format(doc_id, expires)
query['Expires'] = expires
else:
mess = doc_id
if prefix:
mess = '{}/{}'.format(prefix, mess)
query['Prefix'] = prefix
query['Signature'] = quote(b64encode(docservice_key.signature(mess.encode("utf-8"))))
query['KeyID'] = docservice_key.hex_vk()[:8]
return urlunsplit((parsed_url.scheme, parsed_url.netloc, '/get/{}'.format(doc_id), urlencode(query), ''))


def forbidden(request):
request.errors.add('url', 'permission', 'Forbidden')
request.errors.status = 403
Expand Down Expand Up @@ -321,3 +345,36 @@ def set_modetest_titles(item):
item.title_en = u'[TESTING] {}'.format(item.title_en or u'')
if not item.title_ru or u'[ТЕСТИРОВАНИЕ]' not in item.title_ru:
item.title_ru = u'[ТЕСТИРОВАНИЕ] {}'.format(item.title_ru or u'')


# Schematics Serialize Functions


def serialize_document_url(document):
url = document.url
if not url or '?download=' not in url:
return url
doc_id = parse_qs(urlparse(url).query)['download'][-1]
root = document.__parent__
parents = []
while root.__parent__ is not None:
parents[0:0] = [root]
root = root.__parent__
request = root.request
if not request.registry.docservice_url:
return url
if 'status' in parents[0] and parents[0].status in type(parents[0])._options.roles:
role = parents[0].status
for index, obj in enumerate(parents):
if obj.id != url.split('/')[(index - len(parents)) * 2 - 1]:
break
field = url.split('/')[(index - len(parents)) * 2]
if "_" in field:
field = field[0] + field.title().replace("_", "")[1:]
roles = type(obj)._options.roles
if roles[role if role in roles else 'default'](field, []):
return url
if not document.hash:
path = [i for i in urlparse(url).path.split('/') if len(i) == 32 and not set(i).difference(hexdigits)]
return generate_docservice_url(request, doc_id, False, '{}/{}'.format(path[0], path[-1]))
return generate_docservice_url(request, doc_id, False)

0 comments on commit 9ea7879

Please sign in to comment.