Skip to content

Commit

Permalink
Remove 'sections' (to make things more simple)
Browse files Browse the repository at this point in the history
  • Loading branch information
janhybs committed Dec 13, 2018
1 parent d173a65 commit 8e1c979
Show file tree
Hide file tree
Showing 21 changed files with 702 additions and 102 deletions.
39 changes: 17 additions & 22 deletions cfg/hello-world/local/config.yaml
Original file line number Diff line number Diff line change
@@ -1,28 +1,23 @@
workdir: <hello-world-workdir>
init-shell: !sh sh/init.sh
install:
- name: compilation-phase
output: stdout

stages:
- name: compile
enabled: true
git:
- url: https://github.com/janhybs/bench-stat.git
commit: <arg.commit.hello-world> # checkout to this commit
branch: <arg.branch.hello-world> # under this branch's name
- url: https://github.com/janhybs/bench-stat.git
commit: <arg.commit.hello-world> # checkout to this commit
branch: <arg.branch.hello-world> # under this branch's name

shell: !sh sh/install.sh

cache:
storage: <os.HOME>/.cache/cihpc
directories:
- bench-stat
cache: bench-stat

test:
- name: memory-cache-test
output: 'null'

repeat:
exactly: 1
no-less-thann: 8
no-less-than: 8

parallel:
cpus: 0.51
Expand All @@ -33,14 +28,14 @@ test:
- test:
- id: 1
name: memory-cache-test-l1
- id: 2
name: memory-cache-test-l2
- id: 3
name: memory-cache-test-l3
- matrix:
- test:
- id: 0
name: memory-cache-test-all
# - id: 2
# name: memory-cache-test-l2
# - id: 3
# name: memory-cache-test-l3
# - matrix:
# - test:
# - id: 0
# name: memory-cache-test-all

shell: !sh sh/test.sh

Expand Down
49 changes: 29 additions & 20 deletions cihpc/common/processing/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,11 @@ class LogStatusFormat(enum.Enum):
ONELINE_GROUPED = 'oneline-grouped'


class Worker(threading.Thread):
def __init__(self, crate, target, cpus=1):
"""
Parameters
----------
crate: ProcessConfigCrate or None
target: callable or None
cpus: int
"""
super(Worker, self).__init__()
self.cpus = cpus
self.target = target
self.crate = crate

class SimpleWorker(threading.Thread):
def __init__(self):
super(SimpleWorker, self).__init__()
self.cpus = 1
self.target = None
self.semaphore = None # type: ComplexSemaphore
self.thread_event = None # type: EnterExitEvent
self.result = None # type: ProcessStepResult
Expand All @@ -67,6 +58,7 @@ def __init__(self, crate, target, cpus=1):
self.timer = Timer(self.name)
self._pretty_name = None


@property
def status(self):
return self._status
Expand All @@ -92,15 +84,30 @@ def run(self):
self.semaphore.release(value=self.cpus)
self.status = WorkerStatus.EXITING

def __repr__(self):
return '{self.name}({self.cpus}x, [{self.status}])'.format(self=self)


class Worker(SimpleWorker):
def __init__(self, crate, target, cpus=1):
"""
Parameters
----------
crate: ProcessConfigCrate or None
target: callable or None
cpus: int
"""
super(Worker, self).__init__()
self.cpus = cpus
self.target = target
self.crate = crate

@property
def pretty_name(self):
if self.crate:
return self.crate.name
return self._pretty_name or self.name

def __repr__(self):
return '{self.name}({self.cpus}x, [{self.status}])'.format(self=self)


class WorkerPool(object):
def __init__(self, processes, threads):
Expand All @@ -126,6 +133,11 @@ def update_cpu_values(self, target):
def result(self):
return pluck(self.threads, 'result')

def start(self):
if self.processes == 1:
return self.start_serial()
return self.start_parallel()

def start_serial(self):
# in serial mode, we start the thread, wait for finish and fire on_exit
for thread in self.threads:
Expand All @@ -135,9 +147,6 @@ def start_serial(self):
self.thread_event.on_exit(thread)

def start_parallel(self):
if self.processes == 1:
return self.start_serial()

# start
for thread in self.threads:
thread.start()
Expand Down
8 changes: 4 additions & 4 deletions cihpc/common/utils/parallels.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
PLENTY_OF_CPUS = 64 # turn off for now


def extract_cpus_from_worker(worker):
def extract_cpus_from_worker(process_stage):
"""
:type worker: multiproc.thread_pool.Worker
:type worker: cihpc.core.processing.stage.ProcessStage
:return:
"""
if worker and worker.crate and worker.crate.vars:
return int(worker.crate.vars.get('__cpu__', 1))
if process_stage and process_stage.variables:
return int(process_stage.variables.get('__cpu__', 1))

return 1

Expand Down
9 changes: 9 additions & 0 deletions cihpc/common/utils/strings.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,22 @@
import random
import json
import yaml
import re

from cihpc.common.utils import dateutils as dateutils
from cihpc.common.utils import extend_yaml


extend_yaml.extend()

_non_valid_chars = re.compile('[^a-zA-Z0-9_-]+')


def secure_filename(name):
if name:
return _non_valid_chars.sub('-', str(name))
return ''


def generate_random_key(length=8):
"""
Expand Down
3 changes: 3 additions & 0 deletions cihpc/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,9 @@ def main(cmd_args=None):
project = ProcessProject(project_definition)
logger.info('processing project %s, action %s', project_name, args.action)

project.process()
exit(0)

if args.action is ArgAction.GIT_FOREACH:
if not global_configuration.project_git:
logger.error('no repository provided')
Expand Down
2 changes: 2 additions & 0 deletions cihpc/core/db/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ def __init__(self, project_name):
self.reports = self.db.get_collection(opts.get('col_timers_name'))
self.files = self.db.get_collection(opts.get('col_files_name'))
self.history = self.db.get_collection(opts.get('col_history_name'))
self.running = self.db.get_collection(opts.get('col_running_name'))

def _get_artifacts(self, opts=None):
if opts:
Expand Down Expand Up @@ -292,4 +293,5 @@ def artifacts_default_configuration(project_name=None):
col_timers_name='timers',
col_files_name='files',
col_history_name='hist',
col_running_name='running',
)
89 changes: 78 additions & 11 deletions cihpc/core/processing/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,18 @@
import os
import shutil
import sys
import threading

from tqdm import tqdm

from cihpc.cfg.config import global_configuration
from cihpc.common.processing.pool import LogStatusFormat, PoolInt, Worker, WorkerPool
from cihpc.common.processing.pool import LogStatusFormat, PoolInt, Worker, WorkerPool, SimpleWorker
from cihpc.common.utils import strings
from cihpc.common.utils.datautils import iter_over, merge_dict
from cihpc.common.utils.parallels import extract_cpus_from_worker
from cihpc.common.utils.timer import Timer
from cihpc.core.db import CIHPCMongo
from cihpc.core.processing.stage import ProcessStage
from cihpc.core.processing.step_cache import ProcessStepCache
from cihpc.core.processing.step_collect import process_step_collect
from cihpc.core.processing.step_git import process_step_git
Expand All @@ -32,21 +35,85 @@ class ProcessProject(object):

def __init__(self, project):
self.project = project
self.mongo = CIHPCMongo.get(self.project.name)
os.makedirs(self.project.workdir, exist_ok=True)
os.chdir(self.project.workdir)

def process(self):
"""
Method will process entire project
:return:
"""
logger.debug('preparing project %s', self.project.name)
# remove old configuration
shutil.rmtree('tmp.%s' % self.project.name, True)
Method will read configuration and prepare threads, which will
perform given action
Returns
-------
# process both sections
for section in [self.project.install, self.project.test]:
self.process_section(section)
list[threading.Thread]
"""
for stage in self.project.stages:
if stage:
threads = ProcessStage.create(self.project, stage)
self._process_stage_threads(stage, threads)

def _process_stage_threads(self, stage, threads):
with Timer(stage.ord_name) as timer:
pool = WorkerPool(processes=stage.parallel.cpus, threads=threads)
pool.update_cpu_values(extract_cpus_from_worker)

if stage.parallel:
logger.info('%d job(s) will be now executed in parallel' % len(threads))
else:
logger.info('%d job(s) will be now executed in serial' % len(threads))

default_status = pool.get_statuses(LogStatusFormat.ONELINE)
cqtdm = tqdm(file=sys.stdout, total=len(threads), dynamic_ncols=True,
desc='%s: %s' % (stage.ord_name, default_status))

def update_status(worker: SimpleWorker):
cqtdm.desc = '%s: %s' % (stage.ord_name, pool.get_statuses(LogStatusFormat.ONELINE))
cqtdm.update()

pool.thread_event.on_exit.on(update_status)

# run in serial or parallel
pool.start()
cqtdm.close()

timers_total = sum([sum(x.collect_result.total) for x in threads if x.collect_result])
if timers_total:
self._save_stats(stage, threads, timers_total, timer.duration)

def _save_stats(self, stage, threads, timers_total, duration):
try:
# here we will store additional info to the DB
# such as how many result we have for each commit
# or how long the testing took.
# is this manner, we will have better control over
# execution for specific commit value, we will now how many results
# we have for each step, so we can automatically determine how many
# repetition we need in order to have minimum result available
from cihpc.artifacts.modules import CIHPCReport
from cihpc.core.db import CIHPCMongo

logger.info('%d processes finished, found %d documents' % (len(threads), timers_total))

stats = dict()
stats['git'] = CIHPCReport.global_git.copy()
stats['system'] = CIHPCReport.global_system.copy()
stats['name'] = stage.name
stats['reps'] = stage.repeat
stats['docs'] = timers_total
stats['config'] = strings.to_yaml(stage.raw_config)
stats['duration'] = duration
stats['parallel'] = bool(stage.parallel)
# logger.dump(stats, 'stats')

cihpc_mongo = CIHPCMongo.get(self.project.name)
insert_result = cihpc_mongo.history.insert_one(stats)
logger.debug('DB write acknowledged: %s' % str(insert_result.acknowledged))
if insert_result.acknowledged:
logger.info('saved stat document to build history')

except Exception as e:
logger.exception(str(e))

def process_section(self, section):
"""
Expand Down Expand Up @@ -128,7 +195,7 @@ def step_on_exit(worker: Worker):
cqtdm = tqdm(file=sys.stdout, total=len(threads), dynamic_ncols=True,
desc='section %s: %s' % (section.name, default_status))

def update_status(worker: Worker):
def update_status(worker: SimpleWorker):
cqtdm.desc = 'section %s: %s' % (section.name, pool.get_statuses(LogStatusFormat.ONELINE))
cqtdm.update()

Expand Down

0 comments on commit 8e1c979

Please sign in to comment.