A simplified illustration of a data processing pipeline to use in documentation.

The pipeline here represents a hypothetical web scraping process using dummy data.

In [None]:
import time
from typing import Union
import matplotlib.pyplot as plt
from IPython.display import display, clear_output
from petritype.plotting.simple_graphviz import SimpleGraphvizVisualization
from pydantic import BaseModel

from petritype.core.executable_graph_components import (
    ListPlaceNode, FunctionTransitionNode, ArgumentEdgeToTransition, ReturnedEdgeFromTransition,
    ExecutableGraphOperations
)
from petritype.core.rustworkx_graph import RustworkxGraph
from petritype.plotting.simple_graphviz import SimpleGraphvizVisualization
from IPython.display import display, clear_output

In [None]:
class ScrapeParameters(BaseModel):
    url: str

    def __str__(self):
        return f"[{self.url}]"

class ScrapedData(BaseModel):
    parameters: ScrapeParameters
    data: str

    def __str__(self):
        return f"[{self.parameters.url}]"

class SuccessfulScrape(BaseModel):
    parameters: ScrapeParameters
    result: ScrapedData

    def __str__(self):
        return f"[{self.parameters.url}]"


class UncertainScrapeResult(BaseModel):
    parameters: ScrapeParameters
    result: ScrapedData

    def __str__(self):
        return f"[{self.parameters.url}]"


class FailedScrape(BaseModel):
    parameters: ScrapeParameters
    result: ScrapedData
    error: str

    def __str__(self):
        return f"[{self.parameters.url}]"

In [None]:
def simulated_scrape_attempt(parameters: ScrapeParameters) -> ScrapedData:
    if parameters.url.startswith("wrong/page"):
        return ScrapedData(parameters=parameters, data="Encountered a wrong error")
    elif parameters.url.startswith("difficult/page"):
        return ScrapedData(parameters=parameters, data="Maybe difficult valid data")
    else:
        return ScrapedData(parameters=parameters, data="Valid data")


def classify_scrape_data(data: ScrapedData) -> Union[SuccessfulScrape, UncertainScrapeResult]:
    if ("difficult" in data.data.lower()) or ("wrong" in data.data.lower()):
        return UncertainScrapeResult(parameters=data.parameters, result=data)
    return SuccessfulScrape(parameters=data.parameters, result=data)


def handle_special_cases(data: UncertainScrapeResult) -> Union[SuccessfulScrape, FailedScrape]:
    if "maybe" in data.result.data.lower():
        return SuccessfulScrape(parameters=data.parameters, result=data.result)
    else:
        return FailedScrape(
            parameters=data.parameters, result=data.result, error="Definite error encountered"
        )

In [None]:
initial_scrape_parameters = [
    ScrapeParameters(url="valid/page_01"),
    ScrapeParameters(url="wrong/page_01"),
    ScrapeParameters(url="difficult/page_01"),
    ScrapeParameters(url="valid/page_02"),
]

In [None]:
executable_graph_nodes_and_edges = [
    ListPlaceNode( # Entry point for the scraping process.
        name="Input Parameters",
        type=ScrapeParameters,
        tokens=initial_scrape_parameters
    ),

    ArgumentEdgeToTransition(
        place_node_name="Input Parameters",
        transition_node_name="Scrape",
        argument="parameters",
    ),
    FunctionTransitionNode(
        name="Scrape",
        function=simulated_scrape_attempt,
    ),
    ReturnedEdgeFromTransition("Scrape", "Response"),

    ListPlaceNode("Response", type=ScrapedData),

    ArgumentEdgeToTransition("Response", "Classify", "data"),
    FunctionTransitionNode("Classify", classify_scrape_data),
    ReturnedEdgeFromTransition("Classify", "Uncertain"),
    ReturnedEdgeFromTransition("Classify", "Successes"),

    ListPlaceNode("Uncertain", type=UncertainScrapeResult),

    ArgumentEdgeToTransition("Uncertain", "Special Cases", "data"),
    FunctionTransitionNode("Special Cases", handle_special_cases),
    ReturnedEdgeFromTransition("Special Cases", "Successes"),
    ReturnedEdgeFromTransition("Special Cases", "Failures"),

    ListPlaceNode("Failures", type=FailedScrape),

    ListPlaceNode("Successes", type=SuccessfulScrape),
]

In [None]:
executable_graph = ExecutableGraphOperations.construct_graph(executable_graph_nodes_and_edges)
executable_pydigraph = RustworkxGraph.from_executable_graph(executable_graph)
display(SimpleGraphvizVisualization.graph(executable_pydigraph))

In [None]:
import os
os.makedirs("images/docs/illustrations/readme_example", exist_ok=True)

async for step, diagram, transitions_fired in SimpleGraphvizVisualization.animate_execution_generator(
    executable_graph=executable_graph,
    executable_pydigraph=executable_pydigraph,
):
    clear_output(wait=True)
    print(f"Step {step}")
    display(diagram)
    print(f"Transitions fired: {transitions_fired}")
    if not transitions_fired:
        print("No more transitions to fire. Execution complete.")
        break

    # Save current diagram to PNG
    out_path = f"images/docs/illustrations/readme_example/step_{step:03d}.png"
    try:
        # Check if it's a PIL Image
        if hasattr(diagram, 'save') and hasattr(diagram, 'mode'):
            # It's a PIL Image object
            diagram.save(out_path, format="PNG")
        # Check if it's a graphviz object with source attribute
        elif hasattr(diagram, 'source'):
            # It's likely a graphviz.Source or Digraph object
            png_bytes = diagram.pipe(format="png")
            with open(out_path, "wb") as f:
                f.write(png_bytes)
        elif hasattr(diagram, "pipe"):
            # Direct pipe method
            png_bytes = diagram.pipe(format="png")
            with open(out_path, "wb") as f:
                f.write(png_bytes)
        elif hasattr(diagram, "_repr_svg_"):
            # SVG representation - convert to PNG using cairosvg or save as SVG
            svg_data = diagram._repr_svg_()
            try:
                import cairosvg
                png_bytes = cairosvg.svg2png(bytestring=svg_data.encode('utf-8'))
                with open(out_path, "wb") as f:
                    f.write(png_bytes)
            except ImportError:
                # Fallback: save as SVG
                svg_path = out_path.replace('.png', '.svg')
                with open(svg_path, "w") as f:
                    f.write(svg_data)
                print(f"Saved as SVG (cairosvg not available): {svg_path}")
        else:
            print(f"Warning: diagram type {type(diagram)} not recognized at step {step}")
    except Exception as e:
        print(f"Error saving diagram at step {step}: {e}")
        import traceback
        traceback.print_exc()

    time.sleep(1.0)

In [None]:
import glob
from PIL import Image

png_files = sorted(glob.glob("images/docs/illustrations/readme_example/step_*.png"))
images = [Image.open(p) for p in png_files]

# Save as GIF
gif_path = "images/docs/illustrations/readme_example/animation.gif"
images[0].save(
    gif_path,
    save_all=True,
    append_images=images[1:],
    duration=1600,  # milliseconds per frame (doubled to slow playback)
    loop=0
)
print("Saved GIF to", gif_path)