diff --git a/oonipipeline/src/oonipipeline/temporal/activities/analysis.py b/oonipipeline/src/oonipipeline/temporal/activities/analysis.py index 6b94cbe9..38ca12bf 100644 --- a/oonipipeline/src/oonipipeline/temporal/activities/analysis.py +++ b/oonipipeline/src/oonipipeline/temporal/activities/analysis.py @@ -8,6 +8,8 @@ from temporalio import workflow, activity +from .common import optimize_all_tables + with workflow.unsafe.imports_passed_through(): import clickhouse_driver @@ -29,7 +31,6 @@ get_prev_range, make_db_rows, maybe_delete_prev_range, - optimize_all_tables, ) log = logging.getLogger("oonidata.processing") diff --git a/oonipipeline/src/oonipipeline/temporal/activities/common.py b/oonipipeline/src/oonipipeline/temporal/activities/common.py new file mode 100644 index 00000000..d5174c76 --- /dev/null +++ b/oonipipeline/src/oonipipeline/temporal/activities/common.py @@ -0,0 +1,17 @@ +from dataclasses import dataclass +from oonipipeline.db.connections import ClickhouseConnection +from oonipipeline.db.create_tables import create_queries + +from temporalio import activity + + +@dataclass +class ClickhouseParams: + clickhouse_url: str + + +@activity.defn +def optimize_all_tables(params: ClickhouseParams): + with ClickhouseConnection(params.clickhouse_url) as db: + for _, table_name in create_queries: + db.execute(f"OPTIMIZE TABLE {table_name}") diff --git a/oonipipeline/src/oonipipeline/temporal/activities/observations.py b/oonipipeline/src/oonipipeline/temporal/activities/observations.py index 8649ef96..ee541c17 100644 --- a/oonipipeline/src/oonipipeline/temporal/activities/observations.py +++ b/oonipipeline/src/oonipipeline/temporal/activities/observations.py @@ -141,6 +141,7 @@ def make_observation_in_day(params: MakeObservationsParams) -> dict: ), ) ) + log.info(f"prev_ranges: {prev_ranges}") t = PerfTimer() total_t = PerfTimer() @@ -175,6 +176,7 @@ def make_observation_in_day(params: MakeObservationsParams) -> dict: if len(prev_ranges) > 0: with ClickhouseConnection(params.clickhouse, row_buffer_size=10_000) as db: for table_name, pr in prev_ranges: + log.info("deleting previous range of {pr}") maybe_delete_prev_range(db=db, prev_range=pr) return {"size": total_size, "measurement_count": total_msmt_count} diff --git a/oonipipeline/src/oonipipeline/temporal/common.py b/oonipipeline/src/oonipipeline/temporal/common.py index 22f0778a..379b56d3 100644 --- a/oonipipeline/src/oonipipeline/temporal/common.py +++ b/oonipipeline/src/oonipipeline/temporal/common.py @@ -21,7 +21,6 @@ MeasurementListProgress, ) from ..db.connections import ClickhouseConnection -from ..db.create_tables import create_queries log = logging.getLogger("oonidata.processing") @@ -165,12 +164,6 @@ def get_prev_range( return prev_range -def optimize_all_tables(clickhouse): - with ClickhouseConnection(clickhouse) as db: - for _, table_name in create_queries: - db.execute(f"OPTIMIZE TABLE {table_name}") - - def get_obs_count_by_cc( db: ClickhouseConnection, test_name: List[str], diff --git a/oonipipeline/src/oonipipeline/temporal/workflows.py b/oonipipeline/src/oonipipeline/temporal/workflows.py index ffb70aa6..89061375 100644 --- a/oonipipeline/src/oonipipeline/temporal/workflows.py +++ b/oonipipeline/src/oonipipeline/temporal/workflows.py @@ -1,5 +1,5 @@ from dataclasses import dataclass -from typing import List +from typing import List, Optional import logging import multiprocessing @@ -8,6 +8,7 @@ from datetime import datetime, timedelta, timezone from temporalio import workflow +from temporalio.common import SearchAttributeKey from temporalio.worker import Worker, SharedStateManager from temporalio.client import ( Client as TemporalClient, @@ -18,6 +19,11 @@ ScheduleState, ) +from oonipipeline.temporal.activities.common import ( + optimize_all_tables, + ClickhouseParams, +) + # Handle temporal sandbox violations related to calls to self.processName = # mp.current_process().name in logger, see: @@ -36,7 +42,7 @@ make_analysis_in_a_day, make_cc_batches, ) - from oonipipeline.temporal.common import get_obs_count_by_cc, optimize_all_tables + from oonipipeline.temporal.common import get_obs_count_by_cc from oonipipeline.temporal.activities.observations import ( MakeObservationsParams, make_observation_in_day, @@ -67,6 +73,7 @@ def make_worker(client: TemporalClient, parallelism: int) -> Worker: make_observation_in_day, make_ground_truths_in_day, make_analysis_in_a_day, + optimize_all_tables, ], activity_executor=concurrent.futures.ProcessPoolExecutor(parallelism + 2), max_concurrent_activities=parallelism, @@ -76,6 +83,14 @@ def make_worker(client: TemporalClient, parallelism: int) -> Worker: ) +def get_workflow_start_time() -> datetime: + workflow_start_time = workflow.info().typed_search_attributes.get( + SearchAttributeKey.for_datetime("TemporalScheduledStartTime") + ) + assert workflow_start_time is not None, "TemporalScheduledStartTime not set" + return workflow_start_time + + @dataclass class ObservationsWorkflowParams: probe_cc: List[str] @@ -84,37 +99,27 @@ class ObservationsWorkflowParams: data_dir: str fast_fail: bool log_level: int = logging.INFO + bucket_date: Optional[str] = None @workflow.defn class ObservationsWorkflow: @workflow.run async def run(self, params: ObservationsWorkflowParams) -> dict: - # TODO(art): wrap this a coroutine call - optimize_all_tables(params.clickhouse) - - workflow_id = workflow.info().workflow_id - - # TODO(art): this is quite sketchy. Waiting on temporal slack question: - # https://temporalio.slack.com/archives/CTT84RS0P/p1714040382186429 - run_ts = datetime.strptime( - "-".join(workflow_id.split("-")[-3:]), - "%Y-%m-%dT%H:%M:%SZ", + if params.bucket_date is None: + params.bucket_date = ( + get_workflow_start_time() - timedelta(days=1) + ).strftime("%Y-%m-%d") + + await workflow.execute_activity( + optimize_all_tables, + ClickhouseParams(clickhouse_url=params.clickhouse), + start_to_close_timeout=timedelta(minutes=5), ) - bucket_date = (run_ts - timedelta(days=1)).strftime("%Y-%m-%d") - - # read_time = workflow_info.start_time - timedelta(days=1) - # log.info(f"workflow.info().start_time={workflow.info().start_time} ") - # log.info(f"workflow.info().cron_schedule={workflow.info().cron_schedule} ") - # log.info(f"workflow_info.workflow_id={workflow_info.workflow_id} ") - # log.info(f"workflow_info.run_id={workflow_info.run_id} ") - # log.info(f"workflow.now()={workflow.now()}") - # print(workflow) - # bucket_date = f"{read_time.year}-{read_time.month:02}-{read_time.day:02}" t = PerfTimer() log.info( - f"Starting observation making with probe_cc={params.probe_cc},test_name={params.test_name} bucket_date={bucket_date}" + f"Starting observation making with probe_cc={params.probe_cc},test_name={params.test_name} bucket_date={params.bucket_date}" ) res = await workflow.execute_activity( @@ -125,19 +130,17 @@ async def run(self, params: ObservationsWorkflowParams) -> dict: clickhouse=params.clickhouse, data_dir=params.data_dir, fast_fail=params.fast_fail, - bucket_date=bucket_date, + bucket_date=params.bucket_date, ), start_to_close_timeout=timedelta(minutes=30), ) total_size = res["size"] total_measurement_count = res["measurement_count"] - - # This needs to be adjusted once we get the the per entry concurrency working mb_per_sec = round(total_size / t.s / 10**6, 1) msmt_per_sec = round(total_measurement_count / t.s) log.info( - f"finished processing {bucket_date} speed: {mb_per_sec}MB/s ({msmt_per_sec}msmt/s)" + f"finished processing {params.bucket_date} speed: {mb_per_sec}MB/s ({msmt_per_sec}msmt/s)" ) # with ClickhouseConnection(params.clickhouse) as db: diff --git a/oonipipeline/tests/test_cli.py b/oonipipeline/tests/test_cli.py index b92a7b77..f08c42f7 100644 --- a/oonipipeline/tests/test_cli.py +++ b/oonipipeline/tests/test_cli.py @@ -54,11 +54,7 @@ def test_full_workflow( bucket_dict = dict(res) assert "2022-10-20" in bucket_dict, bucket_dict assert bucket_dict["2022-10-20"] == 200, bucket_dict - - res = db.execute( - "SELECT COUNT() FROM obs_web WHERE bucket_date = '2022-10-20' AND probe_cc = 'BA'" - ) - obs_count = res[0][0] # type: ignore + obs_count = bucket_dict["2022-10-20"] result = cli_runner.invoke( cli,