This repository has been archived by the owner on Jan 5, 2019. It is now read-only.
/
decorators.py
101 lines (73 loc) · 2.68 KB
/
decorators.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# coding: utf-8
"""
Secure JavaScript Login
~~~~~~~~~~~~~~~~~~~~~~~
:copyleft: 2015 by the secure-js-login team, see AUTHORS for more details.
:created: by JensDiemer.de
:license: GNU GPL v3 or above, see LICENSE for more details
"""
from __future__ import unicode_literals
import collections
import functools
import logging
import traceback
import sys
import time
log = logging.getLogger("secure_js_login")
def log_view(func):
"""
Helpful while debugging Selenium unittests.
e.g.: server response an error in AJAX requests
"""
@functools.wraps(func)
def view_logger(*args, **kwargs):
log.debug("call view %r", func.__name__)
try:
response = func(*args, **kwargs)
except Exception as err:
log.error("view exception: %s", err)
traceback.print_exc(file=sys.stderr)
raise
log.debug("Response: %s", response)
return response
return view_logger
class TimingAttackPreventer(object):
def __init__(self, deque_maxlen=50):
self.deque_maxlen=deque_maxlen
self.reset()
def reset(self):
self.successful_timings = collections.deque(maxlen=self.deque_maxlen)
self.failed_timings = collections.deque(maxlen=self.deque_maxlen)
# self.sleep_timings = collections.deque(maxlen=self.deque_maxlen)
def avg(self, deque):
if deque:
return sum(deque) / len(deque)
else:
return 0
def __call__(self, func):
def wrapped_func(*args, **kwargs):
# log.debug("\ncall view %r with args: %r kwargs: %r",
# func.__name__, args, kwargs
# )
start_time = time.time()
response = func(*args, **kwargs)
successful_length = self.avg(self.successful_timings)
failed_length = self.avg(self.failed_timings)
diff_compensation = successful_length - failed_length
no_compensation = 0
if getattr(response, "add_duration", False):
# successful request -> collect duration value
timing_deque = self.successful_timings
sleep_length = no_compensation
else:
# failed request -> 'fill' time with collect durations
timing_deque = self.failed_timings
sleep_length = diff_compensation
if sleep_length < 0:
sleep_length = 0
# self.sleep_timings.append(sleep_length)
timing_deque.append(time.time() - start_time)
time.sleep(sleep_length)
# log.debug("Response: %s", response)
return response
return wrapped_func