Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Temporal for orchestration #58

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 12 additions & 0 deletions debug-temporal.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# >>> json.dumps(asdict(ObservationsWorkflowParams(probe_cc=["IT"], start_day="2024-01-01", end_day="2024-01-02", clickhouse="clickhouse://localhost/", data_dir="/Users/art/repos/ooni/data/tests/data/", parallelism=10, fast_fail=False, test_name=["signal"])))
#
#
INPUT_JSON="{\"probe_cc\": [\"IT\"], \"test_name\": [\"signal\"], \"start_day\": \"2024-01-01\", \"end_day\": \"2024-01-20\", \"clickhouse\": \"clickhouse://localhost/\", \"data_dir\": \"$(pwd)/tests/data/datadir/\", \"parallelism\": 10, \"fast_fail\": false, \"log_level\": 20}"

echo $INPUT_JSON
temporal workflow start \
--task-queue oonidatapipeline-task-queue \
--type ObservationsWorkflow \
--namespace default \
--input "$INPUT_JSON"

295 changes: 0 additions & 295 deletions oonidata/cli/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,6 @@
from oonidata.db.connections import ClickhouseConnection
from oonidata.db.create_tables import create_queries, list_all_table_diffs
from oonidata.netinfo import NetinfoDB
from oonidata.workers import (
start_fingerprint_hunter,
start_observation_maker,
start_ground_truth_builder,
start_response_archiver,
)
from oonidata.workers.analysis import start_analysis


log = logging.getLogger("oonidata")
Expand Down Expand Up @@ -125,294 +118,6 @@ def sync(
)


@cli.command()
@probe_cc_option
@test_name_option
@start_day_option
@end_day_option
@click.option("--clickhouse", type=str)
@click.option(
"--data-dir",
type=Path,
required=True,
help="data directory to store fingerprint and geoip databases",
)
@click.option(
"--parallelism",
type=int,
default=multiprocessing.cpu_count() + 2,
help="number of processes to use. Only works when writing to a database",
)
@click.option(
"--fast-fail",
is_flag=True,
help="should we fail immediately when we encounter an error?",
)
@click.option(
"--create-tables",
is_flag=True,
help="should we attempt to create the required clickhouse tables",
)
@click.option(
"--drop-tables",
is_flag=True,
help="should we drop tables before creating them",
)
def mkobs(
probe_cc: List[str],
test_name: List[str],
start_day: date,
end_day: date,
clickhouse: Optional[str],
data_dir: Path,
parallelism: int,
fast_fail: bool,
create_tables: bool,
drop_tables: bool,
):
"""
Make observations for OONI measurements and write them into clickhouse or a CSV file
"""
if create_tables:
if not clickhouse:
click.echo("--clickhouse needs to be specified when creating tables")
return 1
if drop_tables:
click.confirm(
"Are you sure you want to drop the tables before creation?", abort=True
)

with ClickhouseConnection(clickhouse) as db:
for query, table_name in create_queries:
if drop_tables:
db.execute(f"DROP TABLE IF EXISTS {table_name};")
db.execute(query)

NetinfoDB(datadir=data_dir, download=True)

start_observation_maker(
probe_cc=probe_cc,
test_name=test_name,
start_day=start_day,
end_day=end_day,
clickhouse=clickhouse,
data_dir=data_dir,
parallelism=parallelism,
fast_fail=fast_fail,
)


@cli.command()
@probe_cc_option
@test_name_option
@start_day_option
@end_day_option
@click.option("--clickhouse", type=str, required=True)
@click.option(
"--data-dir",
type=Path,
required=True,
help="data directory to store fingerprint and geoip databases",
)
@click.option(
"--parallelism",
type=int,
default=multiprocessing.cpu_count(),
help="number of processes to use. Only works when writing to a database",
)
@click.option(
"--fast-fail",
is_flag=True,
help="should we fail immediately when we encounter an error?",
)
@click.option(
"--create-tables",
is_flag=True,
help="should we attempt to create the required clickhouse tables",
)
@click.option(
"--rebuild-ground-truths",
is_flag=True,
help="should we force the rebuilding of ground truths",
)
def mker(
probe_cc: List[str],
test_name: List[str],
start_day: date,
end_day: date,
clickhouse: str,
data_dir: Path,
parallelism: int,
fast_fail: bool,
create_tables: bool,
rebuild_ground_truths: bool,
):
if create_tables:
with ClickhouseConnection(clickhouse) as db:
for query, table_name in create_queries:
click.echo(f"Running create query for {table_name}")
db.execute(query)
raise Exception("Run this via the analysis command")


@cli.command()
@probe_cc_option
@test_name_option
@start_day_option
@end_day_option
@click.option("--clickhouse", type=str, required=True)
@click.option(
"--data-dir",
type=Path,
required=True,
help="data directory to store fingerprint and geoip databases",
)
@click.option(
"--parallelism",
type=int,
default=multiprocessing.cpu_count() + 2,
help="number of processes to use. Only works when writing to a database",
)
@click.option(
"--fast-fail",
is_flag=True,
help="should we fail immediately when we encounter an error?",
)
@click.option(
"--create-tables",
is_flag=True,
help="should we attempt to create the required clickhouse tables",
)
@click.option(
"--rebuild-ground-truths",
is_flag=True,
help="should we force the rebuilding of ground truths",
)
def mkanalysis(
probe_cc: List[str],
test_name: List[str],
start_day: date,
end_day: date,
clickhouse: str,
data_dir: Path,
parallelism: int,
fast_fail: bool,
create_tables: bool,
rebuild_ground_truths: bool,
):
if create_tables:
with ClickhouseConnection(clickhouse) as db:
for query, table_name in create_queries:
click.echo(f"Running create query for {table_name}")
db.execute(query)

start_analysis(
probe_cc=probe_cc,
test_name=test_name,
start_day=start_day,
end_day=end_day,
clickhouse=clickhouse,
data_dir=data_dir,
parallelism=parallelism,
fast_fail=fast_fail,
rebuild_ground_truths=rebuild_ground_truths,
)


@cli.command()
@start_day_option
@end_day_option
@click.option("--clickhouse", type=str, required=True)
@click.option(
"--data-dir",
type=Path,
required=True,
help="data directory to store fingerprint and geoip databases",
)
@click.option(
"--parallelism",
type=int,
default=multiprocessing.cpu_count() + 2,
help="number of processes to use. Only works when writing to a database",
)
def mkgt(
start_day: date,
end_day: date,
clickhouse: str,
data_dir: Path,
parallelism: int,
):
start_ground_truth_builder(
start_day=start_day,
end_day=end_day,
clickhouse=clickhouse,
data_dir=data_dir,
parallelism=parallelism,
)


@cli.command()
@probe_cc_option
@test_name_option
@start_day_option
@end_day_option
@click.option("--clickhouse", type=str)
@click.option("--data-dir", type=Path, required=True)
@click.option("--archives-dir", type=Path, required=True)
@click.option(
"--parallelism",
type=int,
default=multiprocessing.cpu_count() + 2,
help="number of processes to use. Only works when writing to a database",
)
def mkbodies(
probe_cc: List[str],
test_name: List[str],
start_day: date,
end_day: date,
clickhouse: str,
data_dir: Path,
archives_dir: Path,
parallelism: int,
):
"""
Make response body archives
"""
start_response_archiver(
probe_cc=probe_cc,
test_name=test_name,
start_day=start_day,
end_day=end_day,
data_dir=data_dir,
archives_dir=archives_dir,
clickhouse=clickhouse,
parallelism=parallelism,
)


@cli.command()
@click.option(
"--data-dir",
type=Path,
required=True,
help="data directory to store fingerprint and geoip databases",
)
@click.option("--archives-dir", type=Path, required=True)
@click.option(
"--parallelism",
type=int,
default=multiprocessing.cpu_count() + 2,
help="number of processes to use",
)
def fphunt(data_dir: Path, archives_dir: Path, parallelism: int):
click.echo("🏹 starting the hunt for blockpage fingerprints!")
start_fingerprint_hunter(
archives_dir=archives_dir,
data_dir=data_dir,
parallelism=parallelism,
)


@cli.command()
@click.option("--clickhouse", type=str)
@click.option(
Expand Down
Empty file.
31 changes: 31 additions & 0 deletions oonidata/datapipeline/run_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import asyncio

import concurrent.futures

from temporalio.client import Client
from temporalio.worker import Worker

from .workflows.observations import ObservationsWorkflow
from .workflows.observations import make_observation_in_day


async def async_main():
client = await Client.connect("localhost:7233")
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as activity_executor:
worker = Worker(
client,
task_queue="oonidatapipeline-task-queue",
workflows=[ObservationsWorkflow],
activities=[make_observation_in_day],
activity_executor=activity_executor,
)

await worker.run()


def main():
asyncio.run(async_main())


if __name__ == "__main__":
main()
Empty file.