-
Notifications
You must be signed in to change notification settings - Fork 1
/
utils.py
219 lines (181 loc) · 6.18 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
#!/usr/bin/env python
#-*- coding:utf-8 -*-
# This file is part of Stones Server Side.
# Stones Server Side is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# Stones Server Side is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with Stones Server Side. If not, see <http://www.gnu.org/licenses/>.
# Copyright 2013, Carlos León <carlos.eduardo.leon.franco@gmail.com>
import datetime
import logging
import base64
import random
import string
try:
import simplejson as json
except ImportError:
import json
import webapp2
import webapp2_extras.json
from google.appengine.ext import ndb
from google.appengine.ext import blobstore
from google.appengine.api import files
from google.appengine.api import users as google_users
from google.appengine.api import images
from google.appengine.api import datastore_types
import model
import unittest
from google.appengine.ext import testbed
from google.appengine.datastore import datastore_stub_util
logger = logging.getLogger(__name__)
__all__ = ['JSONEncoder',
'clear_id',
'BaseTestCase',
'get_constant_display',
'get_constants_choices',
'resize_image',
'fix_base64_padding',
'get_random_string',
'decode_json',
'encode_json',
'if_attr']
def get_constant_display(constant, constants_group):
'''Extract constant display from her constants group.'''
for item in constants_group:
if constant == item[0]:
return item[1]
raise IndexError
def get_constants_choices(constants_group):
'''Returns a list of constants.'''
return [item[0] for item in constants_group]
def clear_id(id):
"""
Clear id to add it to model creation.
Converts to ascii, replace spaces, and more.
"""
map_ = {u'ñ': 'n', u'á': 'a', u'é': 'e', u'í': u'i', u'ó': 'o', u'ú': 'u'}
_id = id
_id = _id.strip().lower()
_id = _id.replace(' ', '-')
for old, new in map_.iteritems():
_id = _id.replace(old, new)
return _id
class JSONEncoder(json.JSONEncoder):
"""
Encoder for models dumps.
"""
def __init__(self, *args, **kwargs):
kwargs['ensure_ascii'] = True
super(JSONEncoder, self).__init__(*args, **kwargs)
def default(self, obj):
try:
from babel.support import LazyProxy
except ImportError:
LazyProxy = None
if isinstance(obj, datetime.datetime) or isinstance(obj, datetime.date):
return obj.strftime(model.DATETIME_FORMAT)
elif isinstance(obj, datetime.date):
return obj.strftime(model.DATE_FORMAT)
elif isinstance(obj, datetime.time):
return obj.strftime(model.TIME_FORMAT)
elif isinstance(obj, ndb.Query):
return obj.fetch()
elif LazyProxy and isinstance(obj, LazyProxy):
return unicode(obj)
elif isinstance(obj, ndb.Model):
our_dict = obj.to_dict()
if not '__key__' in our_dict and obj.key:
our_dict['__key__'] = obj.key.urlsafe()
if not '__id__' in our_dict and obj.key:
our_dict['__id__'] = obj.key.id()
return our_dict
elif isinstance(obj, model.Key):
return obj.urlsafe()
elif isinstance(obj, google_users.User):
return {
'email': obj.email(),
'user_id': obj.user_id(),
'nickname': obj.nickname(),
}
elif isinstance(obj, blobstore.BlobKey):
return str(obj)
elif isinstance(obj, datastore_types.GeoPt):
return {
'lat': obj.lat,
'lon': obj.lon
}
else:
return json.JSONEncoder.default(self, obj)
def resize_image(b64encoded_image, width=0, height=0):
'''Resize an image.
:param b64encoded_image:
Base64 image representation.
:returns:
base64 encoded string representing the image.'''
# if we deal with unprocessed base64.
if 'data:' in b64encoded_image:
parts = b64encoded_image.split(',')
mimetype = parts[0].split(';')[0].split(':')[1]
content = parts[1]
else:
mimetype = 'image/png'
content = b64encoded_image
img = base64.decodestring(content)
img = images.resize(image_data=img, height=height, width=width)
img = base64.encodestring(img)
img = ''.join(['data:', mimetype, ';base64,', img.encode('utf-8')])
return img
class BaseTestCase(unittest.TestCase):
'''Base class to test.'''
def __init__(self, *args, **kwargs):
super(BaseTestCase, self).__init__(*args, **kwargs)
try:
app = webapp2.import_string('main.app')
except webapp2.ImportStringError:
app = None
self.app = app
def setUp(self):
'''Activates some appengine specific stuff.'''
self.testbed = testbed.Testbed()
self.testbed.activate()
# Consistency policy to HRD.
self.policy = datastore_stub_util.PseudoRandomHRConsistencyPolicy(probability=0)
self.testbed.init_datastore_v3_stub(consistency_policy=self.policy)
self.testbed.init_memcache_stub()
self.testbed.init_user_stub()
def tearDown(self):
self.testbed.deactivate()
def get_random_string(length=12):
'''Builds a given length random string.'''
return ''.join(random.sample(string.letters + string.digits, length))
def fix_base64_padding(s):
'''Fix base64 padding.'''
sb = s.rstrip()
missing_padding = 4 - len(sb) % 4
if missing_padding:
sb += '='* missing_padding
return sb
def decode_json(json_string):
'''Decode a string that represents a JSON.'''
return webapp2_extras.json.decode(json_string)
def encode_json(jsonable, **kwargs):
'''Encode a dict in JSON format.'''
return webapp2_extras.json.encode(jsonable, ensure_ascii=False,
cls=JSONEncoder, **kwargs)
def if_attr(obj, attr):
'''Check if attr exist in obj.
attr: string that represents attribute.
'''
parts = attr.split('.')
_obj = obj
for part in parts:
if not getattr(_obj, part, None):
return False
_obj = getattr(_obj, part, None)
return True