-
Notifications
You must be signed in to change notification settings - Fork 6
/
dbConnector.pyx
246 lines (217 loc) · 6.64 KB
/
dbConnector.pyx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
import queue
import MySQLdb
import MySQLdb.cursors
import time
from objects import glob
from common.log import logUtils as log
class worker:
"""
A single MySQL worker
"""
def __init__(self, connection, temporary=False):
"""
Initialize a MySQL worker
:param connection: database connection object
:param temporary: if True, this worker will be flagged as temporary
"""
self.connection = connection
self.temporary = temporary
log.debug("Created MySQL worker. Temporary: {}".format(self.temporary))
def ping(self):
"""
Ping MySQL server using this worker.
:return: True if connected, False if error occured.
"""
c = self.connection.cursor(MySQLdb.cursors.DictCursor)
try:
c.execute("SELECT 1+1")
return True
except MySQLdb.Error:
return False
finally:
c.close()
def __del__(self):
"""
Close connection to the server
:return:
"""
self.connection.close()
class connectionsPool:
"""
A MySQL workers pool
"""
def __init__(self, host, username, password, database, size=128):
"""
Initialize a MySQL connections pool
:param host: MySQL host
:param username: MySQL username
:param password: MySQL password
:param database: MySQL database name
:param size: pool max size
"""
self.config = (host, username, password, database)
self.maxSize = size
self.pool = queue.Queue(self.maxSize)
self.consecutiveEmptyPool = 0
self.fillPool()
def newWorker(self, temporary=False):
"""
Create a new worker.
:param temporary: if True, flag the worker as temporary
:return: instance of worker class
"""
db = MySQLdb.connect(
*self.config,
autocommit=True,
charset="utf8",
use_unicode=True
)
conn = worker(db, temporary)
return conn
def fillPool(self, newConnections=0):
"""
Fill the queue with workers
:param newConnections: number of new connections. If 0, the pool will be filled entirely.
:return:
"""
# If newConnections = 0, fill the whole pool
if newConnections == 0:
newConnections = self.maxSize
# Fill the pool
for _ in range(0, newConnections):
if not self.pool.full():
self.pool.put_nowait(self.newWorker())
def getWorker(self, level=0):
"""
Get a MySQL connection worker from the pool.
If the pool is empty, a new temporary worker is created.
:param level: number of failed connection attempts. If > 50, return None
:return: instance of worker class
"""
# Make sure we below 50 retries
#log.info("Pool size: {}".format(self.pool.qsize()))
glob.dog.increment(glob.DATADOG_PREFIX+".mysql_pool.queries")
glob.dog.gauge(glob.DATADOG_PREFIX+".mysql_pool.size", self.pool.qsize())
if level >= 50:
log.warning("Too many failed connection attempts. No MySQL connection available.")
return None
try:
if self.pool.empty():
# The pool is empty. Spawn a new temporary worker
log.warning("MySQL connections pool is empty. Using temporary worker.")
worker = self.newWorker(True)
# Increment saturation
self.consecutiveEmptyPool += 1
# If the pool is usually empty, expand it
if self.consecutiveEmptyPool >= 10:
log.warning("MySQL connections pool is empty. Filling connections pool.")
self.fillPool()
else:
# The pool is not empty. Get worker from the pool
# and reset saturation counter
worker = self.pool.get()
self.consecutiveEmptyPool = 0
except MySQLdb.OperationalError:
# Connection to server lost
# Wait 1 second and try again
log.warning("Can't connect to MySQL database. Retrying in 1 second...")
glob.dog.increment(glob.DATADOG_PREFIX+".mysql_pool.failed_connections")
time.sleep(1)
return self.getWorker(level=level+1)
# Return the connection
return worker
def putWorker(self, worker):
"""
Put the worker back in the pool.
If the worker is temporary, close the connection
and destroy the object
:param worker: worker object
:return:
"""
if worker.temporary or self.pool.full():
# Kill the worker if it's temporary or the queue
# is full and we can't put anything in it
del worker
else:
# Put the connection in the queue if there's space
self.pool.put_nowait(worker)
class db:
"""
A MySQL helper with multiple workers
"""
def __init__(self, host, username, password, database, initialSize):
"""
Initialize a new MySQL database helper with multiple workers.
This class is thread safe.
:param host: MySQL host
:param username: MySQL username
:param password: MySQL password
:param database: MySQL database name
:param initialSize: initial pool size
"""
self.pool = connectionsPool(host, username, password, database, initialSize)
def execute(self, query, params = ()):
"""
Executes a query
:param query: query to execute. You can bind parameters with %s
:param params: parameters list. First element replaces first %s and so on
"""
cursor = None
worker = self.pool.getWorker()
if worker is None:
return None
try:
# Create cursor, execute query and commit
cursor = worker.connection.cursor(MySQLdb.cursors.DictCursor)
cursor.execute(query, params)
log.debug(query)
return cursor.lastrowid
except MySQLdb.OperationalError:
del worker
worker = None
return self.execute(query, params)
finally:
# Close the cursor and release worker's lock
if cursor is not None:
cursor.close()
if worker is not None:
self.pool.putWorker(worker)
def fetch(self, query, params = (), _all = False):
"""
Fetch a single value from db that matches given query
:param query: query to execute. You can bind parameters with %s
:param params: parameters list. First element replaces first %s and so on
:param _all: fetch one or all values. Used internally. Use fetchAll if you want to fetch all values
"""
cursor = None
worker = self.pool.getWorker()
if worker is None:
return None
try:
# Create cursor, execute the query and fetch one/all result(s)
cursor = worker.connection.cursor(MySQLdb.cursors.DictCursor)
cursor.execute(query, params)
log.debug(query)
if _all:
return cursor.fetchall()
else:
return cursor.fetchone()
except MySQLdb.OperationalError:
log.warning("MySQL connection lost! Using next worker...")
del worker
worker = None
return self.fetch(query, params, _all)
finally:
# Close the cursor and release worker's lock
if cursor is not None:
cursor.close()
if worker is not None:
self.pool.putWorker(worker)
def fetchAll(self, query, params = ()):
"""
Fetch all values from db that matche given query.
Calls self.fetch with all = True.
:param query: query to execute. You can bind parameters with %s
:param params: parameters list. First element replaces first %s and so on
"""
return self.fetch(query, params, True)