# Adding Durability with Temporal

In this section, we'll solve this problem by making your application durable. You'll learn how to build GenAI applications that survive failures, recover automatically, and never lose progress.

In this section:
- Introduce Temporal
- Primary Temporal Abstractions: Activities and Workflows
- Demonstrate the benefits of durable execution
- Event-driven Architectures

### Notebook Setup

Run the following code blocks to install various packages and tools necessary to run this notebook

**Be sure to add your .env file again. It doesn't persist across notebooks or sessions**

### Make Sure Your `.env` File Exists

In [None]:
# Check if .env file exists at the root of `exercises` directory, if not create one:
import os   
env_path = "../.env" 

if not os.path.exists(env_path):   
  with open(env_path, "w") as fh:
    fh.write("LLM_API_KEY = YOUR_API_KEY\nLLM_MODEL = openai/gpt-4o")

# If you just created a new .env file, open the .env file at the root of `exercises` and replace YOUR_API_KEY with your API key.

In [None]:
# Load environment variables and configure LLM settings

import os
from dotenv import load_dotenv

load_dotenv(override=True)

# Get LLM_API_KEY environment variable and print it to make sure that your .env file is properly loaded.
LLM_MODEL = os.getenv("LLM_MODEL", "openai/gpt-4o")
LLM_API_KEY = os.getenv("LLM_API_KEY", None)
print("LLM API Key", LLM_API_KEY)
# If your LLM_API_Key is empty, go to the .env file at the root of your directory 
# and replace YOUR_API_KEY with your API key.

### Package Our Inputs & Outputs for Ease of Management

For ease of use, evolution of parameters, and type checking, Temporal recommends passing and returning a single object from functions. `dataclass` is the recommended structure here, but anything serializable will work.

_Read more about inputs and outputs in [this chapter](https://temporal.talentlms.com/unit/view/id:2822) of our free Temporal 102 course._

In [None]:
# TODO: Run this code block to load it into the program
from dataclasses import dataclass

@dataclass
class LLMCallInput:
  prompt: str

@dataclass
class PDFGenerationInput:
  content: str
  filename: str = "research_pdf.pdf"

### Let's Create Activities
- A function that is prone and/or non-deterministic
- Temporal requires all non-deterministic code to be run in an Activity
- Activities give you automatic retries, timeout handling, detailed visibility 
dutomatic checkpoints

In [None]:
# Step 1: Add your import statement at the top: `from temporalio import activity`
# Step 2: To turn a function into an Activity, add the `@activity.defn` decorator 
# on top of the `llm_call` and `create_pdf` functions. Run the codeblock below to see how.
# Step 3: Now run the code to load it into the program

# TODO Add your import statement at the top: `from temporalio import activity`
from reportlab.lib.pagesizes import letter
from litellm import completion, ModelResponse
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle

# TODO Add the Acticity decorator to make this function a Temporal Activity
def llm_call(input: LLMCallInput) -> ModelResponse:
    response = completion(
      model=LLM_MODEL,
      api_key=LLM_API_KEY,
      messages=[{ "content": input.prompt,"role": "user"}]
    )
    return response

# TODO Add the Acticity decorator to make this function a Temporal Activity
def create_pdf(input: PDFGenerationInput) -> str:
    print("Creating PDF document...")

    doc = SimpleDocTemplate(input.filename, pagesize=letter)
    styles = getSampleStyleSheet()
    title_style = ParagraphStyle(
        'CustomTitle',
        parent=styles['Heading1'],
        fontSize=24,
        spaceAfter=30,
        alignment=1
    )

    story = []
    title = Paragraph("Research Report", title_style)
    story.append(title)
    story.append(Spacer(1, 20))
    paragraphs = input.content.split('\n\n')
    for para in paragraphs:
        if para.strip():
          p = Paragraph(para.strip(), styles['Normal'])
          story.append(p)
          story.append(Spacer(1, 12))

    doc.build(story)

    print(f"SUCCESS! PDF created: {input.filename}")
    return input.filename

In [None]:
# Optional: Run this cell to display the solution, which you can copy and paste over the code block above.
from pathlib import Path
from IPython.display import display, Markdown
import os

notebook_dir = Path(os.getcwd())
solution_file = notebook_dir / "Solutions_02_Adding_Durability_with_Temporal" / "create_activity_solution.py"

code = solution_file.read_text()

print("Solution loaded:")
display(Markdown(f"```python\n{code}\n```"))

## Let's Orchestrate a Workflow

- Activities are orchestrated within a Temporal Workflow
- Workflows contain the decision-making flow but Activities perform the actual work
- Each Activity is recorded in the History with inputs and outputs
- Workflows can wait for Activity completion, handle failures, make decisions based on results

### More Input/Output Packaging

Just like with Activities, Temporal recommends passing a single object to the Workflow for input and returning a single object.

In [None]:
# TODO: Run this code block to load it into the program
from dataclasses import dataclass

@dataclass
class GenerateReportInput:
    prompt: str

@dataclass
class GenerateReportOutput:
    result: str

In [None]:
# Step 1: Add your `@workflow.run` decorator
# Step 2:Notice how the Workflow calls the `llm_call` Activity. 
# Step 3: Follow the pattern to call the `create_pdf`.
# Step 4: Pass in your pdf_generation_input
# Step 5: A Start-to-Close timeout is the maximum amount of time a single Activity Execution can take. We recommend always setting this timeout.
# Set a Start-to-Close timeout of 10 seconds for the `create_pdf`.
# Step 6: Run this code block to load it into the program
from datetime import timedelta
from temporalio import workflow

# sandboxed=False is a Notebook only requirement. You normally don't do this
@workflow.defn(sandboxed=False)
class GenerateReportWorkflow:

    # TODO: Add your `@workflow.run` decorator here
    async def run(self, input: GenerateReportInput) -> GenerateReportOutput:

        llm_call_input = LLMCallInput(prompt=input.prompt)

        research_facts = await workflow.execute_activity(
            llm_call,
            llm_call_input,
            start_to_close_timeout=timedelta(seconds=30), # maximum amount of time a single Activity Execution can take.
        )

        workflow.logger.info("Research complete!")

        pdf_generation_input = PDFGenerationInput(content=research_facts["choices"][0]["message"]["content"])

        pdf_filename = await workflow.execute_activity(
            # TODO: Call the create_pdf here
            # TODO: Pass in your pdf_generation_input
            # TODO: Set the Start-to-Close timeout of 10 seconds here
        )

        return GenerateReportOutput(result=f"Successfully created research report PDF: {pdf_filename}")

In [None]:
# Optional: Run this cell to display the solution, which you can copy and paste over the code block above.
from pathlib import Path
from IPython.display import display, Markdown
import os

notebook_dir = Path(os.getcwd())
solution_file = notebook_dir / "Solutions_02_Adding_Durability_with_Temporal" / "generatereportworkflow_solution.py"

code = solution_file.read_text()

print("Solution loaded:")
display(Markdown(f"```python\n{code}\n```"))

### Run the Temporal Server Now in this Exercise Environment:
1. To start the Temporal Server, run `temporal server start-dev` in your terminal.
2. Then in your `Ports` tab on the bottom of this screen, find `8233` and click on the Globe icon to open the Temporal Web UI.

### Temporal Workers
- Temporal Workflows and Activities are run in Workers
- Workers wait for tasks to do and execute them

In [None]:
# Step 1: Pass in our `create_pdf` Activity into the `activities` list to register them with the Worker
# Step 2: Set the task queue that the Worker is polling to be "research"
# Step 3: Run this codeblock to load it into the program
import concurrent.futures
from temporalio.client import Client
from temporalio.worker import Worker

async def run_worker() -> None:
    # Create client connected to server at the given address
    client = await Client.connect("localhost:7233", namespace="default")

    # Run the Worker
    with concurrent.futures.ThreadPoolExecutor(max_workers=100) as activity_executor:
        worker = Worker(
            client,
            task_queue="", # TODO Set the task queue that the Worker is polling to be "research"
            workflows=[GenerateReportWorkflow], # register the Workflow
            activities=[llm_call, __], # TODO Pass in your create_pdf to register it
            activity_executor=activity_executor
        )

        print(f"Starting the worker....")
        await worker.run()

In [None]:
# Optional: Run this cell to display the solution, which you can copy and paste over the code block above.
from pathlib import Path
from IPython.display import display, Markdown
import os

notebook_dir = Path(os.getcwd())
solution_file = notebook_dir / "Solutions_02_Adding_Durability_with_Temporal" / "worker_solution.py"

code = solution_file.read_text()

print("Solution loaded:")
display(Markdown(f"```python\n{code}\n```"))

### Starting the Worker

A Workflow can't execute if a Worker isn't running.

In [None]:
# Due to the limitation of Jupyter Notebooks and Google Collab, this is how
# you must start the worker in a Notebook environment
import asyncio

worker = asyncio.create_task(run_worker())

# If you are running this code in a typical Python environment, you can start
# the Worker by just calling `asyncio.run`
# if __name__ == "__main__":
#    asyncio.run(run_worker())

### Executing the Workflow
- Temporal Workflows are executed indirectly
- Request execution from the Temporal Service
- You do this with the Temporal Client

In [None]:
# Step 1: Set the Task Queue to be the Task Queue that your Worker is polling
# Step 2: Pass in the Workflow you are running 
# Step 3 Run this code block to load it into the program
from temporalio.client import Client
import uuid

# Create client connected to server at the given address
client = await Client.connect("localhost:7233", namespace="default")

print("Welcome to the Research Report Generator!")
prompt = input("Enter your research topic or question: ").strip()

if not prompt:
    prompt = "Give me 5 fun and fascinating facts about tardigrades. Make them interesting and educational!"
    print(f"No prompt entered. Using default: {prompt}")

# Asynchronous start of a Workflow
handle = await client.start_workflow(
    # TODO Pass in the Workflow you are running
    GenerateReportInput(prompt=prompt),
    id=f"generate-research-report-workflow-{uuid.uuid4()}", # user-defined Workflow identifier, which typically has some business meaning
    task_queue="", # TODO: Set the Task Queue to be the Task Queue that your Worker is polling
)

print(f"Started workflow. Workflow ID: {handle.id}, RunID {handle.result_run_id}")

In [None]:
# Optional: Run this cell to display the solution, which you can copy and paste over the code block above.
from pathlib import Path
from IPython.display import display, Markdown
import os

notebook_dir = Path(os.getcwd())
solution_file = notebook_dir / "Solutions_02_Adding_Durability_with_Temporal" / "client_solution.py"

code = solution_file.read_text()

print("Solution loaded:")
display(Markdown(f"```python\n{code}\n```"))

### Getting the Result

The example above uses async execution. You can `await` the handle to get the result.

In [None]:
# Get the result
result = await handle.result()
print(f"Result: {result}")

# To download the report: right click `research_pdf.pdf` in your file explore, then click `Download`.

### Instructor-Led Demo (Expand for instructor notes or to run on your own)
<!--
Durable Execution Demo:
1. The instructions will also be in the README at the `demos` level. Follow the `Setup` step first before running.
2. From the `demos/module_one_02_adding_durability` directory, run the Worker with `uv run worker.py`.
3. Run the Workflow with `uv run starter.py`.
4. When prompted, provide the prompt you want to prompt OpenAI in the command line.
5. Before the process generates a PDF, kill the Worker.
6. Rerun the Worker and show that you continue right where you left off.
7. Emphasize that you lost no progress or data. The Workflow will continue by generating the PDF (available in the same directory) and completing the process successfully.
10. Show the Workflow Execution completion in the Web UI.
-->

### Simulating Failure and Recovery

Let's practice experiencing failure and recovery firsthand. We'll add a new feature to our workflow: generating an executive summary before creating the PDF. 

This will demonstrate:
* How Activities automatically retry on failure
* How Temporal preserves state across Worker restarts
* How you can fix bugs without losing progress

### Step 1: Create a New Activity with an Intentional Error

We'll create a `generate_summary` Activity that:
1. Takes the research content and generates a concise summary
2. Contains an intentional error to simulate a real-world failure
3. Will automatically retry when it fails

Run the code below to add this Activity:

In [None]:
# Run this code to create the Activity with an intentional error
from temporalio import activity
from temporalio.exceptions import ApplicationError

@activity.defn
def generate_summary(input: LLMCallInput) -> ModelResponse:
    """Generate a concise summary of the research content"""
    
    # This simulates a temporary failure - maybe a database is down, 
    # or an API is temporarily unavailable
    raise ApplicationError(
        "Simulated failure: Summary service temporarily unavailable"
    )
    
    # This code would run if we remove the error above
    response = completion(
        model=LLM_MODEL,
        api_key=LLM_API_KEY,
        messages=[{"content": input.prompt, "role": "user"}]
    )
    return response

print("Activity created with intentional error!")

### Step 2: Update the Workflow to Call the Summary Activity

Now we'll modify our Workflow to:
1. Generate research content (existing)
2. **Generate a summary of that research (new!)**
3. Create the PDF with the summary (existing)

Run the code below:

In [None]:
# Updated Workflow with summary generation step
from datetime import timedelta
from temporalio import workflow

@workflow.defn(sandboxed=False)
class GenerateReportWorkflow:

    @workflow.run
    async def run(self, input: GenerateReportInput) -> GenerateReportOutput:

        # Step 1: Generate research content
        llm_call_input = LLMCallInput(prompt=input.prompt)

        research_facts = await workflow.execute_activity(
            llm_call,
            llm_call_input,
            start_to_close_timeout=timedelta(seconds=30),
        )

        workflow.logger.info("Research complete!")

        # Step 2: Generate a summary (NEW - this will fail initially!)
        summary_prompt = f"Provide a 2-3 sentence executive summary of this research: {research_facts['choices'][0]['message']['content']}"
        summary_input = LLMCallInput(prompt=summary_prompt)

        summary_result = await workflow.execute_activity(
            generate_summary,
            summary_input,
            start_to_close_timeout=timedelta(seconds=30),
        )

        workflow.logger.info("Summary generated!")
        
        # Step 3: Create PDF with summary prepended
        full_content = f"EXECUTIVE SUMMARY:\n{summary_result['choices'][0]['message']['content']}\n\n{research_facts['choices'][0]['message']['content']}"
        pdf_generation_input = PDFGenerationInput(content=full_content)

        pdf_filename = await workflow.execute_activity(
            create_pdf,
            pdf_generation_input,
            start_to_close_timeout=timedelta(seconds=10),
        )

        return GenerateReportOutput(result=f"Successfully created research report PDF with summary: {pdf_filename}")

print("Workflow updated!")

### Step 3: Register the New Activity with the Worker

We need to tell the Worker about our new `generate_summary` Activity:

In [None]:
# Updated Worker with the new Activity registered
import concurrent.futures
from temporalio.client import Client
from temporalio.worker import Worker

async def run_worker() -> None:
    client = await Client.connect("localhost:7233", namespace="default")

    with concurrent.futures.ThreadPoolExecutor(max_workers=100) as activity_executor:
        worker = Worker(
            client,
            task_queue="research",
            workflows=[GenerateReportWorkflow],
            activities=[llm_call, create_pdf, generate_summary],  # Added generate_summary
            activity_executor=activity_executor
        )

        print(f"Starting the worker with summary Activity registered....")
        await worker.run()

print("Worker function updated!")

In [None]:
# Kill the old worker and start the new one
import asyncio

x = worker.cancel()

worker = asyncio.create_task(run_worker())
print("New worker started!")

### Step 4: Start a New Workflow Execution

Let's start a new Workflow that will call our failing Activity:

In [None]:
# Start a new Workflow execution
from temporalio.client import Client
import uuid

client = await Client.connect("localhost:7233", namespace="default")

prompt = "Give me 3 interesting facts about dolphins"
print(f"Starting workflow with prompt: {prompt}")

handle = await client.start_workflow(
    GenerateReportWorkflow,
    GenerateReportInput(prompt=prompt),
    id=f"generate-report-with-summary-{uuid.uuid4()}",
    task_queue="research",
)

print(f"Started workflow. Workflow ID: {handle.id}")
print(f"The workflow is now running and will retry the failing Activity automatically!")

### Step 5: Observe Automatic Retries in the Web UI

**Go to your Temporal Web UI now!**

You should see:
1. Your Workflow is **Running** (not Failed!)
2. The `llm_call` Activity completed successfully âœ“
3. The `generate_summary` Activity shows a **Pending Activity** with retry attempts

**Click on the Pending Activity to see:**
- The error message. What does it say?
- What is the current retry attempt number?
- What is the countdown until the next retry?

**Key insight:** Notice that the expensive `llm_call` Activity isn't being re-executed! Temporal saved its result and won't waste money calling the LLM again. Only the failing Activity retries.

### Step 6: Fix the Error

Now let's "fix" our simulated failure by removing the error. In a real scenario, this could be:
- A database coming back online
- An API endpoint being fixed
- A network issue being resolved

Fix the code by removing or commenting out the error.

In [None]:
# Step 1: Fix the code by removing or commenting out the error.
# Step 2: Run the code block
from temporalio import activity
from litellm import completion

@activity.defn
def generate_summary(input: LLMCallInput) -> ModelResponse:
    """Generate a concise summary of the research content"""
    
    # This simulates a temporary failure - maybe a database is down, 
    # or an API is temporarily unavailable
    raise ApplicationError(
        "Simulated failure: Summary service temporarily unavailable"
    )

    # Error is now removed - the Activity will work!
    response = completion(
        model=LLM_MODEL,
        api_key=LLM_API_KEY,
        messages=[{"content": input.prompt, "role": "user"}]
    )
    return response

print("Activity fixed! Error removed.")

### Step 7: Restart the Worker with Fixed Code

Now restart the Worker so it picks up the fixed Activity code:

In [None]:
# Kill the old worker and start the new one
import asyncio

x = worker.cancel()
worker = asyncio.create_task(run_worker())
print("New worker started!")

### Step 8: Observe Successful Completion

**Refresh your Web UI and observe:**

1. The `generate_summary` Activity now completes successfully!
2. The `create_pdf` executes and creates the PDF
3. The entire Workflow shows **Completed** status

**What just happened?**
- Your Workflow **preserved all state** through the failure
- The expensive `llm_call` was **never re-executed** (saving you money!)
- When you fixed the bug, Temporal **automatically continued** from where it left off
- No manual intervention needed - just fix the code and restart the Worker

In [None]:
# Kill any worker to prepare for the exercise.
x = worker.cancel()

if x:
  print("Worker killed")
else:
  print("Worker was not running. Nothing to kill")

--

## Exercise 2 - Adding Durability

* In these exercises you will:
  * Transform your LLM calls and your execution of tools to Activities
  * Use a Temporal Workflow to orchestrate your Activities
  * Observe how Temporal handles your errors
  * Debug your error and observe your Workflow Execution successfully complete
* Go to the **Exercise** Directory and open the **02_Adding_Durability_with_Temporal** Directory
* Open _Practice_ and follow the instructions
* If you get stuck, raise your hand and someone will come by and help. You can also check the `Solution` directory for the answers

### What's Next?

This chapter introduced you to the **concept** of Durable Execution with Temporal. Further your learning with these resources:

### Resources

- Our free [Temporal 102 Course](https://learn.temporal.io/courses/temporal_102/python/) which covers these concepts (Workflows, Activities, Replay, and more) in more detail
- A Temporal [tutorial in the Python SDK](https://learn.temporal.io/getting_started/python/hello_world_in_python/) that showcases how to get started with Temporal
- Our [docs page](https://docs.temporal.io/encyclopedia/event-history/event-history-python#How-History-Replay-Provides-Durable-Execution) describing how Temporal uses Replay to provide durable execution in more detail