Skip to content

Commit

Permalink
Refactor observations workflow based on pattern in temporalio/samples…
Browse files Browse the repository at this point in the history
  • Loading branch information
hellais committed Apr 25, 2024
1 parent 1035bd3 commit ee36957
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

from temporalio import workflow, activity

from .common import optimize_all_tables

with workflow.unsafe.imports_passed_through():
import clickhouse_driver

Expand All @@ -29,7 +31,6 @@
get_prev_range,
make_db_rows,
maybe_delete_prev_range,
optimize_all_tables,
)

log = logging.getLogger("oonidata.processing")
Expand Down
17 changes: 17 additions & 0 deletions oonipipeline/src/oonipipeline/temporal/activities/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from dataclasses import dataclass
from oonipipeline.db.connections import ClickhouseConnection
from oonipipeline.db.create_tables import create_queries

from temporalio import activity


@dataclass
class ClickhouseParams:
clickhouse_url: str


@activity.defn
def optimize_all_tables(params: ClickhouseParams):
with ClickhouseConnection(params.clickhouse_url) as db:
for _, table_name in create_queries:
db.execute(f"OPTIMIZE TABLE {table_name}")
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ def make_observation_in_day(params: MakeObservationsParams) -> dict:
),
)
)
log.info(f"prev_ranges: {prev_ranges}")

t = PerfTimer()
total_t = PerfTimer()
Expand Down Expand Up @@ -175,6 +176,7 @@ def make_observation_in_day(params: MakeObservationsParams) -> dict:
if len(prev_ranges) > 0:
with ClickhouseConnection(params.clickhouse, row_buffer_size=10_000) as db:
for table_name, pr in prev_ranges:
log.info("deleting previous range of {pr}")
maybe_delete_prev_range(db=db, prev_range=pr)

return {"size": total_size, "measurement_count": total_msmt_count}
7 changes: 0 additions & 7 deletions oonipipeline/src/oonipipeline/temporal/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
MeasurementListProgress,
)
from ..db.connections import ClickhouseConnection
from ..db.create_tables import create_queries

log = logging.getLogger("oonidata.processing")

Expand Down Expand Up @@ -165,12 +164,6 @@ def get_prev_range(
return prev_range


def optimize_all_tables(clickhouse):
with ClickhouseConnection(clickhouse) as db:
for _, table_name in create_queries:
db.execute(f"OPTIMIZE TABLE {table_name}")


def get_obs_count_by_cc(
db: ClickhouseConnection,
test_name: List[str],
Expand Down
57 changes: 30 additions & 27 deletions oonipipeline/src/oonipipeline/temporal/workflows.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import dataclass
from typing import List
from typing import List, Optional

import logging
import multiprocessing
Expand All @@ -8,6 +8,7 @@
from datetime import datetime, timedelta, timezone

from temporalio import workflow
from temporalio.common import SearchAttributeKey
from temporalio.worker import Worker, SharedStateManager
from temporalio.client import (
Client as TemporalClient,
Expand All @@ -18,6 +19,11 @@
ScheduleState,
)

from oonipipeline.temporal.activities.common import (
optimize_all_tables,
ClickhouseParams,
)


# Handle temporal sandbox violations related to calls to self.processName =
# mp.current_process().name in logger, see:
Expand All @@ -36,7 +42,7 @@
make_analysis_in_a_day,
make_cc_batches,
)
from oonipipeline.temporal.common import get_obs_count_by_cc, optimize_all_tables
from oonipipeline.temporal.common import get_obs_count_by_cc
from oonipipeline.temporal.activities.observations import (
MakeObservationsParams,
make_observation_in_day,
Expand Down Expand Up @@ -67,6 +73,7 @@ def make_worker(client: TemporalClient, parallelism: int) -> Worker:
make_observation_in_day,
make_ground_truths_in_day,
make_analysis_in_a_day,
optimize_all_tables,
],
activity_executor=concurrent.futures.ProcessPoolExecutor(parallelism + 2),
max_concurrent_activities=parallelism,
Expand All @@ -76,6 +83,14 @@ def make_worker(client: TemporalClient, parallelism: int) -> Worker:
)


def get_workflow_start_time() -> datetime:
workflow_start_time = workflow.info().typed_search_attributes.get(
SearchAttributeKey.for_datetime("TemporalScheduledStartTime")
)
assert workflow_start_time is not None, "TemporalScheduledStartTime not set"
return workflow_start_time


@dataclass
class ObservationsWorkflowParams:
probe_cc: List[str]
Expand All @@ -84,37 +99,27 @@ class ObservationsWorkflowParams:
data_dir: str
fast_fail: bool
log_level: int = logging.INFO
bucket_date: Optional[str] = None


@workflow.defn
class ObservationsWorkflow:
@workflow.run
async def run(self, params: ObservationsWorkflowParams) -> dict:
# TODO(art): wrap this a coroutine call
optimize_all_tables(params.clickhouse)

workflow_id = workflow.info().workflow_id

# TODO(art): this is quite sketchy. Waiting on temporal slack question:
# https://temporalio.slack.com/archives/CTT84RS0P/p1714040382186429
run_ts = datetime.strptime(
"-".join(workflow_id.split("-")[-3:]),
"%Y-%m-%dT%H:%M:%SZ",
if params.bucket_date is None:
params.bucket_date = (
get_workflow_start_time() - timedelta(days=1)
).strftime("%Y-%m-%d")

await workflow.execute_activity(
optimize_all_tables,
ClickhouseParams(clickhouse_url=params.clickhouse),
start_to_close_timeout=timedelta(minutes=5),
)
bucket_date = (run_ts - timedelta(days=1)).strftime("%Y-%m-%d")

# read_time = workflow_info.start_time - timedelta(days=1)
# log.info(f"workflow.info().start_time={workflow.info().start_time} ")
# log.info(f"workflow.info().cron_schedule={workflow.info().cron_schedule} ")
# log.info(f"workflow_info.workflow_id={workflow_info.workflow_id} ")
# log.info(f"workflow_info.run_id={workflow_info.run_id} ")
# log.info(f"workflow.now()={workflow.now()}")
# print(workflow)
# bucket_date = f"{read_time.year}-{read_time.month:02}-{read_time.day:02}"

t = PerfTimer()
log.info(
f"Starting observation making with probe_cc={params.probe_cc},test_name={params.test_name} bucket_date={bucket_date}"
f"Starting observation making with probe_cc={params.probe_cc},test_name={params.test_name} bucket_date={params.bucket_date}"
)

res = await workflow.execute_activity(
Expand All @@ -125,19 +130,17 @@ async def run(self, params: ObservationsWorkflowParams) -> dict:
clickhouse=params.clickhouse,
data_dir=params.data_dir,
fast_fail=params.fast_fail,
bucket_date=bucket_date,
bucket_date=params.bucket_date,
),
start_to_close_timeout=timedelta(minutes=30),
)

total_size = res["size"]
total_measurement_count = res["measurement_count"]

# This needs to be adjusted once we get the the per entry concurrency working
mb_per_sec = round(total_size / t.s / 10**6, 1)
msmt_per_sec = round(total_measurement_count / t.s)
log.info(
f"finished processing {bucket_date} speed: {mb_per_sec}MB/s ({msmt_per_sec}msmt/s)"
f"finished processing {params.bucket_date} speed: {mb_per_sec}MB/s ({msmt_per_sec}msmt/s)"
)

# with ClickhouseConnection(params.clickhouse) as db:
Expand Down
6 changes: 1 addition & 5 deletions oonipipeline/tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,7 @@ def test_full_workflow(
bucket_dict = dict(res)
assert "2022-10-20" in bucket_dict, bucket_dict
assert bucket_dict["2022-10-20"] == 200, bucket_dict

res = db.execute(
"SELECT COUNT() FROM obs_web WHERE bucket_date = '2022-10-20' AND probe_cc = 'BA'"
)
obs_count = res[0][0] # type: ignore
obs_count = bucket_dict["2022-10-20"]

result = cli_runner.invoke(
cli,
Expand Down

0 comments on commit ee36957

Please sign in to comment.