Skip to content

Commit

Permalink
Added support to report experiment completion to ReBenchDB
Browse files Browse the repository at this point in the history
This change extracts the ReBenchDB functionality for network connections to a separate rebenchdb.py.
It also moves the responsibility for configuration checking to the configurator.

This way, we can use it in the persistency module and in rebench without code duplication.

Signed-off-by: Stefan Marr <git@stefan-marr.de>
  • Loading branch information
smarr committed Sep 16, 2020
1 parent 6e80d2b commit 8de87cf
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 91 deletions.
29 changes: 29 additions & 0 deletions rebench/configurator.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
from pykwalify.errors import SchemaError
import yaml

from .configuration_error import ConfigurationError
from .model.experiment import Experiment
from .model.exp_run_details import ExpRunDetails
from .model.reporting import Reporting
from .model.executor import Executor
from .rebenchdb import ReBenchDB
from .ui import UIError, escape_braces

# Disable most logging for pykwalify
Expand Down Expand Up @@ -139,6 +141,8 @@ def __init__(self, raw_config, data_store, ui, cli_options=None, cli_reporter=No
self._exp_name = exp_name or raw_config.get('default_experiment', 'all')
self._artifact_review = raw_config.get('artifact_review', False)

self._rebench_db_connector = None

# capture invocation and iteration settings and override when quick is selected
invocations = cli_options.invocations if cli_options else None
iterations = cli_options.iterations if cli_options else None
Expand Down Expand Up @@ -204,6 +208,31 @@ def use_rebench_db(self):
self._rebench_db.get('send_to_rebench_db', False)
or self._rebench_db.get('record_all', False))

def get_rebench_db_connector(self):
if not self.use_rebench_db:
return None
if self._rebench_db_connector:
return self._rebench_db_connector

if 'project_name' not in self._rebench_db:
raise ConfigurationError(
"No project_name defined in configuration file under reporting.rebenchdb.")

if not self._options.experiment_name:
raise ConfigurationError(
"The experiment was not named, which is mandatory. "
"This is needed to identify the data uniquely. "
"It should also help to remember in which context it "
"was recorded, perhaps relating to a specific CI job "
"or confirming some hypothesis."
"\n\n"
"Use the --experiment option to set the name.")

self._rebench_db_connector = ReBenchDB(
self._rebench_db['db_url'], self._rebench_db['project_name'],
self._options.experiment_name, self._ui)
return self._rebench_db_connector

def _process_cli_options(self):
if self._options is None:
return
Expand Down
97 changes: 6 additions & 91 deletions rebench/persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,38 +22,17 @@
import shutil
import subprocess
import sys
from datetime import datetime
from tempfile import NamedTemporaryFile
from threading import Lock
from time import time

from .configuration_error import ConfigurationError
from .environment import determine_environment, determine_source_details
from .model.data_point import DataPoint
from .model.measurement import Measurement
from .model.run_id import RunId
from .rebenchdb import get_current_time
from .ui import UIError


try:
from http.client import HTTPException
from urllib.request import urlopen, Request as PutRequest
except ImportError:
# Python 2.7
from httplib import HTTPException
from urllib2 import urlopen, Request


class PutRequest(Request):
def __init__(self, *args, **kwargs):
if 'method' in kwargs:
del kwargs['method']
Request.__init__(self, *args, **kwargs)

def get_method(self, *_args, **_kwargs): # pylint: disable=arguments-differ
return 'PUT'


_START_TIME_LINE = "# Execution Start: "


Expand Down Expand Up @@ -82,24 +61,7 @@ def get(self, filename, configurator):
self._ui.debug_output_info('ReBenchDB enabled: {e}\n', e=configurator.use_rebench_db)

if configurator.use_rebench_db:
if 'project_name' not in configurator.rebench_db:
raise ConfigurationError(
"No project_name defined in configuration file under reporting.rebenchdb.")

if not configurator.options.experiment_name:
raise ConfigurationError(
"The experiment was not named, which is mandatory. "
"This is needed to identify the data uniquely. "
"It should also help to remember in which context it "
"was recorded, perhaps relating to a specific CI job "
"or confirming some hypothesis."
"\n\n"
"Use the --experiment option to set the name.")

db = _ReBenchDB(configurator.rebench_db['db_url'],
configurator.rebench_db['project_name'],
configurator.options.experiment_name,
self, self._ui)
db = _ReBenchDB(configurator.get_rebench_db_connector(), self, self._ui)
p = _CompositePersistence(p, db)

self._files[filename] = p
Expand Down Expand Up @@ -216,7 +178,7 @@ def __init__(self, data_filename, data_store, discard_old_data, ui):
self._lock = Lock()
self._read_start_time()
if not self._start_time:
self._start_time = datetime.utcnow().isoformat() + "+00:00"
self._start_time = get_current_time()

def _discard_old_data(self):
self._truncate_file(self._data_filename)
Expand Down Expand Up @@ -372,17 +334,11 @@ def close(self):

class _ReBenchDB(_ConcretePersistence):

def __init__(self, server_url, project_name, experiment_name, data_store, ui):
def __init__(self, rebench_db, data_store, ui):
super(_ReBenchDB, self).__init__(data_store, ui)
# TODO: extract common code, possibly
if not server_url:
raise ValueError("ReBenchDB expected server address, but got: %s" % server_url)
self._rebench_db = rebench_db

self._ui.debug_output_info(
'ReBench will report all measurements to {url}\n', url=server_url)
self._server_url = server_url
self._project_name = project_name
self._experiment_name = experiment_name
self._lock = Lock()

self._cache_for_seconds = 30
Expand Down Expand Up @@ -440,54 +396,13 @@ def _send_data(self, cache):
self._ui.debug_output_info(
"ReBenchDB: Send {num_m} measures. startTime: {st}\n",
num_m=num_measurements, st=self._start_time)
return self._send_to_rebench_db({
return self._rebench_db.send_results({
'data': all_data,
'criteria': criteria_index,
'env': determine_environment(),
'startTime': self._start_time,
'projectName': self._project_name,
'experimentName': self._experiment_name,
'source': determine_source_details()}, num_measurements)

def close(self):
with self._lock:
self._send_data_and_empty_cache()

def _send_payload(self, payload):
req = PutRequest(self._server_url, payload,
{'Content-Type': 'application/json'}, method='PUT')
socket = urlopen(req)
response = socket.read()
socket.close()
return response

def _send_to_rebench_db(self, results, num_measurements):
payload = json.dumps(results, separators=(',', ':'), ensure_ascii=True)

# self._ui.output("Saving JSON Payload of size: %d\n" % len(payload))
with open("payload.json", "w") as text_file:
text_file.write(payload)

try:
data = payload.encode('utf-8')
response = self._send_payload(data)
self._ui.verbose_output_info(
"ReBenchDB: Sent {num_m} results to ReBenchDB, response was: {resp}\n",
num_m=num_measurements, resp=response)
return True
except TypeError as te:
self._ui.error("{ind}Error: Reporting to ReBenchDB failed.\n"
+ "{ind}{ind}" + str(te) + "\n")
except (IOError, HTTPException):
# sometimes Codespeed fails to accept a request because something
# is not yet properly initialized, let's try again for those cases
try:
response = self._send_payload(payload)
self._ui.verbose_output_info(
"ReBenchDB: Sent {num_m} results to ReBenchDB, response was: {resp}\n",
num_m=num_measurements, resp=response)
return True
except (IOError, HTTPException) as error:
self._ui.error("{ind}Error: Reporting to ReBenchDB failed.\n"
+ "{ind}{ind}" + str(error) + "\n")
return False
12 changes: 12 additions & 0 deletions rebench/rebench.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from .denoise import minimize_noise, restore_noise
from .environment import init_environment
from .persistence import DataStore
from .rebenchdb import get_current_time
from .reporter import CliReporter
from .configurator import Configurator, load_config
from .configuration_error import ConfigurationError
Expand Down Expand Up @@ -199,6 +200,9 @@ def shell_options(self):
'be recorded, i.e., to which the commit'
' belongs. If not provided, ReBench will try to get'
' the name from git.')
rebench_db.add_argument('--report-completion', dest='report_completion',
default=None, action='store_true',
help='Report the completion of the name experiment to ReBenchDB.')

return parser

Expand All @@ -211,6 +215,11 @@ def determine_exp_name_and_filters(filters):
f.startswith("s:"))]
return exp_name, exp_filter

def _report_completion(self):
rebench_db = self._config.get_rebench_db_connector()
success, _ = rebench_db.send_completion(get_current_time())
return success

def run(self, argv=None):
if argv is None:
argv = sys.argv
Expand All @@ -231,6 +240,9 @@ def run(self, argv=None):
except ConfigurationError as exc:
raise UIError(exc.message + "\n", exc)

if args.report_completion:
return self._report_completion()

runs = self._config.get_runs()

denoise_result = None
Expand Down
110 changes: 110 additions & 0 deletions rebench/rebenchdb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import json
from datetime import datetime

from .ui import UIError

try:
from http.client import HTTPException
from urllib.request import urlopen, Request as PutRequest
except ImportError:
# Python 2.7
from httplib import HTTPException
from urllib2 import urlopen, Request


class PutRequest(Request):
def __init__(self, *args, **kwargs):
if 'method' in kwargs:
del kwargs['method']
Request.__init__(self, *args, **kwargs)

def get_method(self, *_args, **_kwargs): # pylint: disable=arguments-differ
return 'PUT'


def get_current_time():
"""Return the current time as string for use with ReBenchDB and other persistency backends."""
return datetime.utcnow().isoformat() + "+00:00"


class ReBenchDB(object):

def __init__(self, server_base_url, project_name, experiment_name, ui):
self._ui = ui

if not server_base_url:
raise UIError("ReBenchDB expected server address, but got: %s" % server_base_url, None)

# A user warning that old style configuration is detected
if server_base_url.endswith('/results'):
raise UIError(
"The URL to ReBenchDB should exclude '/results' but was '%s'" % server_base_url,
None)

ui.debug_output_info(
'ReBench will report all measurements to {url}\n', url=server_base_url)

self._server_base_url = server_base_url
self._project_name = project_name
self._experiment_name = experiment_name

def send_results(self, benchmark_data, num_measurements):
success, response = self._send_to_rebench_db(benchmark_data, '/results')

if success:
self._ui.verbose_output_info(
"ReBenchDB: Sent {num_m} results to ReBenchDB, response was: {resp}\n",
num_m=num_measurements, resp=response)

return success, response

def send_completion(self, end_time):
success, response = self._send_to_rebench_db({'endTime': end_time}, '/completion')

if success:
self._ui.verbose_output_info(
"ReBenchDB was notified of completion of {project} {exp} at {time}\n" +
"{ind} Its response was: {resp}\n",
project=self._project_name, exp=self._experiment_name, time=end_time, resp=response)
else:
self._ui.error("Reporting completion to ReBenchDB failed.\n" +
"{ind}Error: {response}", response=response)

return success, response

@staticmethod
def _send_payload(payload, url):
req = PutRequest(url, payload,
{'Content-Type': 'application/json'}, method='PUT')
socket = urlopen(req)
response = socket.read()
socket.close()
return response

def _send_to_rebench_db(self, payload_data, operation):
payload_data['projectName'] = self._project_name
payload_data['experimentName'] = self._experiment_name
url = self._server_base_url + operation

payload = json.dumps(payload_data, separators=(',', ':'), ensure_ascii=True)

# self._ui.output("Saving JSON Payload of size: %d\n" % len(payload))
with open("payload.json", "w") as text_file:
text_file.write(payload)

try:
data = payload.encode('utf-8')
response = self._send_payload(data, url)
return True, response
except TypeError as te:
self._ui.error("{ind}Error: Reporting to ReBenchDB failed.\n"
+ "{ind}{ind}" + str(te) + "\n")
except (IOError, HTTPException):
# network or server may have issues, let's try one more time
try:
response = self._send_payload(payload, url)
return True, response
except (IOError, HTTPException) as error:
self._ui.error("{ind}Error: Reporting to ReBenchDB failed.\n"
+ "{ind}{ind}" + str(error) + "\n")
return False, None

0 comments on commit 8de87cf

Please sign in to comment.