diff --git a/docs.json b/docs.json index ecbe42a..5ec0ec1 100644 --- a/docs.json +++ b/docs.json @@ -11,6 +11,11 @@ "interaction": { "drilldown": false }, + "integrations": { + "posthog": { + "apiKey": "phc_neTVVD8S3p76S25BXYXYf7TadgWK6ri4pg8sLqb4TVhJ" + } + }, "navigation": { "tabs": [ { diff --git a/guides/workflows/multi-language.mdx b/guides/workflows/multi-language.mdx index f5c2e5e..5d5cc79 100644 --- a/guides/workflows/multi-language.mdx +++ b/guides/workflows/multi-language.mdx @@ -39,7 +39,12 @@ class ScheduleImageCapture(Task): def execute(self, context: ExecutionContext) -> None: # Here you can implement your task logic, submit subtasks, etc. - print(f"Image captured for {self.location} with {self.resolution_m}m resolution and bands {self.spectral_bands}") + context.logger.info( + "Image captured", + location=self.location, + resolution_m=self.resolution_m, + spectral_bands=self.spectral_bands, + ) @staticmethod def identifier() -> tuple[str, str]: @@ -197,10 +202,10 @@ Submit a job to the Go server. curl http://localhost:8080/submit?lat=40.75&lon=-73.98&resolution=30&bands[]=489.0,560.6,666.5 ``` -Check the Python runner output, it should print the following line: +Check the Python runner logs for the following entry: -```plaintext Output -Image captured for [40.75, -73.98] with 30m resolution and bands [489, 560.6, 666.5] +```plaintext Logs +Image captured location=[40.75, -73.98] resolution_m=30 spectral_bands=[489, 560.6, 666.5] ``` ## Next Steps diff --git a/workflows/caches.mdx b/workflows/caches.mdx index 883e670..787515a 100644 --- a/workflows/caches.mdx +++ b/workflows/caches.mdx @@ -162,13 +162,13 @@ The following snippet illustrates storing and retrieving data from the cache. class ConsumerTask(Task): def execute(self, context: ExecutionContext) -> None: data = context.job_cache["data"] - print(f"Read {data} from cache") + context.logger.info("Read data from cache", data=data.decode()) ``` In this example, data stored under the key `"data"` can be any size that fits the cache backend constraints. Ensure the key remains unique within the job's scope to avoid conflicts. -To test the workflow, you can start a local task runner using the `InMemoryCache` backend. Then, submit a job to execute the `ProducerTask` and observe the output of the `ConsumerTask`. +To test the workflow, you can start a local task runner using the `InMemoryCache` backend. Then, submit a job to execute the `ProducerTask` and inspect the logs emitted by the `ConsumerTask`. ```python Python @@ -185,8 +185,8 @@ To test the workflow, you can start a local task runner using the `InMemoryCache ``` -```plaintext Output -Read b'my_binary_data_to_store' from cache +```plaintext Logs +Read data from cache data=my_binary_data_to_store ``` ## Groups and Hierarchical Keys @@ -244,7 +244,7 @@ class PrintSum(Task): number = group[key] # read data from cache numbers.append(int(number.decode())) # convert bytes back to int - print(f"Sum of all numbers: {sum(numbers)}") + context.logger.info("Computed sum of all numbers", total=sum(numbers)) ``` @@ -265,6 +265,6 @@ Submitting a job of the `CacheGroupDemo` and running it with a task runner can b ``` -```plaintext Output -Sum of all numbers: 284 +```plaintext Logs +Computed sum of all numbers total=284 ``` diff --git a/workflows/concepts/jobs.mdx b/workflows/concepts/jobs.mdx index 4b337f3..a7df186 100644 --- a/workflows/concepts/jobs.mdx +++ b/workflows/concepts/jobs.mdx @@ -414,10 +414,16 @@ class PrintMovieStats(Task): def execute(self, context: ExecutionContext) -> None: params = {"t": self.title, "apikey": ""} url = "http://www.omdbapi.com/?" + urlencode(params) - response = httpx.get(url).json() + with context.tracer.span("fetch-movie-stats") as span: + span.set_attribute("movie.title", self.title) + response = httpx.get(url).json() # set the display name of the task to the title of the movie: context.current_task.display = response["Title"] - print(f"{response['Title']} was released on {response['Released']}") + context.logger.info( + "Movie release date fetched", + title=response["Title"], + released=response["Released"], + ) ``` ```go Go package movie @@ -521,11 +527,11 @@ job, err := client.Jobs.Submit(ctx, "movies-stats", One of the `PrintMovieStats` tasks fails with a `KeyError`. This error occurs when a movie title is not found by the [OMDb API](http://www.omdbapi.com/), leading to a response without the `Title` and `Released` fields. -Console output from the task runners confirms this: +Task logs from the runners confirm this: -```plaintext Output -The Matrix was released on 31 Mar 1999 -Shrek 2 was released on 19 May 2004 +```plaintext Logs +Movie release date fetched title="The Matrix" released="31 Mar 1999" +Movie release date fetched title="Shrek 2" released="19 May 2004" ERROR: Task PrintMovieStats failed with exception: KeyError('Title') ``` @@ -539,13 +545,19 @@ class PrintMovieStats(Task): def execute(self, context: ExecutionContext) -> None: params = {"t": self.title, "apikey": ""} url = "http://www.omdbapi.com/?" + urlencode(params) - response = httpx.get(url).json() + with context.tracer.span("fetch-movie-stats") as span: + span.set_attribute("movie.title", self.title) + response = httpx.get(url).json() if "Title" in response and "Released" in response: context.current_task.display = response["Title"] - print(f"{response['Title']} was released on {response['Released']}") + context.logger.info( + "Movie release date fetched", + title=response["Title"], + released=response["Released"], + ) else: context.current_task.display = f"NotFound: {self.title}" - print(f"Could not find the release date for {self.title}") + context.logger.info("Movie release date not found", title=self.title) ``` ```go Go type PrintMovieStats struct { @@ -607,15 +619,15 @@ _, err := client.Jobs.Retry(ctx, job.ID) Job retried successfully -Now the console output shows: +Now the task logs show: -```plaintext Output -Could not find the release date for Tilebox - The Movie -The Avengers was released on 04 May 2012 +```plaintext Logs +Movie release date not found title="Tilebox - The Movie" +Movie release date fetched title="The Avengers" released="04 May 2012" ``` -The output confirms that only two tasks were executed, resuming from the point of failure instead of re-executing all tasks. +The logs confirm that only two tasks were executed, resuming from the point of failure instead of re-executing all tasks. The job was retried and succeeded. The two tasks that completed before the failure were not re-executed. diff --git a/workflows/concepts/tasks.mdx b/workflows/concepts/tasks.mdx index 456a6a2..7c4eb6c 100644 --- a/workflows/concepts/tasks.mdx +++ b/workflows/concepts/tasks.mdx @@ -42,7 +42,7 @@ For python, the key components of this task are: The `execute` method is the entry point for executing the task. This is where the task's logic is defined. It's invoked by a [task runner](/workflows/concepts/task-runners) when the task runs and performs the task's operation. - The `context` argument is an `ExecutionContext` instance that provides access to an [API for submitting new tasks](/api-reference/python/tilebox.workflows/ExecutionContext.submit_subtask) as part of the same job and features like [shared caching](/api-reference/python/tilebox.workflows/ExecutionContext.job_cache). + The `context` argument is an `ExecutionContext` instance that provides access to an [API for submitting new tasks](/api-reference/python/tilebox.workflows/ExecutionContext.submit_subtask) as part of the same job, [task logging](/api-reference/python/tilebox.workflows/ExecutionContext.logger), [custom tracing](/api-reference/python/tilebox.workflows/ExecutionContext.tracer), and features like [shared caching](/api-reference/python/tilebox.workflows/ExecutionContext.job_cache). @@ -129,7 +129,7 @@ class ChildTask(Task): index: int def execute(self, context: ExecutionContext) -> None: - print(f"Executing ChildTask {self.index}") + context.logger.info("Executing child task", index=self.index) # after submitting this task, a task runner may pick it up and execute it # which will result in 5 ChildTasks being submitted and executed as well @@ -425,7 +425,7 @@ class Sum(Task): # The reduce step for key in squares: result += int(squares[key].decode()) - print("Sum of squares is:", result) + context.logger.info("Computed sum of squares", result=result) ``` @@ -449,8 +449,8 @@ jobs.display(job) ``` -```plaintext Output -Sum of squares is: 336448 +```plaintext Logs +Computed sum of squares result=336448 ``` @@ -479,7 +479,7 @@ For example, the `RecursiveTask` below is a valid task that submits smaller inst num: int def execute(self, context: ExecutionContext) -> None: - print(f"Executing RecursiveTask with num={self.num}") + context.logger.info("Executing recursive task", num=self.num) # if num < 2, we reached the base case and stop recursion if self.num >= 2: context.submit_subtask(RecursiveTask(self.num // 2)) @@ -613,7 +613,7 @@ class RootTask(Task): class FlakyTask(Task): def execute(self, context: ExecutionContext) -> None: - print(f"Executing FlakyTask") + context.logger.info("Executing flaky task") if random.random() < 0.1: raise Exception("FlakyTask failed randomly") @@ -683,7 +683,7 @@ class PrintTask(Task): message: str def execute(self, context: ExecutionContext) -> None: - print(self.message) + context.logger.info("Print task executed", message=self.message) ``` ```go Go type RootTask struct{} @@ -761,23 +761,35 @@ A practical example is a workflow that fetches news articles from an API and pro def execute(self, context: ExecutionContext) -> None: url = f"https://newsapi.org/v2/top-headlines?category={self.category}&pageSize={self.max_articles}&country=us&apiKey=API_KEY" - news = httpx.get(url).json() + with context.tracer.span("fetch-news") as span: + span.set_attribute("category", self.category) + span.set_attribute("max_articles", self.max_articles) + news = httpx.get(url).json() # check out our documentation page on caches to learn # about a better way of passing data between tasks Path("news.json").write_text(json.dumps(news)) + context.logger.info( + "Fetched news articles", + category=self.category, + article_count=len(news["articles"]), + ) class PrintHeadlines(Task): def execute(self, context: ExecutionContext) -> None: news = json.loads(Path("news.json").read_text()) for article in news["articles"]: - print(f"{article['publishedAt'][:10]}: {article['title']}") + context.logger.info( + "News headline", + published_at=article["publishedAt"][:10], + title=article["title"], + ) class MostFrequentAuthors(Task): def execute(self, context: ExecutionContext) -> None: news = json.loads(Path("news.json").read_text()) authors = [article["author"] for article in news["articles"]] for author, count in Counter(authors).most_common(): - print(f"Author {author} has written {count} articles") + context.logger.info("Author article count", author=author, count=count) # now submit a job, and then visualize it job = job_client.submit("process-news", @@ -935,17 +947,17 @@ job, err := client.Jobs.Submit(ctx, "process-news", ``` -```plaintext Output -2024-02-15: NASA selects ultraviolet astronomy mission but delays its launch two years - SpaceNews -2024-02-15: SpaceX launches Space Force mission from Cape Canaveral - Orlando Sentinel -2024-02-14: Saturn's largest moon most likely uninhabitable - Phys.org -2024-02-14: AI Unveils Mysteries of Unknown Proteins' Functions - Neuroscience News -2024-02-14: Anthropologists' research unveils early stone plaza in the Andes - Phys.org -Author Jeff Foust has written 1 articles -Author Richard Tribou has written 1 articles -Author Jeff Renaud has written 1 articles -Author Neuroscience News has written 1 articles -Author Science X has written 1 articles +```plaintext Logs +News headline published_at=2024-02-15 title="NASA selects ultraviolet astronomy mission but delays its launch two years - SpaceNews" +News headline published_at=2024-02-15 title="SpaceX launches Space Force mission from Cape Canaveral - Orlando Sentinel" +News headline published_at=2024-02-14 title="Saturn's largest moon most likely uninhabitable - Phys.org" +News headline published_at=2024-02-14 title="AI Unveils Mysteries of Unknown Proteins' Functions - Neuroscience News" +News headline published_at=2024-02-14 title="Anthropologists' research unveils early stone plaza in the Andes - Phys.org" +Author article count author="Jeff Foust" count=1 +Author article count author="Richard Tribou" count=1 +Author article count author="Jeff Renaud" count=1 +Author article count author="Neuroscience News" count=1 +Author article count author="Science X" count=1 ``` @@ -963,8 +975,8 @@ This workflow consists of four tasks: | ------------------- | ------------ | -------------------------------------------------------------------------------------------------------------------------- | | NewsWorkflow | - | The root task of the workflow. It spawns the other tasks and sets up the dependencies between them. | | FetchNews | - | A task that fetches news articles from the API and writes the results to a file, which is then read by dependent tasks. | -| PrintHeadlines | FetchNews | A task that prints the headlines of the news articles to the console. | -| MostFrequentAuthors | FetchNews | A task that counts the number of articles each author has written and prints the result to the console. | +| PrintHeadlines | FetchNews | A task that logs the headlines of the news articles. | +| MostFrequentAuthors | FetchNews | A task that counts the number of articles each author has written and logs the result. | An important aspect is that there is no dependency between the `PrintHeadlines` and `MostFrequentAuthors` tasks. This means they can execute in parallel, which the Tilebox Workflow Orchestrator will do, provided multiple task runners are available. @@ -1017,17 +1029,17 @@ class RootTask(Task): class RequiredTask(Task): def execute(self, context: ExecutionContext) -> None: # this task may fail, but the job will continue regardless - print("This task is required to succeed, otherwise the job would stop") + context.logger.info("Required task completed") class FlakyTask(Task): def execute(self, context: ExecutionContext) -> None: # this task may fail, but the job will continue regardless - print("Attempting flaky operation...") + context.logger.info("Attempting flaky operation") class FinalTask(Task): def execute(self, context: ExecutionContext) -> None: # this task runs even if FlakyTask failed - print("Running final step") + context.logger.info("Running final step") ``` ```go Go type RootTask struct{} @@ -1111,7 +1123,7 @@ class Step1(Task): class Step1A(Task): def execute(self, context: ExecutionContext) -> None: - print("Step1A executed successfully") + context.logger.info("Step1A executed successfully") class Step1B(Task): def execute(self, context: ExecutionContext) -> None: @@ -1119,15 +1131,15 @@ class Step1B(Task): class Step1C(Task): def execute(self, context: ExecutionContext) -> None: - print("This will be skipped because Step1B failed") + context.logger.info("This will be skipped because Step1B failed") class Step2(Task): def execute(self, context: ExecutionContext) -> None: - print("This will be skipped because Step1B failed") + context.logger.info("This will be skipped because Step1B failed") class AlwaysRuns(Task): def execute(self, context: ExecutionContext) -> None: - print("This runs regardless") + context.logger.info("This runs regardless") ``` ```go Go type Pipeline struct{} diff --git a/workflows/near-real-time/cron.mdx b/workflows/near-real-time/cron.mdx index 6bf9922..cc5c8fc 100644 --- a/workflows/near-real-time/cron.mdx +++ b/workflows/near-real-time/cron.mdx @@ -21,11 +21,14 @@ class MyCronTask(CronTask): message: str def execute(self, context: ExecutionContext) -> None: - print(f"Hello {self.message} from a Cron Task!") # self.trigger is an attribute of the CronTask class, # which contains information about the trigger event # that caused this task to be submitted as part of a job - print(f"This task was triggered at {self.trigger.time}") + context.logger.info( + "Cron task triggered", + message=self.message, + trigger_time=self.trigger.time, + ) ``` ## Registering a Cron trigger @@ -67,19 +70,14 @@ runner = client.runner(tasks=[MyCronTask]) runner.run_all() ``` -If this task runner runs continuously, its output may resemble the following: - -```plaintext Output -Hello World from a Cron Task! -This task was triggered at 2023-09-25 16:12:00 -Hello World from a Cron Task! -This task was triggered at 2023-09-25 17:12:00 -Hello World from a Cron Task! -This task was triggered at 2023-09-25 18:12:00 -Hello World from a Cron Task! -This task was triggered at 2023-09-25 18:45:00 -Hello World from a Cron Task! -This task was triggered at 2023-09-25 19:12:00 +If this task runner runs continuously, its logs may resemble the following: + +```plaintext Logs +Cron task triggered message=World trigger_time=2023-09-25 16:12:00 +Cron task triggered message=World trigger_time=2023-09-25 17:12:00 +Cron task triggered message=World trigger_time=2023-09-25 18:12:00 +Cron task triggered message=World trigger_time=2023-09-25 18:45:00 +Cron task triggered message=World trigger_time=2023-09-25 19:12:00 ``` ## Inspecting in the Console diff --git a/workflows/near-real-time/storage-events.mdx b/workflows/near-real-time/storage-events.mdx index ebce79b..dbc182f 100644 --- a/workflows/near-real-time/storage-events.mdx +++ b/workflows/near-real-time/storage-events.mdx @@ -16,9 +16,6 @@ To create a Storage Event task, use `tilebox.workflows.automations.StorageEventT ```python Python from tilebox.workflows import ExecutionContext from tilebox.workflows.automations import StorageEventTask, StorageEventType -from tilebox.workflows.observability.logging import get_logger - -logger = get_logger() class LogObjectCreation(StorageEventTask): head_bytes: int @@ -28,14 +25,14 @@ class LogObjectCreation(StorageEventTask): # which contains information about the storage event that triggered this task if self.trigger.type == StorageEventType.CREATED: path = self.trigger.location - logger.info(f"A new object was created: {path}") + context.logger.info("A new object was created", path=path) # trigger.storage is a storage client for interacting with the storage # location that triggered the event # using it, we can read the object to get its content as a bytes object data = self.trigger.storage.read(path) - logger.info(f"The object's file size is {len(data)} bytes") - logger.info(f"The object's first {self.head_bytes} bytes are: {data[:self.head_bytes]}") + context.logger.info("Read object data", path=path, size_bytes=len(data)) + context.logger.info("Object preview", path=path, preview=data[:self.head_bytes].hex()) ``` ## Storage Locations