-
Notifications
You must be signed in to change notification settings - Fork 169
/
q_serializer.py
133 lines (113 loc) · 4.15 KB
/
q_serializer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
"""This is a module to serializers/deserializes Django Q (query) object."""
from datetime import datetime, date
import base64
import time
from django.db.models import Q
from django.core.serializers.base import SerializationError
import simplejson as json
try:
min_ts = time.mktime(datetime.min.timetuple())
except OverflowError:
min_ts = 0
try:
max_ts = time.mktime((3000,) + (0,) * 8) # avoid OverflowError on windows
except OverflowError:
max_ts = time.mktime((2038,) + (0,) * 8) # limit 32bits
def dt2ts(obj):
return time.mktime(obj.timetuple()) if isinstance(obj, date) else obj
class QSerializer:
"""
A Q object serializer base class. Pass base64=True when initializing
to Base-64 encode/decode the returned/passed string.
By default the class provides loads/dumps methods that wrap around
json serialization, but they may be easily overwritten to serialize
into other formats (i.e XML, YAML, etc...)
"""
b64_enabled = False
def __init__(self, base64=False):
if base64:
self.b64_enabled = True
@staticmethod
def _is_range(qtuple):
return qtuple[0].endswith("__range") and len(qtuple[1]) == 2
def prepare_value(self, qtuple):
if self._is_range(qtuple):
qtuple[1][0] = qtuple[1][0] or min_ts
qtuple[1][1] = qtuple[1][1] or max_ts
qtuple[1] = (datetime.fromtimestamp(qtuple[1][0]),
datetime.fromtimestamp(qtuple[1][1]))
return qtuple
def serialize(self, q):
"""
Serialize a Q object into a (possibly nested) dict.
"""
children = []
for child in q.children:
if isinstance(child, Q):
children.append(self.serialize(child))
else:
children.append(child)
serialized = q.__dict__
serialized['children'] = children
return serialized
def deserialize(self, d):
"""
De-serialize a Q object from a (possibly nested) dict.
"""
children = []
for child in d.pop('children'):
if isinstance(child, dict):
children.append(self.deserialize(child))
else:
children.append(self.prepare_value(child))
query = Q()
query.children = children
query.connector = d['connector']
query.negated = d['negated']
if 'subtree_parents' in d:
query.subtree_parents = d['subtree_parents']
return query
def get_field_values_list(self, d):
"""
Iterate over a (possibly nested) dict, and return a list
of all children queries, as a dict of the following structure:
{
'field': 'some_field__iexact',
'value': 'some_value',
'value_from': 'optional_range_val1',
'value_to': 'optional_range_val2',
'negate': True,
}
OR relations are expressed as an extra "line" between queries.
"""
fields = []
children = d.get('children', [])
for child in children:
if isinstance(child, dict):
fields.extend(self.get_field_values_list(child))
else:
f = {'field': child[0], 'value': child[1]}
if self._is_range(child):
f['value_from'] = child[1][0]
f['value_to'] = child[1][1]
f['negate'] = d.get('negated', False)
fields.append(f)
# add _OR line
if d['connector'] == 'OR' and children[-1] != child:
fields.append({'field': '_OR', 'value': 'null'})
return fields
def dumps(self, obj):
if not isinstance(obj, Q):
raise SerializationError
string = json.dumps(self.serialize(obj), default=dt2ts)
if self.b64_enabled:
return base64.b64encode(string.encode("latin-1")).decode("utf-8")
return string
def loads(self, string, raw=False):
if self.b64_enabled:
d = json.loads(base64.b64decode(string))
else:
d = json.loads(string)
if raw:
return d
return self.deserialize(d)