Skip to content

Commit

Permalink
Updated the ThreadUtils.py to use the updated (Python 2.4+) queue mod…
Browse files Browse the repository at this point in the history
…ule. If there are any new bugs introduced, check out the line55 for the commented out self._empty() which is depreciated. Also updated the Pinger.py to used parameterized sqlite queries by changing %s to ?
  • Loading branch information
sirvaliance committed May 30, 2011
1 parent a9a6594 commit 6c58012
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 44 deletions.
6 changes: 5 additions & 1 deletion lib/mixminion/ThreadUtils.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ def clear(self):
# If the queue is empty, return.
self.not_empty.acquire()
try:
if self._empty(): return
# Temporary comment out of empty
# self._empty() is depreciated and is now self.empty()
# self.empty() currently hangs during make test
#
#if self._empty(): return
self._clear()
self.not_full.notify()
finally:
Expand Down
86 changes: 43 additions & 43 deletions lib/mixminion/server/Pinger.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def _objectExists(self, name, objType):
'view', 'table', or 'index'.
"""
self._theCursor.execute(
"SELECT * FROM SQLITE_MASTER WHERE type = %s AND name = %s",
"SELECT * FROM SQLITE_MASTER WHERE type = ? AND name = ?",
(objType,name))
rs = self._theCursor.fetchall()
return len(rs) > 0
Expand Down Expand Up @@ -232,7 +232,7 @@ def getInsertOrUpdateFn(self, table, keyCols, valCols):
table,
", ".join(keyCols),
", ".join(valCols),
", ".join(["%s"]*(len(valCols)+len(keyCols))))
", ".join(["?"]*(len(valCols)+len(keyCols))))
def fn(keyVals, valVals):
assert len(keyVals) == len(keyCols)
assert len(valVals) == len(valCols)
Expand Down Expand Up @@ -528,9 +528,9 @@ def _getServerID(self, identity):
cur = self._db.getCursor()

hexid = self._db.encodeIdentity(identity)

cur.execute("INSERT INTO server (identity) VALUES (%s)", hexid)
cur.execute("SELECT id FROM server WHERE identity = %s", hexid)
cur.execute("INSERT INTO server (identity) VALUES (?)", [hexid])
cur.execute("SELECT id FROM server WHERE identity = ?", [hexid])
#XXXX catch errors!
ident, = cur.fetchone()
self._serverIDs[identity]=ident
Expand All @@ -545,16 +545,16 @@ def _getIntervalID(self, start, end):
start = self._db.time(start)
end = self._db.time(end)
cur = self._db.getCursor()
cur.execute("SELECT id FROM statsInterval WHERE "
"startAt = %s AND endAt = %s", start, end)
cur.execute("SELECT id FROM statsInterval WHERE startAt = ? AND endAt = ?",
(start, end))
r = cur.fetchall()
if len(r) == 1:
return r[0][0]

cur.execute("INSERT INTO statsInterval (startAt, endAt) "
"VALUES (%s, %s)", start, end)
cur.execute("SELECT id FROM statsInterval WHERE "
"startAt = %s AND endAt = %s", start, end)
cur.execute("INSERT INTO statsInterval (startAt, endAt) VALUES (?, ?)",
(start, end))
cur.execute("SELECT id FROM statsInterval WHERE startAt = ? AND endAt = ?",
(start, end))
r = cur.fetchall()
assert len(r) == 1
return r[0][0]
Expand All @@ -570,17 +570,17 @@ def rotate(self, dataCutoff, resultsCutoff):
#resultsCutoff = self._db.time(now - sec['RetainPingResults'])

cur = self._db.getCursor()
cur.execute("DELETE FROM myLifespan WHERE stillup < %s", dataCutoff)
cur.execute("DELETE FROM ping WHERE sentat < %s", dataCutoff)
cur.execute("DELETE FROM connectionAttempt WHERE at < %s", dataCutoff)
cur.execute("DELETE FROM myLifespan WHERE stillup < ?", [dataCutoff])
cur.execute("DELETE FROM ping WHERE sentat < ?", [dataCutoff])
cur.execute("DELETE FROM connectionAttempt WHERE at < ?", [dataCutoff])

cur.execute("DELETE FROM uptime WHERE interval IN "
"( SELECT id FROM statsInterval WHERE endAt < %s )",
resultsCutoff)
"( SELECT id FROM statsInterval WHERE endAt < ? )",
[resultsCutoff])
cur.execute("DELETE FROM echolotOneHopResult WHERE interval IN "
"( SELECT id FROM statsInterval WHERE endAt < %s )",
resultsCutoff)
cur.execute("DELETE FROM statsInterval WHERE endAt < %s", resultsCutoff)
"( SELECT id FROM statsInterval WHERE endAt < ? )",
[resultsCutoff])
cur.execute("DELETE FROM statsInterval WHERE endAt < ?", [resultsCutoff])

self._db.getConnection().commit()

Expand All @@ -593,7 +593,7 @@ def close(self):
database."""
self._db.close()

_STARTUP = "INSERT INTO myLifespan (startup, stillup, shutdown) VALUES (%s,%s, 0)"
_STARTUP = "INSERT INTO myLifespan (startup, stillup, shutdown) VALUES (?,?, 0)"
def startup(self,now=None):
"""Called when the server has just started. Starts tracking a new
interval of this server's lifetime."""
Expand All @@ -603,7 +603,7 @@ def startup(self,now=None):
self._db.getCursor().execute(self._STARTUP, (now,now))
self._db.getConnection().commit()

_SHUTDOWN = "UPDATE myLifespan SET stillup = %s, shutdown = %s WHERE startup = %s"
_SHUTDOWN = "UPDATE myLifespan SET stillup = ?, shutdown = ? WHERE startup = ?"
def shutdown(self, now=None):
"""Called when the server is shutting down. Stops tracking the current
interval of this server's lifetime."""
Expand All @@ -612,7 +612,7 @@ def shutdown(self, now=None):
self._db.getCursor().execute(self._SHUTDOWN, (now, now, self._startTime))
self._db.getConnection().commit()

_HEARTBEAT = "UPDATE myLifespan SET stillup = %s WHERE startup = %s AND stillup < %s"
_HEARTBEAT = "UPDATE myLifespan SET stillup = ? WHERE startup = ? AND stillup < ?"
def heartbeat(self, now=None):
"""Called periodically. Notes that the server is still running as of
the time 'now'."""
Expand All @@ -622,7 +622,7 @@ def heartbeat(self, now=None):
self._db.getConnection().commit()

_CONNECTED = ("INSERT INTO connectionAttempt (at, server, success) "
"VALUES (%s,%s,%s)")
"VALUES (?,?,?)")
def connected(self, identity, success=1, now=None):
"""Note that we attempted to connect to the server with 'identity'.
We successfully negotiated a protocol iff success is true.
Expand All @@ -639,7 +639,7 @@ def connectFailed(self, identity, now=None):
self.connected(identity, success=0, now=now)

_QUEUED_PING = ("INSERT INTO ping (hash, path, sentat, received)"
"VALUES (%s,%s,%s,%s)")
"VALUES (?,?,?,?)")
def queuedPing(self, hash, path, now=None):
"""Note that we send a probe message along 'path' (a list of
server identities, excluding ourself as first and last
Expand All @@ -652,7 +652,7 @@ def queuedPing(self, hash, path, now=None):
(formatBase64(hash), ids, self._db.time(now), 0))
self._db.getConnection().commit()

_GOT_PING = "UPDATE ping SET received = %s WHERE hash = %s"
_GOT_PING = "UPDATE ping SET received = ? WHERE hash = ?"
def gotPing(self, hash, now=None):
"""Note that we have received a probe message whose payload had 'hash'
as its digest.
Expand Down Expand Up @@ -682,8 +682,8 @@ def _calculateUptimes(self, serverIdentities, startTime, endTime, now=None):
self._intervals.getIntervals(startTime,endTime)]

cur.execute("SELECT startup, stillup, shutdown FROM myLifespan WHERE "
"startup <= %s AND stillup >= %s",
self._db.time(endTime), self._db.time(startTime))
"startup <= ? AND stillup >= ?",
(self._db.time(endTime), self._db.time(startTime)))
myIntervals = IntervalSet([ (start, max(end,shutdown))
for start,end,shutdown in cur ])
myIntervals *= timespan
Expand All @@ -696,9 +696,9 @@ def _calculateUptimes(self, serverIdentities, startTime, endTime, now=None):
for (identity, serverID) in self._serverIDs.items():
if s in ('<self>','<unknown>'): continue
cur.execute("SELECT at, success FROM connectionAttempt"
" WHERE server = %s AND at >= %s AND at <= %s"
" WHERE server = ? AND at >= ? AND at <= ?"
" ORDER BY at",
serverID, startTime, endTime)
(serverID, startTime, endTime))

intervals = [[], []] #uptimes, downtimes
lastStatus = None
Expand Down Expand Up @@ -756,7 +756,7 @@ def getUptimes(self, startAt, endAt):
"FROM uptime, statsInterval, server "
"WHERE statsInterval.id = uptime.interval "
"AND server.id = uptime.server "
"AND %s >= startat AND %s <= endat",
"AND ? >= startat AND ? <= endat",
(self._db.time(startAt), self._db.time(endAt)))
for s,e,i,u in cur:
result.setdefault((s,e), {})[self._db.decodeIdentity(i)] = u
Expand Down Expand Up @@ -810,8 +810,8 @@ def _calculateOneHopResult(self, serverIdentity, startTime, endTime,
dailyLatencies = [[] for _ in xrange(nPeriods)]
nSent = [0]*nPeriods
nPings = 0
cur.execute("SELECT sentat, received FROM ping WHERE path = %s"
" AND sentat >= %s AND sentat <= %s",
cur.execute("SELECT sentat, received FROM ping WHERE path = ?"
" AND sentat >= ? AND sentat <= ?",
(serverID, startTime, endTime))
for sent,received in cur:
pIdx = floorDiv(sent-startTime, self._PING_GRANULARITY)
Expand Down Expand Up @@ -843,8 +843,8 @@ def _calculateOneHopResult(self, serverIdentity, startTime, endTime,
nReceived = [0]*nPeriods
perTotalWeights = [0]*nPeriods
perTotalWeighted = [0]*nPeriods
cur.execute("SELECT sentat, received FROM ping WHERE path = %s"
" AND sentat >= %s AND sentat <= %s",
cur.execute("SELECT sentat, received FROM ping WHERE path = ?"
" AND sentat >= ? AND sentat <= ?",
(serverID, startTime, endTime))
for sent,received in cur:
pIdx = floorDiv(sent-startTime, self._PING_GRANULARITY)
Expand Down Expand Up @@ -939,23 +939,23 @@ def _calculate2ChainStatus(self, since, s1, s2, now=None):
# doesn't commit.
cur = self._db.getCursor()
path = "%s,%s"%(self._getServerID(s1),self._getServerID(s2))
cur.execute("SELECT count() FROM ping WHERE path = %s"
" AND sentat >= %s",
cur.execute("SELECT count() FROM ping WHERE path = ?"
" AND sentat >= ?",
(path,self._db.time(since)))
nSent, = cur.fetchone()
cur.execute("SELECT count() FROM ping WHERE path = %s"
" AND sentat >= %s AND received > 0",
cur.execute("SELECT count() FROM ping WHERE path = ?"
" AND sentat >= ? AND received > 0",
(path,since))
nReceived, = cur.fetchone()
cur.execute("SELECT SUM(r1.reliability * r2.reliability) "
"FROM ping, echolotOneHopResult as r1, "
" echolotOneHopResult as r2, statsInterval "
"WHERE ping.path = %s AND ping.sentAt >= %s "
"WHERE ping.path = ? AND ping.sentAt >= ? "
"AND statsInterval.startAt <= ping.sentAt "
"AND statsInterval.endAt >= ping.sentAt "
"AND r1.server = %s "
"AND r1.server = ? "
"AND r1.interval = statsInterval.id "
"AND r2.server = %s "
"AND r2.server = ? "
"AND r2.interval = statsInterval.id ",
(path, self._db.time(since),
self._getServerID(s1), self._getServerID(s2)))
Expand Down Expand Up @@ -1038,7 +1038,7 @@ def dumpAllStatus(self,f,since,now=None):
print >>f, "\n# Map from server to list of (period-start, period-end, fractional uptime"
print >>f, "SERVER_UPTIMES = {"
cur.execute("SELECT startAt,endAt,identity,uptime FROM uptime, server, statsInterval "
"WHERE startAt >= %s AND startAt <= %s "
"WHERE startAt >= ? AND startAt <= ? "
"AND uptime.server = server.id "
"AND uptime.interval = statsInterval.id "
"ORDER BY identity, startAt", (since, now))
Expand All @@ -1060,7 +1060,7 @@ def dumpAllStatus(self,f,since,now=None):
cur.execute("SELECT identity,startAt,endAt,nSent,nReceived,"
" latency,reliability "
"FROM echolotOneHopResult, statsInterval, server "
"WHERE startat >= %s AND startat <= %s "
"WHERE startat >= ? AND startat <= ? "
"AND echolotOneHopResult.server = server.id "
"AND echolotOneHopResult.interval = statsInterval.id "
"ORDER BY identity, startat", (since, now))
Expand Down

0 comments on commit 6c58012

Please sign in to comment.