Skip to content

Commit

Permalink
Move object_hook outside loads method so class can be extend and reused
Browse files Browse the repository at this point in the history
  • Loading branch information
s-block committed Feb 23, 2017
1 parent dc5f48f commit 8a8a608
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 44 deletions.
147 changes: 103 additions & 44 deletions flask/sessions.py
Expand Up @@ -8,7 +8,6 @@
:copyright: (c) 2015 by Armin Ronacher.
:license: BSD, see LICENSE for more details.
"""

import uuid
import hashlib
from base64 import b64encode, b64decode
Expand Down Expand Up @@ -49,22 +48,82 @@ def _set_permanent(self, value):
modified = True


def _tag(value):
if isinstance(value, tuple):
return {' t': [_tag(x) for x in value]}
elif isinstance(value, uuid.UUID):
return {' u': value.hex}
elif isinstance(value, bytes):
return {' b': b64encode(value).decode('ascii')}
elif callable(getattr(value, '__html__', None)):
return {' m': text_type(value.__html__())}
elif isinstance(value, list):
return [_tag(x) for x in value]
elif isinstance(value, datetime):
return {' d': http_date(value)}
elif isinstance(value, dict):
return dict((k, _tag(v)) for k, v in iteritems(value))
elif isinstance(value, str):
class TaggedJSONSerializer(object):
"""A customized JSON serializer that supports a few extra types that
we take for granted when serializing (tuples, markup objects, datetime).
"""

def __init__(self):
self.conversions = [
{
'check': lambda value: self._is_dict_with_used_key(value),
'tag': lambda value: self._tag_dict_used_with_key(value),
'untag': lambda value: self._untag_dict_used_with_key(value),
'key': ' di',
},
{
'check': lambda value: isinstance(value, tuple),
'tag': lambda value: [self._tag(x) for x in value],
'untag': lambda value: tuple(value),
'key': ' t',
},
{
'check': lambda value: isinstance(value, uuid.UUID),
'tag': lambda value: value.hex,
'untag': lambda value: uuid.UUID(value),
'key': ' u',
},
{
'check': lambda value: isinstance(value, bytes),
'tag': lambda value: b64encode(value).decode('ascii'),
'untag': lambda value: b64decode(value),
'key': ' b',
},
{
'check': lambda value: callable(getattr(value, '__html__',
None)),
'tag': lambda value: text_type(value.__html__()),
'untag': lambda value: Markup(value),
'key': ' m',
},
{
'check': lambda value: isinstance(value, list),
'tag': lambda value: [self._tag(x) for x in value],
},
{
'check': lambda value: isinstance(value, datetime),
'tag': lambda value: http_date(value),
'untag': lambda value: parse_date(value),
'key': ' d',
},
{
'check': lambda value: isinstance(value, dict),
'tag': lambda value: dict((k, self._tag(v)) for k, v in
iteritems(value)),
},
{
'check': lambda value: isinstance(value, str),
'tag': lambda value: self._tag_string(value),
}
]

@property
def keys(self):
return [c['key'] for c in self.conversions if c.get('key')]

def _get_conversion_untag(self, key):
return next(
(c['untag'] for c in self.conversions if c.get('key') == key),
lambda v: None
)

def _is_dict_with_used_key(self, v):
return isinstance(v, dict) and len(v) == 1 and list(v)[0] in self.keys

def _was_dict_with_used_key(self, k):
return k.endswith('__') and k[:-2] in self.keys

def _tag_string(self, value):
try:
return text_type(value)
except UnicodeError:
Expand All @@ -73,38 +132,38 @@ def _tag(value):
u'non-ASCII data was passed to the session system '
u'which can only store unicode strings. Consider '
u'base64 encoding your string (String was %r)' % value)
return value


class TaggedJSONSerializer(object):
"""A customized JSON serializer that supports a few extra types that
we take for granted when serializing (tuples, markup objects, datetime).
"""
def _tag_dict_used_with_key(self, value):
k, v = next(iteritems(value))
return {'%s__' % k: v}

def _tag(self, value):
for tag_ops in self.conversions:
if tag_ops['check'](value):
tag = tag_ops.get('key')
if tag:
return {tag: tag_ops['tag'](value)}
return tag_ops['tag'](value)
return value

def _untag_dict_used_with_key(self, the_value):
k, v = next(iteritems(the_value))
if self._was_dict_with_used_key(k):
return {k[:-2]: self._untag(v)}

def _untag(self, obj):
if len(obj) != 1:
return obj
the_key, the_value = next(iteritems(obj))
untag = self._get_conversion_untag(the_key)
new_value = untag(the_value)
return new_value if new_value else obj

def dumps(self, value):
return json.dumps(_tag(value), separators=(',', ':'))

LOADS_MAP = {
' t': tuple,
' u': uuid.UUID,
' b': b64decode,
' m': Markup,
' d': parse_date,
}
return json.dumps(self._tag(value), separators=(',', ':'))

def loads(self, value):
def object_hook(obj):
if len(obj) != 1:
return obj
the_key, the_value = next(iteritems(obj))
# Check the key for a corresponding function
return_function = self.LOADS_MAP.get(the_key)
if return_function:
# Pass the value to the function
return return_function(the_value)
# Didn't find a function for this object
return obj
return json.loads(value, object_hook=object_hook)
return json.loads(value, object_hook=self._untag)


session_json_serializer = TaggedJSONSerializer()
Expand Down
6 changes: 6 additions & 0 deletions tests/test_basic.py
Expand Up @@ -383,6 +383,9 @@ def modify_session(response):
flask.session['dt'] = now
flask.session['b'] = b'\xff'
flask.session['t'] = (1, 2, 3)
flask.session['spacefirst'] = {' t': 'not-a-tuple'}
flask.session['withunderscores'] = {' t__': 'not-a-tuple'}
flask.session['notadict'] = {' di': 'not-a-dict'}
return response

@app.route('/')
Expand All @@ -399,6 +402,9 @@ def dump_session_contents():
assert rv['b'] == b'\xff'
assert type(rv['b']) == bytes
assert rv['t'] == (1, 2, 3)
assert rv['spacefirst'] == {' t': 'not-a-tuple'}
assert rv['withunderscores'] == {' t__': 'not-a-tuple'}
assert rv['notadict'] == {' di': 'not-a-dict'}


def test_session_cookie_setting():
Expand Down

0 comments on commit 8a8a608

Please sign in to comment.