Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

batch_daily example #113

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions batch_daily/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Batch sample

This is an example workflow that solves the following use-case.

You have a series of records that are divided into daily batches (think a days
worth of telemetry coming from an application).
Every day you would like to run a batch to process a days worth of records, but
you would also like to have the ability to backfill the records from a previous
window of time.

Backfilling might be run as a schedule or it might be run as a directly
triggered workflow.

Please make sure your python is 3.9 above. For this sample, run:

```
poetry install --with batch_daily
```

To run, first see [README.md](../README.md) for prerequisites. Then, run the following from this directory to start the worker:

```bash
poetry run python run_worker.py
```

This will start the worker. Then, in another terminal, run the following to start the workflow:

```bash
poetry run python starter.py
```

Optionally, you can schedule the workflow with:

```bash
poetry run python create_schedule.py
```
Empty file added batch_daily/__init__.py
Empty file.
43 changes: 43 additions & 0 deletions batch_daily/activities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import asyncio
import time
import random
from typing import Any, Dict, List
from temporalio import activity

from dataclasses import dataclass


@dataclass
class ListRecordActivityInput:
record_filter: str
day: str


@dataclass
class ProcessRecordActivityInput:
uri: str


async def random_sleep():
"""
simulate a long running operation with a random sleep.
"""
sleep_s = 1 / random.randint(1, 100)
await asyncio.sleep(sleep_s)


@activity.defn
async def list_records(activity_input: ListRecordActivityInput) -> List[str]:
print(
f"filtering records on {activity_input.day} based on filter: {activity_input.record_filter}"
)
await random_sleep()
return [f"uri://record-id{idx}" for idx in range(10)]


@activity.defn
async def process_record(activity_input: ProcessRecordActivityInput) -> Dict[str, Any]:
t0 = time.monotonic()
print(f"this record is yummy: {activity_input.uri}")
await random_sleep()
return {"runtime": time.monotonic() - t0}
46 changes: 46 additions & 0 deletions batch_daily/create_schedule.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import asyncio
import traceback
from datetime import datetime, timedelta

from temporalio.client import (
Client,
Schedule,
ScheduleActionStartWorkflow,
ScheduleIntervalSpec,
ScheduleSpec,
WorkflowFailureError,
)

from batch_daily.workflows import (
RecordBatchProcessor,
RecordBatchProcessorWorkflowInput,
TASK_QUEUE_NAME,
)


async def main() -> None:
"""Main function to run temporal workflow."""
client = await Client.connect("localhost:7233")

try:
wf_input = RecordBatchProcessorWorkflowInput(record_filter="taste=yummy")
await client.create_schedule(
"daily-batch-wf-schedule",
Schedule(
action=ScheduleActionStartWorkflow(
RecordBatchProcessor.run,
wf_input,
id=f"record-filter-{wf_input.record_filter}",
task_queue=TASK_QUEUE_NAME,
),
spec=ScheduleSpec(
intervals=[ScheduleIntervalSpec(every=timedelta(hours=1))]
),
),
)
except WorkflowFailureError:
print("Got exception: ", traceback.format_exc())


if __name__ == "__main__":
asyncio.run(main())
29 changes: 29 additions & 0 deletions batch_daily/run_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import asyncio
from concurrent.futures import ThreadPoolExecutor

from temporalio.client import Client
from temporalio.worker import Worker

from batch_daily.activities import (
list_records,
process_record,
)
from batch_daily.workflows import DailyBatch, RecordBatchProcessor, TASK_QUEUE_NAME


async def main() -> None:
"""Main worker function."""
client = await Client.connect("localhost:7233")

worker: Worker = Worker(
client,
task_queue=TASK_QUEUE_NAME,
workflows=[DailyBatch, RecordBatchProcessor],
activities=[list_records, process_record],
activity_executor=ThreadPoolExecutor(100),
)
await worker.run()


if __name__ == "__main__":
asyncio.run(main())
28 changes: 28 additions & 0 deletions batch_daily/starter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import asyncio

from temporalio.client import Client

# from batch_daily.activity import
from batch_daily.workflows import DailyBatchWorkflowInput, TASK_QUEUE_NAME, DailyBatch


async def main():
client = await Client.connect(
"localhost:7233",
)

result = await client.execute_workflow(
DailyBatch.run,
DailyBatchWorkflowInput(
start_day="2024-01-01",
end_day="2024-03-01",
record_filter="taste=yummy",
),
id=f"daily_batch-workflow-id",
task_queue=TASK_QUEUE_NAME,
)
print(f"Workflow result: {result}")


if __name__ == "__main__":
asyncio.run(main())
106 changes: 106 additions & 0 deletions batch_daily/workflows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import asyncio
from datetime import datetime, timedelta

from dataclasses import dataclass
import time
from typing import Any, Dict, Optional

from temporalio import workflow
from temporalio.common import RetryPolicy
from temporalio.exceptions import ActivityError
from temporalio.common import SearchAttributeKey

with workflow.unsafe.imports_passed_through():
from batch_daily.activities import (
ListRecordActivityInput,
list_records,
ProcessRecordActivityInput,
process_record,
)

TASK_QUEUE_NAME = "MY_TASK_QUEUE"


@dataclass
class RecordBatchProcessorWorkflowInput:
record_filter: str
day: Optional[str] = None


@workflow.defn
class RecordBatchProcessor:
@workflow.run
async def run(
self, workflow_input: RecordBatchProcessorWorkflowInput
) -> Dict[str, Any]:
if workflow_input.day is None:
schedule_time = workflow.info().typed_search_attributes.get(
SearchAttributeKey.for_datetime("TemporalScheduledStartTime")
)
assert schedule_time is not None, "when not scheduled, day must be provided"
workflow_input.day = schedule_time.strftime("%Y-%m-%d")

print(f"starting RecordProcessor with {workflow_input}")

list_records_input = ListRecordActivityInput(
record_filter=workflow_input.record_filter, day=workflow_input.day
)

record_uri_list = await workflow.execute_activity(
list_records,
list_records_input,
start_to_close_timeout=timedelta(minutes=5),
)

task_list = []
async with asyncio.TaskGroup() as tg:
for key in record_uri_list:
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is no good as it runs the risk of spamming the event history log. It should probably work similarly to the java example where it uses continue_as_new to reset the event history.

process_record_input = ProcessRecordActivityInput(uri=key)
task_list.append(
tg.create_task(
workflow.execute_activity(
process_record,
process_record_input,
start_to_close_timeout=timedelta(minutes=1),
)
)
)
total_runtime = sum(map(lambda task: task.result()["runtime"], task_list))
return {"runtime": total_runtime}


@dataclass
class DailyBatchWorkflowInput:
start_day: str
end_day: str
record_filter: str


@workflow.defn
class DailyBatch:
"""DailyBatch workflow"""

@workflow.run
async def run(self, workflow_input: DailyBatchWorkflowInput) -> Dict[str, Any]:
print(f"starting DailyBatch with {workflow_input}")

start = datetime.strptime(workflow_input.start_day, "%Y-%m-%d")
end = datetime.strptime(workflow_input.end_day, "%Y-%m-%d")
task_list = []
async with asyncio.TaskGroup() as tg:
for day in [
start + timedelta(days=x) for x in range(0, (end - start).days)
]:
task_list.append(
tg.create_task(
workflow.execute_child_workflow(
RecordBatchProcessor.run,
RecordBatchProcessorWorkflowInput(
day=day.strftime("%Y-%m-%d"),
record_filter=workflow_input.record_filter,
),
)
)
)
total_runtime = sum(map(lambda task: task.result()["runtime"], task_list))
return {"runtime": total_runtime}