Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
Fix simple statistics of a plan execution
Browse files Browse the repository at this point in the history
report for each plan execution will contain:
- total time - the difference between first executed and last updated
  task
- total delta - sum of all delta for executed tasks

Change-Id: Ib7ea41574d1c57739931ba942111e48de0c72849
  • Loading branch information
dshulyak authored and pigmej committed Jan 26, 2016
1 parent 2681a5f commit a179702
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 14 deletions.
12 changes: 4 additions & 8 deletions solar/cli/orch.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,14 @@ def click_report(uid):
'SKIPPED': 'blue',
'NOOP': 'black'}

total = 0.0
report = graph.report_topo(uid)
for item in report:
report = graph.report_progress(uid)
for item in report['tasks']:
msg = '{} -> {}'.format(item[0], item[1])
if item[2]:
msg += ' :: {}'.format(item[2])
if item[4] and item[3]:
delta = float(item[4]) - float(item[3])
total += delta
msg += ' D: {}'.format(delta)
click.echo(click.style(msg, fg=colors[item[1]]))
click.echo('Delta SUM: {}'.format(total))
click.echo('Total Delta: {}'.format(report['total_delta']))
click.echo('Total Time: {}'.format(report['total_time']))


@orchestration.command()
Expand Down
2 changes: 2 additions & 0 deletions solar/dblayer/solar_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1044,6 +1044,8 @@ class Task(Model):
timelimit = Field(int, default=int)
retry = Field(int, default=int)
timeout = Field(int, default=int)
start_time = Field(float, default=float)
end_time = Field(float, default=float)

execution = IndexedField(basestring)
parents = ParentField(default=list)
Expand Down
55 changes: 49 additions & 6 deletions solar/orchestration/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ def save_graph(graph):
'errmsg': graph.node[n].get('errmsg', '') or '',
'timelimit': graph.node[n].get('timelimit', 0),
'retry': graph.node[n].get('retry', 0),
'timeout': graph.node[n].get('timeout', 0)})
'timeout': graph.node[n].get('timeout', 0),
'start_time': 0.0,
'end_time': 0.0})
graph.node[n]['task'] = t
for pred in graph.predecessors(n):
pred_task = graph.node[pred]['task']
Expand All @@ -60,6 +62,8 @@ def update_graph(graph, force=False):
task.errmsg = graph.node[n]['errmsg'] or ''
task.retry = graph.node[n].get('retry', 0)
task.timeout = graph.node[n].get('timeout', 0)
task.start_time = graph.node[n].get('start_time', 0.0)
task.end_time = graph.node[n].get('end_time', 0.0)
task.save(force=force)


Expand Down Expand Up @@ -87,12 +91,45 @@ def get_graph(uid):
task=t,
timelimit=t.timelimit,
retry=t.retry,
timeout=t.timeout)
timeout=t.timeout,
start_time=t.start_time,
end_time=t.end_time)
for u in t.parents.all_names():
dg.add_edge(u, t.name)
return dg


def longest_path_time(graph):
"""We are not interested in the path itself, just get the start
of execution and the end of it.
"""
start = None
end = None
for n in graph:
node_start = graph.node[n]['start_time']
node_end = graph.node[n]['end_time']
if node_start is 0.0 or node_end is 0.0:
continue

if node_start < start or start is None:
start = node_start

if node_end > end or end is None:
end = node_end
return end - start


def total_delta(graph):
delta = 0.0
for n in graph:
node_start = graph.node[n]['start_time']
node_end = graph.node[n]['end_time']
if node_start is 0.0 or node_end is 0.0:
continue
delta += node_end - node_start
return delta


get_plan = get_graph


Expand Down Expand Up @@ -162,14 +199,20 @@ def reset_filtered(uid):
reset_by_uid(uid, state_list=[states.SKIPPED.name, states.NOOP.name])


def report_topo(uid):
def report_progress(uid):
return report_progress_graph(get_graph(uid))

dg = get_graph(uid)
report = []

def report_progress_graph(dg):
tasks = []
report = {
'total_time': longest_path_time(dg),
'total_delta': total_delta(dg),
'tasks': tasks}

for task in nx.topological_sort(dg):
data = dg.node[task]
report.append([
tasks.append([
task,
data['status'],
data['errmsg'],
Expand Down
1 change: 1 addition & 0 deletions solar/orchestration/workers/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def _do_scheduling(self, plan, task_name):
task_id = '{}:{}'.format(plan.graph['uid'], task_name)
task_type = plan.node[task_name]['type']
plan.node[task_name]['status'] = states.INPROGRESS.name
plan.node[task_name]['start_time'] = time.time()
timelimit = plan.node[task_name].get('timelimit', 0)
timeout = plan.node[task_name].get('timeout', 0)
ctxt = {
Expand Down
26 changes: 26 additions & 0 deletions solar/test/test_graph_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import os

import networkx as nx
from pytest import fixture

from solar.orchestration import graph
Expand Down Expand Up @@ -93,3 +94,28 @@ def test_several_updates(simple):
'PENDING': 0,
'ERROR_RETRY': 0,
}


@fixture
def times():
rst = nx.DiGraph()
rst.add_node('t1', start_time=1.0, end_time=12.0,
status='', errmsg='')
rst.add_node('t2', start_time=1.0, end_time=3.0,
status='', errmsg='')
rst.add_node('t3', start_time=3.0, end_time=7.0,
status='', errmsg='')
rst.add_node('t4', start_time=7.0, end_time=13.0,
status='', errmsg='')
rst.add_node('t5', start_time=12.0, end_time=14.0,
status='', errmsg='')
rst.add_path(['t1', 't5'])
rst.add_path(['t2', 't3', 't4'])
return rst


def test_report_progress(times):
report = graph.report_progress_graph(times)
assert report['total_time'] == 13.0
assert report['total_delta'] == 25.0
assert len(report['tasks']) == 5

0 comments on commit a179702

Please sign in to comment.