In [5]:
from zenml import pipeline, step
import os
import requests
from dotenv import load_dotenv
from youtube_transcript_api import YouTubeTranscriptApi
from bs4 import BeautifulSoup
from pymongo import MongoClient
import re
from zenml.logger import get_logger

logger = get_logger(__name__)

# Load environment variables
env_path = os.path.abspath(os.path.join(os.getcwd(), '../dotenv.env'))
print(f"Looking for .env at: {env_path}")
loaded = load_dotenv(env_path)
print(f"Load successful: {loaded}")


Looking for .env at: /app/dotenv.env
Load successful: True


In [4]:
!pip install playwright
!playwright install

[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.0.1[0m[39;49m -> [0m[32;49m25.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
╔══════════════════════════════════════════════════════╗
║ Host system is missing dependencies to run browsers. ║
║ Please install them with the following command:      ║
║                                                      ║
║     playwright install-deps                          ║
║                                                      ║
║ Alternatively, use apt:                              ║
║     apt-get install libglib2.0-0\                    ║
║         libnss3\                                     ║
║         libnspr4\                                    ║
║         libdbus-1-3\                                 ║
║         libatk1.0-0\                                 ║
║         libatk-bridge2.0-0\                          ║
║ 

In [6]:
@step
def search_youtube_videos(queries: list, max_results_per_query: int = 10) -> list:
    YOUTUBE_API_KEY = os.getenv('YOUTUBE_API_KEY')
    if not YOUTUBE_API_KEY:
        raise ValueError("YOUTUBE_API_KEY not found in environment.")

    video_ids = []
    for query in queries:
        params = {
            'part': 'snippet',
            'type': 'video',
            'maxResults': max_results_per_query,
            'q': query,
            'key': YOUTUBE_API_KEY
        }
        resp = requests.get('https://www.googleapis.com/youtube/v3/search', params=params)
        if resp.status_code == 200:
            results = resp.json().get('items', [])
            for item in results:
                video_id = item['id']['videoId']
                video_ids.append(video_id)
        else:
            print(f"Failed YouTube search for {query}: {resp.text}")

    return video_ids

@step
def extract_youtube_data(video_ids: list) -> list:
    data = []
    for vid in video_ids:
        try:
            transcript = YouTubeTranscriptApi.get_transcript(vid)
            full_text = " ".join([t["text"] for t in transcript])
            data.append({
                'url': f"https://www.youtube.com/watch?v={vid}",
                'path': None,
                'content': full_text,
                'source': 'youtube',
                'repository': None,
                'branch': None
            })
        except Exception as e:
            print(f"Error fetching transcript for video {vid}: {e}")
            logger.error(f"Error fetching transcript for video {vid}: {e}")
            continue
    return data

In [7]:
from urllib.parse import urlparse, urljoin
@step
def discover_web_pages(root_urls: list, max_pages_per_root: int = 10) -> list:
    # Web crawling
    all_urls = []

    for root_url in root_urls:
        try:
            logger.info(f"Fetching root URL: {root_url}")
            response = requests.get(root_url)
            if response.status_code != 200:
                logger.warning(f"Failed to fetch {root_url}: {response.status_code}")
                continue

            soup = BeautifulSoup(response.text, 'html.parser')
            links = soup.find_all('a', href=True)
            parsed_root = urlparse(root_url)
            base_domain = parsed_root.netloc

            extracted_urls = []
            for link in links:
                href = link['href']
                full_url = urljoin(root_url, href) 
                parsed_url = urlparse(full_url)

          
                if base_domain in parsed_url.netloc and 'text/html' in requests.head(full_url).headers.get('Content-Type', ''):
                    extracted_urls.append(full_url)

            # Deduplicate and limit results
            unique_urls = list(dict.fromkeys(extracted_urls))
            limited_urls = unique_urls[:max_pages_per_root]
            all_urls.extend(limited_urls)
            # print(limited_urls)

        except Exception as e:
            logger.error(f"Error discovering web pages from {root_url}: {e}")
            continue

    return all_urls

@step
def extract_web_data(urls: list) -> list:
    data = []

    for url in urls:
        try:
            logger.info(f"Scraping URL: {url}")
            response = requests.get(url)
            if response.status_code != 200:
                logger.warning(f"Failed to fetch {url}: {response.status_code}")
                continue

            soup = BeautifulSoup(response.text, 'html.parser')

            for script in soup(["script", "style"]):
                script.decompose()

            # Extract and clean text content
            text = soup.get_text(separator=' ')
            text = re.sub(r'\s+', ' ', text).strip()

            data.append({
                'url': url,
                'path': None,
                'content': text,
                'source': 'web',
                'repository': None,
                'branch': None
            })

        except Exception as e:
            logger.error(f"Error scraping {url}: {e}")
            continue

    return data

In [8]:
@step
def extract_github_data() -> list:
    import time
    logger.info("Starting data extraction from GitHub...")

    GITHUB_TOKEN = os.getenv('GITHUB_TOKEN')
    headers = {
        'Authorization': f'token {GITHUB_TOKEN}',
        'Accept': 'application/vnd.github.v3+json' 
    }

    # List of repositories to scrape
    repositories = [
        {'owner': 'ros2', 'repo': 'ros2_documentation', 'branch': 'rolling'},
        {'owner': 'ros2', 'repo': 'examples', 'branch': 'rolling'},
        {'owner': 'ros2', 'repo': 'demos', 'branch': 'rolling'},
        {'owner': 'ros2', 'repo': 'rclpy', 'branch': 'rolling'},
        {'owner': 'ros2', 'repo': 'rclcpp', 'branch': 'rolling'},
        {'owner': 'ros-navigation', 'repo': 'docs.nav2.org', 'branch': 'master'},
        {'owner': 'ros-navigation', 'repo': 'navigation2', 'branch': 'main'},
        {'owner': 'moveit', 'repo': 'moveit2', 'branch': 'main'},
        {'owner': 'gazebosim', 'repo': 'gz-doc', 'branch': 'master'}
    ]

    data = []

    for repo in repositories:
        owner = repo['owner']
        repo_name = repo['repo']
        branch = repo['branch']
        logger.info(f"Fetching files from repository: {owner}/{repo_name}")

        tree_url = f'https://api.github.com/repos/{owner}/{repo_name}/git/trees/{branch}?recursive=1'
        response = requests.get(tree_url, headers=headers)
        if response.status_code != 200:
            logger.error(f"Failed to fetch tree for {owner}/{repo_name}: {response.text}")
            continue

        tree = response.json().get('tree', [])
        file_urls = []
        for item in tree:
            if item['type'] == 'blob' and item['path'].endswith(('.md', '.rst', '.py')):
                raw_url = f'https://raw.githubusercontent.com/{owner}/{repo_name}/{branch}/{item["path"]}'
                file_urls.append({'url': raw_url, 'path': item['path']})

        logger.info(f"Found {len(file_urls)} files to download in {owner}/{repo_name}")

        # Fetch and store file contents
        for file_info in file_urls:
            file_url = file_info['url']
            file_path = file_info['path']
            try:
                file_response = requests.get(file_url, headers=headers)
                if file_response.status_code == 200:
                    content = file_response.text
                    data.append({
                        'url': file_url,
                        'path': file_path,
                        'content': content,
                        'source': 'github',
                        'repository': f'{owner}/{repo_name}',
                        'branch': branch
                    })
                    logger.debug(f"Fetched {file_url}")
                else:
                    logger.warning(f"Failed to fetch {file_url}: {file_response.status_code}")
            except Exception as e:
                logger.error(f"Error fetching {file_url}: {e}")

    logger.info(f"Extracted {len(data)} files from GitHub.")
    return data


In [9]:
@step
def transform_data(data: list) -> list:
    logger.info("Starting data transformation...")

    transformed_data = []

    def clean_text(text):
        if text.endswith('.py'):
            # For Python files, keep the content as is
            return text.strip()
        text = re.sub(r'!\[.*?\]\(.*?\)', '', text)  # Remove images
        text = re.sub(r'\[.*?\]\(.*?\)', '', text)    # Remove links
        text = re.sub(r'#.*', '', text)  # Remove headings
        # Remove excessive whitespace and newlines
        text = re.sub(r'\n{3,}', '\n\n', text)
        text = re.sub(r'\s+', ' ', text)
        text = text.strip()
        return text

    for item in data:
        content = clean_text(item['content'])
        if content:
            transformed_item = {
                'url': item['url'],
                'path': item['path'],
                'repository': item['repository'],
                'branch': item['branch'],
                'content': content,
                'source': item['source']
            }
            transformed_data.append(transformed_item)
            logger.debug(f"Transformed data from {item['url']}")
        else:
            logger.warning(f"No content after cleaning for {item['url']}")

    logger.info(f"Transformed {len(transformed_data)} items.")
    return transformed_data


In [10]:
@step
def load_data(transformed_data: list):
    logger.info("Starting data loading into MongoDB...")

    client = MongoClient('mongodb://rag_mongodb:27017/')
    db = client['rag_db']
    collection = db['raw_data']

    # Insert data into MongoDB
    if transformed_data:
        collection.insert_many(transformed_data)
        logger.info(f"Inserted {len(transformed_data)} documents into MongoDB")
    else:
        logger.warning("No data to insert into MongoDB")


In [None]:
@pipeline
def data_collection_pipeline():
    # GitHub data
    github_data = extract_github_data()
    transformed_github_data = transform_data(github_data)
    load_data(transformed_github_data)

    # YouTube data
    youtube_search_queries = ["ROS2"," ROS2 navigation", "ROS2 MoveIt2", "ROS2 Gazebo simulation"]
    video_ids = search_youtube_videos(youtube_search_queries, max_results_per_query=5)
    youtube_data = extract_youtube_data(video_ids)
    transformed_youtube_data = transform_data(youtube_data)
    load_data(transformed_youtube_data)

    # Web data
    root_urls = [
        "https://docs.nav2.org/",
        "https://moveit.picknik.ai/main/",
        "https://gazebosim.org/docs/all/tutorials/",
        "https://docs.ros.org/en/foxy/Tutorials/"
    ]
    discovered_urls = discover_web_pages(root_urls, max_pages_per_root=100)
    web_data = extract_web_data(discovered_urls)
    transformed_web_data = transform_data(web_data)
    load_data(transformed_web_data)

pipeline_instance = data_collection_pipeline()

In [None]:
from pymongo import MongoClient

client = MongoClient('mongodb://rag_mongodb:27017/')
db = client['rag_db']
collection = db['raw_data']

doc_count = collection.count_documents({})
print(f"Total documents in raw_data collection: {doc_count}")

urls = collection.distinct('url')
print("URLs ingested:")
for url in urls:
    print(url)