/
util.py
133 lines (95 loc) · 2.87 KB
/
util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
from hashlib import md5, sha256
from hmac import compare_digest, new as hmac
from binascii import unhexlify
from os import getpid
from socket import gethostname
from time import time
from random import randint
from threading import RLock
log = __import__('logging').getLogger(__name__)
MACHINE = int(md5(gethostname().encode()).hexdigest()[:6], 16)
class SignatureError(ValueError):
pass
class Counter:
def __init__(self):
self.value = randint(0, 2**24)
self.lock = RLock()
def __iter__(self):
return self
def __next__(self):
with self.lock:
self.value = (self.value + 1) % 0xFFFFFF
value = self.value
return value
next = __next__
counter = Counter()
class SessionIdentifier:
def __init__(self, value=None):
if value:
self.parse(value)
else:
self.generate()
def parse(self, value):
self.time = int(value[:8], 16)
self.machine = int(value[8:14], 16)
self.process = int(value[14:18], 16)
self.counter = int(value[18:24], 16)
def generate(self):
self.time = int(time())
self.machine = MACHINE
self.process = getpid() % 0xFFFF
self.counter = next(counter)
def __bytes__(self):
return str(self).encode('ascii')
def __str__(self):
return f"{self.time:08x}{self.machine:06x}{self.process:04x}{self.counter:06x}"
def __repr__(self):
return f"{self.__class__.__name__}('{self}')"
class SignedSessionIdentifier(SessionIdentifier):
__slots__ = ('__secret', '__signature', 'expires')
def __init__(self, value=None, secret=None, expires=None):
self.__secret = secret.encode('ascii') if hasattr(secret, 'encode') else secret
self.__signature = None
self.expires = expires
super().__init__(value)
def parse(self, value):
if len(value) != 88:
raise SignatureError("Invalid signed identifier length.")
super().parse(value)
self.__signature = value[24:].encode('ascii')
if not self.valid:
raise SignatureError("Invalid signed identifier.")
@property
def signed(self):
return bytes(self) + self.signature
@property
def signature(self):
if not self.__signature:
self.__signature = hmac(
self.__secret,
unhexlify(bytes(self)),
sha256
).hexdigest()
if hasattr(self.__signature, 'encode'):
self.__signature = self.__signature.encode('ascii')
return self.__signature
@property
def valid(self):
if not self.__signature:
raise SignatureError("No signature present.")
return False
if self.expires and (time() - self.time) > self.expires:
raise SignatureError("Expired signature.")
return False
challenge = hmac(
self.__secret,
unhexlify(bytes(self)),
sha256
).hexdigest()
if hasattr(challenge, 'encode'):
challenge = challenge.encode('ascii')
result = compare_digest(challenge, self.signature)
if not result:
raise SignatureError("Invalid signature:", repr(challenge), repr(self.signature))
return False
return True