Skip to content

Commit

Permalink
Merge branch 'concurrencystuff'
Browse files Browse the repository at this point in the history
  • Loading branch information
kroll-j committed May 9, 2016
2 parents 475ad8c + 4908a7b commit faaf3da
Show file tree
Hide file tree
Showing 2 changed files with 242 additions and 20 deletions.
260 changes: 241 additions & 19 deletions cgstat.fcgi
Expand Up @@ -27,12 +27,10 @@ import requests
import asyncore
import flask
import json
from flask import Flask
from flask import Flask, request
from flup.server.fcgi import WSGIServer
from gp import *

sys.stderr.write("PRE __MAIN__\n")

app= Flask(__name__)
myhostname= socket.gethostname()

Expand Down Expand Up @@ -114,7 +112,9 @@ def gengraphstats(hostmap):
stat["status"]["graph"]= graph
for v in stat["status"]["content"]: stat["status"]["errormsg"][v]= "%s" % str(ex)
yield stat
if not erricon_set: yield { "favicon": "/cgstat/static/erricon.png" }
if not erricon_set:
yield { "favicon": "/cgstat/static/erricon.png" }
erricon_set= True

pollstart= time.time()
while len(transports) and time.time()-pollstart<60:
Expand All @@ -130,7 +130,9 @@ def gengraphstats(hostmap):
if not line or not line.startswith("OK.") or not line.strip().endswith(':'):
# bad status
for v in stat["status"]["content"]: stat["status"]["errormsg"][v]= "INVALID RESPONSE '%s'" % str(line).strip()
if not erricon_set: yield { "favicon": "/cgstat/static/erricon.png" }
if not erricon_set:
yield { "favicon": "/cgstat/static/erricon.png" }
erricon_set= True
else:
# status ok, read data set
if cmd==0: # first cmd -- 'stats q'
Expand All @@ -151,15 +153,21 @@ def gengraphstats(hostmap):
minutes, seconds= divmod(remainder, 60)
if age.days:
stat["status"]["content"]["age"]= '<div class=errage>%02d:%02d:%02d</div>' % (age.days, hours, minutes)
if not erricon_set: yield { "favicon": "/cgstat/static/erricon.png" }
if not erricon_set:
yield { "favicon": "/cgstat/static/erricon.png" }
erricon_set= True
else:
stat["status"]["content"]["age"]= '%02d:%02d:%02d' % (age.days, hours, minutes)
except socket.timeout:
for v in stat["status"]["content"]: stat["status"]["errormsg"][v]= "TIMEOUT"
if not erricon_set: yield { "favicon": "/cgstat/static/erricon.png" }
if not erricon_set:
yield { "favicon": "/cgstat/static/erricon.png" }
erricon_set= True
except Exception as ex:
for v in stat["status"]["content"]: stat["status"]["errormsg"][v]= str(ex)
if not erricon_set: yield { "favicon": "/cgstat/static/erricon.png" }
if not erricon_set:
yield { "favicon": "/cgstat/static/erricon.png" }
erricon_set= True

yield stat
poll.unregister(row[0])
Expand All @@ -171,35 +179,249 @@ def gengraphstats(hostmap):
stat["status"]["graph"]= transports[t].graphname
for v in stat["status"]["content"]: stat["status"]["errormsg"][v]= "TIMEOUT"
yield stat
if not erricon_set: yield { "favicon": "/cgstat/static/erricon.png" }
if not erricon_set:
yield { "favicon": "/cgstat/static/erricon.png" }
erricon_set= True

# ... when finished:
for t in transports:
transports[t].close()


def gengraphstats2(hostmap):
graphsbyhost= {}
for graph in hostmap:
if not hostmap[graph] in graphsbyhost:
graphsbyhost[hostmap[graph]]= [graph]
else:
graphsbyhost[hostmap[graph]].append(graph)

erricon_set= False
for host in graphsbyhost:
conn= client.Connection(client.ClientTransport(host))
conn.connect()
for graph in graphsbyhost[host]:
stat= { "status": { "graph": graph, "content": { "arccount": "ERROR", "rss": "ERROR", "virt": "ERROR", "age": "ERROR" }, "errormsg": {} } }
conn.use_graph(str(graph))
stats= conn.capture_stats("q")
for row in stats:
if row[0]=='ArcCount':
stat["status"]["content"]["arccount"]= row[1]
elif row[0]=='ProcVirt':
stat["status"]["content"]["virt"]= row[1]
elif row[0]=='ProcRSS':
stat["status"]["content"]["rss"]= row[1]
meta= conn.capture_list_meta()
for row in meta:
if row[0]=='last_full_import':
now= datetime.datetime.utcnow()
lastimport= datetime.datetime.strptime(row[1], "%Y-%m-%dT%H:%M:%S")
age= now-lastimport
hours, remainder= divmod(age.seconds, 3600)
minutes, seconds= divmod(remainder, 60)
if age.days:
stat["status"]["content"]["age"]= '<div class=errage>%02d:%02d:%02d</div>' % (age.days, hours, minutes)
if not erricon_set:
yield { "favicon": "/cgstat/static/erricon.png" }
erricon_set= True
else:
stat["status"]["content"]["age"]= '%02d:%02d:%02d' % (age.days, hours, minutes)
yield stat
conn.close()


from multiprocessing.dummy import Pool as ThreadPool
from multiprocessing import Queue
import threading
def gengraphstats3(hostmap):
jobq= Queue()
for graph in hostmap:
jobq.put( (graph, hostmap[graph]) )

resultq= Queue()

def processjobs():
erricon_set= False
connections= { }
while not jobq.empty():
job= jobq.get()

graph= job[0]
host= job[1]

if not host in connections:
connections[host]= client.Connection(client.ClientTransport(host))
connections[host].connect()

connections[host].use_graph(str(graph))

stat= { "status": { "graph": graph, "content": { "arccount": "ERROR", "rss": "ERROR", "virt": "ERROR", "age": "ERROR" }, "errormsg": {} } }
stats= connections[host].capture_stats("q")
for row in stats:
if row[0]=='ArcCount':
stat["status"]["content"]["arccount"]= row[1]
elif row[0]=='ProcVirt':
stat["status"]["content"]["virt"]= row[1]
elif row[0]=='ProcRSS':
stat["status"]["content"]["rss"]= row[1]
meta= connections[host].capture_list_meta()
for row in meta:
if row[0]=='last_full_import':
now= datetime.datetime.utcnow()
lastimport= datetime.datetime.strptime(row[1], "%Y-%m-%dT%H:%M:%S")
age= now-lastimport
hours, remainder= divmod(age.seconds, 3600)
minutes, seconds= divmod(remainder, 60)
if age.days:
stat["status"]["content"]["age"]= '<div class=errage>%02d:%02d:%02d</div>' % (age.days, hours, minutes)
if not erricon_set:
resultq.put( { "favicon": "/cgstat/static/erricon.png" } )
erricon_set= True
else:
stat["status"]["content"]["age"]= '%02d:%02d:%02d' % (age.days, hours, minutes)

resultq.put(stat)

threads= []
for i in range(10):
t= threading.Thread(target= processjobs)
t.start()
threads.append(t)

def workers_alive():
for t in threads:
if t.is_alive(): return True
return False

while (not resultq.empty()) or workers_alive():
yield resultq.get()

def mkgraphstats_stuffcmds(hostmap):
graphsbyhost= {}
for graph in hostmap:
if not hostmap[graph] in graphsbyhost:
graphsbyhost[hostmap[graph]]= [graph]
else:
graphsbyhost[hostmap[graph]].append(graph)

graphstats= []

erricon_set= False
for host in graphsbyhost:
sock= socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, 6666))
sock.settimeout(30)

hin= sock.makefile("r")
hout= sock.makefile("w")

# returns tuple: (status (bool), statusline, dataset or None)
def readreply():
# get status line
reply= hin.readline().strip()

# error or failure occured
if not (reply.startswith("OK") or reply.startswith("VALUE")):
return (False, reply, None)

# no error, no dataset
if not reply.endswith(':'):
return (True, reply, None)

# no error, reply with data set
dataset= []
while True:
line= hin.readline().strip()
if line=='': break
dataset.append( line.split(',') )
return (True, reply, dataset)


for graph in graphsbyhost[host]:
hout.write("use-graph %s\n" % graph)
hout.write("stats q\n");
hout.write("list-meta\n");
hout.flush()

for graph in graphsbyhost[host]:
ret= { "status": { "graph": graph, "content": { "arccount": "ERROR", "rss": "ERROR", "virt": "ERROR", "age": "ERROR" }, "errormsg": {} } }
# xxx todo... could shorten this:
#status= { "graph": graph, "host": host, "arccount": "ERROR", "rss": "ERROR", "virt": "ERROR", "age": "ERROR", "errormsg": {} }

# need to read all replies first
reply_usegraph= readreply()
reply_stats= readreply()
reply_meta= readreply()

# check status
for reply in (reply_usegraph, reply_stats, reply_meta):
if not reply[0]:
ret["status"]["errormsg"]= reply[1]
graphstats.append(ret)
continue

# get stat values
for retkey,key in ( ("arccount", "ArcCount"), ("rss", "ProcRSS"), ("virt", "ProcVirt") ):
if not reply_stats[2]:
ret["status"]["errormsg"]= reply_stats[1]
graphstats.append(ret)
continue
d= dict(reply_stats[2])
if key in d:
ret["status"]["content"][retkey]= d[key]
else:
ret["status"]["errormsg"]= reply_stats[1]
graphstats.append(ret)
continue

# get age
if (not reply_meta[0]) or (not reply_meta[2]) or (not "last_full_import" in dict(reply_meta[2])):
ret["status"]["errormsg"]= reply_meta[1]
graphstats.append(ret)
continue

lastimport_str= dict(reply_meta[2])["last_full_import"]
now= datetime.datetime.utcnow()
lastimport= datetime.datetime.strptime(lastimport_str, "%Y-%m-%dT%H:%M:%S")
age= now-lastimport
hours, remainder= divmod(age.seconds, 3600)
minutes, seconds= divmod(remainder, 60)
if age.days:
ret["status"]["content"]["age"]= '<div class=errage>%02d:%02d:%02d</div>' % (age.days, hours, minutes)
if not erricon_set:
graphstats.append( { "favicon": "/cgstat/static/erricon.png" } ) # yield { "favicon": "/cgstat/static/erricon.png" }
erricon_set= True
else:
ret["status"]["content"]["age"]= '%02d:%02d:%02d' % (age.days, hours, minutes)

graphstats.append(ret)

hin.close()
hout.close()
sock.close()

with open("/tmp/fuckfuckfuck", "w") as f:
f.write(str(graphstats))
return graphstats


# glue function for 'streaming' a template which results in chunked transfer encoding
def stream_template(template_name, **context):
app.update_template_context(context)
t = app.jinja_env.get_template(template_name)
rv = t.stream(context)
#~ rv.enable_buffering(50)
rv.enable_buffering(25)
return rv

@app.route('/cgstat')
@app.route('/')
#~ @app.route('/catgraph/cgstat')
#~ @app.route('/catgraph/cgstat/')
def cgstat():
hostmapUri= "http://%s/hostmap/graphs.json" % ('localhost' if myhostname=='C086' else 'sylvester')
hostmap= requests.get(hostmapUri).json()
app.jinja_env.trim_blocks= True
app.jinja_env.lstrip_blocks= True
response= flask.Response(stream_template('template.html', title="CatGraph Status", graphs=gengraphinfo(hostmap), updates=gengraphstats(hostmap), scripts=[]))
#~ response.headers.add("X-Foobar", "Asdf")
#~ response.headers.add("Connection", "keep-alive")
#~ response.headers.add("Cache-Control", "no-cache")
#~ response.headers.add("Cache-Control", "no-store")
#~ response.headers.add("Cache-Control", "private")
#~ response= flask.Response(stream_template('template.html', title="CatGraph Status", graphs=gengraphinfo(hostmap), updates=gengraphstats(hostmap), scripts=[]))
response= flask.Response( flask.render_template('template.html', title="CatGraph Status", graphs=gengraphinfo(hostmap), updates=mkgraphstats_stuffcmds(hostmap), scripts=[]) )
response.headers.add("Content-Encoding", "identity")
return response

Expand All @@ -208,8 +430,8 @@ if __name__ == '__main__':
import cgitb
cgitb.enable()
app.config['DEBUG']= True
#~ app.config['PROPAGATE_EXCEPTIONS']= None
app.debug= True
app.use_debugger= True
sys.stderr.write("__MAIN__\n")
WSGIServer(app).run()
#~ app.run(debug=app.debug, use_debugger=app.debug)
2 changes: 1 addition & 1 deletion templates/template.html
Expand Up @@ -114,7 +114,7 @@ <h4 class="modal-title" id="myModalLabel">CatGraph Status</h4>
</tr>
{% for graph in graphs %}
<tr>
<td>{{ graph.name }}</td><td>{{ graph.host }}</td>
<td>{{ graph.name }}</td><td id="host_{{ graph.name }}">{{ graph.host }}</td>
<td id="arccount_{{ graph.name }}"></td>
<td id="rss_{{ graph.name }}"></td>
<td id="virt_{{ graph.name }}"></td>
Expand Down

0 comments on commit faaf3da

Please sign in to comment.