In [9]:
# Library-Specific RAG Data Collection System
# Comprehensive data collection from Documentation, GitHub, and StackOverflow

import requests
from bs4 import BeautifulSoup
import json
import time
from pathlib import Path
from typing import List, Dict, Any
import re
from urllib.parse import urljoin, urlparse
import pandas as pd
from datetime import datetime


In [8]:
# %pip install requests
# %pip install beautifulsoup4
# %pip install bs4
%pip install pandas

Collecting pandas
  Downloading pandas-2.3.3-cp312-cp312-win_amd64.whl.metadata (19 kB)
Collecting numpy>=1.26.0 (from pandas)
  Downloading numpy-2.3.5-cp312-cp312-win_amd64.whl.metadata (60 kB)
Collecting pytz>=2020.1 (from pandas)
  Downloading pytz-2025.2-py2.py3-none-any.whl.metadata (22 kB)
Collecting tzdata>=2022.7 (from pandas)
  Downloading tzdata-2025.2-py2.py3-none-any.whl.metadata (1.4 kB)
Downloading pandas-2.3.3-cp312-cp312-win_amd64.whl (11.0 MB)
   ---------------------------------------- 0.0/11.0 MB ? eta -:--:--
   ---------------------------------------- 0.0/11.0 MB ? eta -:--:--
    --------------------------------------- 0.3/11.0 MB ? eta -:--:--
   -- ------------------------------------- 0.8/11.0 MB 3.3 MB/s eta 0:00:04
   -- ------------------------------------- 0.8/11.0 MB 3.3 MB/s eta 0:00:04
   ---- ----------------------------------- 1.3/11.0 MB 1.6 MB/s eta 0:00:07
   ----- ---------------------------------- 1.6/11.0 MB 1.7 MB/s eta 0:00:06
   --------- ---

In [10]:

# ============================================================================
# CELL 1: Base Configuration and Utilities
# ============================================================================

class DataCollectionConfig:
    """Configuration for data collection"""
    
    def __init__(self, library_name: str):
        self.library_name = library_name
        self.base_dir = Path(f"rag_data/{library_name}")
        self.base_dir.mkdir(parents=True, exist_ok=True)
        
        # Output directories
        self.docs_dir = self.base_dir / "documentation"
        self.github_dir = self.base_dir / "github"
        self.stackoverflow_dir = self.base_dir / "stackoverflow"
        self.combined_dir = self.base_dir / "combined"
        
        for d in [self.docs_dir, self.github_dir, self.stackoverflow_dir, self.combined_dir]:
            d.mkdir(exist_ok=True)
    
    def get_output_path(self, source: str, filename: str) -> Path:
        """Get output path for a specific source"""
        source_map = {
            'docs': self.docs_dir,
            'github': self.github_dir,
            'stackoverflow': self.stackoverflow_dir,
            'combined': self.combined_dir
        }
        return source_map[source] / filename



In [11]:

# ============================================================================
# CELL 2: Documentation Scraper
# ============================================================================

class DocumentationScraper:
    """Scrape library documentation from ReadTheDocs or similar sites"""
    
    def __init__(self, config: DataCollectionConfig):
        self.config = config
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        })
        self.visited_urls = set()
        self.collected_data = []
    
    def scrape_page(self, url: str, max_depth: int = 3, current_depth: int = 0) -> List[Dict]:
        """Recursively scrape documentation pages"""
        if current_depth > max_depth or url in self.visited_urls:
            return []
        
        self.visited_urls.add(url)
        print(f"Scraping: {url} (depth: {current_depth})")
        
        try:
            response = self.session.get(url, timeout=10)
            response.raise_for_status()
            soup = BeautifulSoup(response.text, 'html.parser')
            
            # Extract main content
            content_section = soup.find('div', {'role': 'main'}) or soup.find('article') or soup.find('main')
            
            if content_section:
                # Extract text content
                text_content = content_section.get_text(separator='\n', strip=True)
                
                # Extract code blocks
                code_blocks = []
                for code_tag in content_section.find_all(['code', 'pre']):
                    code_text = code_tag.get_text(strip=True)
                    if len(code_text) > 20:  # Filter out inline code
                        code_blocks.append(code_text)
                
                # Extract headers for structure
                headers = [h.get_text(strip=True) for h in content_section.find_all(['h1', 'h2', 'h3', 'h4'])]
                
                page_data = {
                    'id': f"doc_{len(self.collected_data)}",
                    'type': 'documentation',
                    'library': self.config.library_name,
                    'source': 'documentation',
                    'source_url': url,
                    'title': soup.find('title').get_text(strip=True) if soup.find('title') else '',
                    'headers': headers,
                    'content': text_content[:5000],  # Limit content length
                    'code_blocks': code_blocks,
                    'collected_at': datetime.now().isoformat()
                }
                
                self.collected_data.append(page_data)
                
                # Find and follow internal links
                if current_depth < max_depth:
                    for link in content_section.find_all('a', href=True):
                        next_url = urljoin(url, link['href'])
                        # Only follow links within the same domain
                        if urlparse(next_url).netloc == urlparse(url).netloc:
                            if next_url not in self.visited_urls:
                                time.sleep(0.5)  # Rate limiting
                                self.scrape_page(next_url, max_depth, current_depth + 1)
            
        except Exception as e:
            print(f"Error scraping {url}: {e}")
        
        return self.collected_data
    
    def save_data(self):
        """Save collected documentation data"""
        output_file = self.config.get_output_path('docs', f'{self.config.library_name}_docs.json')
        with open(output_file, 'w', encoding='utf-8') as f:
            json.dump(self.collected_data, f, indent=2, ensure_ascii=False)
        print(f"Saved {len(self.collected_data)} documentation pages to {output_file}")
        return output_file



In [12]:

# ============================================================================
# CELL 3: GitHub Repository Scraper
# ============================================================================

class GitHubScraper:
    """Scrape code examples and documentation from GitHub repositories"""
    
    def __init__(self, config: DataCollectionConfig, github_token: str = None):
        self.config = config
        self.github_token = github_token
        self.session = requests.Session()
        
        headers = {'User-Agent': 'RAG-Data-Collector'}
        if github_token:
            headers['Authorization'] = f'token {github_token}'
        self.session.headers.update(headers)
        
        self.collected_data = []
    
    def get_repo_contents(self, owner: str, repo: str, path: str = '') -> List[Dict]:
        """Get repository contents recursively"""
        url = f"https://api.github.com/repos/{owner}/{repo}/contents/{path}"
        
        try:
            response = self.session.get(url)
            response.raise_for_status()
            contents = response.json()
            
            if not isinstance(contents, list):
                contents = [contents]
            
            return contents
        except Exception as e:
            print(f"Error fetching repo contents: {e}")
            return []
    
    def scrape_repository(self, repo_url: str, include_patterns: List[str] = None):
        """Scrape code examples from a GitHub repository"""
        # Parse repo URL
        parts = repo_url.rstrip('/').split('/')
        owner, repo = parts[-2], parts[-1]
        
        print(f"Scraping GitHub repo: {owner}/{repo}")
        
        if include_patterns is None:
            include_patterns = ['*.py', '*.md', 'examples/*', 'docs/*']
        
        def process_contents(contents, current_path=''):
            for item in contents:
                if item['type'] == 'file':
                    # Check if file matches include patterns
                    if any(self._match_pattern(item['name'], pattern) for pattern in include_patterns):
                        self._process_file(owner, repo, item, current_path)
                elif item['type'] == 'dir':
                    # Recursively process directories
                    subcontents = self.get_repo_contents(owner, repo, item['path'])
                    time.sleep(0.3)  # Rate limiting
                    process_contents(subcontents, item['path'])
        
        # Start processing from root
        root_contents = self.get_repo_contents(owner, repo)
        process_contents(root_contents)
    
    def _match_pattern(self, filename: str, pattern: str) -> bool:
        """Match filename against pattern"""
        if '*' in pattern:
            import fnmatch
            return fnmatch.fnmatch(filename, pattern.split('/')[-1])
        return pattern in filename
    
    def _process_file(self, owner: str, repo: str, file_item: Dict, current_path: str):
        """Process individual file from repository"""
        try:
            # Download file content
            response = self.session.get(file_item['download_url'])
            response.raise_for_status()
            content = response.text
            
            file_data = {
                'id': f"github_{len(self.collected_data)}",
                'type': 'github_code',
                'library': self.config.library_name,
                'source': 'github',
                'source_url': file_item['html_url'],
                'repo': f"{owner}/{repo}",
                'file_path': file_item['path'],
                'file_name': file_item['name'],
                'content': content[:10000],  # Limit size
                'language': self._detect_language(file_item['name']),
                'collected_at': datetime.now().isoformat()
            }
            
            self.collected_data.append(file_data)
            print(f"  Collected: {file_item['path']}")
            
        except Exception as e:
            print(f"  Error processing {file_item['path']}: {e}")
    
    def _detect_language(self, filename: str) -> str:
        """Detect programming language from filename"""
        ext_map = {
            '.py': 'python',
            '.js': 'javascript',
            '.java': 'java',
            '.cpp': 'cpp',
            '.c': 'c',
            '.md': 'markdown',
            '.rst': 'restructuredtext'
        }
        ext = Path(filename).suffix.lower()
        return ext_map.get(ext, 'unknown')
    
    def save_data(self):
        """Save collected GitHub data"""
        output_file = self.config.get_output_path('github', f'{self.config.library_name}_github.json')
        with open(output_file, 'w', encoding='utf-8') as f:
            json.dump(self.collected_data, f, indent=2, ensure_ascii=False)
        print(f"Saved {len(self.collected_data)} GitHub files to {output_file}")
        return output_file



In [13]:

# ============================================================================
# CELL 4: StackOverflow Scraper
# ============================================================================

class StackOverflowScraper:
    """Scrape code examples and answers from StackOverflow"""
    
    def __init__(self, config: DataCollectionConfig, api_key: str = None):
        self.config = config
        self.api_key = api_key
        self.base_url = "https://api.stackexchange.com/2.3"
        self.collected_data = []
    
    def search_questions(self, tag: str, max_results: int = 100):
        """Search for questions with specific tag"""
        print(f"Searching StackOverflow for tag: {tag}")
        
        params = {
            'order': 'desc',
            'sort': 'votes',
            'tagged': tag,
            'site': 'stackoverflow',
            'pagesize': min(max_results, 100),
            'filter': 'withbody'  # Include question body
        }
        
        if self.api_key:
            params['key'] = self.api_key
        
        try:
            response = requests.get(f"{self.base_url}/questions", params=params)
            response.raise_for_status()
            data = response.json()
            
            questions = data.get('items', [])
            print(f"Found {len(questions)} questions")
            
            for question in questions:
                self._process_question(question)
                time.sleep(0.5)  # Rate limiting
            
        except Exception as e:
            print(f"Error searching StackOverflow: {e}")
    
    def _process_question(self, question: Dict):
        """Process individual question and its answers"""
        question_id = question['question_id']
        
        # Get answers for this question
        params = {
            'order': 'desc',
            'sort': 'votes',
            'site': 'stackoverflow',
            'filter': 'withbody'
        }
        
        if self.api_key:
            params['key'] = self.api_key
        
        try:
            response = requests.get(
                f"{self.base_url}/questions/{question_id}/answers",
                params=params
            )
            response.raise_for_status()
            answers = response.json().get('items', [])
            
            # Extract code from question
            question_code = self._extract_code(question.get('body', ''))
            
            # Extract code from answers
            answer_codes = []
            for answer in answers:
                if answer.get('is_accepted') or answer.get('score', 0) > 0:
                    code = self._extract_code(answer.get('body', ''))
                    if code:
                        answer_codes.extend(code)
            
            if question_code or answer_codes:
                item_data = {
                    'id': f"so_{question_id}",
                    'type': 'stackoverflow',
                    'library': self.config.library_name,
                    'source': 'stackoverflow',
                    'source_url': question['link'],
                    'question_id': question_id,
                    'title': question['title'],
                    'score': question.get('score', 0),
                    'tags': question.get('tags', []),
                    'question_code': question_code,
                    'answer_codes': answer_codes,
                    'view_count': question.get('view_count', 0),
                    'collected_at': datetime.now().isoformat()
                }
                
                self.collected_data.append(item_data)
                print(f"  Collected Q{question_id}: {question['title'][:50]}...")
        
        except Exception as e:
            print(f"  Error processing question {question_id}: {e}")
    
    def _extract_code(self, html_content: str) -> List[str]:
        """Extract code blocks from HTML content"""
        soup = BeautifulSoup(html_content, 'html.parser')
        code_blocks = []
        
        for code_tag in soup.find_all('code'):
            code_text = code_tag.get_text(strip=True)
            if len(code_text) > 20:  # Filter out inline code
                code_blocks.append(code_text)
        
        return code_blocks
    
    def save_data(self):
        """Save collected StackOverflow data"""
        output_file = self.config.get_output_path('stackoverflow', f'{self.config.library_name}_stackoverflow.json')
        with open(output_file, 'w', encoding='utf-8') as f:
            json.dump(self.collected_data, f, indent=2, ensure_ascii=False)
        print(f"Saved {len(self.collected_data)} StackOverflow items to {output_file}")
        return output_file



In [14]:

# ============================================================================
# CELL 5: Data Combiner and Processor
# ============================================================================

class DataCombiner:
    """Combine and process data from all sources"""
    
    def __init__(self, config: DataCollectionConfig):
        self.config = config
    
    def combine_all_sources(self) -> pd.DataFrame:
        """Combine data from all sources into a single dataset"""
        all_data = []
        
        # Load documentation data
        docs_file = self.config.get_output_path('docs', f'{self.config.library_name}_docs.json')
        if docs_file.exists():
            with open(docs_file, 'r', encoding='utf-8') as f:
                all_data.extend(json.load(f))
        
        # Load GitHub data
        github_file = self.config.get_output_path('github', f'{self.config.library_name}_github.json')
        if github_file.exists():
            with open(github_file, 'r', encoding='utf-8') as f:
                all_data.extend(json.load(f))
        
        # Load StackOverflow data
        so_file = self.config.get_output_path('stackoverflow', f'{self.config.library_name}_stackoverflow.json')
        if so_file.exists():
            with open(so_file, 'r', encoding='utf-8') as f:
                all_data.extend(json.load(f))
        
        # Create DataFrame
        df = pd.DataFrame(all_data)
        
        # Save combined data
        combined_json = self.config.get_output_path('combined', f'{self.config.library_name}_combined.json')
        with open(combined_json, 'w', encoding='utf-8') as f:
            json.dump(all_data, f, indent=2, ensure_ascii=False)
        
        combined_csv = self.config.get_output_path('combined', f'{self.config.library_name}_combined.csv')
        df.to_csv(combined_csv, index=False, encoding='utf-8')
        
        print(f"\nCombined Data Summary:")
        print(f"Total items: {len(df)}")
        print(f"By source: {df['source'].value_counts().to_dict()}")
        print(f"Saved to: {combined_json}")
        
        return df









In [24]:
# ============================================================================
# CELL 6: PyWinAuto Specific Collection
# ============================================================================

def collect_pywinauto_data(github_token: str = None, stackoverflow_key: str = None):
    """
    Collect comprehensive data for pywinauto library
    
    Args:
        github_token: GitHub API token (optional but recommended)
        stackoverflow_key: StackOverflow API key (optional)
    """
    print("=" * 80)
    print("COLLECTING DATA FOR: pywinauto")
    print("=" * 80)
    
    # Initialize configuration
    config = DataCollectionConfig('pywinauto')
    
    # # 1. Scrape Documentation
    # print("\n[1/4] Scraping Documentation...")
    # doc_scraper = DocumentationScraper(config)
    # doc_scraper.scrape_page('https://pywinauto.readthedocs.io/en/latest/contents.html', max_depth=2)
    # doc_scraper.save_data()
    
    # # 2. Scrape GitHub Repository
    # print("\n[2/4] Scraping GitHub Repository...")
    # github_scraper = GitHubScraper(config, github_token)
    # github_scraper.scrape_repository(
    #     'https://github.com/pywinauto/pywinauto',
    #     include_patterns=['*.py', '*.md', 'examples/*.py', 'docs/*.rst']
    # )
    # github_scraper.save_data()
    
    # 3. Scrape StackOverflow
    print("\n[3/4] Scraping StackOverflow...")
    so_scraper = StackOverflowScraper(config, stackoverflow_key)
    so_scraper.search_questions('pywinauto', max_results=100)
    so_scraper.save_data()
    
    # 4. Combine all data
    print("\n[4/4] Combining all sources...")
    combiner = DataCombiner(config)
    combined_df = combiner.combine_all_sources()
    
    print("\n" + "=" * 80)
    print("DATA COLLECTION COMPLETE!")
    print("=" * 80)
    
    return combined_df


In [25]:
# ============================================================================
# CELL 7: Generic Library Collection Function
# ============================================================================

def collect_library_data(
    library_name: str,
    docs_url: str,
    github_repo_url: str,
    stackoverflow_tag: str,
    github_token: str = None,
    stackoverflow_key: str = None,
    max_so_results: int = 100
):
    """
    Generic function to collect data for any library
    
    Args:
        library_name: Name of the library
        docs_url: URL to documentation
        github_repo_url: GitHub repository URL
        stackoverflow_tag: StackOverflow tag to search
        github_token: GitHub API token (optional)
        stackoverflow_key: StackOverflow API key (optional)
        max_so_results: Maximum StackOverflow results to collect
    """
    print("=" * 80)
    print(f"COLLECTING DATA FOR: {library_name}")
    print("=" * 80)
    
    config = DataCollectionConfig(library_name)
    
    # # Documentation
    # print("\n[1/4] Scraping Documentation...")
    # doc_scraper = DocumentationScraper(config)
    # doc_scraper.scrape_page(docs_url, max_depth=2)
    # doc_scraper.save_data()
    
    # # GitHub
    # print("\n[2/4] Scraping GitHub Repository...")
    # github_scraper = GitHubScraper(config, github_token)
    # github_scraper.scrape_repository(github_repo_url)
    # github_scraper.save_data()
    
    # StackOverflow
    print("\n[3/4] Scraping StackOverflow...")
    so_scraper = StackOverflowScraper(config, stackoverflow_key)
    so_scraper.search_questions(stackoverflow_tag, max_results=max_so_results)
    so_scraper.save_data()
    
    # Combine
    print("\n[4/4] Combining all sources...")
    combiner = DataCombiner(config)
    combined_df = combiner.combine_all_sources()
    
    print("\n" + "=" * 80)
    print("DATA COLLECTION COMPLETE!")
    print("=" * 80)
    
    return combined_df


In [None]:

# ============================================================================
# CELL 8: Usage Examples
# ============================================================================

# Example 1: Collect PyWinAuto data
import os


if __name__ == "__main__":
    # PyWinAuto
    pywinauto_df = collect_pywinauto_data(
        github_token=os.environ.get("GITHUB_TOKEN"),  # Optional but recommended
        stackoverflow_key=os.environ.get("STACKOVWER_TOKEN")  # Optional
    )
    
    # Example 2: Collect data for another library (e.g., selenium)
    # selenium_df = collect_library_data(
    #     library_name='selenium',
    #     docs_url='https://selenium-python.readthedocs.io/',
    #     github_repo_url='https://github.com/SeleniumHQ/selenium',
    #     stackoverflow_tag='selenium',
    #     github_token='your_token',
    #     stackoverflow_key='your_key'
    # )
    

COLLECTING DATA FOR: pywinauto

[3/4] Scraping StackOverflow...
Searching StackOverflow for tag: pywinauto
Found 100 questions
  Collected Q57523762: pytest - Windows fatal exception: code 0x8001010d...
  Collected Q55547940: How to get a list of the name of every open window...
  Collected Q65459632: Cannot import pywinauto on Windows 10...
  Collected Q39794729: Pywinauto: unable to bring window to foreground...
  Collected Q39021888: Using PyWinAuto to control a currently running app...
  Collected Q32846550: Python - Control window with pywinauto while the w...
  Collected Q24606219: How to perform Click action on Button or Text fiel...
  Collected Q40265705: Press key with pywinauto...
  Collected Q31367425: pywinauto wait for window to appear and send key-p...
  Collected Q58976354: Pywinauto - There are 2 elements that match criter...
  Collected Q8008490: How can i find available dialogs ,controls of an a...
  Collected Q51284268: WindowsContext: OleInitialize() failed: &quot;C