Skip to content

Commit

Permalink
Add parallel support
Browse files Browse the repository at this point in the history
  • Loading branch information
janhybs committed Nov 13, 2018
1 parent 7b21989 commit f296ade
Show file tree
Hide file tree
Showing 23 changed files with 917 additions and 118 deletions.
2 changes: 1 addition & 1 deletion ci-hpc/artifacts/collect/modules/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ def save_to_db(self, collect_results):
:type collect_results: list[CollectResult]
"""

logger.info('saving %d profile.json files to database', len(collect_results))
logger.debug('saving %d report files to database', len(collect_results))
cihpc_mongo = CIHPCMongo.get(self.project_name)

results = list()
Expand Down
4 changes: 3 additions & 1 deletion ci-hpc/artifacts/collect/modules/generic_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@


from artifacts.collect.modules import CollectResult, CIHPCReport, AbstractCollectModule
from utils.logging import logger


class CollectModule(AbstractCollectModule):
Expand Down Expand Up @@ -50,10 +51,11 @@ class CollectModule(AbstractCollectModule):
}
"""

def process(self, object, from_file=None):
report = CIHPCReport()
report.merge(object)

print(report)
logger.dump(report, 'report', level=logger.LEVEL_DEBUG)

return CollectResult([report])
43 changes: 41 additions & 2 deletions ci-hpc/artifacts/db/mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def _get_database(self, opts=None):
'database features.',
self.project_name,
)
raise Exception('No database configuration found')
raise Exception('No database configuration found for %s' % self.project_name)
return opts

def __repr__(self):
Expand Down Expand Up @@ -106,12 +106,15 @@ def __init__(self, project_name):
self.db = self.client.get_database(opts.get('db_name'))
self.reports = self.db.get_collection(opts.get('col_timers_name'))
self.files = self.db.get_collection(opts.get('col_files_name'))
self.history = self.db.get_collection(opts.get('col_history_name'))

def _get_artifacts(self, opts=None):
if opts:
return opts

base_opts = artifacts_default_configuration(self.project_name)
opts = cfg.get('%s.artifacts' % self.project_name)

if not opts:
if self._warn_first_time(self.SECTION_ARTIFACTS):
logger.warning(
Expand All @@ -126,7 +129,8 @@ def _get_artifacts(self, opts=None):
)

opts = artifacts_default_configuration(self.project_name)
return opts
base_opts.update(opts)
return base_opts

def find_all(self, filter=None, projection=None, *args, flatten=True, **kwargs):
if not filter:
Expand Down Expand Up @@ -211,6 +215,41 @@ def aggregate(self, match=None, unwind=None, project=None, flatten=True, *args):

return items

def commit_history(self):
return list(self.history.find())

def timers_stats(self):
pipeline = [
{
'$group': {
'_id': {
'hash': '$git.commit',
'date': {
'$dateToString':{
'date': '$git.datetime',
'format': '%Y-%m-%d %H:%M:%S'
}
}
},
'dur_sum': {'$sum': '$result.duration'},
'dur_avg': {'$avg': '$result.duration'},
'dur_max': {'$max': '$result.duration'},
'dur_min': {'$min': '$result.duration'},
'items': {'$sum': 1},
}
},
{
'$sort': {
'_id.date': -1
}
}
]
logger.debug('db.getCollection("%s").aggregate(\n%s\n)',
str(self.reports.name),
strings.pad_lines(strings.to_json(pipeline))
)
return list(self.reports.aggregate(pipeline))

@classmethod
def get(cls, project_name):
"""
Expand Down
1 change: 1 addition & 0 deletions ci-hpc/defaults/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,5 @@ def artifacts_default_configuration(project_name=None):
db_name=project_name,
col_timers_name='timers',
col_files_name='files',
col_history_name='hist',
)
23 changes: 15 additions & 8 deletions ci-hpc/files/temp_file.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import os
import sys


def divider(name=None, start='# ', end=' #', char='-', len=60):
fmt = '%s{:%s^%ds}%s' % (start, char, len, end)
if name:
return fmt.format(' %s ' % name)
return fmt.format('')


class TempFile(object):
Expand All @@ -9,12 +15,13 @@ def __init__(self, path, verbose=False):
self.lines = []
self.verbose = verbose

def write_shebang(self, interpreter='/bin/bash'):
self.write('#!/bin/bash')
self.write('### AUTOGENERATED DO NOT EDIT ###')
def write_shebang(self, interpreter='#!/bin/bash'):
self.write('%s\n' % interpreter)
self.write(divider(char='-'))
self.write(divider('AUTOGENERATED DO NOT EDIT'))
if self.verbose:
self.write('set -x')
self.write('# ----------------------------- #')
self.write(divider(char='-'))
self.write('')

def read(self):
Expand All @@ -27,9 +34,9 @@ def write(self, s='', new_line=True):
self.lines.append('\n')

def write_section(self, section_name, s='', new_line=True):
self.write('# {:=^60s} #'.format(' %s ' % section_name), new_line=False)
self.write(s, new_line=False)
self.write('# {:-^60s} #'.format(' %s ' % section_name), new_line=new_line)
self.write(divider(section_name, char='='))
self.write(s)
self.write(divider(section_name, char='-'), new_line=new_line)

def __enter__(self):
self.lines = []
Expand Down
Empty file added ci-hpc/multiproc/__init__.py
Empty file.
92 changes: 92 additions & 0 deletions ci-hpc/multiproc/complex_semaphore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
#!/bin/python3
# author: Jan Hybs
import threading
from time import monotonic as _time


class ComplexSemaphore(object):
"""This class implements semaphore objects.
Semaphores manage a counter representing the number of release() calls minus
the number of acquire() calls, plus an initial value. The acquire() method
blocks if necessary until it can return without making the counter
negative. If not given, value defaults to 1.
"""

# After Tim Peters' semaphore class, but not quite the same (no maximum)

def __init__(self, value=1):
if value < 0:
raise ValueError("semaphore initial value must be >= 0")
self._cond = threading.Condition(threading.Lock())
self._value = value
self._limit = value

def acquire(self, blocking=True, timeout=None, value=1):
"""Acquire a semaphore, decrementing the internal counter by one.
When invoked without arguments: if the internal counter is larger than
zero on entry, decrement it by one and return immediately. If it is zero
on entry, block, waiting until some other thread has called release() to
make it larger than zero. This is done with proper interlocking so that
if multiple acquire() calls are blocked, release() will wake exactly one
of them up. The implementation may pick one at random, so the order in
which blocked threads are awakened should not be relied on. There is no
return value in this case.
When invoked with blocking set to true, do the same thing as when called
without arguments, and return true.
When invoked with blocking set to false, do not block. If a call without
an argument would block, return false immediately; otherwise, do the
same thing as when called without arguments, and return true.
When invoked with a timeout other than None, it will block for at
most timeout seconds. If acquire does not complete successfully in
that interval, return false. Return true otherwise.
The value parameter specifies how many units to take from the semaphore
default value is 1.
"""
if not blocking and timeout is not None:
raise ValueError("can't specify timeout for non-blocking acquire")

if value > self._limit:
raise ValueError("can't aquire the lock because specified value is greater then max value")

rc = False
endtime = None
with self._cond:
while self._value < value:
if not blocking:
break
if timeout is not None:
if endtime is None:
endtime = _time() + timeout
else:
timeout = endtime - _time()
if timeout <= 0:
break
self._cond.wait(timeout)
else:
self._value -= value
rc = True
return rc

__enter__ = acquire

def release(self, value=1):
"""Release a semaphore, incrementing the internal counter by one.
When the counter is zero on entry and another thread is waiting for it
to become larger than zero again, wake up that thread.
"""
with self._cond:
self._value += value
self._cond.notify()

def __exit__(self, t, v, tb):
self.release()

0 comments on commit f296ade

Please sign in to comment.