This repository has been archived by the owner on May 13, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 100
/
login.py
85 lines (66 loc) · 2.32 KB
/
login.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import time
import uuid
from flask_httpauth import HTTPBasicAuth
from flask import g
from flask_restful import Resource
auth = HTTPBasicAuth()
class TokenAccess:
def __init__(self, user: str):
self.token = str(uuid.uuid1())
self.time_created = time.time()
self.time_accessed = time.time()
self.user = user
class UsersDB:
def __init__(self):
self.users = {}
self.tokens = {}
self.token_expiry_seconds = 600 # token is valid for 10 minutes
def add_user(self, user: str, password: str):
self.users[user] = password
def del_user(self, user: str):
self.users.pop(user)
def authorize(self, user_or_token: str, password: str):
"""Authorize user using token or username/password combination"""
g.user = user_or_token
token_record = self.verify_token(user_or_token)
if token_record is not None:
token_record.time_accessed = time.time()
return True
else:
self.tokens.pop(user_or_token, '')
if user_or_token not in self.users:
return False
return self.users[user_or_token] == password
def verify_token(self, token: str):
"""Verify if the token is valid and not expired"""
token_record = self.tokens.get(token, None)
if token_record is None:
return None
t_diff = time.time() - token_record.time_accessed
assert t_diff >= 0
if t_diff > self.token_expiry_seconds:
return None
return token_record
def remove_token(self, token: str):
del self.tokens[token]
def get_token(self, user: str):
token_record = TokenAccess(user)
self.tokens[token_record.token] = token_record
return token_record.token
userDB = UsersDB()
# used by flask to process http auth requests
@auth.verify_password
def verify_password(username, password):
return userDB.authorize(username, password)
# two resources for managing the login
# exported as /login and /logout
class ChannelManagementLogin(Resource):
@auth.login_required
def get(self):
token = userDB.get_token(g.user)
return {'token': token}, 200
class ChannelManagementLogout(Resource):
@auth.login_required
def get(self):
userDB.remove_token(g.user)
return "OK", 200