Skip to content

Commit

Permalink
Backfill distribution table (#60)
Browse files Browse the repository at this point in the history
  • Loading branch information
dianaclarke committed May 14, 2021
1 parent a2e4f7d commit 9e33f49
Show file tree
Hide file tree
Showing 4 changed files with 332 additions and 3 deletions.
2 changes: 1 addition & 1 deletion conbench/entities/machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def hash(cls):
cls.cpu_thread_count,
"-",
cls.memory_bytes,
)
).label("hash")


s.Index(
Expand Down
4 changes: 2 additions & 2 deletions conbench/tests/entities/test_distribution.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
(SELECT commit.id AS id, commit.sha AS sha, commit.timestamp AS timestamp
FROM commit
WHERE commit.repository = :repository_1 ORDER BY commit.timestamp DESC)
SELECT text(:text_1) AS repository, text(:text_2) AS sha, summary.case_id, summary.context_id, concat(machine.name, :concat_2, machine.cpu_core_count, :concat_3, machine.cpu_thread_count, :concat_4, machine.memory_bytes) AS concat_1, max(summary.unit) AS unit, avg(summary.mean) AS mean_mean, stddev(summary.mean) AS mean_sd, avg(summary.min) AS min_mean, stddev(summary.min) AS min_sd, avg(summary.max) AS max_mean, stddev(summary.max) AS max_sd, avg(summary.median) AS median_mean, stddev(summary.median) AS median_sd, min(commits_up.timestamp) AS first_timestamp, max(commits_up.timestamp) AS last_timestamp, count(summary.mean) AS observations
SELECT text(:text_1) AS repository, text(:text_2) AS sha, summary.case_id, summary.context_id, concat(machine.name, :concat_1, machine.cpu_core_count, :concat_2, machine.cpu_thread_count, :concat_3, machine.memory_bytes) AS hash, max(summary.unit) AS unit, avg(summary.mean) AS mean_mean, stddev(summary.mean) AS mean_sd, avg(summary.min) AS min_mean, stddev(summary.min) AS min_sd, avg(summary.max) AS max_mean, stddev(summary.max) AS max_sd, avg(summary.median) AS median_mean, stddev(summary.median) AS median_sd, min(commits_up.timestamp) AS first_timestamp, max(commits_up.timestamp) AS last_timestamp, count(summary.mean) AS observations
FROM summary JOIN run ON run.id = summary.run_id JOIN machine ON machine.id = summary.machine_id JOIN (SELECT commit_index.id AS id, commit_index.sha AS sha, commit_index.timestamp AS timestamp, commit_index.row_number AS row_number
FROM (SELECT ordered_commits.id AS id, ordered_commits.sha AS sha, ordered_commits.timestamp AS timestamp, row_number() OVER () AS row_number
FROM ordered_commits) AS commit_index
Expand All @@ -63,7 +63,7 @@
FROM ordered_commits) AS commit_index
WHERE commit_index.sha = :sha_1)
LIMIT :param_1) AS commits_up ON commits_up.id = run.commit_id
WHERE run.name LIKE :name_1 AND summary.case_id = :case_id_1 AND summary.context_id = :context_id_1 AND concat(machine.name, :concat_5, machine.cpu_core_count, :concat_6, machine.cpu_thread_count, :concat_7, machine.memory_bytes) = :concat_8 GROUP BY summary.case_id, summary.context_id, summary.machine_id, machine.name, machine.cpu_core_count, machine.cpu_thread_count, machine.memory_bytes""" # noqa
WHERE run.name LIKE :name_1 AND summary.case_id = :case_id_1 AND summary.context_id = :context_id_1 AND concat(machine.name, :concat_4, machine.cpu_core_count, :concat_5, machine.cpu_thread_count, :concat_6, machine.memory_bytes) = :param_2 GROUP BY summary.case_id, summary.context_id, summary.machine_id, machine.name, machine.cpu_core_count, machine.cpu_thread_count, machine.memory_bytes""" # noqa


def create_benchmark_summary(conbench, results, benchmark_name=None):
Expand Down
139 changes: 139 additions & 0 deletions conbench/tests/migrations/test_0d4e564b1876_backfill_distribution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import decimal
import os
import uuid

from alembic import command
from alembic.config import Config

from ...entities.distribution import Distribution
from ...entities.summary import Summary


this_dir = os.path.abspath(os.path.dirname(__file__))
config_path = os.path.join(this_dir, "../../../alembic.ini")


VALID_PAYLOAD = {
"context": {
"arrow_compiler_flags": "-fPIC -arch x86_64 -arch x86_64 -std=c++11 -Qunused-arguments -fcolor-diagnostics -O3 -DNDEBUG",
"arrow_compiler_id": "AppleClang",
"arrow_compiler_version": "11.0.0.11000033",
"arrow_version": "2.0.0",
"benchmark_language_version": "Python 3.8.5",
"benchmark_language": "Python",
},
"github": {
"commit": "02addad336ba19a654f9c857ede546331be7b631",
"repository": "https://github.com/apache/arrow",
},
"machine_info": {
"architecture_name": "x86_64",
"cpu_l1d_cache_bytes": "32768",
"cpu_l1i_cache_bytes": "32768",
"cpu_l2_cache_bytes": "262144",
"cpu_l3_cache_bytes": "4194304",
"cpu_core_count": "2",
"cpu_frequency_max_hz": "3500000000",
"cpu_model_name": "Intel(R) Core(TM) i7-7567U CPU @ 3.50GHz",
"cpu_thread_count": "4",
"kernel_name": "19.6.0",
"memory_bytes": "17179869184",
"name": "diana",
"os_name": "macOS",
"os_version": "10.15.7",
},
"stats": {
"batch_id": "7b2fdd9f929d47b9960152090d47f8e6",
"run_id": "2a5709d179f349cba69ed242be3e6321",
"run_name": "commit: 02addad336ba19a654f9c857ede546331be7b631",
"data": [
"0.099094",
"0.037129",
"0.036381",
"0.148896",
"0.008104",
"0.005496",
"0.009871",
"0.006008",
"0.007978",
"0.004733",
],
"times": [
"0.099094",
"0.037129",
"0.036381",
"0.148896",
"0.008104",
"0.005496",
"0.009871",
"0.006008",
"0.007978",
"0.004733",
],
"unit": "s",
"time_unit": "s",
"iqr": "0.030442",
"iterations": 10,
"max": "0.148896",
"mean": "0.036369",
"median": "0.008988",
"min": "0.004733",
"q1": "0.006500",
"q3": "0.036942",
"stdev": "0.049194",
"timestamp": "2020-11-25T21:02:42.706806+00:00",
},
"tags": {
"compression": "snappy",
"cpu_count": 2,
"dataset": "nyctaxi_sample",
"file_type": "parquet",
"input_type": "arrow",
"name": "file-write",
},
}


def test_upgrade():
VALID_PAYLOAD["tags"]["name"] = uuid.uuid4().hex
summary = Summary.create(VALID_PAYLOAD)

# assert before migration
distributions = Distribution.search(
filters=[Distribution.case_id == summary.case_id]
)
assert len(distributions) == 1
assert distributions[0].unit == "s"
assert distributions[0].observations == 1
assert distributions[0].mean_mean == decimal.Decimal("0.03636900000000000000")
assert distributions[0].mean_sd is None
assert distributions[0].min_mean == decimal.Decimal("0.00473300000000000000")
assert distributions[0].min_sd is None
assert distributions[0].max_mean == decimal.Decimal("0.14889600000000000000")
assert distributions[0].max_sd is None
assert distributions[0].median_mean == decimal.Decimal("0.00898800000000000000")
assert distributions[0].median_sd is None

Distribution.delete_all()
assert Distribution.count() == 0

# do migration
alembic_config = Config(config_path)
command.stamp(alembic_config, "0d44e2332557")
command.upgrade(alembic_config, "0d4e564b1876")

# assert after migration
distributions = Distribution.search(
filters=[Distribution.case_id == summary.case_id]
)
assert len(distributions) == 1
assert distributions[0].unit == "s"
assert distributions[0].observations == 1
assert distributions[0].mean_mean == decimal.Decimal("0.03636900000000000000")
assert distributions[0].mean_sd is None
assert distributions[0].min_mean == decimal.Decimal("0.00473300000000000000")
assert distributions[0].min_sd is None
assert distributions[0].max_mean == decimal.Decimal("0.14889600000000000000")
assert distributions[0].max_sd is None
assert distributions[0].median_mean == decimal.Decimal("0.00898800000000000000")
assert distributions[0].median_sd is None
190 changes: 190 additions & 0 deletions migrations/versions/0d4e564b1876_backfill_distribution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
import uuid

from alembic import op
from sqlalchemy import func, MetaData
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.sql import select


revision = "0d4e564b1876"
down_revision = "0d44e2332557"
branch_labels = None
depends_on = None


def get_commit_index(commit_table, repository):
ordered = (
select(commit_table.c.id, commit_table.c.sha, commit_table.c.timestamp)
.filter(commit_table.c.repository == repository)
.order_by(commit_table.c.timestamp.desc())
).cte("ordered_commits")
return select(ordered, func.row_number().over().label("row_number"))


def get_commits_up(commit_table, repository, sha, limit):
index = get_commit_index(commit_table, repository).subquery().alias("commit_index")
n = select(index.c.row_number).filter(index.c.sha == sha).scalar_subquery()
return index.select().filter(index.c.row_number >= n).limit(limit)


def get_distribution(
summary_table,
run_table,
machine_table,
commit_table,
repository,
sha,
case_id,
context_id,
machine_hash,
limit,
):
commits_up = (
get_commits_up(commit_table, repository, sha, limit)
.subquery()
.alias("commits_up")
)
return (
select(
func.text(repository).label("repository"),
func.text(sha).label("sha"),
summary_table.c.case_id,
summary_table.c.context_id,
func.concat(
machine_table.c.name,
"-",
machine_table.c.cpu_core_count,
"-",
machine_table.c.cpu_thread_count,
"-",
machine_table.c.memory_bytes,
).label("hash"),
func.max(summary_table.c.unit).label("unit"),
func.avg(summary_table.c.mean).label("mean_mean"),
func.stddev(summary_table.c.mean).label("mean_sd"),
func.avg(summary_table.c.min).label("min_mean"),
func.stddev(summary_table.c.min).label("min_sd"),
func.avg(summary_table.c.max).label("max_mean"),
func.stddev(summary_table.c.max).label("max_sd"),
func.avg(summary_table.c.median).label("median_mean"),
func.stddev(summary_table.c.median).label("median_sd"),
func.min(commits_up.c.timestamp).label("first_timestamp"),
func.max(commits_up.c.timestamp).label("last_timestamp"),
func.count(summary_table.c.mean).label("observations"),
)
.group_by(
summary_table.c.case_id,
summary_table.c.context_id,
summary_table.c.machine_id,
machine_table.c.name,
machine_table.c.cpu_core_count,
machine_table.c.cpu_thread_count,
machine_table.c.memory_bytes,
)
.join(run_table, run_table.c.id == summary_table.c.run_id)
.join(machine_table, machine_table.c.id == summary_table.c.machine_id)
.join(commits_up, commits_up.c.id == run_table.c.commit_id)
.filter(
run_table.c.name.like("commit: %"),
summary_table.c.case_id == case_id,
summary_table.c.context_id == context_id,
func.concat(
machine_table.c.name,
"-",
machine_table.c.cpu_core_count,
"-",
machine_table.c.cpu_thread_count,
"-",
machine_table.c.memory_bytes,
)
== machine_hash,
)
)


def upgrade():
connection = op.get_bind()
meta = MetaData()
meta.reflect(bind=connection)

commit_table = meta.tables["commit"]
distribution_table = meta.tables["distribution"]
machine_table = meta.tables["machine"]
run_table = meta.tables["run"]
summary_table = meta.tables["summary"]

runs = connection.execute(run_table.select())
commits = connection.execute(commit_table.select())
distributions = connection.execute(distribution_table.select())
machines = connection.execute(machine_table.select())
runs_by_id = {r["id"]: r for r in runs}
commits_by_id = {c["id"]: c for c in commits}
machines_by_id = {m["id"]: m for m in machines}

seen = {
f'{d["sha"]}{d["case_id"]}{d["context_id"]}{d["machine_id"]}'
for d in distributions
}

summaries = connection.execute(
summary_table.select()
.join(run_table, run_table.c.id == summary_table.c.run_id)
.filter(run_table.c.name.like("commit: %"))
)
for summary in summaries:
run = runs_by_id.get(summary["run_id"])
if not run:
continue

commit = commits_by_id.get(run["commit_id"])
if not commit:
continue

hash_ = f'{commit["sha"]}{summary["case_id"]}{summary["context_id"]}{summary["machine_id"]}'
if hash_ in seen:
continue

m = machines_by_id[summary["machine_id"]]
machine_hash = (
f"{m.name}-{m.cpu_core_count}-{m.cpu_thread_count}-{m.memory_bytes}"
)

distributions = list(
connection.execute(
get_distribution(
summary_table,
run_table,
machine_table,
commit_table,
commit["repository"],
commit["sha"],
summary["case_id"],
summary["context_id"],
machine_hash,
1000,
)
)
)

if not distributions:
continue

distribution = distributions[0]
values = dict(distribution)
machine_hash = values.pop("hash")
values["id"] = uuid.uuid4().hex
values["machine_hash"] = machine_hash

connection.execute(
insert(distribution_table)
.values(values)
.on_conflict_do_update(
index_elements=["sha", "case_id", "context_id", "machine_hash"],
set_=values,
)
)
connection.commit()


def downgrade():
pass

0 comments on commit 9e33f49

Please sign in to comment.