/
utils.py
125 lines (89 loc) · 2.88 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import collections
import contextlib
import math
import re
from twisted.internet import reactor
from twisted.internet.defer import (
Deferred,
DeferredSemaphore,
gatherResults,
inlineCallbacks,
returnValue,
succeed,
)
from twisted.web.iweb import IBodyProducer
from zope.interface import implements
MAX_PARALLELISM = 50
@contextlib.contextmanager
def swallow_exceptions(title, log):
"""Decorator to swallow exception. Used in conjuction with
a 'with' statement"""
try:
yield
except Exception as e:
log.warning("%s: %s", title, e)
def sleep(seconds):
"""Return a Deferred that will fire its callback `seconds` later."""
deferred = Deferred()
reactor.callLater(seconds, deferred.callback, seconds)
return deferred
valid_push_word = re.compile("^[a-z:]{5,}$")
@inlineCallbacks
def parallel_map(iterable, fn, *args, **kwargs):
deferreds = []
parallelism_limiter = DeferredSemaphore(MAX_PARALLELISM)
for item in iterable:
d = parallelism_limiter.run(fn, item, *args, **kwargs)
deferreds.append(d)
results = yield gatherResults(deferreds)
returnValue(results)
def _distribute_into(master, additions):
assert len(master) >= len(additions)
spread = int(math.ceil(float(len(master)) / len(additions)))
for i, item in enumerate(additions):
master.insert(i * spread, item)
def interleaved(items, key):
"""Reorder a list such that items of the same key are maximally apart.
This ensures that no pool gets a bunch of its servers taken down all at
the same time due to an unlucky host ordering.
"""
if not items:
return []
grouped = collections.defaultdict(list)
for item in items:
grouped[key(item)].append(item)
groups_by_size = sorted(grouped.values(), key=len, reverse=True)
result = groups_by_size[0]
for items in groups_by_size[1:]:
_distribute_into(result, items)
return result
# https://stackoverflow.com/a/1181922
def b36encode(number, alphabet='0123456789abcdefghijklmnopqrstuvwxyz'):
"""Convert an integer to a base36 string."""
if not isinstance(number, (int, long)):
raise TypeError('number must be an integer')
base36 = ''
sign = ''
if number < 0:
sign = '-'
number = -number
if 0 <= number < len(alphabet):
return sign + alphabet[number]
while number != 0:
number, i = divmod(number, len(alphabet))
base36 = alphabet[i] + base36
return sign + base36
class JSONBodyProducer(object):
implements(IBodyProducer)
def __init__(self, data):
self.length = len(data)
self.body = data
def startProducing(self, consumer):
consumer.write(self.body)
return succeed(None)
def pauseProducing(self):
pass
def stopProducing(self):
pass
def getBody(self):
return self.body