In [None]:
import re
import os
import json
import requests
from datetime import datetime
from typing import List, Dict, Tuple, Optional

In [None]:
class GitHubDataFetcher:
    """Fetches and filters data files from GitHub repository."""
    
    def __init__(self, repo_owner: str = "owlmaps", repo_name: str = "map-data", data_path: str = "data"):
        """
        Initialize the fetcher.
        
        Args:
            repo_owner: GitHub repository owner
            repo_name: GitHub repository name
            data_path: Path to data directory in the repository
        """
        self.repo_owner = repo_owner
        self.repo_name = repo_name
        self.data_path = data_path
        self.api_url = f"https://api.github.com/repos/{repo_owner}/{repo_name}/contents/{data_path}"
        self.raw_base_url = f"https://raw.githubusercontent.com/{repo_owner}/{repo_name}/master/{data_path}/"
        self.date_pattern = re.compile(r'^(\d{8})\.json$')
    
    def check_accessibility(self) -> bool:
        """
        Check if the GitHub API is accessible.
        
        Returns:
            bool: True if accessible, False otherwise
        """
        print("\n" + "=" * 80)
        print("STEP 1: Checking GitHub API accessibility...")
        print("=" * 80)
        
        try:
            response = requests.get(self.api_url, timeout=10)
            if response.status_code == 200:
                print(f"GitHub API is accessible (Status: {response.status_code})")
                print(f"  URL: {self.api_url}")
                return True
            else:
                print(f"GitHub API returned status code: {response.status_code}")
                print(f"  Response: {response.text[:200]}")
                return False
        except requests.exceptions.ConnectionError as e:
            print("Connection error: Unable to reach GitHub API")
            print(f"  Error: {str(e)[:200]}")
            return False
        except requests.exceptions.Timeout:
            print("equest timed out after 10 seconds")
            return False
        except requests.exceptions.RequestException as e:
            print(f"Request failed: {e}")
            return False
    
    def fetch_directory_contents(self) -> Optional[List[Dict]]:
        """
        Fetch directory contents from GitHub API.
        
        Returns:
            List of file information dictionaries, or None if failed
        """
        print("\n" + "=" * 80)
        print("STEP 2: Fetching directory contents...")
        print("=" * 80)
        
        try:
            response = requests.get(self.api_url, timeout=10)
            response.raise_for_status()
            
            files_data = response.json()
            print("Successfully fetched directory contents")
            print(f"  Total items found: {len(files_data)}")
            
            return files_data
        except requests.exceptions.RequestException as e:
            print(f"Failed to fetch directory: {e}")
            return None
        except json.JSONDecodeError as e:
            print(f"Failed to parse JSON response: {e}")
            return None
    
    def filter_and_parse_files(self, files_data: List[Dict]) -> List[Dict]:
        """
        Filter JSON files with date pattern and parse their dates.
        
        Args:
            files_data: List of file information from GitHub API
            
        Returns:
            List of parsed file information with dates
        """
        print("\n" + "=" * 80)
        print("STEP 3: Filtering and parsing JSON files...")
        print("=" * 80)
        
        json_files = []
        skipped_files = []
        
        for item in files_data:
            if item.get('type') == 'file':
                filename = item.get('name', '')
                match = self.date_pattern.match(filename)
                
                if match:
                    date_str = match.group(1)
                    try:
                        # Parse date to ensure it's valid
                        file_date = datetime.strptime(date_str, '%Y%m%d')
                        json_files.append({
                            'filename': filename,
                            'date': file_date,
                            'date_str': date_str,
                            'download_url': item.get('download_url'),
                            'size': item.get('size', 0),
                            'sha': item.get('sha', '')
                        })
                    except ValueError:
                        skipped_files.append(filename)
        
        print(f"Found {len(json_files)} JSON files with valid date names")
        if skipped_files:
            print(f"Skipped {len(skipped_files)} files with invalid dates:")
            for fname in skipped_files[:5]:
                print(f"    - {fname}")
        
        return json_files
    
    def sort_by_date(self, json_files: List[Dict], descending: bool = True) -> List[Dict]:
        """
        Sort files by date.
        
        Args:
            json_files: List of file information with dates
            descending: If True, sort newest first; if False, oldest first
            
        Returns:
            Sorted list of files
        """
        print("\n" + "=" * 80)
        print("STEP 4: Sorting files by date...")
        print("=" * 80)
        
        sorted_files = sorted(json_files, key=lambda x: x['date'], reverse=descending)
        
        order = "newest to oldest" if descending else "oldest to newest"
        print(f"Sorted {len(sorted_files)} files ({order})")
        
        return sorted_files
    
    def get_recent_files(self, sorted_files: List[Dict], n: int = 5) -> List[Dict]:
        """
        Get the N most recent files.
        
        Args:
            sorted_files: List of sorted file information
            n: Number of recent files to retrieve
            
        Returns:
            List of N most recent files
        """
        print("\n" + "=" * 80)
        print(f"STEP 5: Selecting {n} most recent files...")
        print("=" * 80)
        
        recent_files = sorted_files[:n]
        
        print(f"✓ Selected {len(recent_files)} files:")
        print(f"\n{'#':<4} {'Date':<12} {'Filename':<20} {'Size (KB)':<12}")
        print("-" * 80)
        
        for idx, file_info in enumerate(recent_files, 1):
            size_kb = file_info['size'] / 1024
            print(f"{idx:<4} {file_info['date'].strftime('%Y-%m-%d'):<12} {file_info['filename']:<20} {size_kb:>10.1f}")
        
        return recent_files
    
    def display_summary(self, all_files: List[Dict]):
        """Display summary statistics."""
        print("\n" + "=" * 80)
        print("STEP 6: Summary Statistics")
        print("=" * 80)
        
        if not all_files:
            print("No files to summarize")
            return
        
        oldest = all_files[-1]
        newest = all_files[0]
        total_size_mb = sum(f['size'] for f in all_files) / (1024 * 1024)
        avg_size_mb = total_size_mb / len(all_files)
        
        print(f"Total JSON files: {len(all_files)}")
        print(f"Date range: {oldest['date'].strftime('%Y-%m-%d')} to {newest['date'].strftime('%Y-%m-%d')}")
        print(f"Days covered: {(newest['date'] - oldest['date']).days}")
        print(f"Total size: {total_size_mb:.2f} MB")
        print(f"Average file size: {avg_size_mb:.2f} MB")
    
    def fetch_and_filter(self, num_recent: int = 5) -> Optional[List[Dict]]:
        """
        Main method to fetch and filter recent files.
        
        Args:
            num_recent: Number of most recent files to return
            
        Returns:
            List of most recent files, or None if failed
        """
        print("\n" + "=" * 80)
        print("GITHUB DATA FETCHER - owlmaps/map-data")
        print("=" * 80)
        print(f"Target: {self.repo_owner}/{self.repo_name}/{self.data_path}")
        print(f"Fetching {num_recent} most recent files...")
        
        # Step 1: Check accessibility
        if not self.check_accessibility():
            return None
        
        # Step 2: Fetch directory contents
        files_data = self.fetch_directory_contents()
        if files_data is None:
            return None
        
        # Step 3: Filter and parse files
        json_files = self.filter_and_parse_files(files_data)
        if not json_files:
            print("\nNo valid JSON files found!")
            return None
        
        # Step 4: Sort by date
        sorted_files = self.sort_by_date(json_files, descending=True)
        
        # Step 5: Get recent files
        recent_files = self.get_recent_files(sorted_files, num_recent)
        
        # Step 6: Display summary
        self.display_summary(sorted_files)
        
        print("\n" + "=" * 80)
        print("FETCH COMPLETE - Files ready for processing")
        print("=" * 80)
        
        return recent_files

In [None]:
class DataValidator:
    """Validates JSON data structure and business rules."""
    
    def validate_structure(self, data: dict) -> List[str]:
        """Validate the basic structure of the data."""
        errors = []
        
        # Check required top-level fields
        required_fields = ['areas', 'areas_ua', 'frontline', 'geos', 'unit_count', 'units']
        for field in required_fields:
            if field not in data:
                errors.append(f"Missing required field: {field}")
        
        # Validate areas
        if 'areas' in data:
            if not isinstance(data['areas'], list):
                errors.append("'areas' must be an array")
            else:
                for idx, area in enumerate(data['areas']):
                    if not isinstance(area, list):
                        errors.append(f"areas[{idx}] must be an array of coordinates")
                    elif len(area) < 3:
                        errors.append(f"areas[{idx}] must have at least 3 coordinate pairs")
        
        # Validate geos
        if 'geos' in data:
            if not isinstance(data['geos'], dict):
                errors.append("'geos' must be an object")
            else:
                if 'ru' not in data['geos']:
                    errors.append("'geos' must have 'ru' field")
                if 'ua' not in data['geos']:
                    errors.append("'geos' must have 'ua' field")
                
                for side in ['ru', 'ua']:
                    if side in data['geos'] and isinstance(data['geos'][side], list):
                        for idx, event in enumerate(data['geos'][side]):
                            if not isinstance(event, dict):
                                errors.append(f"geos.{side}[{idx}] must be an object")
                            else:
                                if 'c' not in event:
                                    errors.append(f"geos.{side}[{idx}] missing 'c' (coordinates)")
                                if 'd' not in event:
                                    errors.append(f"geos.{side}[{idx}] missing 'd' (description)")
        
        # Validate unit_count
        if 'unit_count' in data:
            if not isinstance(data['unit_count'], dict):
                errors.append("'unit_count' must be an object")
            else:
                for side in ['ru', 'ua']:
                    if side not in data['unit_count']:
                        errors.append(f"unit_count.{side} is required")
                    elif not isinstance(data['unit_count'][side], int):
                        errors.append(f"unit_count.{side} must be an integer")
        
        # Validate units
        if 'units' in data:
            if not isinstance(data['units'], dict):
                errors.append("'units' must be an object")
            else:
                for side in ['ru', 'ua']:
                    if side not in data['units']:
                        errors.append(f"units.{side} is required")
                    elif not isinstance(data['units'][side], list):
                        errors.append(f"units.{side} must be an array")
        
        return errors
    
    def validate_business_rules(self, data: dict) -> List[Dict]:
        """Validate additional business rules."""
        warnings = []
        
        # Check coordinate ranges
        def check_coordinates(coords, context):
            if len(coords) >= 2:
                lon, lat = coords[0], coords[1]
                if not (-180 <= lon <= 180):
                    warnings.append({
                        'type': 'INVALID_COORDINATE',
                        'message': f"Invalid longitude {lon} in {context}"
                    })
                if not (-90 <= lat <= 90):
                    warnings.append({
                        'type': 'INVALID_COORDINATE',
                        'message': f"Invalid latitude {lat} in {context}"
                    })
        
        # Validate geos coordinates
        if 'geos' in data:
            for side in ['ru', 'ua']:
                if side in data['geos']:
                    for idx, event in enumerate(data['geos'][side]):
                        if 'c' in event and isinstance(event['c'], list):
                            check_coordinates(event['c'], f"geos.{side}[{idx}]")
        
        # Validate unit coordinates
        if 'units' in data:
            for side in ['ru', 'ua']:
                if side in data['units']:
                    for idx, unit in enumerate(data['units'][side]):
                        if isinstance(unit, list) and len(unit) >= 2 and isinstance(unit[1], list):
                            check_coordinates(unit[1], f"units.{side}[{idx}]")
        
        return warnings
    
    def validate_file(self, data: dict, filename: str) -> Tuple[bool, List[str], List[Dict]]:
        """
        Validate a single file.
        
        Returns:
            Tuple of (is_valid, errors, warnings)
        """
        errors = self.validate_structure(data)
        warnings = self.validate_business_rules(data)
        
        # Consider file invalid only if there are structural errors
        is_valid = len(errors) == 0
        
        return is_valid, errors, warnings

In [None]:
class DataPipeline:
    """Main pipeline for fetching, validating, and processing data."""
    
    def __init__(self, failed_dir: str = "../data/failed/"):
        """
        Initialize the pipeline.
        
        Args:
            failed_dir: Directory to save failed validation files
        """
        self.failed_dir = failed_dir
        self.validator = DataValidator()
        
        # Create failed directory if it doesn't exist
        os.makedirs(self.failed_dir, exist_ok=True)
    
    def download_file_content(self, file_info: Dict) -> Optional[dict]:
        """
        Download and parse JSON file content.
        
        Args:
            file_info: File information with download_url
            
        Returns:
            Parsed JSON data or None if failed
        """
        try:
            response = requests.get(file_info['download_url'], timeout=30)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            print(f"Download error for {file_info['filename']}: {e}")
            return None
        except json.JSONDecodeError as e:
            print(f"JSON parse error for {file_info['filename']}: {e}")
            return None
    
    def save_failed_file(self, file_info: Dict, data: dict, errors: List[str], warnings: List[Dict]):
        """
        Save failed validation file with error report.
        
        Args:
            file_info: File information
            data: JSON data that failed validation
            errors: List of validation errors
            warnings: List of validation warnings
        """
        # Save the data file
        data_path = os.path.join(self.failed_dir, file_info['filename'])
        with open(data_path, 'w') as f:
            json.dump(data, f, indent=2)
        
        # Save error report
        report_path = os.path.join(self.failed_dir, f"{file_info['date_str']}_errors.txt")
        with open(report_path, 'w') as f:
            f.write("VALIDATION FAILURE REPORT\n")
            f.write("=" * 80 + "\n")
            f.write(f"Filename: {file_info['filename']}\n")
            f.write(f"Date: {file_info['date'].strftime('%Y-%m-%d')}\n")
            f.write(f"Size: {file_info['size']} bytes\n")
            f.write(f"Download URL: {file_info['download_url']}\n")
            f.write(f"Validation Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
            f.write("\n")
            
            f.write(f"ERRORS ({len(errors)}):\n")
            f.write("-" * 80 + "\n")
            for idx, error in enumerate(errors, 1):
                f.write(f"{idx}. {error}\n")
            
            if warnings:
                f.write(f"\nWARNINGS ({len(warnings)}):\n")
                f.write("-" * 80 + "\n")
                for idx, warning in enumerate(warnings, 1):
                    f.write(f"{idx}. [{warning['type']}] {warning['message']}\n")
        
        print(f"  → Saved to failed directory: {file_info['filename']}")
        print(f"  → Error report: {file_info['date_str']}_errors.txt")
    
    def process_files(self, recent_files: List[Dict]) -> Tuple[List[dict], int, int]:
        """
        Process recent files: download, validate, and categorize.
        
        Args:
            recent_files: List of file information from GitHubDataFetcher
            
        Returns:
            Tuple of (raw_data list, passed_count, failed_count)
        """
        print("\n" + "=" * 80)
        print("DATA VALIDATION AND PROCESSING PIPELINE")
        print("=" * 80)
        
        raw_data = []
        passed_count = 0
        failed_count = 0
        
        print(f"\nProcessing {len(recent_files)} files...")
        print("-" * 80)
        
        for idx, file_info in enumerate(recent_files, 1):
            filename = file_info['filename']
            print(f"\n[{idx}/{len(recent_files)}] Processing: {filename}")
            
            # Step 1: Download file content
            print("  → Downloading...")
            data = self.download_file_content(file_info)
            
            if data is None:
                failed_count += 1
                print("  FAILED: Could not download or parse file")
                continue
            
            # Step 2: Validate file
            print("  → Validating...")
            is_valid, errors, warnings = self.validator.validate_file(data, filename)
            
            # Step 3: Categorize based on validation
            if is_valid:
                raw_data.append(data)
                passed_count += 1
                status = "PASSED"
                if warnings:
                    status += f" (with {len(warnings)} warnings)"
                print(f"  {status}")
            else:
                self.save_failed_file(file_info, data, errors, warnings)
                failed_count += 1
                print(f"  FAILED: {len(errors)} errors found")
        
        return raw_data, passed_count, failed_count
    
    def display_summary(self, raw_data: List[dict], passed_count: int, failed_count: int):
        """Display processing summary."""
        print("\n" + "=" * 80)
        print("PIPELINE SUMMARY")
        print("=" * 80)
        
        total = passed_count + failed_count
        print(f"\nTotal files processed: {total}")
        print(f"  Passed validation: {passed_count} ({passed_count/total*100:.1f}%)")
        print(f"  Failed validation: {failed_count} ({failed_count/total*100:.1f}%)")
        
        if failed_count > 0:
            print(f"\nFailed files saved to: {self.failed_dir}/")
        
        if raw_data:
            print(f"\nraw_data variable contains {len(raw_data)} validated datasets")
            print(f"  Type: list of {len(raw_data)} dictionaries")
            print("  Each contains the complete JSON data structure")
            
            # Quick stats from first dataset
            if len(raw_data) > 0:
                sample = raw_data[0]
                print("\nSample data structure (first file):")
                print(f"  - areas: {len(sample.get('areas', []))} polygons")
                print(f"  - frontline: {len(sample.get('frontline', []))} segments")
                print(f"  - geos.ru: {len(sample.get('geos', {}).get('ru', []))} incidents")
                print(f"  - geos.ua: {len(sample.get('geos', {}).get('ua', []))} incidents")
                print(f"  - units.ru: {len(sample.get('units', {}).get('ru', []))} units")
                print(f"  - units.ua: {len(sample.get('units', {}).get('ua', []))} units")
        
        print("\n" + "=" * 80)
        if failed_count == 0:
            print("ALL FILES VALIDATED SUCCESSFULLY")
        else:
            print("VALIDATION COMPLETE WITH FAILURES")
        print("=" * 80)

In [None]:
def main():
    """Main execution function."""
    print("=" * 80)
    print("COMPLETE DATA PIPELINE: FETCH → VALIDATE → PROCESS")
    print("=" * 80)
    
    # Step 1: Fetch recent files
    print("\n" + "=" * 80)
    print("STEP 1: FETCHING RECENT FILES FROM GITHUB")
    print("=" * 80)
    
    fetcher = GitHubDataFetcher()
    recent_files = fetcher.fetch_and_filter(num_recent=5)
    
    if not recent_files:
        print("\nFailed to fetch files from GitHub")
        return None, None
    
    # Step 2: Validate and process files
    print("\n" + "=" * 80)
    print("STEP 2: VALIDATING AND PROCESSING FILES")
    print("=" * 80)
    
    pipeline = DataPipeline(failed_dir="../data/failed/")
    raw_data, passed_count, failed_count = pipeline.process_files(recent_files)
    
    # Step 3: Display summary
    pipeline.display_summary(raw_data, passed_count, failed_count)
    
    # Return the validated data
    return raw_data, recent_files

In [None]:
if __name__ == "__main__":
    raw_data, recent_files = main()
    
    if raw_data:
        print(f"\n{'='*80}")
        print("READY FOR FURTHER PROCESSING")
        print('='*80)
        print(f"\nThe 'raw_data' variable contains {len(raw_data)} validated datasets.")
        print("You can now process this data further:")
        print("  - Analyze trends across dates")
        print("  - Extract specific information")
        print("  - Generate visualizations")
        print("  - Export to other formats")