Skip to content

Commit

Permalink
Reuse QUERY for NIGHTLY telemetry.
Browse files Browse the repository at this point in the history
Fixes #141.
  • Loading branch information
Rémy HUBSCHER committed Nov 21, 2017
1 parent 08ad9f2 commit 606dbe0
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 171 deletions.
4 changes: 2 additions & 2 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
CHANGELOG
=========

0.6.0 (unreleased)
1.0.0 (unreleased)
------------------

- Nothing changed yet.
- Reuse the same Nightly query for Telemetry Update Parquet. (#141)


0.5.0 (2017-11-06)
Expand Down
138 changes: 58 additions & 80 deletions pollbot/tasks/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,54 @@


TELEMETRY_SERVER = "https://sql.telemetry.mozilla.org"
NIGHTLY_BUILD_IDS = {
"57.0a1": 40223, # https://sql.telemetry.mozilla.org/queries/40223/source
"58.0a1": 40790, # https://sql.telemetry.mozilla.org/queries/40790/source
}
TELEMETRY_API_KEY = os.getenv("TELEMETRY_API_KEY")


def get_telemetry_auth_header():
return {"Authorization": "Key {}".format(TELEMETRY_API_KEY)}.copy()


async def put_query(session, query_title, version_name, query, *, query_id=None, run=True):
# Update query with the last build_id
if query_id:
url = "{}/api/queries/{}".format(TELEMETRY_SERVER, query_id)
else:
url = "{}/api/queries".format(TELEMETRY_SERVER)

payload = {
"name": query_title,
"schedule": 3600,
"schedule_until": (date.today() + timedelta(days=7)).strftime(
'%Y-%m-%dT%H:%M:%S'),
"is_draft": True,
"query": query,
"data_source_id": 1,
"options": {"parameters": []}
}
async with session.post(url, json=payload) as resp:
if resp.status != 200:
message = "Unable to create the new query for {} (HTTP {})"
raise TaskError(message.format(version_name, resp.status), url=url)
body = await resp.json()
query_id = body["id"]

if run:
# Query for results
url = "{}/api/query_results".format(TELEMETRY_SERVER)
payload = {
"data_source_id": 1,
"query": query,
"max_age": 0,
"query_id": query_id
}
async with session.post(url, json=payload) as resp:
if resp.status != 200:
message = "Unable to execute the query n°{} for {} (HTTP {})"
raise TaskError(message.format(query_id, version_name, resp.status),
url=url)
return query_id


async def get_query_info_from_title(session, query_title):
query_params = urlencode({"include_drafts": "true", "q": query_title})
query_url = "{}/api/queries/search?{}".format(TELEMETRY_SERVER, query_params)
Expand All @@ -30,41 +67,6 @@ async def get_query_info_from_title(session, query_title):
return body[0] if len(body) > 0 else None


async def get_last_build_ids_for_nightly_version(session, version):
if version not in NIGHTLY_BUILD_IDS:
raise TaskError("Please configure Build IDs query for {}".format(version))

query_id = NIGHTLY_BUILD_IDS[version]
url = "{}/api/queries/{}".format(TELEMETRY_SERVER, query_id)
async with session.get(url) as resp:
if resp.status != 200:
raise TaskError("Query {} unavailable (HTTP {})".format(query_id, resp.status),
url=url)

body = await resp.json()
if not body:
message = "Couldn't find any build matching."
raise TaskError(message, url=url)

latest_query_data_id = body["latest_query_data_id"]
url = "{}/api/query_results/{}".format(TELEMETRY_SERVER, latest_query_data_id)
async with session.get(url) as resp:
if resp.status != 200:
message = "Query Result {} unavailable (HTTP {})"
raise TaskError(message.format(latest_query_data_id, resp.status), url=url)

body = await resp.json()
rows = body["query_result"]["data"]["rows"]

if not rows:
message = "Couldn't find any build matching."
raise TaskError(message, url=url)

last_build_id_date = rows[0]["build_id"][:8]
return [r["build_id"] for r in rows
if r["build_id"].startswith(last_build_id_date)]


async def update_parquet_uptake(product, version):
channel = get_version_channel(version)
if build_version_id(version) < build_version_id('57.0a1'):
Expand All @@ -73,16 +75,17 @@ async def update_parquet_uptake(product, version):
"Telemetry update-parquet metrics landed in Firefox Quantum")

with get_session(headers=get_telemetry_auth_header()) as session:
# Get the build IDs for this channel
build_ids = await get_build_ids_for_version(product, version)
version_name = "{} ({})".format(version, ", ".join(build_ids))

if channel is Channel.NIGHTLY:
# Get the build IDs of the lastest days of nightly
build_ids = await get_last_build_ids_for_nightly_version(session, version)
query_title = "Uptake {} {}"
query_title = query_title.format(product.title(), channel.value)
build_ids = build_ids[:1]
else:
# Get the build IDs for this channel
build_ids = await get_build_ids_for_version(product, version)

version_name = "{} ({})".format(version, ", ".join(build_ids))
query_title = "Uptake {} {} {}"
query_title = query_title.format(product.title(), channel.value, version_name)
query_title = "Uptake {} {} {}"
query_title = query_title.format(product.title(), channel.value, version_name)

query = """
WITH updated_t AS (
Expand All @@ -92,19 +95,24 @@ async def update_parquet_uptake(product, version):
AND environment.build.build_id IN ({build_ids})
),
total_t AS (
SELECT COUNT(*) AS total, payload.target_version
SELECT COUNT(*) AS total, payload.target_version AS version
FROM telemetry_update_parquet
WHERE payload.reason = 'ready'
AND payload.target_build_id IN ({build_ids})
GROUP BY 2
)
SELECT updated * 1.0 / total as ratio, updated, total
SELECT updated * 1.0 / total as ratio, updated, total, version
FROM updated_t, total_t
""".format(build_ids=', '.join(["'{}'".format(bid) for bid in build_ids]))

query_info = await get_query_info_from_title(session, query_title)

if query_info:
if channel is Channel.NIGHTLY:
# Update the NIGHTLY query with the last build_ids
await put_query(session, query_title, version_name, query,
query_id=query_info['id'], run=False)

# In that case the query already exists
latest_query_data_id = query_info["latest_query_data_id"]

Expand Down Expand Up @@ -145,37 +153,7 @@ async def update_parquet_uptake(product, version):

return build_task_response(status, url, message)

# In that case we couldn't find the query, so we need to create it.
url = "{}/api/queries".format(TELEMETRY_SERVER)
payload = {
"name": query_title,
"schedule": 3600,
"schedule_until": (date.today() + timedelta(days=7)).strftime('%Y-%m-%dT%H:%M:%S'),
"is_draft": True,
"query": query,
"data_source_id": 1,
"options": {"parameters": []}
}
async with session.post(url, json=payload) as resp:
if resp.status != 200:
message = "Unable to create the new query for {} (HTTP {})"
raise TaskError(message.format(version_name, resp.status), url=url)
body = await resp.json()
query_id = body["id"]

# Query for results
url = "{}/api/query_results".format(TELEMETRY_SERVER)
payload = {
"data_source_id": 1,
"query": query,
"max_age": 0,
"query_id": query_id
}
async with session.post(url, json=payload) as resp:
if resp.status != 200:
message = "Unable to execute the query n°{} for {} (HTTP {})"
raise TaskError(message.format(query_id, version_name, resp.status), url=url)

query_id = await put_query(session, query_title, version_name, query)
url = "{}/queries/{}".format(TELEMETRY_SERVER, query_id)
message = 'Telemetry uptake calculation for version {} is in progress'.format(version_name)
return build_task_response(Status.INCOMPLETE, url, message)
Expand Down
108 changes: 20 additions & 88 deletions tests/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -922,64 +922,32 @@ def _telemetry_mock_release_query(self, body=None):
self.mocked.get(url, status=200, body=json.dumps(body))

def _telemetry_mock_nightly_query(self, body=None):
record = {
"latest_query_data_id": 5678,
"id": 40197,
"name": "Uptake Firefox NIGHTLY"
}
if body is None:
body = [{
"latest_query_data_id": 5678,
"id": 40197,
"name": "Uptake Firefox NIGHTLY 57.0a1 20170920"
}]
body = [record]

url = ("{}/api/queries/search?q=Uptake+Firefox+NIGHTLY+57.0a1+"
"%2820170920220431%2C+20170920111019%2C+20170920100426%29&include_drafts=true")
url = ("{}/api/queries/search?q=Uptake+Firefox+NIGHTLY&include_drafts=true")
url = url.format(telemetry.TELEMETRY_SERVER)
self.mocked.get(url, status=200, body=json.dumps(body))

url = "{}/api/queries/40197".format(telemetry.TELEMETRY_SERVER)
self.mocked.post(url, status=200, body=json.dumps(record))

def _telemetry_mock_query_result(self, body):
url = '{}/api/query_results/5678'.format(telemetry.TELEMETRY_SERVER)
self.mocked.get(url, status=200, body=json.dumps(body))

def _telemetry_mock_nightly_build_ids(self, body=None):
url = '{}/api/queries/{}'
url = url.format(telemetry.TELEMETRY_SERVER, telemetry.NIGHTLY_BUILD_IDS["57.0a1"])
self.mocked.get(url, status=200, body=json.dumps({
"latest_query_data_id": 1234
}))

if body is None:
body = {
"query_result": {"data": {"rows": [
{"build_id": "20170920220431"},
{"build_id": "20170920111019"},
{"build_id": "20170920100426"},
{"build_id": "20170919220202"},
{"build_id": "20170919110626"}
]}}
}

url = '{}/api/query_results/1234'.format(telemetry.TELEMETRY_SERVER)
self.mocked.get(url, status=200, body=json.dumps(body))

async def test_telemetry_update_uptake_tasks_returns_error_for_previous_nightly(self):
received = await telemetry.update_parquet_uptake('firefox', '56.0a1')
assert received["status"] == Status.MISSING.value
assert received["message"] == "Telemetry update-parquet metrics landed in Firefox Quantum"

async def test_telemetry_update_uptake_tasks_returns_error_for_unsupported_nightly(self):
with pytest.raises(TaskError) as excinfo:
await telemetry.update_parquet_uptake('firefox', '57.0a2')
assert str(excinfo.value) == 'Please configure Build IDs query for 57.0a2'

async def test_telemetry_update_uptake_tasks_returns_error_for_unavailable_query(self):
url = '{}/api/queries/{}'
url = url.format(telemetry.TELEMETRY_SERVER, telemetry.NIGHTLY_BUILD_IDS["57.0a1"])
self.mocked.get(url, status=502)

with pytest.raises(TaskError) as excinfo:
await telemetry.update_parquet_uptake('firefox', '57.0a1')
assert str(excinfo.value) == 'Query 40223 unavailable (HTTP 502)'

async def test_telemetry_update_uptake_tasks_returns_incomplete_for_no_result(self):
self._telemetry_mock_nightly_build_ids()
self._mock_buildhub_search()
self._telemetry_mock_nightly_query([{
"latest_query_data_id": None,
"id": 40197,
Expand All @@ -991,7 +959,7 @@ async def test_telemetry_update_uptake_tasks_returns_incomplete_for_no_result(se
assert received["message"] == ("Query still processing.")

async def test_telemetry_update_uptake_tasks_returns_error_for_empty_results(self):
self._telemetry_mock_nightly_build_ids()
self._mock_buildhub_search()
self._telemetry_mock_nightly_query()
self._telemetry_mock_query_result({
"query_result": {"data": {"rows": []}}
Expand All @@ -1001,21 +969,8 @@ async def test_telemetry_update_uptake_tasks_returns_error_for_empty_results(sel
assert received["status"] == Status.ERROR.value
assert received["message"] == ("No result found for your query.")

async def test_telemetry_update_uptake_tasks_returns_error_for_unavailable_query_results(self):
url = '{}/api/queries/{}'
url = url.format(telemetry.TELEMETRY_SERVER, telemetry.NIGHTLY_BUILD_IDS["57.0a1"])
self.mocked.get(url, status=200, body=json.dumps({
"latest_query_data_id": 1234
}))
url = '{}/api/query_results/1234'.format(telemetry.TELEMETRY_SERVER)
self.mocked.get(url, status=502)

with pytest.raises(TaskError) as excinfo:
await telemetry.update_parquet_uptake('firefox', '57.0a1')
assert str(excinfo.value) == 'Query Result 1234 unavailable (HTTP 502)'

async def test_telemetry_update_uptake_tasks_returns_incomplete_for_low_nightly_uptake(self):
self._telemetry_mock_nightly_build_ids()
self._mock_buildhub_search()
self._telemetry_mock_nightly_query()
self._telemetry_mock_query_result({
"query_result": {"data": {"rows": [
Expand All @@ -1026,11 +981,10 @@ async def test_telemetry_update_uptake_tasks_returns_incomplete_for_low_nightly_
received = await telemetry.update_parquet_uptake('firefox', '57.0a1')
assert received["status"] == Status.INCOMPLETE.value
assert received["message"] == ("Telemetry uptake for version 57.0a1 "
"(20170920220431, 20170920111019, 20170920100426) "
"is 45.32%")
"(20171009192146) is 45.32%")

async def test_telemetry_update_uptake_tasks_should_ignore_copied_queries(self):
self._telemetry_mock_nightly_build_ids()
self._mock_buildhub_search()
self._telemetry_mock_nightly_query([{
"latest_query_data_id": 123456789,
"id": 40198,
Expand All @@ -1049,11 +1003,10 @@ async def test_telemetry_update_uptake_tasks_should_ignore_copied_queries(self):
received = await telemetry.update_parquet_uptake('firefox', '57.0a1')
assert received["status"] == Status.EXISTS.value
assert received["message"] == ("Telemetry uptake for version 57.0a1 "
"(20170920220431, 20170920111019, 20170920100426) "
"is 65.43%")
"(20171009192146) is 65.43%")

async def test_telemetry_update_uptake_tasks_returns_exists_for_high_nightly_uptake(self):
self._telemetry_mock_nightly_build_ids()
self._mock_buildhub_search()
self._telemetry_mock_nightly_query()
url = '{}/api/query_results/5678'.format(telemetry.TELEMETRY_SERVER)
self.mocked.get(url, status=200, body=json.dumps({
Expand All @@ -1065,11 +1018,10 @@ async def test_telemetry_update_uptake_tasks_returns_exists_for_high_nightly_upt
received = await telemetry.update_parquet_uptake('firefox', '57.0a1')
assert received["status"] == Status.EXISTS.value
assert received["message"] == ("Telemetry uptake for version 57.0a1 "
"(20170920220431, 20170920111019, 20170920100426) "
"is 65.43%")
"(20171009192146) is 65.43%")

async def test_telemetry_update_uptake_tasks_returns_missing_for_no_search_query(self):
self._telemetry_mock_nightly_build_ids()
self._mock_buildhub_search()
self._telemetry_mock_nightly_query()
url = '{}/api/query_results/5678'.format(telemetry.TELEMETRY_SERVER)
self.mocked.get(url, status=404)
Expand Down Expand Up @@ -1108,7 +1060,6 @@ async def test_telemetry_update_uptake_tasks_returns_incomplete_for_high_release

async def test_telemetry_update_uptake_creates_the_query_if_not_found_for_nightly(self):
self._mock_buildhub_search()
self._telemetry_mock_nightly_build_ids()
self._telemetry_mock_nightly_query([])

url = '{}/api/queries'.format(telemetry.TELEMETRY_SERVER)
Expand All @@ -1124,28 +1075,9 @@ async def test_telemetry_update_uptake_creates_the_query_if_not_found_for_nightl
received = await telemetry.update_parquet_uptake('firefox', '57.0a1')
assert received["status"] == Status.INCOMPLETE.value
assert received["message"] == (
"Telemetry uptake calculation for version 57.0a1 "
"(20170920220431, 20170920111019, 20170920100426) is in progress"
"Telemetry uptake calculation for version 57.0a1 (20171009192146) is in progress"
)

async def test_telemetry_update_uptake_creates_the_query_if_null_body(self):
url = '{}/api/queries/{}'
url = url.format(telemetry.TELEMETRY_SERVER, telemetry.NIGHTLY_BUILD_IDS["57.0a1"])
self.mocked.get(url, status=200, body=json.dumps({}))

with pytest.raises(TaskError) as excinfo:
await telemetry.update_parquet_uptake('firefox', '57.0a1')
assert str(excinfo.value) == "Couldn't find any build matching."

async def test_telemetry_update_uptake_creates_the_query_if_no_results(self):
self._telemetry_mock_nightly_build_ids({
"query_result": {"data": {"rows": []}}
})

with pytest.raises(TaskError) as excinfo:
await telemetry.update_parquet_uptake('firefox', '57.0a1')
assert str(excinfo.value) == "Couldn't find any build matching."

async def test_telemetry_update_uptake_creates_the_query_if_not_found_for_release(self):
self._mock_buildhub_search()
self._telemetry_mock_release_query([])
Expand Down

0 comments on commit 606dbe0

Please sign in to comment.