In [1]:
# Load environment variables from .env file
from dotenv import load_dotenv
load_dotenv()

# Import the XenoCanto service
import sys
sys.path.append('..')
from services.xeno_canto_service import XenoCantoService

# Initialize the service (will read API key from environment)
service = XenoCantoService()

# Make a simple test API call - get 1 recording of American Robin
print("Testing Xeno-canto API connection...")
result = service.search_recordings(
    species_common_name="American Robin",
    quality="A",  # High quality recordings only
    per_page=50,
    page=1
)

if result:
    print(f"\n‚úÖ API connection successful!")
    print(f"Total recordings found: {result.get('numRecordings', 0)}")
    print(f"Total species: {result.get('numSpecies', 0)}")
    
    if result.get('recordings'):
        recording = result['recordings'][0]
        print(f"\nüìä Sample Recording:")
        print(f"  ID: {recording.get('id')}")
        print(f"  Species: {recording.get('gen')} {recording.get('sp')}")
        print(f"  Common Name: {recording.get('en')}")
        print(f"  Quality: {recording.get('q')}")
        print(f"  Location: {recording.get('loc', 'N/A')}")
        print(f"  Country: {recording.get('cnt', 'N/A')}")
        print(f"  Duration: {recording.get('length', 'N/A')}")
        print(f"  Audio URL: {recording.get('file', 'N/A')}")
    else:
        print("\n‚ö†Ô∏è No recordings found in response")
else:
    print("\n‚ùå API request failed")


Testing Xeno-canto API connection...

‚úÖ API connection successful!
Total recordings found: 314
Total species: 1

üìä Sample Recording:
  ID: 1068477
  Species: Turdus migratorius
  Common Name: American Robin
  Quality: A
  Location: Swan Lake Park, Regional Municipality of York, Ontario
  Country: Canada
  Duration: 0:56
  Audio URL: https://xeno-canto.org/1068477/download


In [2]:
# Import additional services and libraries
import json
import pandas as pd
import requests
from pathlib import Path
import time
from typing import Dict, List, Optional
import tempfile
import os

from services.s3_manager import S3Manager

# Load species configuration
with open('../config/species_config.json', 'r') as f:
    species_config = json.load(f)

georgia_birds = species_config['georgia_birds']
print(f"Loaded {len(georgia_birds)} Georgia bird species:")
for bird in georgia_birds:
    print(f"  - {bird['common_name']} ({bird['scientific_name']})")

# Initialize S3 Manager
s3_manager = S3Manager(create_bucket_if_not_exists=True)
print(f"\n‚úÖ S3 Manager initialized for bucket: {s3_manager.bucket_name}")



Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
        
  import pandas as pd


Loaded 8 Georgia bird species:
  - Northern Cardinal (Cardinalis cardinalis)
  - Carolina Wren (Thryothorus ludovicianus)
  - Blue Jay (Cyanocitta cristata)
  - American Robin (Turdus migratorius)
  - Mourning Dove (Zenaida macroura)
  - Tufted Titmouse (Baeolophus bicolor)
  - Carolina Chickadee (Poecile carolinensis)
  - Eastern Bluebird (Sialia sialis)

‚úÖ S3 Manager initialized for bucket: bird-classification-data


In [3]:
# Helper Functions

def download_audio_file(url: str, local_path: str, max_retries: int = 3, timeout: int = 30) -> bool:
    """
    Download an audio file with retry logic.
    
    Args:
        url: URL of the audio file to download
        local_path: Local file path to save the download
        max_retries: Maximum number of retry attempts
        timeout: Request timeout in seconds
    
    Returns:
        True if download successful, False otherwise
    """
    # Fix URL format - handle various malformed URL patterns
    # Check for well-formed URLs first
    if url.startswith('https://') or url.startswith('http://'):
        # Already well-formed, no changes needed
        pass
    elif url.startswith('https:https://'):
        url = url.replace('https:https://', 'https://')
    elif url.startswith('https:////'):
        url = url.replace('https:////', 'https://')
    elif url.startswith('https:'):
        # Missing second slash: https:domain.com -> https://domain.com
        url = url.replace('https:', 'https://', 1)  # Replace only first occurrence
    elif url.startswith('//'):
        # Protocol-relative URL - add https:
        url = 'https:' + url
    else:
        # Assume it needs https:// prefix
        url = f"https://{url}"
    
    # Create directory if it doesn't exist
    Path(local_path).parent.mkdir(parents=True, exist_ok=True)
    
    for attempt in range(max_retries):
        try:
            response = requests.get(url, timeout=timeout, stream=True)
            response.raise_for_status()
            
            with open(local_path, 'wb') as f:
                for chunk in response.iter_content(chunk_size=8192):
                    f.write(chunk)
            
            return True
        except Exception as e:
            if attempt < max_retries - 1:
                wait_time = 2 ** attempt  # Exponential backoff
                print(f"  Retry {attempt + 1}/{max_retries} after {wait_time}s...")
                time.sleep(wait_time)
            else:
                print(f"  Failed to download after {max_retries} attempts: {e}")
                return False
    
    return False

def normalize_metadata(recording_data: Dict, species_common_name: str, species_scientific_name: str, s3_uri: Optional[str] = None) -> Dict:
    """
    Normalize recording metadata to standardized format.
    
    Args:
        recording_data: Raw recording data from Xeno-canto API
        species_common_name: Common name of the species
        species_scientific_name: Scientific name of the species
        s3_uri: S3 URI of the uploaded audio file (if available)
    
    Returns:
        Normalized metadata dictionary
    """
    # Parse duration (format: "M:SS" or "MM:SS")
    duration_str = recording_data.get('length', '0:00')
    duration_seconds = 0
    try:
        parts = duration_str.split(':')
        if len(parts) == 2:
            duration_seconds = int(parts[0]) * 60 + int(parts[1])
    except:
        pass
    
    # Build Xeno-canto URL
    recording_id = recording_data.get('id', '')
    xeno_canto_url = f"https://xeno-canto.org/{recording_id}" if recording_id else ""
    
    return {
        'recording_id': recording_id,
        'species_common_name': species_common_name,
        'species_scientific_name': species_scientific_name,
        'audio_s3_uri': s3_uri or '',
        'recording_quality': recording_data.get('q', ''),
        'duration_seconds': duration_seconds,
        'duration_formatted': duration_str,
        'location': recording_data.get('loc', ''),
        'country': recording_data.get('cnt', ''),
        'date': recording_data.get('date', ''),
        'xeno_canto_url': xeno_canto_url,
        'genus': recording_data.get('gen', ''),
        'species': recording_data.get('sp', ''),
        'file_url': recording_data.get('file', '')
    }

print("‚úÖ Helper functions defined")



‚úÖ Helper functions defined


In [4]:
# Main Acquisition Pipeline

def process_species(
    species_common_name: str,
    species_scientific_name: str,
    max_recordings: int = 100,
    quality_filter: str = "A",
    temp_dir: str = "/tmp/bird_recordings"
) -> List[Dict]:
    """
    Process a single species: query API, download recordings, upload to S3.
    
    Args:
        species_common_name: Common name of the species
        species_scientific_name: Scientific name of the species
        max_recordings: Maximum number of recordings to process
        quality_filter: Quality filter (e.g., "A" for high quality)
        temp_dir: Temporary directory for downloads
    
    Returns:
        List of normalized metadata dictionaries
    """
    print(f"\n{'='*60}")
    print(f"Processing: {species_common_name} ({species_scientific_name})")
    print(f"{'='*60}")
    
    # Create temp directory for this species
    species_temp_dir = Path(temp_dir) / species_common_name.replace(' ', '_')
    species_temp_dir.mkdir(parents=True, exist_ok=True)
    
    # Query API for recordings
    print(f"Querying Xeno-canto API...")
    recordings = service.get_all_recordings_for_species(
        species_common_name=species_common_name,
        quality=quality_filter,
        max_recordings=max_recordings,
        per_page=100
    )
    
    if not recordings:
        print(f"‚ö†Ô∏è No recordings found for {species_common_name}")
        return []
    
    print(f"Found {len(recordings)} recordings")
    
    # Process each recording
    metadata_list = []
    successful_downloads = 0
    failed_downloads = 0
    
    for idx, recording in enumerate(recordings, 1):
        recording_id = recording.get('id', 'unknown')
        file_url = recording.get('file', '')
        
        if not file_url:
            print(f"  [{idx}/{len(recordings)}] Skipping {recording_id}: No file URL")
            failed_downloads += 1
            continue
        
        # Fix URL format - handle various API response formats
        if file_url.startswith('https://'):
            audio_url = file_url
        elif file_url.startswith('//'):
            # Protocol-relative URL - add https:
            audio_url = 'https:' + file_url
        elif file_url.startswith('http://'):
            audio_url = file_url
        else:
            # Assume it's a relative path or domain - prepend https://
            audio_url = f"https://{file_url}"
        
        # Download audio file
        local_file = species_temp_dir / f"{recording_id}.mp3"
        print(f"  [{idx}/{len(recordings)}] Downloading {recording_id}...", end=' ')
        
        if download_audio_file(audio_url, str(local_file)):
            # Upload to S3
            s3_uri = s3_manager.upload_audio_file(
                str(local_file),
                species_common_name,
                recording_id
            )
            
            if s3_uri:
                # Normalize metadata
                metadata = normalize_metadata(
                    recording,
                    species_common_name,
                    species_scientific_name,
                    s3_uri
                )
                metadata_list.append(metadata)
                successful_downloads += 1
                print(f"‚úÖ Uploaded to S3")
            else:
                print(f"‚ùå S3 upload failed")
                failed_downloads += 1
            
            # Clean up local file
            local_file.unlink()
        else:
            failed_downloads += 1
            print(f"‚ùå Download failed")
    
    print(f"\nSummary for {species_common_name}:")
    print(f"  ‚úÖ Successful: {successful_downloads}")
    print(f"  ‚ùå Failed: {failed_downloads}")
    print(f"  üìä Total metadata entries: {len(metadata_list)}")
    
    return metadata_list

print("‚úÖ Process function defined")



‚úÖ Process function defined


In [6]:
# Run acquisition pipeline for all species

all_metadata = []
errors = []

# Configuration
MAX_RECORDINGS_PER_SPECIES = 100
QUALITY_FILTER = "A"  # High quality recordings only

print(f"Starting acquisition pipeline...")
print(f"Target: {MAX_RECORDINGS_PER_SPECIES} recordings per species")
print(f"Quality filter: {QUALITY_FILTER}")
print(f"Total species: {len(georgia_birds)}\n")

for bird in georgia_birds:
    try:
        metadata = process_species(
            species_common_name=bird['common_name'],
            species_scientific_name=bird['scientific_name'],
            max_recordings=MAX_RECORDINGS_PER_SPECIES,
            quality_filter=QUALITY_FILTER
        )
        all_metadata.extend(metadata)
    except Exception as e:
        error_msg = f"Error processing {bird['common_name']}: {e}"
        print(f"‚ùå {error_msg}")
        errors.append(error_msg)

print(f"\n{'='*60}")
print(f"Pipeline Complete!")
print(f"{'='*60}")
print(f"Total recordings processed: {len(all_metadata)}")
print(f"Total errors: {len(errors)}")

if errors:
    print(f"\nErrors encountered:")
    for error in errors:
        print(f"  - {error}")



Starting acquisition pipeline...
Target: 100 recordings per species
Quality filter: A
Total species: 8


Processing: Northern Cardinal (Cardinalis cardinalis)
Querying Xeno-canto API...
Found 100 recordings
  [1/100] Downloading 1070756... ‚úÖ Uploaded to S3
  [2/100] Downloading 1070366... ‚úÖ Uploaded to S3
  [3/100] Downloading 1027842... ‚úÖ Uploaded to S3
  [4/100] Downloading 1012918... ‚úÖ Uploaded to S3
  [5/100] Downloading 1012917... ‚úÖ Uploaded to S3
  [6/100] Downloading 1012916... ‚úÖ Uploaded to S3
  [7/100] Downloading 1012495... ‚úÖ Uploaded to S3
  [8/100] Downloading 1012494... ‚úÖ Uploaded to S3
  [9/100] Downloading 1012493... ‚úÖ Uploaded to S3
  [10/100] Downloading 1012492... ‚úÖ Uploaded to S3
  [11/100] Downloading 1012491... ‚úÖ Uploaded to S3
  [12/100] Downloading 1012142... ‚úÖ Uploaded to S3
  [13/100] Downloading 1010649... ‚úÖ Uploaded to S3
  [14/100] Downloading 1010609... ‚úÖ Uploaded to S3
  [15/100] Downloading 1006772... ‚úÖ Uploaded to S3
  [16/1

In [7]:
# Generate and upload metadata files

if all_metadata:
    # Create DataFrame
    df = pd.DataFrame(all_metadata)
    
    # Save to temporary files
    temp_dir = Path("/tmp/bird_metadata")
    temp_dir.mkdir(exist_ok=True)
    
    csv_path = temp_dir / "recordings_metadata.csv"
    json_path = temp_dir / "recordings_metadata.json"
    
    # Save CSV
    df.to_csv(csv_path, index=False)
    print(f"‚úÖ Created CSV metadata: {csv_path}")
    
    # Save JSON
    df.to_json(json_path, orient='records', indent=2)
    print(f"‚úÖ Created JSON metadata: {json_path}")
    
    # Upload to S3
    csv_s3_uri = s3_manager.upload_metadata_file(str(csv_path), "recordings_metadata")
    json_s3_uri = s3_manager.upload_metadata_file(str(json_path), "recordings_metadata")
    
    if csv_s3_uri:
        print(f"‚úÖ Uploaded CSV to S3: {csv_s3_uri}")
    if json_s3_uri:
        print(f"‚úÖ Uploaded JSON to S3: {json_s3_uri}")
    
    # Display summary statistics
    print(f"\n{'='*60}")
    print(f"Dataset Summary")
    print(f"{'='*60}")
    print(f"Total recordings: {len(df)}")
    print(f"\nRecordings per species:")
    species_counts = df['species_common_name'].value_counts()
    for species, count in species_counts.items():
        print(f"  {species}: {count}")
    
    print(f"\nQuality distribution:")
    quality_counts = df['recording_quality'].value_counts()
    for quality, count in quality_counts.items():
        print(f"  Quality {quality}: {count}")
    
    print(f"\nTotal duration: {df['duration_seconds'].sum() / 60:.1f} minutes")
    print(f"Average duration: {df['duration_seconds'].mean():.1f} seconds")
    
    # Display first few rows
    print(f"\nSample metadata:")
    print(df.head().to_string())
else:
    print("‚ö†Ô∏è No metadata to process")



‚úÖ Created CSV metadata: /tmp/bird_metadata/recordings_metadata.csv
‚úÖ Created JSON metadata: /tmp/bird_metadata/recordings_metadata.json
‚úÖ Uploaded CSV to S3: s3://bird-classification-data/metadata/recordings_metadata.csv
‚úÖ Uploaded JSON to S3: s3://bird-classification-data/metadata/recordings_metadata.json

Dataset Summary
Total recordings: 692

Recordings per species:
  Northern Cardinal: 100
  Blue Jay: 100
  American Robin: 100
  Tufted Titmouse: 100
  Carolina Wren: 99
  Carolina Chickadee: 78
  Eastern Bluebird: 61
  Mourning Dove: 54

Quality distribution:
  Quality A: 692

Total duration: 711.0 minutes
Average duration: 61.6 seconds

Sample metadata:
  recording_id species_common_name species_scientific_name                                                           audio_s3_uri recording_quality  duration_seconds duration_formatted                                                          location        country        date                  xeno_canto_url       genus     