# Agentic Pipeline Testing

Now that we have the individual pieces working, let's put it all together into a full Airflow pipeline.

In [1]:
import sys 
import subprocess

# get root of current repo and add to our path
root_dir = subprocess.check_output(["git", "rev-parse", "--show-toplevel"], stderr=subprocess.DEVNULL).decode("utf-8").strip()

sys.path.append(root_dir)

## Extraction 

Content will be scraped from available public media RSS feeds. This job will be designed to run every night at 5PM (provided the server and scheduler are running) and will write outputs to the `agentic-de/bronze` data directory.

The code blocks defined here will be consolidated into a single Airflow task in our Agentic Pipeline

In [2]:
from airflow.dags.utils.helpers import generate_npr_feed_urls

# get RSS feeds from public media sources
npr_rss_feeds = generate_npr_feed_urls()
pbs_rss_feeds = [
    "https://www.pbs.org/newshour/feeds/rss/headlines",
    "https://www.pbs.org/newshour/feeds/rss/politics",
    "https://www.pbs.org/newshour/feeds/rss/brooks-and-capehart"
]

# combine 
rss_feeds_to_crawl = npr_rss_feeds + pbs_rss_feeds

# status update
print(f"Preparing to request {len(rss_feeds_to_crawl)} RSS feeds")

Preparing to request 232 RSS feeds


In [3]:
from airflow.dags.utils.helpers import request_rss_feed
import tqdm

raw_feed_data = []
for url in tqdm.tqdm(rss_feeds_to_crawl[:1], desc="Requesting RSS feeds", unit="feed"):
    try:
        feed_data = request_rss_feed(url)
        if feed_data:
            raw_feed_data.append(feed_data)
    except Exception as e:
        print(f"Error requesting {url}: {e}")

Requesting RSS feeds: 100%|██████████| 1/1 [00:01<00:00,  1.24s/feed]


In [5]:
raw_feed_data[0]

{'rss_url': 'https://www.npr.org/rss/rss.php?id=1126',
 'xml_doc': '<?xml version="1.0" encoding="UTF-8"?>\n<rss xmlns:npr="https://www.npr.org/rss/" xmlns:nprml="https://api.npr.org/nprml" xmlns:itunes="http://www.itunes.com/dtds/podcast-1.0.dtd" xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:media="http://search.yahoo.com/mrss/" version="2.0">\n  <channel>\n    <title>NPR Topics: Africa</title>\n    <link>https://www.npr.org/templates/story/story.php?storyId=1126</link>\n    <description>Africa</description>\n    <language>en</language>\n    <copyright>Copyright 2024 NPR - For Personal Use Only</copyright>\n    <generator>Story API Shim 1.2.24</generator>\n    <lastBuildDate>Thu, 12 Jun 2025 09:51:38 -0400</lastBuildDate>\n    <image>\n      <url>https://media.npr.org/images/podcasts/primary/npr_generic_image_300.jpg?s=200</url>\n      <title>NPR Topics: Africa</title>\n      <link>https://www.npr.org/sections/africa/</link>

In [None]:
from airflow.dags.utils.aws import S3 
import os 

# sample key for testing 
sample_key = "bronze/05222025/https%3A%2F%2Fwww.npr.org%2Frss%2Frss.php%3Fid%3D10_05222025.xml"

# S3.upload_raw_rss_data(raw_feed_data[0], role_arn=os.getenv("DIGI_INNO_ROLE_ARN"))
metadata = S3.get_file_metadata(key=sample_key, chars_to_sample=500, role_arn=os.getenv("DIGI_INNO_ROLE_ARN"))

## Transformation 

Here's where we'll embed our Agent! It will help us make an intelligent decision about which transformation pipeline a given file should be sent to. 

In [14]:
from airflow.dags.utils.agents.supervisor_agent import SupervisorAgent

SupervisorAgent.trigger(metadata=metadata)



[1m> Entering new AgentExecutor chain...[0m
[[34m2025-06-12T10:04:23.803-0400[0m] {[34m_client.py:[0m1026} INFO[0m - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"[0m
[32;1m[1;3mThe file content is an RSS feed from NPR, as indicated by the URL and the content preview. Therefore, the appropriate DAG to trigger would be the one related to NPR data.

Action: TriggerTransformationDAG
Action Input: transform_npr_data[0m
Observation: [36;1m[1;3m{'status_code': 200, 'response': {'dag_run_id': 'manual__2025-06-12T14:04:23.805999+00:00', 'dag_id': 'transform_npr_data', 'logical_date': '2025-06-12T14:04:23.805999Z', 'queued_at': '2025-06-12T14:04:24.015518Z', 'start_date': None, 'end_date': None, 'data_interval_start': '2025-06-12T14:04:23.805999Z', 'data_interval_end': '2025-06-12T14:04:23.805999Z', 'run_after': '2025-06-12T14:04:23.805999Z', 'last_scheduling_decision': None, 'run_type': 'manual', 'state': 'queued', 'triggered_by': 'rest_api', 'co

## Loading 

 This is the final step and is rather trivial. We could dump this into a database or right back into S3! If time allows, making note to come back and finish this out