-
Notifications
You must be signed in to change notification settings - Fork 7
/
utils.py
124 lines (110 loc) · 3.62 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
from urllib.parse import urlparse
import datetime
import os
import re
from functools import wraps
from flask import request, make_response, jsonify, session
import jwt
from dotenv import load_dotenv
import psycopg2
from psycopg2.extras import RealDictCursor
from config import BaseConfig
def db_config(database_uri=None):
""" This function extracts postgres url
and return database login information
:param database_uri: database Configuration uri
:return: database login information
"""
load_dotenv()
if os.environ.get('DATABASE_URL'):
database_uri = os.environ.get('DATABASE_URL')
result = urlparse(database_uri)
config = {
'database': result.path[1:],
'user': result.username,
'password': result.password,
'host': result.hostname
}
if os.environ.get('APP_SETTINGS') == 'TESTING':
config['database'] = BaseConfig.TEST_DB
return config
def is_blacklisted(token):
""" Checks whether a token is blacklisted """
con, queryset_list = psycopg2.connect(**db_config()), None
cur = con.cursor(cursor_factory=RealDictCursor)
try:
cur.execute("select * from token_blacklist WHERE token='{}'".format(token))
queryset_list = cur.fetchall()
except Exception as e:
print(e)
con.close()
return queryset_list
def jwt_required(f):
""" Ensure jwt token is provided and valid
:param f: function to decorated
:return: decorated function
"""
@wraps(f)
def decorated_function(*args, **kwargs):
try:
auth_header = request.headers.get('Authorization').split(' ')[-1]
except Exception as e:
print(e)
return make_response(jsonify({"message": 'Unauthorized. Please login'})), 401
result = decode_auth_token(auth_header)
try:
if int(result):
pass
except Exception as e:
print(e)
if not result:
result = 'Unauthorized. Please login'
return make_response(jsonify({"message": result})), 401
return f(*args, **kwargs)
return decorated_function
def encode_auth_token(user_id, username=None):
"""
Encodes a payload to generate JWT Token
:param user_id: Logged in user Id
:param username: string: Logged in username
:return: JWT token
:TODO add secret key to app configuration
"""
payload = {
'exp': datetime.datetime.utcnow() + datetime.timedelta(days=31, seconds=130),
'iat': datetime.datetime.utcnow(),
'sub': user_id,
'username': username
}
return jwt.encode(
payload,
'SECRET_KEY',
algorithm='HS256'
)
def decode_auth_token(auth_token):
""" Validates the auth token
:param auth_token:
:return: integer|string
"""
try:
if is_blacklisted(auth_token):
return 'Token has been blacklisted. Please log in again'
payload = jwt.decode(auth_token, 'SECRET_KEY', algorithm='HS256')
session['user_id'] = str(payload.get('sub'))
return payload['sub']
except jwt.ExpiredSignatureError:
return 'Token Signature expired. Please log in again.'
except jwt.InvalidTokenError:
return 'Invalid token. Please log in again.'
def valid_email(email):
""" Validate email """
try:
if email.count('@') > 1:
return False
last_part = email.split('@'[-1])[-1]
if last_part.count('.com') > 1:
return False
return re.match(r'^.+@([?)[a-zA-Z0-9-.])+.([a-zA-Z]{2,3}|[0-9]{1,3})(]?)$', email)
except Exception as e:
print(e)
return False