In [2]:
import os
print("Current working directory:", os.getcwd())

Current working directory: /workspace


In [None]:
import os
import sys
from pathlib import Path

# Change to the CryptoCorpusBuilder directory
os.chdir('/workspace/CryptoCorpusBuilder')
print(f"Changed directory to: {os.getcwd()}")

# Add the current directory to the Python path
sys.path.append(os.getcwd())

# Create test directories
base_dir = Path("/workspace/data/test_corpus")
for subdir in ["arxiv", "github", "quantopian", "fred", "bitmex"]:
    os.makedirs(base_dir / subdir, exist_ok=True)

# List what's in the sources/specific_collectors directory to see exact file names
collectors_dir = os.path.join(os.getcwd(), 'sources', 'specific_collectors')
print(f"Files in {collectors_dir}:")
if os.path.exists(collectors_dir):
    for file in os.listdir(collectors_dir):
        print(f"  {file}")
else:
    print(f"Directory {collectors_dir} does not exist")

# Now try to import
try:
    from sources.specific_collectors.arvix_collector import ArxivCollector
    print("Successfully imported ArxivCollector")
except ModuleNotFoundError as e:
    print(f"Failed to import ArxivCollector: {e}")

In [None]:
testing arvix

In [None]:
import os
import sys
from pathlib import Path
import logging

# Configure logging to see more details
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler()
    ]
)

# Change to the CryptoCorpusBuilder directory
os.chdir('/workspace/CryptoCorpusBuilder')
print(f"Working directory: {os.getcwd()}")

# Add the current directory to the Python path
sys.path.append(os.getcwd())

# Create test directory
output_dir = "/workspace/data/test_corpus/arxiv"
os.makedirs(output_dir, exist_ok=True)

# Clear existing files to start fresh
import shutil
shutil.rmtree(output_dir)
os.makedirs(output_dir, exist_ok=True)
print(f"Cleared and recreated directory: {output_dir}")

# Import the collector
from sources.specific_collectors.arvix_collector import ArxivCollector

# Initialize collector with more verbose debug logging
collector = ArxivCollector(output_dir)

# Modify the logger to show DEBUG level messages
collector.logger.setLevel(logging.DEBUG)

# Run a test with 6 papers
try:
    print("\nStarting ArxivCollector test with 6 papers...")
    
    # Try 3 papers for each search term to get 6 total
    search_terms = ["cryptocurrency trading", "bitcoin price prediction"]
    max_results_per_term = 3
    
    # Override the collect method to add more logging
    original_collect = collector.collect
    
    def collect_with_logging(*args, **kwargs):
        print(f"\nCollecting with args: {args}, kwargs: {kwargs}")
        results = original_collect(*args, **kwargs)
        print(f"Collect returned {len(results)} papers")
        return results
    
    collector.collect = collect_with_logging
    
    # Run the collection
    results = collector.collect(
        search_terms=search_terms,
        max_results=max_results_per_term
    )
    
    # Detailed results analysis
    print("\n===== ArxivCollector Results =====")
    print(f"Downloaded {len(results)} papers")
    
    if results:
        valid_papers = 0
        for i, paper in enumerate(results):
            print(f"\nPaper #{i+1}:")
            print(f"  Title: {paper.get('title')}")
            print(f"  Authors: {', '.join(paper.get('authors', []))}")
            print(f"  ArXiv ID: {paper.get('arxiv_id')}")
            print(f"  Category: {paper.get('primary_category')}")
            
            # Check PDF link before download
            pdf_link = paper.get('pdf_link')
            print(f"  PDF Link: {pdf_link}")
            
            # Check if file exists
            filepath = paper.get('filepath')
            if filepath and os.path.exists(filepath):
                size_kb = os.path.getsize(filepath) / 1024
                print(f"  File: {filepath}")
                print(f"  File size: {size_kb:.2f} KB")
                print(f"  File exists: Yes")
                valid_papers += 1
            else:
                print(f"  File: {filepath}")
                print(f"  File exists: No")
        
        print(f"\nSummary: {valid_papers} out of {len(results)} papers have valid files")
    else:
        print("No papers were downloaded.")
    
    print("\n===== Test Directory Contents =====")
    # List files in the output directory
    for root, dirs, files in os.walk(output_dir):
        level = root.replace(output_dir, '').count(os.sep)
        indent = ' ' * 4 * level
        print(f"{indent}{os.path.basename(root)}/")
        sub_indent = ' ' * 4 * (level + 1)
        for file in files:
            filepath = os.path.join(root, file)
            size_kb = os.path.getsize(filepath) / 1024
            print(f"{sub_indent}{file} ({size_kb:.2f} KB)")
    
except Exception as e:
    print(f"Error running ArxivCollector: {e}")
    import traceback
    traceback.print_exc()

In [None]:
testing github

In [None]:
import os
import sys
import requests
import zipfile
import io
from pathlib import Path
import logging

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler()
    ]
)

# Change to the CryptoCorpusBuilder directory
os.chdir('/workspace/CryptoCorpusBuilder')
print(f"Working directory: {os.getcwd()}")

# Add the current directory to the Python path
sys.path.append(os.getcwd())

# Create test directory
output_dir = "/workspace/data/test_corpus/github"
os.makedirs(output_dir, exist_ok=True)

# Clear existing files to start fresh
import shutil
shutil.rmtree(output_dir)
os.makedirs(output_dir, exist_ok=True)
print(f"Cleared and recreated directory: {output_dir}")

# Create a modified function to download GitHub repo as ZIP
def download_github_repo(repo_name, output_path):
    """Download a GitHub repository as a ZIP file and extract it"""
    try:
        # Construct GitHub ZIP URL
        zip_url = f"https://github.com/{repo_name}/archive/refs/heads/main.zip"
        print(f"Downloading repo ZIP from: {zip_url}")
        
        # Make request
        response = requests.get(zip_url, stream=True)
        
        # Check if main branch doesn't exist, try master
        if response.status_code == 404:
            zip_url = f"https://github.com/{repo_name}/archive/refs/heads/master.zip"
            print(f"Main branch not found, trying master: {zip_url}")
            response = requests.get(zip_url, stream=True)
        
        response.raise_for_status()
        
        # Extract directly from the in-memory ZIP file
        z = zipfile.ZipFile(io.BytesIO(response.content))
        z.extractall(output_path)
        
        # Get the extracted directory name (should be the only directory)
        extracted_dirs = [d for d in os.listdir(output_path) if os.path.isdir(os.path.join(output_path, d))]
        if extracted_dirs:
            extracted_dir = os.path.join(output_path, extracted_dirs[0])
            print(f"Extracted to: {extracted_dir}")
            return extracted_dir
        else:
            print("No directory was extracted")
            return None
            
    except Exception as e:
        print(f"Error downloading repository: {e}")
        return None

# Test with a single repository
try:
    print("\nTesting GitHub repository download...")
    
    # Choose a popular crypto trading bot repo
    repo_name = "freqtrade/freqtrade"
    
    # Download the repository
    local_path = download_github_repo(repo_name, output_dir)
    
    if local_path:
        print(f"\n===== GitHub Repository Download Results =====")
        print(f"Downloaded repository: {repo_name}")
        print(f"Local path: {local_path}")
        
        # Check directory contents
        if os.path.exists(local_path):
            # Count files in the repository
            file_count = sum(1 for _ in Path(local_path).glob('**/*') if _.is_file())
            print(f"File count: {file_count}")
            
            # Look for README
            readme_files = list(Path(local_path).glob('README*'))
            if readme_files:
                print(f"README: {readme_files[0].name}")
                
                # Show README contents
                with open(readme_files[0], 'r', encoding='utf-8', errors='ignore') as f:
                    readme_content = f.read(500)  # Read first 500 chars
                    print("\nREADME excerpt:")
                    print(readme_content + "...")
            
            # Check for Python files
            py_files = list(Path(local_path).glob('**/*.py'))
            print(f"Python files: {len(py_files)}")
            if py_files and len(py_files) > 0:
                print(f"Example Python file: {py_files[0].name}")
    else:
        print("Failed to download repository")
    
except Exception as e:
    print(f"Error in test: {e}")
    import traceback
    traceback.print_exc()

In [None]:
Quantopian collector

In [6]:
import os
import sys
from pathlib import Path
import logging

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler()
    ]
)

# Change to the CryptoCorpusBuilder directory
os.chdir('/workspace/CryptoCorpusBuilder')
print(f"Working directory: {os.getcwd()}")

# Add the current directory to the Python path
sys.path.append(os.getcwd())

# Create test directory
output_dir = "/workspace/data/test_corpus/quantopian"
os.makedirs(output_dir, exist_ok=True)

# Clear existing files to start fresh
import shutil
shutil.rmtree(output_dir)
os.makedirs(output_dir, exist_ok=True)
print(f"Cleared and recreated directory: {output_dir}")

# Download function
def download_github_repo(repo_name, output_path):
    """Download a GitHub repository as a ZIP file and extract it"""
    try:
        import requests
        import zipfile
        import io
        
        # Construct GitHub ZIP URL
        zip_url = f"https://github.com/{repo_name}/archive/refs/heads/main.zip"
        print(f"Downloading repo ZIP from: {zip_url}")
        
        # Make request
        response = requests.get(zip_url, stream=True)
        
        # Check if main branch doesn't exist, try master
        if response.status_code == 404:
            zip_url = f"https://github.com/{repo_name}/archive/refs/heads/master.zip"
            print(f"Main branch not found, trying master: {zip_url}")
            response = requests.get(zip_url, stream=True)
        
        response.raise_for_status()
        
        # Extract directly from the in-memory ZIP file
        z = zipfile.ZipFile(io.BytesIO(response.content))
        z.extractall(output_path)
        
        # Get the extracted directory name (should be the only directory)
        extracted_dirs = [d for d in os.listdir(output_path) if os.path.isdir(os.path.join(output_path, d))]
        if extracted_dirs:
            extracted_dir = os.path.join(output_path, extracted_dirs[0])
            print(f"Extracted to: {extracted_dir}")
            # Return as Path object
            return Path(extracted_dir)
        else:
            print("No directory was extracted")
            return None
            
    except Exception as e:
        print(f"Error downloading repository: {e}")
        return None

# Download the repository
repo_name = "quantopian/research_public"
repo_dir = download_github_repo(repo_name, output_dir)

# Check the repository contents
if repo_dir and os.path.exists(repo_dir):
    # Find all notebook files
    notebook_files = list(repo_dir.glob("**/*.ipynb"))
    print(f"\nFound {len(notebook_files)} notebook files")
    
    # Print first 5 notebook paths
    for i, nb_path in enumerate(notebook_files[:5]):
        rel_path = nb_path.relative_to(repo_dir)
        print(f"  {i+1}. {rel_path}")
        
    if len(notebook_files) > 5:
        print(f"  ... and {len(notebook_files) - 5} more notebooks")
else:
    print("Failed to download repository")

Working directory: /workspace/CryptoCorpusBuilder
Cleared and recreated directory: /workspace/data/test_corpus/quantopian
Downloading repo ZIP from: https://github.com/quantopian/research_public/archive/refs/heads/main.zip
Main branch not found, trying master: https://github.com/quantopian/research_public/archive/refs/heads/master.zip
Extracted to: /workspace/data/test_corpus/quantopian/research_public-master

Found 204 notebook files
  1. advanced_sample_analyses/Employee-to-Earnings-Efficiency.ipynb
  2. advanced_sample_analyses/Fed-Sentiment-Volatility.ipynb
  3. advanced_sample_analyses/Macro-ETFs-and-Fed-Sentiment.ipynb
  4. advanced_sample_analyses/Stoploss-Moving-Window.ipynb
  5. advanced_sample_analyses/Tesla-and-Oil-(Short).ipynb
  ... and 199 more notebooks


In [5]:
import os
import sys
import json
import re
from pathlib import Path

# We're already in the right directory, so we'll just continue from there
print(f"Working directory: {os.getcwd()}")

# Path to the downloaded repository
repo_dir = Path("/workspace/data/test_corpus/quantopian/research_public-master")
print(f"Repository directory: {repo_dir}")

# Find all notebook files
notebook_files = list(repo_dir.glob("**/*.ipynb"))
print(f"Found {len(notebook_files)} notebook files")

# Process a few notebooks
processed_notebooks = []

for i, notebook_path in enumerate(notebook_files[:5]):  # Process first 5 notebooks
    try:
        print(f"\nProcessing notebook {i+1}: {notebook_path.relative_to(repo_dir)}")
        
        # Extract notebook info
        with open(notebook_path, 'r', encoding='utf-8') as f:
            notebook = json.load(f)
        
        # Get metadata
        metadata = notebook.get('metadata', {})
        
        # Get title from first heading cell or filename
        title = notebook_path.stem
        
        # Look for title in markdown cells
        for cell in notebook.get('cells', []):
            if cell.get('cell_type') == 'markdown':
                source = ''.join(cell.get('source', []))
                # Look for heading
                if source.startswith('# '):
                    title = source.strip().lstrip('#').strip()
                    print(f"Found title: {title}")
                    break
        
        # Extract cell statistics
        code_cells = []
        markdown_cells = []
        
        for cell in notebook.get('cells', []):
            cell_type = cell.get('cell_type')
            source = ''.join(cell.get('source', []))
            
            if cell_type == 'code':
                code_cells.append({
                    'source': source,
                    'outputs': len(cell.get('outputs', []))
                })
            elif cell_type == 'markdown':
                markdown_cells.append({
                    'source': source
                })
        
        # Extract import statements
        imports = []
        for cell in code_cells:
            source = cell['source']
            # Match import statements
            for match in re.finditer(r'^(?:from\s+(\S+)\s+import|import\s+([^as]+)(?:\s+as\s+\S+)?)', source, re.MULTILINE):
                module = match.group(1) or match.group(2)
                if module:
                    module = module.strip().split('.')[0]  # Get base module
                    if module and module not in imports:
                        imports.append(module)
        
        notebook_info = {
            'title': title,
            'path': str(notebook_path),
            'imports': imports,
            'code_cell_count': len(code_cells),
            'markdown_cell_count': len(markdown_cells),
            'total_cell_count': len(code_cells) + len(markdown_cells)
        }
        
        processed_notebooks.append(notebook_info)
        print(f"Processed successfully")
        
    except Exception as e:
        print(f"Error processing notebook {notebook_path}: {e}")

# Print results
print(f"\n===== Notebook Processing Results =====")
print(f"Processed {len(processed_notebooks)} notebooks")

for i, notebook in enumerate(processed_notebooks):
    print(f"\nNotebook #{i+1}:")
    print(f"  Title: {notebook.get('title')}")
    rel_path = Path(notebook.get('path')).relative_to(repo_dir)
    print(f"  Path: {rel_path}")
    print(f"  Imports: {', '.join(notebook.get('imports', []))}")
    print(f"  Code cells: {notebook.get('code_cell_count')}")
    print(f"  Markdown cells: {notebook.get('markdown_cell_count')}")
    print(f"  Total cells: {notebook.get('total_cell_count')}")
    
    # Check file size
    filepath = notebook.get('path')
    if filepath and os.path.exists(filepath):
        size_kb = os.path.getsize(filepath) / 1024
        print(f"  File size: {size_kb:.2f} KB")

Working directory: /workspace
Repository directory: /workspace/data/test_corpus/quantopian/research_public-master
Found 204 notebook files

Processing notebook 1: advanced_sample_analyses/Employee-to-Earnings-Efficiency.ipynb
Processed successfully

Processing notebook 2: advanced_sample_analyses/Fed-Sentiment-Volatility.ipynb
Processed successfully

Processing notebook 3: advanced_sample_analyses/Macro-ETFs-and-Fed-Sentiment.ipynb
Processed successfully

Processing notebook 4: advanced_sample_analyses/Stoploss-Moving-Window.ipynb
Processed successfully

Processing notebook 5: advanced_sample_analyses/Tesla-and-Oil-(Short).ipynb
Processed successfully

===== Notebook Processing Results =====
Processed 5 notebooks

Notebook #1:
  Title: Employee-to-Earnings-Efficiency
  Path: advanced_sample_analyses/Employee-to-Earnings-Efficiency.ipynb
  Imports: d, math, m, scipy, numpy, zipline

# initi
  Code cells: 13
  Markdown cells: 12
  Total cells: 25
  File size: 627.05 KB

Notebook #2:
  Ti

In [None]:
Bitmex collector

In [None]:
below patches all collectors to absolute path

In [None]:
import os
import re
from pathlib import Path

# Set the working directory
os.chdir('/workspace/CryptoCorpusBuilder')
print(f"Working directory: {os.getcwd()}")

# Get all collector files in specific_collectors directory
collectors_dir = os.path.join(os.getcwd(), 'sources', 'specific_collectors')
collector_files = list(Path(collectors_dir).glob('*.py'))

print(f"Found {len(collector_files)} collector files to patch")

# Regular expression to match relative imports
relative_import_pattern = re.compile(r'from\s+\.\.([\w\.]+)\s+import')

# Process each file
for file_path in collector_files:
    print(f"\nProcessing: {file_path.name}")
    
    # Read the file
    with open(file_path, 'r') as f:
        content = f.read()
    
    # Find all relative imports
    matches = relative_import_pattern.findall(content)
    if not matches:
        print("  No relative imports found")
        continue
    
    print(f"  Found relative imports: {matches}")
    
    # Replace relative imports with absolute ones
    updated_content = content
    for module in matches:
        relative_import = f"from ..{module} import"
        absolute_import = f"from sources.{module} import"
        
        # Check if this import exists in the content
        if relative_import in updated_content:
            print(f"  Replacing: {relative_import} -> {absolute_import}")
            updated_content = updated_content.replace(relative_import, absolute_import)
    
    # Write the updated content back to the file
    with open(file_path, 'w') as f:
        f.write(updated_content)
    
    print(f"  Updated file: {file_path.name}")

print("\nAll collector files have been patched")

In [None]:
import sys
import os

# Add the CryptoCorpusBuilder directory to Python path
sys.path.append('/workspace/CryptoCorpusBuilder')

# Try a clean import of the WebCollector
try:
    from sources.web_collector import WebCollector
    print("Successfully imported WebCollector")
    
    # Now try importing the BitMEXResearchCollector
    from sources.specific_collectors.bitmex_collector import BitMEXResearchCollector
    print("Successfully imported BitMEXResearchCollector")
except ImportError as e:
    print(f"Import error: {e}")

# If imports work, set up a simple test
if 'BitMEXResearchCollector' in locals():
    # Create test directory
    output_dir = "/workspace/data/test_corpus/bitmex"
    os.makedirs(output_dir, exist_ok=True)
    
    # Initialize collector
    collector = BitMEXResearchCollector(output_dir)
    print(f"Successfully initialized BitMEXResearchCollector with output_dir: {output_dir}")

In [8]:
import os
import sys
import logging
from pathlib import Path

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler()
    ]
)

# Add CryptoCorpusBuilder to Python path
sys.path.append('/workspace/CryptoCorpusBuilder')

# Import the collector classes
from sources.web_collector import WebCollector
from sources.specific_collectors.bitmex_collector import BitMEXResearchCollector

# Create a subclass that overrides the robots.txt check
class TestWebCollector(WebCollector):
    def _can_fetch(self, url):
        """Override the robots.txt check for testing purposes only"""
        self.logger.warning("⚠️ Bypassing robots.txt check for TESTING PURPOSES ONLY ⚠️")
        self.logger.warning("⚠️ In production, always respect robots.txt rules ⚠️")
        return True  # Always return True, ignoring robots.txt

# Create a subclass of BitMEXResearchCollector that uses our test collector
class TestBitMEXCollector(BitMEXResearchCollector):
    def __init__(self, output_dir):
        super().__init__(output_dir)
        # Override the _can_fetch method with our test version
        self._can_fetch = lambda url: True
        # Set a flag to remind us this is a test version
        self.respect_robots_txt = False
        self.logger.warning("⚠️ Created test collector that ignores robots.txt ⚠️")
        self.logger.warning("⚠️ FOR TESTING PURPOSES ONLY - DO NOT USE IN PRODUCTION ⚠️")

# Create test directory
output_dir = "/workspace/data/test_corpus/bitmex"
os.makedirs(output_dir, exist_ok=True)

# Clear existing files to start fresh
import shutil
shutil.rmtree(output_dir)
os.makedirs(output_dir, exist_ok=True)
print(f"Cleared and recreated directory: {output_dir}")

# Initialize collector
collector = TestBitMEXCollector(output_dir)
collector.logger.setLevel(logging.INFO)

# Test with a small sample
try:
    print("\n⚠️ TESTING ONLY: Running BitMEX collector with robots.txt check bypassed ⚠️")
    # Limit to 1 page to keep the test quick and minimize impact
    results = collector.collect(max_pages=1)

    # Print results
    print(f"\n===== BitMEXResearchCollector Results =====")
    print(f"Collected {len(results)} research posts")
    
    if results:
        # Show first 3 posts (or all if fewer)
        show_count = min(3, len(results))
        for i, post in enumerate(results[:show_count]):
            print(f"\nPost #{i+1}:")
            print(f"  Title: {post.get('title')}")
            print(f"  Date: {post.get('date')}")
            print(f"  URL: {post.get('url')}")
            
            # Check for PDFs
            pdfs = post.get('pdfs', [])
            print(f"  PDF Count: {len(pdfs)}")
            
            for j, pdf in enumerate(pdfs[:2]):  # Show first 2 PDFs
                print(f"    PDF #{j+1}: {pdf.get('filename')}")
                filepath = pdf.get('filepath', '')
                if filepath and os.path.exists(filepath):
                    size_kb = os.path.getsize(filepath) / 1024
                    print(f"    Size: {size_kb:.2f} KB")
                    print(f"    Exists: Yes")
                else:
                    print(f"    Exists: No")
            
            # Check for saved HTML
            html_path = post.get('saved_html_path', '')
            if html_path and os.path.exists(html_path):
                size_kb = os.path.getsize(html_path) / 1024
                print(f"  HTML saved: Yes ({size_kb:.2f} KB)")
            else:
                print(f"  HTML saved: No")
    
    # Print directory contents
    print("\n===== Test Directory Contents =====")
    for root, dirs, files in os.walk(output_dir):
        level = root.replace(output_dir, '').count(os.sep)
        indent = ' ' * 4 * level
        print(f"{indent}{os.path.basename(root)}/")
        sub_indent = ' ' * 4 * (level + 1)
        for f in files[:5]:  # Show first 5 files
            filepath = os.path.join(root, f)
            size_kb = os.path.getsize(filepath) / 1024
            print(f"{sub_indent}{f} ({size_kb:.2f} KB)")
        if len(files) > 5:
            print(f"{sub_indent}... and {len(files)-5} more files")
    
    print("\n⚠️ IMPORTANT: In production, always respect robots.txt rules ⚠️")
    
except Exception as e:
    print(f"Error running BitMEXCollector: {e}")
    import traceback
    traceback.print_exc()

2025-05-09 22:05:46,926 - BitMEXResearchCollector - INFO - Collecting BitMEX Research blog posts (max 1 pages)
2025-05-09 22:05:46,926 - BitMEXResearchCollector - INFO - Fetching page: https://blog.bitmex.com/research/


Cleared and recreated directory: /workspace/data/test_corpus/bitmex

⚠️ TESTING ONLY: Running BitMEX collector with robots.txt check bypassed ⚠️


2025-05-09 22:05:47,447 - BitMEXResearchCollector - INFO - Found 1 potential post containers
2025-05-09 22:05:47,450 - BitMEXResearchCollector - INFO - Found post: Removing Bitcoin’s Guardrails
2025-05-09 22:05:47,456 - BitMEXResearchCollector - INFO - Saved metadata for 1 posts to /workspace/data/test_corpus/bitmex/bitmex_research_posts.json
2025-05-09 22:05:47,457 - BitMEXResearchCollector - INFO - Fetching post: https://blog.bitmex.com/removing-bitcoins-guardrails/
2025-05-09 22:05:47,606 - BitMEXResearchCollector - INFO - Saved HTML to /workspace/data/test_corpus/bitmex/removing-bitcoins-guardrails.html
2025-05-09 22:05:47,616 - BitMEXResearchCollector - INFO - Saved metadata for 1 posts to /workspace/data/test_corpus/bitmex/bitmex_research_posts.json



===== BitMEXResearchCollector Results =====
Collected 1 research posts

Post #1:
  Title: Removing Bitcoin’s Guardrails
  Date: 7 May 2025
  URL: https://blog.bitmex.com/removing-bitcoins-guardrails/
  PDF Count: 0
  HTML saved: Yes (2.20 KB)

===== Test Directory Contents =====
bitmex/
    bitmex_research.html (323.90 KB)
    bitmex_research_posts.json (2.51 KB)
    removing-bitcoins-guardrails.html (2.20 KB)

⚠️ IMPORTANT: In production, always respect robots.txt rules ⚠️


In [8]:
import os
import sys
import logging
from pathlib import Path
import requests
from bs4 import BeautifulSoup

# Configure logging
logging.basicConfig(
    level=logging.DEBUG,  # Set to DEBUG for more detailed output
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler()
    ]
)

# Add CryptoCorpusBuilder to Python path
sys.path.append('/workspace/CryptoCorpusBuilder')

# Create a direct debugging script without relying on the collector classes
print(f"Working directory: {os.getcwd()}")

# Set up test directory
output_dir = "/workspace/data/test_corpus/bitmex_debug"
os.makedirs(output_dir, exist_ok=True)

# Clear existing files to start fresh
import shutil
shutil.rmtree(output_dir)
os.makedirs(output_dir, exist_ok=True)
print(f"Cleared and recreated directory: {output_dir}")

# Direct debugging function
def debug_bitmex_blog():
    """Debug the BitMEX blog directly"""
    print("\n===== Debugging BitMEX Blog =====")
    
    url = "https://blog.bitmex.com/research/"
    
    print(f"1. Fetching {url}")
    
    # Set up session with browser-like headers
    session = requests.Session()
    session.headers.update({
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
        "Accept-Language": "en-US,en;q=0.5",
        "Connection": "keep-alive",
        "Upgrade-Insecure-Requests": "1"
    })
    
    try:
        response = session.get(url, timeout=30)
        print(f"Status code: {response.status_code}")
        
        # Save the raw HTML response for debugging
        html_file = os.path.join(output_dir, "bitmex_blog.html")
        with open(html_file, 'w', encoding='utf-8') as f:
            f.write(response.text)
        print(f"Saved raw HTML to {html_file}")
        
        # Parse with BeautifulSoup
        soup = BeautifulSoup(response.text, 'html.parser')
        
        # Save the parsed HTML for debugging
        parsed_file = os.path.join(output_dir, "bitmex_blog_parsed.html")
        with open(parsed_file, 'w', encoding='utf-8') as f:
            f.write(str(soup.prettify()))
        print(f"Saved parsed HTML to {parsed_file}")
        
        # Check for articles
        print("\n2. Checking for article elements")
        articles = soup.find_all("article")
        print(f"Found {len(articles)} article elements")
        
        if articles:
            # Examine the first article
            first_article = articles[0]
            print("\n3. First article details:")
            print(f"Article class: {first_article.get('class')}")
            
            # Check for title
            title_elem = first_article.find(['h1', 'h2'], class_='entry-title')
            if title_elem:
                print(f"Title element found: {title_elem.text.strip()}")
                
                # Check for link
                link_elem = title_elem.find('a')
                if link_elem:
                    print(f"Link found: {link_elem['href']}")
                else:
                    print("No link found in title element")
            else:
                print("No title element found with class 'entry-title'")
                
                # Look for any headings
                headings = first_article.find_all(['h1', 'h2', 'h3'])
                if headings:
                    print(f"Found {len(headings)} heading elements:")
                    for i, h in enumerate(headings[:3]):
                        print(f"  Heading {i+1}: {h.text.strip()}")
                else:
                    print("No heading elements found")
            
            # Check for date
            date_elem = first_article.find(class_='entry-date')
            if date_elem:
                print(f"Date element found: {date_elem.text.strip()}")
            else:
                print("No date element found with class 'entry-date'")
            
            # Check for any links
            links = first_article.find_all('a')
            print(f"Found {len(links)} links in first article")
            for i, link in enumerate(links[:3]):
                print(f"  Link {i+1}: {link.text.strip()} -> {link.get('href')}")
                
            # Save the first article HTML
            article_file = os.path.join(output_dir, "first_article.html")
            with open(article_file, 'w', encoding='utf-8') as f:
                f.write(str(first_article.prettify()))
            print(f"Saved first article HTML to {article_file}")
        else:
            print("No article elements found, checking for alternative structures")
            
            # Look for any headings
            headings = soup.find_all(['h1', 'h2', 'h3'])
            print(f"Found {len(headings)} heading elements on page")
            for i, h in enumerate(headings[:5]):
                print(f"  Heading {i+1}: {h.text.strip()}")
            
            # Look for any blog post containers
            post_containers = soup.find_all(class_=lambda c: c and ('post' in c.lower() or 'entry' in c.lower()))
            print(f"Found {len(post_containers)} potential post containers")
            
            # Look for links that might be blog posts
            blog_links = soup.find_all('a', href=lambda h: h and '/blog/' in h)
            print(f"Found {len(blog_links)} links containing '/blog/'")
            for i, link in enumerate(blog_links[:5]):
                print(f"  Link {i+1}: {link.text.strip()} -> {link.get('href')}")
        
        return {
            "status": "success",
            "articles": len(articles),
            "html_file": html_file
        }
        
    except Exception as e:
        print(f"Error: {e}")
        import traceback
        traceback.print_exc()
        return {
            "status": "error",
            "error": str(e)
        }

# Run the debugging function
debug_result = debug_bitmex_blog()

# Summarize results
print("\n===== Debugging Summary =====")
print(f"Status: {debug_result.get('status')}")
if debug_result.get('status') == 'success':
    print(f"Articles found: {debug_result.get('articles')}")
    print(f"HTML saved to: {debug_result.get('html_file')}")
else:
    print(f"Error: {debug_result.get('error')}")

print("\n===== Directory Contents =====")
for file in os.listdir(output_dir):
    filepath = os.path.join(output_dir, file)
    size_kb = os.path.getsize(filepath) / 1024
    print(f"  {file} ({size_kb:.2f} KB)")

Working directory: /workspace
Cleared and recreated directory: /workspace/data/test_corpus/bitmex_debug

===== Debugging BitMEX Blog =====
1. Fetching https://blog.bitmex.com/research/
Status code: 200
Saved raw HTML to /workspace/data/test_corpus/bitmex_debug/bitmex_blog.html
Saved parsed HTML to /workspace/data/test_corpus/bitmex_debug/bitmex_blog_parsed.html

2. Checking for article elements
Found 0 article elements
No article elements found, checking for alternative structures
Found 263 heading elements on page
  Heading 1: Coming Soon: SXTUSDT Perpetual Swap Listing With Up to 50x Leverage
  Heading 2: Removing Bitcoin’s Guardrails
  Heading 3: Heatbiting The Office
  Heading 4: The Ripple story
  Heading 5: Antminer S19 Pro vs Whatsminer M30S+
Found 808 potential post containers
Found 0 links containing '/blog/'

===== Debugging Summary =====
Status: success
Articles found: 0
HTML saved to: /workspace/data/test_corpus/bitmex_debug/bitmex_blog.html

===== Directory Contents =====


In [7]:
import os
import sys
import logging
from pathlib import Path
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin
import re
import json

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler()
    ]
)

# Create test directory
output_dir = "/workspace/data/test_corpus/bitmex_updated"
os.makedirs(output_dir, exist_ok=True)

# Clear existing files to start fresh
import shutil
shutil.rmtree(output_dir)
os.makedirs(output_dir, exist_ok=True)
print(f"Cleared and recreated directory: {output_dir}")

# Create a new version of the BitMEX collector
class UpdatedBitMEXCollector:
    """Updated collector for BitMEX Research blog posts"""
    
    def __init__(self, output_dir):
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(parents=True, exist_ok=True)
        self.base_url = 'https://blog.bitmex.com/research/'
        self.session = requests.Session()
        self.session.headers.update({
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
            "Accept-Language": "en-US,en;q=0.5"
        })
        self.logger = logging.getLogger("UpdatedBitMEXCollector")
    
    def collect(self, max_pages=1):
        """Collect research blog posts from BitMEX with updated HTML parsing"""
        self.logger.info(f"Collecting BitMEX Research blog posts (max {max_pages} pages)")
        
        all_posts = []
        
        # Fetch the main research page
        self.logger.info(f"Fetching page: {self.base_url}")
        response = self.session.get(self.base_url, timeout=30)
        
        if response.status_code != 200:
            self.logger.error(f"Failed to fetch page: {response.status_code}")
            return all_posts
            
        # Save the raw HTML for inspection if needed
        raw_html_path = self.output_dir / "bitmex_research.html"
        with open(raw_html_path, 'w', encoding='utf-8') as f:
            f.write(response.text)
        
        soup = BeautifulSoup(response.text, 'html.parser')
        
        # Based on the debug output, look for post containers
        # Let's try several approaches
        
        # Approach 1: Look for headings within containers
        posts_found = 0
        
        # Find all div containers that might be posts
        post_containers = soup.find_all("div", class_=lambda c: c and ('post' in c.lower() or 'entry' in c.lower() or 'article' in c.lower()))
        
        self.logger.info(f"Found {len(post_containers)} potential post containers")
        
        # Process each container
        for container in post_containers[:10]:  # Limit to first 10 for testing
            # Look for a heading that might be the title
            heading = container.find(['h1', 'h2', 'h3', 'h4'])
            
            if not heading:
                continue
                
            # Found a potential post
            posts_found += 1
            
            # Extract title
            title = heading.text.strip()
            
            # Look for a link to the full post
            link = None
            if heading.find('a'):
                link = heading.find('a')['href']
                # Make sure it's an absolute URL
                if link and not link.startswith(('http://', 'https://')):
                    link = urljoin(self.base_url, link)
            
            # Look for a date
            date = None
            date_elem = container.find(class_=lambda c: c and ('date' in c.lower() or 'time' in c.lower()))
            if date_elem:
                date = date_elem.text.strip()
            
            # Look for content preview
            content_preview = None
            content_elem = container.find(['p', 'div'], class_=lambda c: c and ('content' in c.lower() or 'excerpt' in c.lower() or 'summary' in c.lower()))
            if content_elem:
                content_preview = content_elem.text.strip()
            
            # Create post object
            post = {
                'title': title,
                'url': link,
                'date': date,
                'excerpt': content_preview,
                'container_classes': container.get('class', [])
            }
            
            all_posts.append(post)
            
            self.logger.info(f"Found post: {title}")
        
        # If no posts found with the above approach, try a fallback method
        if not all_posts:
            self.logger.info("No posts found with container approach, trying fallback method")
            
            # Fallback: Just look for all headings and assume they're posts
            headings = soup.find_all(['h1', 'h2', 'h3'])
            
            for heading in headings[:10]:  # Limit to first 10
                # Skip navigation or sidebar headings
                parent_classes = ' '.join(parent.get('class', []) for parent in heading.parents if parent.get('class'))
                if any(x in parent_classes.lower() for x in ['nav', 'sidebar', 'menu', 'footer', 'header']):
                    continue
                
                title = heading.text.strip()
                
                # Look for a link
                link = None
                if heading.find('a'):
                    link = heading.find('a')['href']
                    # Make sure it's an absolute URL
                    if link and not link.startswith(('http://', 'https://')):
                        link = urljoin(self.base_url, link)
                elif heading.parent and heading.parent.name == 'a':
                    link = heading.parent['href']
                    if link and not link.startswith(('http://', 'https://')):
                        link = urljoin(self.base_url, link)
                
                # Create a simplified post object
                post = {
                    'title': title,
                    'url': link,
                    'source': 'fallback_method'
                }
                
                all_posts.append(post)
                self.logger.info(f"Found post (fallback): {title}")
        
        # Save metadata before attempting to download
        self._save_metadata(all_posts)
        
        # Process each post to download PDFs and save HTML
        processed_posts = self._process_posts(all_posts)
        
        return processed_posts
    
    def _process_posts(self, posts):
        """Process posts to download PDFs and save HTML"""
        processed_posts = []
        
        for post in posts:
            url = post.get('url')
            if not url:
                continue
                
            # Fetch the post page
            try:
                self.logger.info(f"Fetching post: {url}")
                response = self.session.get(url, timeout=30)
                
                if response.status_code != 200:
                    self.logger.warning(f"Failed to fetch post: {response.status_code}")
                    continue
                    
                # Parse the post content
                soup = BeautifulSoup(response.text, 'html.parser')
                
                # Extract the main content
                content_elem = soup.find(['div', 'article'], class_=lambda c: c and ('content' in c.lower() or 'entry' in c.lower() or 'article' in c.lower()))
                
                if content_elem:
                    # Get text content
                    post['content_text'] = content_elem.get_text('\n', strip=True)
                    
                    # Get HTML content
                    post['content_html'] = str(content_elem)
                    
                    # Extract any PDF links
                    pdf_links = []
                    for link in content_elem.find_all('a', href=True):
                        href = link['href']
                        if href.lower().endswith('.pdf'):
                            # Make sure URL is absolute
                            full_url = urljoin(url, href)
                            pdf_links.append(full_url)
                    
                    # Download PDFs
                    downloaded_pdfs = []
                    for i, pdf_url in enumerate(pdf_links):
                        # Create filename
                        title = post.get('title', 'unknown')
                        safe_title = re.sub(r'[^\w\s-]', '', title).strip().lower()
                        safe_title = re.sub(r'[-\s]+', '-', safe_title)
                        filename = f"{safe_title}-{i+1}.pdf"
                        
                        # Download the PDF
                        filepath = self._download_file(pdf_url, filename)
                        
                        if filepath:
                            downloaded_pdfs.append({
                                'url': pdf_url,
                                'filepath': str(filepath),
                                'filename': filename
                            })
                    
                    post['pdfs'] = downloaded_pdfs
                    
                    # Save post HTML
                    html_path = self._save_post_html(post)
                    if html_path:
                        post['saved_html_path'] = str(html_path)
                
                processed_posts.append(post)
                
            except Exception as e:
                self.logger.error(f"Error processing post {url}: {e}")
                continue
        
        # Save updated metadata
        self._save_metadata(processed_posts)
        
        return processed_posts
    
    def _download_file(self, url, filename):
        """Download a file with proper naming"""
        filepath = self.output_dir / filename
        
        try:
            self.logger.info(f"Downloading {url} to {filepath}")
            
            # Download with stream to handle large files
            response = self.session.get(url, stream=True, timeout=30)
            response.raise_for_status()
            
            with open(filepath, 'wb') as f:
                for chunk in response.iter_content(chunk_size=8192):
                    if chunk:
                        f.write(chunk)
            
            self.logger.info(f"Successfully downloaded to {filepath}")
            return filepath
        except Exception as e:
            self.logger.error(f"Error downloading {url}: {e}")
            return None
    
    def _save_post_html(self, post):
        """Save post content as HTML file"""
        if 'content_html' not in post or not post['content_html']:
            return None
            
        # Create filename from title
        title = post.get('title', 'unknown')
        safe_title = re.sub(r'[^\w\s-]', '', title).strip().lower()
        safe_title = re.sub(r'[-\s]+', '-', safe_title)
        
        html_path = self.output_dir / f"{safe_title}.html"
        
        # Create a complete HTML document
        html_content = f"""
        <!DOCTYPE html>
        <html>
        <head>
            <title>{post.get('title', 'BitMEX Research Post')}</title>
            <meta charset="utf-8">
            <meta name="date" content="{post.get('date', '')}">
        </head>
        <body>
            <h1>{post.get('title', '')}</h1>
            <p class="date">{post.get('date', '')}</p>
            <div class="content">
                {post.get('content_html', '')}
            </div>
            <div class="metadata">
                <p>Source: <a href="{post.get('url', '')}">{post.get('url', '')}</a></p>
            </div>
        </body>
        </html>
        """
        
        try:
            with open(html_path, 'w', encoding='utf-8') as f:
                f.write(html_content)
                
            self.logger.info(f"Saved HTML to {html_path}")
            return html_path
        except Exception as e:
            self.logger.error(f"Error saving HTML: {e}")
            return None
    
    def _save_metadata(self, posts):
        """Save metadata about collected posts"""
        metadata_path = self.output_dir / "bitmex_research_posts.json"
        
        try:
            with open(metadata_path, 'w') as f:
                json.dump(posts, f, indent=2)
                
            self.logger.info(f"Saved metadata for {len(posts)} posts to {metadata_path}")
        except Exception as e:
            self.logger.error(f"Error saving metadata: {e}")

# Test the updated collector
try:
    print("\nStarting updated BitMEX collector test...")
    
    # Initialize and run the collector
    collector = UpdatedBitMEXCollector(output_dir)
    results = collector.collect(max_pages=1)
    
    # Print results
    print(f"\n===== Updated BitMEXCollector Results =====")
    print(f"Collected {len(results)} research posts")
    
    if results:
        # Show first 3 posts (or all if fewer)
        show_count = min(3, len(results))
        for i, post in enumerate(results[:show_count]):
            print(f"\nPost #{i+1}:")
            print(f"  Title: {post.get('title')}")
            print(f"  URL: {post.get('url')}")
            
            # Check for PDFs
            pdfs = post.get('pdfs', [])
            print(f"  PDF Count: {len(pdfs)}")
            
            for j, pdf in enumerate(pdfs[:2]):  # Show first 2 PDFs
                print(f"    PDF #{j+1}: {pdf.get('filename')}")
                filepath = pdf.get('filepath', '')
                if filepath and os.path.exists(filepath):
                    size_kb = os.path.getsize(filepath) / 1024
                    print(f"    Size: {size_kb:.2f} KB")
                    print(f"    Exists: Yes")
                else:
                    print(f"    Exists: No")
            
            # Check for saved HTML
            html_path = post.get('saved_html_path', '')
            if html_path and os.path.exists(html_path):
                size_kb = os.path.getsize(html_path) / 1024
                print(f"  HTML saved: Yes ({size_kb:.2f} KB)")
            else:
                print(f"  HTML saved: No")
    
    # Print directory contents
    print("\n===== Test Directory Contents =====")
    for root, dirs, files in os.walk(output_dir):
        level = root.replace(output_dir, '').count(os.sep)
        indent = ' ' * 4 * level
        print(f"{indent}{os.path.basename(root)}/")
        sub_indent = ' ' * 4 * (level + 1)
        for f in files[:5]:  # Show first 5 files
            filepath = os.path.join(root, f)
            size_kb = os.path.getsize(filepath) / 1024
            print(f"{sub_indent}{f} ({size_kb:.2f} KB)")
        if len(files) > 5:
            print(f"{sub_indent}... and {len(files)-5} more files")
            
except Exception as e:
    print(f"Error running updated BitMEX collector: {e}")
    import traceback
    traceback.print_exc()

2025-05-09 22:05:10,538 - UpdatedBitMEXCollector - INFO - Collecting BitMEX Research blog posts (max 1 pages)
2025-05-09 22:05:10,538 - UpdatedBitMEXCollector - INFO - Fetching page: https://blog.bitmex.com/research/


Cleared and recreated directory: /workspace/data/test_corpus/bitmex_updated

Starting updated BitMEX collector test...


2025-05-09 22:05:13,782 - UpdatedBitMEXCollector - INFO - Found 1 potential post containers
2025-05-09 22:05:13,784 - UpdatedBitMEXCollector - INFO - Found post: Removing Bitcoin’s Guardrails
2025-05-09 22:05:13,790 - UpdatedBitMEXCollector - INFO - Saved metadata for 1 posts to /workspace/data/test_corpus/bitmex_updated/bitmex_research_posts.json
2025-05-09 22:05:13,790 - UpdatedBitMEXCollector - INFO - Fetching post: https://blog.bitmex.com/removing-bitcoins-guardrails/
2025-05-09 22:05:13,943 - UpdatedBitMEXCollector - INFO - Saved HTML to /workspace/data/test_corpus/bitmex_updated/removing-bitcoins-guardrails.html
2025-05-09 22:05:13,948 - UpdatedBitMEXCollector - INFO - Saved metadata for 1 posts to /workspace/data/test_corpus/bitmex_updated/bitmex_research_posts.json



===== Updated BitMEXCollector Results =====
Collected 1 research posts

Post #1:
  Title: Removing Bitcoin’s Guardrails
  URL: https://blog.bitmex.com/removing-bitcoins-guardrails/
  PDF Count: 0
  HTML saved: Yes (2.20 KB)

===== Test Directory Contents =====
bitmex_updated/
    bitmex_research.html (323.90 KB)
    bitmex_research_posts.json (2.52 KB)
    removing-bitcoins-guardrails.html (2.20 KB)


In [None]:
FREDCollector

In [4]:
import os
os.environ["FRED_API_KEY"] = "05796b72da56e97a6f7ea908ecf57b59"

In [5]:
import os
import sys
import logging
from pathlib import Path

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler()
    ]
)

# Add CryptoCorpusBuilder to Python path
sys.path.append('/workspace/CryptoCorpusBuilder')

# Import the collector
from sources.specific_collectors.fred_collector import FREDCollector

# Create test directory
output_dir = "/workspace/data/test_corpus/fred"
os.makedirs(output_dir, exist_ok=True)

# Clear existing files to start fresh
import shutil
shutil.rmtree(output_dir)
os.makedirs(output_dir, exist_ok=True)
print(f"Cleared and recreated directory: {output_dir}")

# Check for FRED API key environment variable
fred_api_key = os.getenv("FRED_API_KEY")
print(f"FRED API Key available: {'Yes' if fred_api_key else 'No'}")

# If we don't have an API key, let's make a mock one for testing
# (This won't work for real API calls, but will help test the code structure)
if not fred_api_key:
    fred_api_key = "MOCK_API_KEY_FOR_TESTING"
    print("Using mock API key for testing")

# Initialize collector
collector = FREDCollector(output_dir, api_key=fred_api_key)
collector.logger.setLevel(logging.INFO)

# Test with just the integration without making actual API calls
try:
    print("\nTesting FREDCollector integration...")
    
    # Test the structure without making API calls
    # We'll just verify that the collector methods are accessible
    
    print("FREDCollector methods:")
    methods = [method for method in dir(collector) if not method.startswith('_') and callable(getattr(collector, method))]
    for method in methods:
        print(f"  {method}")
    
    # For a more complete test, we'd need a valid FRED API key
    if fred_api_key != "MOCK_API_KEY_FOR_TESTING":
        print("\nTesting actual data collection (with real API key)...")
        
        # Test with a small sample
        results = collector.collect(
            series_ids=["VIXCLS", "DTWEXBGS"],  # VIX index and Dollar index
            search_terms=["volatility"],
            max_results=2  # Limit to 2 results for testing
        )
        
        # Print results
        print(f"\n===== FREDCollector Results =====")
        print(f"Downloaded {len(results)} data files")
        
        for file_path in results:
            print(f"File: {file_path}")
            if os.path.exists(file_path):
                size_kb = os.path.getsize(file_path) / 1024
                print(f"  Size: {size_kb:.2f} KB")
                print(f"  Exists: Yes")
            else:
                print(f"  Exists: No")
    else:
        print("\nSkipping actual API calls due to mock API key")
        print("To fully test, set the FRED_API_KEY environment variable")
    
    # Print directory contents
    print("\n===== Test Directory Contents =====")
    for root, dirs, files in os.walk(output_dir):
        level = root.replace(output_dir, '').count(os.sep)
        indent = ' ' * 4 * level
        print(f"{indent}{os.path.basename(root)}/")
        sub_indent = ' ' * 4 * (level + 1)
        for f in files[:5]:  # Show first 5 files
            filepath = os.path.join(root, f)
            size_kb = os.path.getsize(filepath) / 1024
            print(f"{sub_indent}{f} ({size_kb:.2f} KB)")
        if len(files) > 5:
            print(f"{sub_indent}... and {len(files)-5} more files")
    
except Exception as e:
    print(f"Error testing FREDCollector: {e}")
    import traceback
    traceback.print_exc()

Cleared and recreated directory: /workspace/data/test_corpus/fred
FRED API Key available: Yes

Testing FREDCollector integration...
FREDCollector methods:
  api_request
  collect
  download_file

Testing actual data collection (with real API key)...

===== FREDCollector Results =====
Downloaded 8 data files
File: /workspace/data/test_corpus/fred/VIXCLS_CBOE Volatility Index_ VIX.json
  Size: 1242.10 KB
  Exists: Yes
File: /workspace/data/test_corpus/fred/VIXCLS_CBOE Volatility Index_ VIX.csv
  Size: 349.83 KB
  Exists: Yes
File: /workspace/data/test_corpus/fred/DTWEXBGS_Nominal Broad U_S_ Dollar Index.json
  Size: 691.31 KB
  Exists: Yes
File: /workspace/data/test_corpus/fred/DTWEXBGS_Nominal Broad U_S_ Dollar Index.csv
  Size: 203.04 KB
  Exists: Yes
File: /workspace/data/test_corpus/fred/VIXCLS_CBOE Volatility Index_ VIX.json
  Size: 1242.10 KB
  Exists: Yes
File: /workspace/data/test_corpus/fred/VIXCLS_CBOE Volatility Index_ VIX.csv
  Size: 349.83 KB
  Exists: Yes
File: /workspace/d

In [None]:
ISDA COllector test

2025-05-08 20:09:16,481 - ISDACollector - INFO - Collecting ISDA documentation (max 2 pages)
2025-05-08 20:09:16,481 - ISDACollector - INFO - Collecting ISDA documentation (max 2 pages)
2025-05-08 20:09:16,482 - ISDACollector - INFO - Fetching page 1: https://www.isda.org/category/documentation/
2025-05-08 20:09:16,482 - ISDACollector - INFO - Fetching page 1: https://www.isda.org/category/documentation/
2025-05-08 20:09:25,914 - ISDACollector - INFO - No more documents found on page 1
2025-05-08 20:09:25,914 - ISDACollector - INFO - No more documents found on page 1
2025-05-08 20:09:25,925 - ISDACollector - INFO - Saved metadata for 0 documents to /workspace/data/test_isda/isda_documents.json
2025-05-08 20:09:25,925 - ISDACollector - INFO - Saved metadata for 0 documents to /workspace/data/test_isda/isda_documents.json


Successfully collected 0 documents from ISDA


In [12]:
import os
import sys
import logging
import requests
from pathlib import Path
from bs4 import BeautifulSoup

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("ISDADebugger")

# Set up the test directory
test_dir = Path("/workspace/data/test_isda")
os.makedirs(test_dir, exist_ok=True)

# Fetch the ISDA documentation page directly
url = "https://www.isda.org/category/documentation/"
logger.info(f"Directly fetching page: {url}")

try:
    # Use a modern browser user agent
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
    }
    response = requests.get(url, headers=headers, timeout=30)
    response.raise_for_status()
    
    # Save the HTML content for inspection
    html_path = test_dir / "isda_page.html"
    with open(html_path, 'w', encoding='utf-8') as f:
        f.write(response.text)
    
    logger.info(f"Saved HTML to {html_path}")
    
    # Parse with BeautifulSoup
    soup = BeautifulSoup(response.text, 'html.parser')
    
    # Debug HTML structure
    logger.info("Analyzing HTML structure:")
    
    # Check for articles
    articles = soup.find_all('article')
    logger.info(f"Found {len(articles)} article elements")
    
    # If no articles, look for alternative elements that might contain documents
    if not articles:
        # Try different container elements
        containers = soup.find_all(['div', 'section'], class_=lambda c: c and any(word in c for word in ['post', 'content', 'article', 'entry']))
        logger.info(f"Found {len(containers)} potential container elements")
        
        # Check for links that might be document links
        pdf_links = soup.find_all('a', href=lambda h: h and h.lower().endswith('.pdf'))
        logger.info(f"Found {len(pdf_links)} direct PDF links")
        
        # Look for any title-like elements
        titles = soup.find_all(['h1', 'h2', 'h3', 'h4'])
        logger.info(f"Found {len(titles)} heading elements")
        
        # Print the first few titles to see what we're working with
        for i, title in enumerate(titles[:5]):
            logger.info(f"Title {i+1}: {title.text.strip()}")
    
    # If articles were found, check if they have the expected structure
    else:
        for i, article in enumerate(articles[:3]):
            logger.info(f"Article {i+1}:")
            
            # Check for title elements
            title_elems = article.find_all(['h1', 'h2', 'h3'])
            logger.info(f"  Found {len(title_elems)} title elements")
            
            # Check for links
            links = article.find_all('a', href=True)
            logger.info(f"  Found {len(links)} links")
            
            # Check for class names to help debug selectors
            if 'class' in article.attrs:
                logger.info(f"  Article classes: {article['class']}")
    
except Exception as e:
    logger.error(f"Error debugging ISDA collector: {e}")

2025-05-08 20:10:07,562 - ISDADebugger - INFO - Directly fetching page: https://www.isda.org/category/documentation/
2025-05-08 20:10:09,500 - ISDADebugger - INFO - Saved HTML to /workspace/data/test_isda/isda_page.html
2025-05-08 20:10:09,512 - ISDADebugger - INFO - Analyzing HTML structure:
2025-05-08 20:10:09,513 - ISDADebugger - INFO - Found 6 article elements
2025-05-08 20:10:09,514 - ISDADebugger - INFO - Article 1:
2025-05-08 20:10:09,514 - ISDADebugger - INFO -   Found 1 title elements
2025-05-08 20:10:09,515 - ISDADebugger - INFO -   Found 2 links
2025-05-08 20:10:09,515 - ISDADebugger - INFO -   Article classes: ['news-small', 'post-1139259', 'post', 'type-post', 'status-publish', 'format-standard', 'hentry', 'category-press-releases', 'tag-credit-default-swaps', 'tag-credit-derivatives', 'tag-determinations-committee', 'tag-legal']
2025-05-08 20:10:09,515 - ISDADebugger - INFO - Article 2:
2025-05-08 20:10:09,516 - ISDADebugger - INFO -   Found 1 title elements
2025-05-08 20

In [15]:
import logging
import requests
from bs4 import BeautifulSoup
from pathlib import Path

def debug_isda_website():
    """Directly debug the ISDA website structure"""
    # Set up logging
    logger = logging.getLogger("ISDADebugger")
    logger.setLevel(logging.INFO)
    handler = logging.StreamHandler()
    handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
    logger.addHandler(handler)
    
    # Set up output directory
    output_dir = Path("/workspace/data/test_isda")
    output_dir.mkdir(exist_ok=True, parents=True)
    
    # Directly fetch the page
    url = "https://www.isda.org/category/documentation/"
    logger.info(f"Directly fetching page: {url}")
    
    try:
        response = requests.get(url, headers={
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
        })
        response.raise_for_status()
        
        # Save HTML to file for inspection
        html_path = output_dir / "isda_page.html"
        with open(html_path, "w", encoding="utf-8") as f:
            f.write(response.text)
        logger.info(f"Saved HTML to {html_path}")
        
        # Parse HTML
        soup = BeautifulSoup(response.text, "html.parser")
        
        # Analyze HTML structure
        logger.info("Analyzing HTML structure:")
        
        # Look for articles
        articles = soup.find_all("article")
        logger.info(f"Found {len(articles)} article elements")
        
        # Analyze first few articles
        for i, article in enumerate(articles[:6]):  # Examine up to 6 articles
            logger.info(f"Article {i+1}:")
            
            # Look for title elements
            titles = article.find_all(["h1", "h2", "h3", "h4", "h5"])
            logger.info(f"  Found {len(titles)} title elements")
            
            # Look for links
            links = article.find_all("a", href=True)
            logger.info(f"  Found {len(links)} links")
            
            # Get article classes
            classes = article.get("class", [])
            logger.info(f"  Article classes: {classes}")
            
            # Get category info from classes
            categories = [c for c in classes if c.startswith("category-")]
            if categories:
                logger.info(f"  Categories: {categories}")
            
            # Get tag info from classes
            tags = [c for c in classes if c.startswith("tag-")]
            if tags:
                logger.info(f"  Tags: {tags}")
        
        # Look for pagination
        pagination = soup.find_all(class_=lambda c: c and "pagination" in c)
        logger.info(f"Found {len(pagination)} pagination elements")
        
        # Extract main document patterns
        pdf_links = soup.find_all("a", href=lambda h: h and h.lower().endswith(".pdf"))
        logger.info(f"Found {len(pdf_links)} direct PDF links")
        
        # Check for documentation-specific sections
        doc_sections = soup.find_all(class_=lambda c: c and "document" in c.lower())
        logger.info(f"Found {len(doc_sections)} document-related sections")
        
        return True
        
    except Exception as e:
        logger.error(f"Error debugging ISDA website: {e}")
        return False

# Now run the debug function
debug_isda_website()

2025-05-08 20:13:30,262 - ISDADebugger - INFO - Directly fetching page: https://www.isda.org/category/documentation/
2025-05-08 20:13:30,262 - ISDADebugger - INFO - Directly fetching page: https://www.isda.org/category/documentation/
2025-05-08 20:13:32,259 - ISDADebugger - INFO - Saved HTML to /workspace/data/test_isda/isda_page.html
2025-05-08 20:13:32,259 - ISDADebugger - INFO - Saved HTML to /workspace/data/test_isda/isda_page.html
2025-05-08 20:13:32,272 - ISDADebugger - INFO - Analyzing HTML structure:
2025-05-08 20:13:32,272 - ISDADebugger - INFO - Analyzing HTML structure:
2025-05-08 20:13:32,273 - ISDADebugger - INFO - Found 6 article elements
2025-05-08 20:13:32,273 - ISDADebugger - INFO - Found 6 article elements
2025-05-08 20:13:32,274 - ISDADebugger - INFO - Article 1:
2025-05-08 20:13:32,274 - ISDADebugger - INFO - Article 1:
2025-05-08 20:13:32,276 - ISDADebugger - INFO -   Found 2 title elements
2025-05-08 20:13:32,276 - ISDADebugger - INFO -   Found 2 title elements
20

True

In [17]:
import os
import re
import json
import time
import requests
from pathlib import Path
from bs4 import BeautifulSoup
from urllib.parse import urljoin

# Set up logging
import logging
logger = logging.getLogger("ISDADocFinder")
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logger.addHandler(handler)

# Create output directory
output_dir = Path("/workspace/data/test_isda")
os.makedirs(output_dir, exist_ok=True)

# First, let's explore ISDA's main documentation library pages
def find_isda_documentation():
    """Find where actual ISDA documentation is stored"""
    # List of potential documentation URLs to check
    potential_urls = [
        "https://www.isda.org/books/",
        "https://www.isda.org/documentation/",
        "https://www.isda.org/documents/",
        "https://www.isda.org/protocols/",
        "https://www.isda.org/guides/",
        "https://www.isda.org/bookstore/",
        "https://www.isda.org/books-and-protocols/",
        "https://www.isda.org/book-and-protocol-list/",
        "https://www.isda.org/legal-documentation/"
    ]
    
    # Check navigation on the main site first
    logger.info("Checking ISDA main site navigation for documentation links")
    try:
        main_response = requests.get("https://www.isda.org", headers={
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
        })
        main_response.raise_for_status()
        main_soup = BeautifulSoup(main_response.text, "html.parser")
        
        # Look for navigation menu items that might contain "documentation"
        nav_items = main_soup.find_all("a", href=True)
        for item in nav_items:
            text = item.text.strip().lower()
            href = item["href"]
            
            if any(keyword in text for keyword in ["document", "legal", "protocol", "book", "publication"]):
                # Add to potential URLs if it looks like a documentation link
                full_url = urljoin("https://www.isda.org", href)
                if full_url not in potential_urls:
                    potential_urls.append(full_url)
                    logger.info(f"Found potential documentation link in nav: {text} -> {full_url}")
    except Exception as e:
        logger.error(f"Error checking main site: {e}")
    
    # Test each potential URL
    documentation_pages = []
    
    for url in potential_urls:
        logger.info(f"Checking potential documentation URL: {url}")
        try:
            response = requests.get(url, headers={
                "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
            })
            
            # Skip if not found
            if response.status_code == 404:
                logger.info(f"URL not found: {url}")
                continue
                
            # Save a copy for inspection
            url_filename = url.split("/")[-2] if url.endswith("/") else url.split("/")[-1]
            if not url_filename:
                url_filename = "index"
            html_path = output_dir / f"isda_{url_filename}.html"
            with open(html_path, "w", encoding="utf-8") as f:
                f.write(response.text)
            
            # Check for PDF links
            soup = BeautifulSoup(response.text, "html.parser")
            pdf_links = soup.find_all("a", href=lambda h: h and h.lower().endswith(".pdf"))
            
            if pdf_links:
                logger.info(f"Found {len(pdf_links)} PDF links on {url}")
                documentation_pages.append({
                    "url": url,
                    "pdf_links_count": len(pdf_links),
                    "title": soup.title.text if soup.title else url
                })
            else:
                # Also check for document sections
                doc_sections = soup.find_all(class_=lambda c: c and "document" in str(c).lower())
                if doc_sections:
                    logger.info(f"Found {len(doc_sections)} document sections on {url}")
                    documentation_pages.append({
                        "url": url,
                        "document_sections_count": len(doc_sections),
                        "title": soup.title.text if soup.title else url
                    })
        
        except Exception as e:
            logger.error(f"Error checking {url}: {e}")
    
    # Check search page for specific documentation
    try:
        search_url = "https://www.isda.org/?s=documentation+protocol"
        logger.info(f"Checking search page: {search_url}")
        
        search_response = requests.get(search_url, headers={
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
        })
        
        search_soup = BeautifulSoup(search_response.text, "html.parser")
        
        # Save search results for inspection
        search_path = output_dir / "isda_search_results.html"
        with open(search_path, "w", encoding="utf-8") as f:
            f.write(search_response.text)
        
        # Look for search results with PDFs or documentation links
        search_results = search_soup.find_all("article")
        for result in search_results[:10]:  # Check first 10 results
            links = result.find_all("a", href=True)
            for link in links:
                href = link["href"]
                if "/document/" in href or "/protocol/" in href or "/bookstore/" in href:
                    documentation_pages.append({
                        "url": href if href.startswith("http") else urljoin("https://www.isda.org", href),
                        "source": "search",
                        "title": link.text.strip()
                    })
                    logger.info(f"Found potential documentation link in search: {link.text.strip()} -> {href}")
    
    except Exception as e:
        logger.error(f"Error checking search page: {e}")
    
    # Save results
    results_path = output_dir / "isda_documentation_sources.json"
    with open(results_path, "w") as f:
        json.dump(documentation_pages, f, indent=2)
    
    logger.info(f"Found {len(documentation_pages)} potential documentation pages, saved to {results_path}")
    return documentation_pages

# Run the documentation finder
documentation_pages = find_isda_documentation()

# Now create a specific collector for the documentation pages we found
class ISDADocumentationCollector:
    """Collector for ISDA documentation from identified sources"""
    
    def __init__(self, output_dir, sources=None):
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(exist_ok=True, parents=True)
        self.sources = sources or []
        self.logger = logging.getLogger("ISDADocCollector")
        self.logger.setLevel(logging.INFO)
        if not self.logger.handlers:
            handler = logging.StreamHandler()
            handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
            self.logger.addHandler(handler)
    
    def collect(self, max_sources=5):
        """Collect documentation from identified sources"""
        self.logger.info(f"Collecting ISDA documentation from {len(self.sources)} sources")
        
        documents = []
        session = requests.Session()
        session.headers.update({
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
        })
        
        # Process each source
        for i, source in enumerate(self.sources[:max_sources]):
            url = source.get("url")
            if not url:
                continue
                
            self.logger.info(f"Processing source {i+1}/{min(max_sources, len(self.sources))}: {url}")
            
            try:
                response = session.get(url)
                response.raise_for_status()
                
                soup = BeautifulSoup(response.text, "html.parser")
                
                # Find all PDF links
                pdf_links = []
                for a in soup.find_all("a", href=True):
                    href = a["href"]
                    if href.lower().endswith(".pdf"):
                        full_url = urljoin(url, href)
                        text = a.text.strip()
                        
                        # Skip if no text
                        if not text:
                            # Try to get text from parent or surrounding elements
                            if a.parent:
                                text = a.parent.text.strip()
                        
                        # Default text if still empty
                        if not text:
                            text = os.path.basename(href)
                        
                        pdf_links.append({
                            "url": full_url,
                            "text": text
                        })
                
                self.logger.info(f"Found {len(pdf_links)} PDF links on {url}")
                
                # Download each PDF
                for pdf in pdf_links:
                    pdf_url = pdf["url"]
                    text = pdf["text"]
                    
                    # Generate filename
                    safe_text = re.sub(r'[^\w\s-]', '', text).strip().lower()
                    filename = re.sub(r'[-\s]+', '-', safe_text)
                    
                    # Ensure filename has .pdf extension and isn't too long
                    if len(filename) > 50:
                        filename = filename[:50]
                    if not filename.endswith(".pdf"):
                        filename += ".pdf"
                    
                    output_path = self.output_dir / filename
                    
                    # Download the file
                    self.logger.info(f"Downloading PDF: {pdf_url} -> {output_path}")
                    try:
                        pdf_response = session.get(pdf_url, stream=True)
                        pdf_response.raise_for_status()
                        
                        with open(output_path, "wb") as f:
                            for chunk in pdf_response.iter_content(chunk_size=8192):
                                if chunk:
                                    f.write(chunk)
                        
                        self.logger.info(f"Successfully downloaded: {output_path}")
                        
                        # Add to documents list
                        documents.append({
                            "title": text,
                            "url": pdf_url,
                            "source_url": url,
                            "local_path": str(output_path),
                            "file_size": os.path.getsize(output_path)
                        })
                    except Exception as e:
                        self.logger.error(f"Error downloading PDF {pdf_url}: {e}")
                
                # Wait between sources
                if i < len(self.sources) - 1:
                    time.sleep(3)
            
            except Exception as e:
                self.logger.error(f"Error processing source {url}: {e}")
        
        # Save metadata
        metadata_path = self.output_dir / "isda_documents_metadata.json"
        with open(metadata_path, "w") as f:
            json.dump(documents, f, indent=2)
            
        self.logger.info(f"Collected {len(documents)} documents, metadata saved to {metadata_path}")
        return documents

# Run the documentation collector with our found sources
collector = ISDADocumentationCollector(output_dir, documentation_pages)
documents = collector.collect(max_sources=5)
print(f"Downloaded {len(documents)} ISDA documentation files")

2025-05-08 20:17:07,713 - ISDADocFinder - INFO - Checking ISDA main site navigation for documentation links
2025-05-08 20:17:07,713 - ISDADocFinder - INFO - Checking ISDA main site navigation for documentation links
2025-05-08 20:17:08,824 - ISDADocFinder - INFO - Found potential documentation link in nav: legal -> https://www.isda.org/category/legal/
2025-05-08 20:17:08,824 - ISDADocFinder - INFO - Found potential documentation link in nav: legal -> https://www.isda.org/category/legal/
2025-05-08 20:17:08,826 - ISDADocFinder - INFO - Found potential documentation link in nav: view all books -> https://www.isda.org/books
2025-05-08 20:17:08,826 - ISDADocFinder - INFO - Found potential documentation link in nav: view all books -> https://www.isda.org/books
2025-05-08 20:17:08,827 - ISDADocFinder - INFO - Checking potential documentation URL: https://www.isda.org/books/
2025-05-08 20:17:08,827 - ISDADocFinder - INFO - Checking potential documentation URL: https://www.isda.org/books/
2025

Downloaded 5 ISDA documentation files


In [None]:
text extraction and domain classification

In [18]:
from processors.text_extractor import TextExtractor
from processors.domain_classifier import DomainClassifier
from pathlib import Path
import json

# Set up paths
test_dir = Path("/workspace/data/test_isda")
pdf_files = list(test_dir.glob("*.pdf"))

# Load domain config
from config.domain_config import DOMAINS

# Initialize processors
extractor = TextExtractor()
classifier = DomainClassifier(DOMAINS)

# Process each PDF file
results = []
for pdf_file in pdf_files:
    print(f"Processing: {pdf_file.name}")
    try:
        # Extract text
        extraction_result = extractor.extract(pdf_file)
        if not extraction_result or 'text' not in extraction_result:
            print(f"⚠️ Failed to extract text from {pdf_file.name}")
            continue
            
        # Get text sample for classification
        text_sample = extraction_result['text'][:5000]  # First ~5000 chars for classification
        
        # Classify the document
        classification = classifier.classify(text_sample)
        
        # Print results
        print(f"🔍 Classification: {classification['domain']}")
        print(f"📊 Confidence: {classification['confidence']:.2f}")
        print(f"📄 Text preview: {text_sample[:150]}...")
        print("-" * 80)
        
        # Save results
        results.append({
            "filename": pdf_file.name,
            "domain": classification["domain"],
            "confidence": classification["confidence"],
            "text_length": len(extraction_result['text'])
        })
    except Exception as e:
        print(f"❌ Error processing {pdf_file.name}: {e}")

# Save classification results
with open(test_dir / "classification_results.json", "w") as f:
    json.dump(results, f, indent=2)

print(f"Classified {len(results)} out of {len(pdf_files)} documents")

2025-05-08 20:20:56,152 - DomainClassifier - INFO - Initialized classifier with 8 domains
2025-05-08 20:20:56,152 - DomainClassifier - INFO - Initialized classifier with 8 domains


Processing: isda-2018-benchmarks-supplement-protocol-agreement.pdf
🔍 Classification: crypto_derivatives
📊 Confidence: 0.00
📄 Text preview:  
 
1 
Copyright © 2018 by International Swaps and Derivatives Association, Inc.  
  
 
International Swaps and Derivatives Association,  
Inc. 
ISDA ...
--------------------------------------------------------------------------------
Processing: isda-2018-benchmarks-supplement-protocol-questionn.pdf
🔍 Classification: crypto_derivatives
📊 Confidence: 0.47
📄 Text preview:  
1 
 
Copyright © International Swaps and Derivatives Association, Inc. 
  
 
 
 
 
International Swaps and Derivatives Association,  
Inc. 
ISDA 201...
--------------------------------------------------------------------------------
Processing: isda-2021-sbs-top-up-protocol-annotated.pdf
🔍 Classification: crypto_derivatives
📊 Confidence: 0.54
📄 Text preview: I
nternational Swaps and Derivatives Association, Inc.  
ISDA 2021 SBS TOP- UP PROTOCOL  
(ANNOTATED PROTOCOL AND RELATED AT

In [None]:
Integration with main corpus builder

In [20]:
import os
import glob

# Search for command_interface.py in the workspace
command_interface_paths = glob.glob("/workspace/**/command_interface.py", recursive=True)
print("Found command_interface.py at:", command_interface_paths)

Found command_interface.py at: ['/workspace/data scraper scripts/command_interface.py']


In [None]:
import subprocess
import os
from pathlib import Path
import glob

# First, let's locate the command_interface.py file
def find_file(filename, search_path="/workspace"):
    """Find a file in the given search path"""
    result = []
    for root, dirs, files in os.walk(search_path):
        if filename in files:
            result.append(os.path.join(root, filename))
    return result

# Find command_interface.py
cmd_interface_paths = find_file("command_interface.py")
print(f"Found command_interface.py at: {cmd_interface_paths}")

# Use the first match if found
cmd_interface_path = cmd_interface_paths[0] if cmd_interface_paths else None

if not cmd_interface_path:
    print("Could not find command_interface.py. Testing with direct file path.")
    cmd_interface_path = "/workspace/data scraper scripts/command_interface.py"

# Check if the file exists
if os.path.exists(cmd_interface_path):
    print(f"Using command_interface.py at: {cmd_interface_path}")
else:
    print(f"Error: File does not exist at {cmd_interface_path}")

# Directory where command_interface.py is located
script_dir = os.path.dirname(cmd_interface_path)
print(f"Script directory: {script_dir}")

# Create directories if needed
os.makedirs("/workspace/data/corpus_1/crypto_derivatives", exist_ok=True)
os.makedirs("/workspace/data/corpus_1/risk_management", exist_ok=True)

# Change to the script directory and run from there
os.chdir(script_dir)
print(f"Changed working directory to: {os.getcwd()}")

# Run corpus builder for derivatives domain
print("\nRunning corpus builder for the derivatives domain...")
try:
    subprocess.run(
        ["python", os.path.basename(cmd_interface_path), "run_corpus_builder", 
         "--domains", "crypto_derivatives", 
         "--downloads", "10"],
        check=True
    )
except subprocess.CalledProcessError as e:
    print(f"Error running corpus builder: {e}")

# Use the curated list for more focused download
print("\nRunning corpus builder with curated list...")
try:
    # Check if curated list exists
    crypto_finance_list = os.path.join(script_dir, "crypto_finance_list.json")
    if os.path.exists(crypto_finance_list):
        print(f"Using curated list at: {crypto_finance_list}")
        subprocess.run(
            ["python", os.path.basename(cmd_interface_path), "run_corpus_builder", 
             "--curated-list", crypto_finance_list],
            check=True
        )
    else:
        print(f"Curated list not found at: {crypto_finance_list}")
        # Let's find it
        curated_lists = glob.glob("/workspace/**/crypto_finance_list.json", recursive=True)
        if curated_lists:
            print(f"Found curated list at: {curated_lists[0]}")
            subprocess.run(
                ["python", os.path.basename(cmd_interface_path), "run_corpus_builder", 
                 "--curated-list", curated_lists[0]],
                check=True
            )
        else:
            print("Could not find crypto_finance_list.json")
except subprocess.CalledProcessError as e:
    print(f"Error running corpus builder with curated list: {e}")

In [None]:
test anna's archive

In [9]:
import subprocess
import os
from pathlib import Path
import sys

# First, find test_client.py
test_client_paths = find_file("test_client.py")
print(f"Found test_client.py at: {test_client_paths}")

# Use the first match if found
test_client_path = test_client_paths[0] if test_client_paths else None

if not test_client_path:
    print("Could not find test_client.py. Testing with direct file path.")
    test_client_path = "/workspace/test_client.py"

# Check if the file exists
if os.path.exists(test_client_path):
    print(f"Using test_client.py at: {test_client_path}")
    
    # Directory where test_client.py is located
    client_dir = os.path.dirname(test_client_path)
    print(f"Client directory: {client_dir}")
    
    # Change to the client directory and run from there
    os.chdir(client_dir)
    print(f"Changed working directory to: {os.getcwd()}")
    
    # Run test_client.py
    print("\nTesting Anna's Archive client...")
    try:
        subprocess.run(["python", os.path.basename(test_client_path)], check=True)
    except subprocess.CalledProcessError as e:
        print(f"Error running test_client.py: {e}")
else:
    print(f"Error: File does not exist at {test_client_path}")

NameError: name 'find_file' is not defined

In [None]:
domain coverage

In [26]:
import os
from pathlib import Path
import json

# Set up paths
corpus_dir = Path("/workspace/data/corpus_1")
corpus_stats = {
    "domains": {},
    "total_files": 0,
    "total_size_mb": 0
}

# Evaluate each domain
for domain in os.listdir(corpus_dir):
    # Skip non-directory items
    domain_dir = corpus_dir / domain
    if not domain_dir.is_dir() or domain.endswith("_extracted"):
        continue
    
    # Count files
    pdf_files = list(domain_dir.glob("*.pdf"))
    meta_files = list(domain_dir.glob("*.pdf.meta"))
    
    # Calculate size
    domain_size_bytes = sum(os.path.getsize(f) for f in pdf_files if os.path.exists(f))
    domain_size_mb = domain_size_bytes / (1024 * 1024)
    
    # Add to stats
    corpus_stats["domains"][domain] = {
        "pdf_files": len(pdf_files),
        "meta_files": len(meta_files),
        "size_mb": round(domain_size_mb, 2)
    }
    
    corpus_stats["total_files"] += len(pdf_files)
    corpus_stats["total_size_mb"] += domain_size_mb

# Round total size
corpus_stats["total_size_mb"] = round(corpus_stats["total_size_mb"], 2)

# Print stats
print("Corpus Statistics:")
print(f"Total Files: {corpus_stats['total_files']}")
print(f"Total Size: {corpus_stats['total_size_mb']} MB")
print("\nDomain Coverage:")
for domain, stats in corpus_stats["domains"].items():
    print(f"  {domain}: {stats['pdf_files']} files, {stats['size_mb']} MB")

# Save stats
with open(corpus_dir / "corpus_stats.json", "w") as f:
    json.dump(corpus_stats, f, indent=2)

print(f"\nSaved statistics to {corpus_dir / 'corpus_stats.json'}")

Corpus Statistics:
Total Files: 6
Total Size: 50.8 MB

Domain Coverage:
  downloads: 0 files, 0.0 MB
  config: 0 files, 0.0 MB
  logs: 0 files, 0.0 MB
  batch_test: 0 files, 0.0 MB
  crypto_derivatives: 5 files, 42.37 MB
  high_frequency_trading: 1 files, 8.43 MB
  market_microstructure: 0 files, 0.0 MB
  risk_management: 0 files, 0.0 MB
  decentralized_finance: 0 files, 0.0 MB
  portfolio_construction: 0 files, 0.0 MB
  valuation_models: 0 files, 0.0 MB
  regulation_compliance: 0 files, 0.0 MB
  scidb_papers: 0 files, 0.0 MB
  temp: 0 files, 0.0 MB
  derivatives: 0 files, 0.0 MB
  test_batch: 0 files, 0.0 MB

Saved statistics to /workspace/data/corpus_1/corpus_stats.json


In [None]:
test extraction entire corpus

In [27]:
from processors.text_extractor import TextExtractor
from pathlib import Path
import os
import sys

# Add the project root to path if necessary
sys.path.append("/workspace")

# Set up paths
corpus_dir = Path("/workspace/data/corpus_1")
extractor = TextExtractor()

# Process each domain directory
for domain_dir in corpus_dir.iterdir():
    # Skip non-directories and extracted directories
    if not domain_dir.is_dir() or domain_dir.name.endswith("_extracted"):
        continue
    
    # Create extracted directory if it doesn't exist
    extracted_dir = corpus_dir / f"{domain_dir.name}_extracted"
    os.makedirs(extracted_dir, exist_ok=True)
    
    print(f"Processing domain: {domain_dir.name}")
    
    # Extract text from each PDF
    pdf_files = list(domain_dir.glob("*.pdf"))
    for pdf_file in pdf_files:
        print(f"  Extracting text from: {pdf_file.name}")
        
        # Output file path
        output_file = extracted_dir / f"{pdf_file.stem}.txt"
        
        # Skip if already processed
        if output_file.exists():
            print(f"    Already processed, skipping")
            continue
        
        try:
            # Extract text
            result = extractor.extract(pdf_file)
            
            if result and 'text' in result:
                # Write text to file
                with open(output_file, "w", encoding="utf-8") as f:
                    f.write(result["text"])
                
                print(f"    Saved extracted text to: {output_file}")
            else:
                print(f"    No text extracted")
        except Exception as e:
            print(f"    Error extracting text: {e}")

print("\nText extraction complete")

Processing domain: downloads
Processing domain: config
Processing domain: logs
Processing domain: batch_test
Processing domain: crypto_derivatives
  Extracting text from: English _en__ _pdf_ __lgli_lgrs_nexusstc_scihub_upload_zlib_ 13_2MB_ _ Book _non_598f73ea78c0aa8be43f264ba61e9e37.pdf
    Saved extracted text to: /workspace/data/corpus_1/crypto_derivatives_extracted/English _en__ _pdf_ __lgli_lgrs_nexusstc_scihub_upload_zlib_ 13_2MB_ _ Book _non_598f73ea78c0aa8be43f264ba61e9e37.txt
  Extracting text from: _pdf_ __upload_ 0_5MB_ _ Book _unknown__ upload_elsevier_elsevier-2023-2024_10_1_9217d59088dfda99ac5eb5332e35847f.pdf
    Saved extracted text to: /workspace/data/corpus_1/crypto_derivatives_extracted/_pdf_ __upload_ 0_5MB_ _ Book _unknown__ upload_elsevier_elsevier-2023-2024_10_1_9217d59088dfda99ac5eb5332e35847f.txt
  Extracting text from: English _en__ _pdf_ __lgli_lgrs_nexusstc_zlib_ 1_3MB_ _ Book _non-fiction__ nexu_eb35af40b3ea9c2d9558803514846ae3.pdf
    Saved extracted text 

In [None]:
domain balance analysis and plan 

In [28]:
from config.domain_config import DOMAINS
import json
from pathlib import Path

# Load corpus stats
corpus_dir = Path("/workspace/data/corpus_1")
stats_file = corpus_dir / "corpus_stats.json"

if stats_file.exists():
    with open(stats_file, "r") as f:
        corpus_stats = json.load(f)
else:
    # Create empty stats if file doesn't exist
    corpus_stats = {"domains": {}, "total_files": 0, "total_size_mb": 0}

# Check domain allocations
print("Domain Balance Analysis:")
print(f"{'Domain':<30} {'Current':<10} {'Target':<10} {'Status':<15}")
print("-" * 65)

# Calculate current total files
total_files = corpus_stats.get("total_files", 0)
target_total = 1200  # From the original allocation

domain_status = []
for domain, config in DOMAINS.items():
    # Get allocation percentage
    allocation = config.get("allocation", 0)
    
    # Calculate target files
    target_files = int(allocation * target_total)
    
    # Get current files
    current_files = corpus_stats.get("domains", {}).get(domain, {}).get("pdf_files", 0)
    
    # Determine status
    if current_files == 0:
        status = "Not started"
    elif current_files < target_files * 0.25:
        status = "Needs work"
    elif current_files < target_files * 0.75:
        status = "In progress"
    elif current_files < target_files:
        status = "Nearly done"
    else:
        status = "Complete"
    
    # Print status
    print(f"{domain:<30} {current_files:<10} {target_files:<10} {status:<15}")
    
    # Add to status list
    domain_status.append({
        "domain": domain,
        "current_files": current_files,
        "target_files": target_files,
        "status": status,
        "percentage": round(current_files / target_files * 100 if target_files > 0 else 0, 2)
    })

# Sort domains by percentage complete
domain_status.sort(key=lambda x: x["percentage"])

print("\nRecommended Domain Focus (in order of priority):")
for domain in domain_status[:3]:
    print(f"1. {domain['domain']} - {domain['percentage']}% complete, need {domain['target_files'] - domain['current_files']} more files")
    # List search terms
    search_terms = DOMAINS.get(domain["domain"], {}).get("search_terms", [])
    if search_terms:
        print("   Suggested search terms:")
        for term in search_terms[:3]:
            print(f"   - {term}")

Domain Balance Analysis:
Domain                         Current    Target     Status         
-----------------------------------------------------------------
crypto_derivatives             5          240        Needs work     
high_frequency_trading         1          180        Needs work     
market_microstructure          0          180        Not started    
risk_management                0          180        Not started    
decentralized_finance          0          144        Not started    
portfolio_construction         0          120        Not started    
valuation_models               0          96         Not started    
regulation_compliance          0          60         Not started    

Recommended Domain Focus (in order of priority):
1. market_microstructure - 0.0% complete, need 180 more files
   Suggested search terms:
   - crypto market microstructure
   - order book dynamics
   - liquidity provision blockchain
1. risk_management - 0.0% complete, need 180 more file

In [3]:
import os
print("Current working directory:", os.getcwd())
print("Exists:", os.path.exists("./data/corpus_1/crypto_derivatives"))


Current working directory: /workspace
Exists: True


In [4]:
import sys
import os
from pathlib import Path

# Add project directories to path
current_dir = os.getcwd()
sys.path.append(current_dir)

# Locate CookieAuthClient.py
client_paths = []
for root, dirs, files in os.walk(current_dir):
    if 'CookieAuthClient.py' in files:
        client_paths.append(os.path.join(root, 'CookieAuthClient.py'))
        sys.path.append(root)

if client_paths:
    print(f"Found CookieAuthClient.py at: {client_paths[0]}")
else:
    print("Could not find CookieAuthClient.py. Please check your project structure.")
    exit(1)

from CookieAuthClient import CookieAuthClient
from dotenv import load_dotenv

# Load environment variables for Anna's Archive cookie
load_dotenv('/workspace/notebooks/.env')  # Adjust path if needed
account_cookie = os.getenv("AA_ACCOUNT_COOKIE")

if not account_cookie:
    print("Error: AA_ACCOUNT_COOKIE not found in .env file")
    exit(1)

client = CookieAuthClient(download_dir="./data/corpus_1/crypto_derivatives", account_cookie=account_cookie)
print("Client initialized")

# Canonical file names
canonical_files = [
    {
        "query": "Mastering Bitcoin Unlocking Digital Cryptocurrencies Antonopoulos",
        "pdf_name": "crypto_derivatives_Mastering_Bitcoin_-_Unlocking_Digital_Cryptocurrencies_by_Andreas_Antonopoulos_2017.pdf",
        "txt_name": "crypto_derivatives_Mastering_Bitcoin_-_Unlocking_Digital_Cryptocurrencies_by_Andreas_Antonopoulos_2017.txt"
    },
    {
        "query": "Mastering Bitcoin Antonopoulos",
        "pdf_name": "crypto_derivatives_Mastering_Bitcoin_antonopoulos.pdf",
        "txt_name": "crypto_derivatives_Mastering_Bitcoin_antonopoulos.txt"
    }
]

for book in canonical_files:
    print(f"\nSearching for: {book['query']}")
    results = client.search(book['query'])
    print(f"Found {len(results)} results")
    for i, result in enumerate(results[:5]):
        print(f"{i+1}. {result.get('title', 'Unknown Title')} - {result.get('file_size', 'Unknown Size')}")

    if results:
        best_result = client.select_best_result(results)
        if best_result:
            print(f"Selected for download: {best_result.get('title')} ({best_result.get('file_size')})")
            # Download to a temp file, then move/rename to canonical name
            temp_pdf = client.download_file(best_result.get('md5'))
            if temp_pdf:
                canonical_pdf_path = Path("./data/corpus_1/crypto_derivatives") / book['pdf_name']
                os.replace(temp_pdf, canonical_pdf_path)
                print(f"PDF saved to: {canonical_pdf_path}")

                # Extract text
                try:
                    from processors.text_extractor import TextExtractor
                    extractor = TextExtractor()
                    extracted = extractor.extract(str(canonical_pdf_path))
                    if extracted and 'text' in extracted:
                        extracted_dir = Path("./data/corpus_1/crypto_derivatives_extracted")
                        extracted_dir.mkdir(parents=True, exist_ok=True)
                        extracted_path = extracted_dir / book['txt_name']
                        with open(extracted_path, 'w', encoding='utf-8') as f:
                            f.write(extracted['text'])
                        print(f"Extracted text saved to: {extracted_path}")
                        print(f"Tokens: {len(extracted['text'].split())}")
                    else:
                        print("Text extraction failed")
                except Exception as e:
                    print(f"Error in extraction: {e}")
            else:
                print("Download failed")
        else:
            print("Could not select best result")
    else:
        print("No results found for this book")

print("\nReminder: Update your metadata to reference the new PDF and .txt files, and restore any priority tags if needed.")

Found CookieAuthClient.py at: /workspace/data scraper scripts/CookieAuthClient.py
Set aa_account_id2 authentication cookie
✅ Successfully authenticated with account cookie
Client initialized

Searching for: Mastering Bitcoin Unlocking Digital Cryptocurrencies Antonopoulos
Searching for: 'Mastering Bitcoin Unlocking Digital Cryptocurrencies Antonopoulos'
Searching for PDFs...
Found 1 PDF results in initial search
Found 1 results
1. English [en], .pdf, 🚀/lgli/lgrs/nexusstc/zlib, 11.1MB, 📘 Book (non-fiction), nexusstc/Mastering bitcoin: unlocking digital cryptocurrencies/0315a333ab8df0d9b71dfe56f50dd431.pdf
Mastering Bitcoin : unlocking digital crypto-currencies
O'Reilly Media, Incorporated, 1, 2015
Andreas M. Antonopoulos
base score: 11065.0, final score: 1.6761823 - 11.1MB
Selecting best result from 1 options...

Top results by quality score:
Result #1 (Score: 35):
  Title: English [en], .pdf, 🚀/lgli/lgrs/nexusstc/zlib, 11.1MB, 📘 Book (non-fiction), nexusstc/Mastering bitcoin: unlocking