-
Notifications
You must be signed in to change notification settings - Fork 0
/
db.py
134 lines (96 loc) · 3.51 KB
/
db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import secrets
import uuid
import redis
from bson import ObjectId
from bson.errors import InvalidId
import config
from utils import json_response
tokens_db = redis.StrictRedis(host=config.REDIS_HOST, port=config.REDIS_PORT, db=config.REDIS_DB)
EXPIRE_TIME = 3600
def get_token(key):
token = tokens_db.get(key)
if token is not None:
return token.decode("utf-8")
return token
def create_new_token(user_id):
token = secrets.token_hex(32)
tokens_db.set(user_id, token)
tokens_db.set(token, user_id)
tokens_db.expire(user_id, EXPIRE_TIME)
tokens_db.expire(token, EXPIRE_TIME)
return token
def list_all(collection, filters=None, arguments=None):
if filters is None:
filters = {}
page, items = int(arguments.get('page', 0)), int(arguments.get('items', 25))
for argument in arguments:
if argument not in ['page', 'items']:
filters[argument] = arguments[argument]
count = collection.find(filters).count()
data = collection.find(filters).skip(page*items).limit(items)
result = [x for x in data]
for x in result:
x['_id'] = str(x['_id'])
if 'e_tag' in x:
x.pop('e_tag')
return json_response({"items": result, "page": page, "all_count": count}, 200)
def find_one(collection, object_id, filter_key='_id'):
if filter_key == '_id':
key = ObjectId(object_id)
else:
key = object_id
try:
return collection.find_one({filter_key: key})
except InvalidId:
return None
def find_one_response(collection, object_id, filter_key='_id'):
data = find_one(collection, object_id, filter_key)
headers = {}
if data is None:
return json_response({"message": "Not found", "status": "404"}, 404)
data['_id'] = str(data['_id'])
if 'e_tag' in data:
headers['ETag'] = data['e_tag']
data.pop('e_tag')
for k, v in data.items():
if isinstance(v, bytes):
data[k] = v.decode('utf-8')
if 'hash' in k:
data[k] = "***"
return json_response(data, 200, headers)
def object_save(collection, object_data, location=""):
headers = {}
create = '_id' not in object_data
object_data['e_tag'] = str(uuid.uuid4())
key = str(collection.save(object_data))
if create:
status = 201
if location == "users":
key = object_data.get('login')
headers['Location'] = f"/{location}/{key}"
else:
status = 200
headers['ETag'] = object_data.get('e_tag')
return json_response("", status, headers)
def delete_one_response(collection, object_id, filter_key='_id'):
if find_one(collection, object_id, filter_key) is None:
return json_response({"message": "Not found", "status": "404"}, 404)
if filter_key == '_id':
key = ObjectId(object_id)
else:
key = object_id
collection.delete_one({filter_key: key})
return json_response("", 204)
def delete_many(collection, filters):
collection.delete_many(filters)
return json_response("", 204)
def generate_single_post_url_response(collection, user, location=""):
existing = collection.find_one({'_temp': True, 'user_id': user.get_id()})
if existing is not None:
key = str(existing['_id'])
e_tag = existing.get('e_tag')
else:
e_tag = str(uuid.uuid4())
key = str(collection.save({'e_tag': e_tag, '_temp': True, 'user_id': user.get_id()}))
headers = {'Location': f"/{location}/{key}", 'ETag': e_tag}
return json_response("", 201, headers)