-
Notifications
You must be signed in to change notification settings - Fork 149
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
bug 1246675: fully functional balrog agent (#103). r=rail,aki
- Loading branch information
Showing
27 changed files
with
707 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
[run] | ||
branch = True | ||
include = balrogagent/* | ||
|
||
[report] | ||
show_missing = True |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
FROM python:3.5-slim | ||
|
||
MAINTAINER bhearsum@mozilla.com | ||
|
||
WORKDIR /app | ||
|
||
COPY requirements.txt /app/ | ||
RUN pip install -r requirements.txt | ||
|
||
COPY balrogagent/ /app/balrogagent/ | ||
COPY scripts/ /app/scripts/ | ||
COPY run.sh MANIFEST.in setup.py version.json /app/ | ||
# test-only stuff | ||
COPY .coveragerc requirements-test.txt run-tests.sh tox.ini version.txt /app/ | ||
|
||
ENTRYPOINT ["/app/run.sh"] | ||
CMD ["agent"] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
include requirements.txt | ||
include version.txt | ||
|
||
recursive-exclude * *.py[co] |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
import logging | ||
|
||
|
||
async def get_telemetry_uptake(*args): | ||
# TODO: implement this when https://bugzilla.mozilla.org/show_bug.cgi?id=1240522 is fixed | ||
return -1 | ||
|
||
|
||
def telemetry_is_ready(change, current_uptake): | ||
logging.debug("Comparing uptake for change %s (current: %s, required: %s)", change["sc_id"], current_uptake, change["telemetry_uptake"]) | ||
if current_uptake >= change["telemetry_uptake"]: | ||
return True | ||
else: | ||
return False | ||
|
||
|
||
def time_is_ready(change, now): | ||
# "when" is to-the-millisecond timestamp that gets stored as an int. | ||
# It needs to be converted back to a float before it can be compared | ||
# against other timestamps. | ||
scheduled_time = change["when"] / 1000 | ||
logging.debug("Comparing time for change %s (now: %s, scheduled time: %s)", change["sc_id"], now, scheduled_time) | ||
if now >= scheduled_time: | ||
return True | ||
else: | ||
return False |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
import aiohttp | ||
import json | ||
import logging | ||
|
||
|
||
default_headers = { | ||
"Accept-Encoding": "application/json", | ||
"Accept": "application/json", | ||
"Content-Type": "application/json", | ||
} | ||
|
||
|
||
def get_url(api_root, path): | ||
return api_root.rstrip("/") + path | ||
|
||
|
||
async def request(api_root, path, method="GET", data={}, headers=default_headers, auth=None, loop=None): | ||
headers = headers.copy() | ||
url = get_url(api_root, path) | ||
csrf_url = get_url(api_root, "/csrf_token") | ||
data = data.copy() | ||
|
||
# CSRF tokens are only required for POST/PUT/DELETE. | ||
if method not in ("HEAD", "GET"): | ||
logging.debug("Sending %s request to %s", "HEAD", csrf_url) | ||
resp = await aiohttp.request("HEAD", csrf_url, auth=auth, loop=loop) | ||
resp.raise_for_status() | ||
data["csrf_token"] = resp.headers["X-CSRF-Token"] | ||
resp.close() | ||
|
||
logging.debug("Sending %s request to %s", method, url) | ||
resp = await aiohttp.request(method, url, data=json.dumps(data), headers=headers, auth=auth, loop=loop) | ||
# Raises on 400 code or higher, we can assume things are good if we make it past this. | ||
resp.raise_for_status() | ||
return resp |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
import aiohttp | ||
import asyncio | ||
import logging | ||
import time | ||
|
||
from . import client | ||
from .changes import get_telemetry_uptake, telemetry_is_ready, time_is_ready | ||
from .log import configure_logging | ||
|
||
|
||
async def run_agent(loop, balrog_api_root, balrog_username, balrog_password, telemetry_api_root, sleeptime=30, | ||
once=False, raise_exceptions=False): | ||
auth = aiohttp.BasicAuth(balrog_username, balrog_password) | ||
|
||
while True: | ||
try: | ||
logging.debug("Looking for active scheduled changes...") | ||
resp = await client.request(balrog_api_root, "/scheduled_changes/rules", auth=auth, loop=loop) | ||
sc = (await resp.json())["scheduled_changes"] | ||
resp.close() | ||
logging.debug("Found %s", len(sc)) | ||
for change in sc: | ||
logging.debug("Processing change %s", change["sc_id"]) | ||
ready = False | ||
|
||
# Figure out if the change is ready, which is type-specific. | ||
if change["telemetry_uptake"]: | ||
# TODO: maybe replace this with a simple client.request()... | ||
current_uptake = await get_telemetry_uptake(change["telemetry_product"], change["telemetry_channel"], loop=loop) | ||
ready = telemetry_is_ready(change, current_uptake) | ||
elif change["when"]: | ||
# "when" is to-the-millisecond timestamp that gets stored as an int. | ||
# It needs to be converted back to a float before it can be compared | ||
# against other timestamps. | ||
ready = time_is_ready(change, time.time()) | ||
else: | ||
logging.debug("Unknown change type!") | ||
|
||
# If it *is* ready, enact it! | ||
if ready: | ||
logging.debug("Change %s is ready, enacting", change["sc_id"]) | ||
endpoint = "/scheduled_changes/rules/{}/enact".format(change["sc_id"]) | ||
resp = await client.request(balrog_api_root, endpoint, method="POST", auth=auth, loop=loop) | ||
resp.close() | ||
else: | ||
logging.debug("Change %s is not ready", change["sc_id"]) | ||
|
||
except: | ||
logging.error("Encountered exception:", exc_info=True) | ||
if raise_exceptions: | ||
raise | ||
finally: | ||
if not once: | ||
await asyncio.sleep(sleeptime) | ||
|
||
if once: | ||
return | ||
|
||
|
||
def main(): | ||
import os | ||
|
||
logging_kwargs = { | ||
"level": os.environ.get("LOG_LEVEL", logging.INFO) | ||
} | ||
if os.environ.get("LOG_FORMAT") == "plain": | ||
logging_kwargs["formatter"] = logging.Formatter | ||
configure_logging(**logging_kwargs) | ||
|
||
loop = asyncio.get_event_loop() | ||
loop.run_until_complete( | ||
run_agent( | ||
loop, | ||
os.environ["BALROG_API_ROOT"], os.environ["BALROG_USERNAME"], os.environ["BALROG_PASSWORD"], | ||
os.environ["TELEMETRY_API_ROOT"] | ||
) | ||
) | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,111 @@ | ||
import json | ||
import logging | ||
import socket | ||
import sys | ||
import traceback | ||
|
||
log_format = "%(asctime)s - %(levelname)s - %(name)s.%(funcName)s#%(lineno)s: %(message)s" | ||
|
||
|
||
class JsonLogFormatter(logging.Formatter): | ||
"""Log formatter that outputs machine-readable json. | ||
This log formatter outputs JSON format messages that are compatible with | ||
Mozilla's standard heka-based log aggregation infrastructure. | ||
See also: | ||
https://mana.mozilla.org/wiki/display/CLOUDSERVICES/Logging+Standard | ||
https://mana.mozilla.org/wiki/pages/viewpage.action?pageId=42895640 | ||
Adapted from: | ||
https://github.com/mozilla-services/mozservices/blob/master/mozsvc/util.py#L106 | ||
""" | ||
LOGGING_FORMAT_VERSION = "2.0" | ||
|
||
# Map from Python logging to Syslog severity levels | ||
SYSLOG_LEVEL_MAP = { | ||
50: 2, # CRITICAL | ||
40: 3, # ERROR | ||
30: 4, # WARNING | ||
20: 6, # INFO | ||
10: 7, # DEBUG | ||
} | ||
|
||
# Syslog level to use when/if python level isn't found in map | ||
DEFAULT_SYSLOG_LEVEL = 7 | ||
|
||
EXCLUDED_LOGRECORD_ATTRS = set(( | ||
'args', 'asctime', 'created', 'exc_info', 'exc_text', 'filename', | ||
'funcName', 'levelname', 'levelno', 'lineno', 'module', 'msecs', | ||
'message', 'msg', 'name', 'pathname', 'process', 'processName', | ||
'relativeCreated', 'stack_info', 'thread', 'threadName' | ||
)) | ||
|
||
def __init__(self, fmt=None, datefmt=None, logger_name='Balrog'): | ||
self.logger_name = logger_name | ||
self.hostname = socket.gethostname() | ||
logging.Formatter.__init__(self, fmt, datefmt) | ||
|
||
def format(self, record): | ||
""" | ||
Map from Python LogRecord attributes to JSON log format fields | ||
* from - https://docs.python.org/3/library/logging.html#logrecord-attributes | ||
* to - https://mana.mozilla.org/wiki/pages/viewpage.action?pageId=42895640 | ||
""" | ||
out = dict( | ||
Timestamp=int(record.created * 1e9), | ||
Type=record.name, | ||
Logger=self.logger_name, | ||
Hostname=self.hostname, | ||
EnvVersion=self.LOGGING_FORMAT_VERSION, | ||
Severity=self.SYSLOG_LEVEL_MAP.get(record.levelno, | ||
self.DEFAULT_SYSLOG_LEVEL), | ||
Pid=record.process, | ||
) | ||
|
||
# Include any custom attributes set on the record. | ||
# These would usually be collected metrics data. | ||
fields = dict() | ||
for key, value in record.__dict__.items(): | ||
if key not in self.EXCLUDED_LOGRECORD_ATTRS: | ||
fields[key] = value | ||
|
||
# Only include the 'message' key if it has useful content | ||
# and is not already a JSON blob. | ||
message = record.getMessage() | ||
if message: | ||
if not message.startswith("{") and not message.endswith("}"): | ||
fields["message"] = message | ||
|
||
# If there is an error, format it for nice output. | ||
if record.exc_info is not None: | ||
fields["error"] = repr(record.exc_info[1]) | ||
fields["traceback"] = safer_format_traceback(*record.exc_info) | ||
|
||
out['Fields'] = fields | ||
|
||
return json.dumps(out) | ||
|
||
|
||
def safer_format_traceback(exc_typ, exc_val, exc_tb): | ||
"""Format an exception traceback into safer string. | ||
We don't want to let users write arbitrary data into our logfiles, | ||
which could happen if they e.g. managed to trigger a ValueError with | ||
a carefully-crafted payload. This function formats the traceback | ||
using "%r" for the actual exception data, which passes it through repr() | ||
so that any special chars are safely escaped. | ||
""" | ||
lines = ["Uncaught exception:\n"] | ||
lines.extend(traceback.format_tb(exc_tb)) | ||
lines.append("%r\n" % (exc_typ,)) | ||
lines.append("%r\n" % (exc_val,)) | ||
return "".join(lines) | ||
|
||
|
||
def configure_logging(stream=sys.stdout, formatter=JsonLogFormatter, format_=log_format, level=logging.DEBUG): | ||
handler = logging.StreamHandler(stream) | ||
formatter = formatter(fmt=format_) | ||
handler.setFormatter(formatter) | ||
logging.root.addHandler(handler) | ||
logging.root.setLevel(level) |
Empty file.
Oops, something went wrong.
4a6d3e3
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TaskCluster-GitHub: https://tools.taskcluster.net/task-graph-inspector/#MVOo-xQ0SRq5yOYn2y1ong