In [0]:
import xml.etree.ElementTree as ET
import pandas as pd
import requests
from typing import Dict, Optional, Any
import time
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse

class ODataAtomParser:
    def __init__(self, metadata_xml: Optional[str] = None):
        # Define namespaces used in the XML
        self.namespaces = {
            'a': 'http://www.w3.org/2005/Atom',
            'm': 'http://docs.oasis-open.org/odata/ns/metadata',
            'd': 'http://docs.oasis-open.org/odata/ns/data'
        }
        
        # Metadata namespaces
        self.metadata_namespaces = {
            'edmx': 'http://docs.oasis-open.org/odata/ns/edmx',
            'edm': 'http://docs.oasis-open.org/odata/ns/edm'
        }
        
        # Parse metadata if provided
        self.schema_info = {}
        if metadata_xml:
            self.parse_metadata(metadata_xml)
    
    def parse_metadata(self, metadata_xml: str):
        """Parse EDM metadata to understand data types and schema"""
        try:
            root = ET.fromstring(metadata_xml)
            
            # Find all EntityType definitions
            entity_types = root.findall('.//edm:EntityType', self.metadata_namespaces)
            
            for entity_type in entity_types:
                entity_name = entity_type.get('Name')
                properties = {}
                
                # Parse properties
                for prop in entity_type.findall('edm:Property', self.metadata_namespaces):
                    prop_name = prop.get('Name')
                    prop_type = prop.get('Type', 'Edm.String')
                    nullable = prop.get('Nullable', 'true').lower() == 'true'
                    
                    properties[prop_name] = {
                        'type': prop_type,
                        'nullable': nullable
                    }
                
                self.schema_info[entity_name] = {'properties': properties}
                
            print(f"Parsed metadata for {len(self.schema_info)} entity types")
            
        except Exception as e:
            print(f"Warning: Could not parse metadata: {e}")
            self.schema_info = {}
    
    def _add_url_parameter(self, url: str, param: str, value: str) -> str:
        """Add or update a parameter in the URL"""
        parsed = urlparse(url)
        query_params = parse_qs(parsed.query)
        query_params[param] = [value]
        
        new_query = urlencode(query_params, doseq=True)
        return urlunparse(parsed._replace(query=new_query))
    
    def fetch_and_parse_url(self, url: str,
                           page_size: int = 1000, 
                           delay_between_requests: float = 0.1):
        """
        Fetch XML from URL and parse it with automatic pagination
        
        Args:
            url: The base URL to fetch data from
            entity_name: Optional entity name for schema application
            page_size: Number of records per page (default 1000, max 50000 for SODA 2.1)
            delay_between_requests: Delay in seconds between API calls to be respectful
        """
        entity_name = url.rstrip('.json').split('/')[-1]
        
        all_records = []
        offset = 0
        page_num = 1
        
        print(f"Starting to fetch data from: {url}")
        print(f"Using page size: {page_size}")
        
        while True:
            # Add pagination parameters to URL
            paginated_url = self._add_url_parameter(url, '$limit', str(page_size))
            paginated_url = self._add_url_parameter(paginated_url, '$offset', str(offset))
            
            print(f"Fetching page {page_num} (offset {offset})...")
            
            try:
                response = requests.get(paginated_url)
                response.raise_for_status()
                
                page_data = response.json()
                
                # If no data returned, we've reached the end
                if not page_data or len(page_data) == 0:
                    print(f"No more data found. Finished at page {page_num - 1}")
                    break
                
                all_records.extend(page_data)
                print(f"Page {page_num}: Retrieved {len(page_data)} records")
                
                # If we got fewer records than requested, we've reached the end
                if len(page_data) < page_size:
                    print(f"Retrieved {len(page_data)} records (less than page size). Finished.")
                    break
                
                # Prepare for next iteration
                offset += page_size
                page_num += 1
                
                # Be respectful to the API
                if delay_between_requests > 0:
                    time.sleep(delay_between_requests)
                
            except requests.exceptions.RequestException as e:
                print(f"Error fetching page {page_num}: {e}")
                if page_num == 1:
                    # If first page fails, re-raise the error
                    raise
                else:
                    # If a later page fails, stop and return what we have
                    print(f"Stopping pagination due to error. Returning {len(all_records)} records.")
                    break
        
        print(f"Total records retrieved: {len(all_records)}")
        
        return self.extract_data(all_records, entity_name)
    
    def extract_data(self, records, entity_name: Optional[str] = None):
        
        # Convert to DataFrame
        df = pd.DataFrame(records)
        return self.apply_schema_types(df, entity_name)
    
    def _convert_edm_type(self, value: str, edm_type: str):
        """Convert string value to appropriate Python type based on EDM type"""
        if value is None or value == '':
            return None
            
        try:
            if edm_type == 'Edm.String':
                return str(value)
            elif edm_type == 'Edm.Int32':
                return int(value)
            elif edm_type == 'Edm.Int64':
                return int(value)
            elif edm_type == 'Edm.Decimal':
                return float(value)
            elif edm_type == 'Edm.Double':
                return float(value)
            elif edm_type == 'Edm.Boolean':
                return value.lower() in ('true', '1', 'yes')
            elif edm_type in ('Edm.DateTime', 'Edm.DateTimeOffset'):
                return pd.to_datetime(value, errors='coerce')
            elif edm_type == 'Edm.Date':
                return pd.to_datetime(value, errors='coerce').date()
            else:
                return None
        except (ValueError, AttributeError):
            # If conversion fails, return original value
            return None
    
    def apply_schema_types(self, df, entity_name: Optional[str] = None):
        """Apply schema-based type conversions to DataFrame"""
        if df.empty:
            return df
        
        schema = self.schema_info.get(entity_name) if entity_name else None

        if schema:
            # Apply type conversions based on schema
            for col in df.columns:
                if col in schema['properties']:
                    prop_info = schema['properties'][col]
                    edm_type = prop_info['type']

                    
                    # Convert entire column
                    df[col] = df[col].apply(lambda x: self._convert_edm_type(x, edm_type))
            
            print(f"Applied schema-based type conversions for entity '{entity_name}'")
        
        return df

In [0]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, when, trim
from pyspark.sql.types import StringType, IntegerType, LongType, FloatType, DoubleType, BooleanType, DateType, TimestampType
from typing import Optional

def _convert_edm_type_spark(column_name: str, edm_type: str):
    """Convert column to appropriate Spark type based on EDM type"""
    column = col(column_name)
    
    if edm_type == 'Edm.String':
        return column.cast(StringType())
    elif edm_type == 'Edm.Int32':
        return when(trim(column) == "", None).otherwise(column.cast(IntegerType()))
    elif edm_type == 'Edm.Int64':
        return when(trim(column) == "", None).otherwise(column.cast(LongType()))
    elif edm_type == 'Edm.Decimal':
        return when(trim(column) == "", None).otherwise(column.cast(DoubleType()))
    elif edm_type == 'Edm.Double':
        return when(trim(column) == "", None).otherwise(column.cast(DoubleType()))
    elif edm_type == 'Edm.Boolean':
        return when(trim(column) == "", None).otherwise(
            when(column.rlike("(?i)^(true|1|yes)$"), True).otherwise(False)
        ).cast(BooleanType())
    elif edm_type in ('Edm.DateTime', 'Edm.DateTimeOffset'):
        return when(trim(column) == "", None).otherwise(column.cast(TimestampType()))
    elif edm_type == 'Edm.Date':
        return when(trim(column) == "", None).otherwise(column.cast(DateType()))
    else:
        return column  # Return original column if type not recognized

def apply_schema_types_spark(df: DataFrame, schema_info: dict, entity_name: Optional[str] = None):
    """Apply schema-based type conversions to Spark DataFrame"""
    if df.count() == 0:  # Check if DataFrame is empty
        return df
    
    schema = schema_info.get(entity_name) if entity_name else None
    
    if schema:
        # Apply type conversions based on schema
        for col_name in df.columns:
            if col_name in schema['properties']:
                prop_info = schema['properties'][col_name]
                edm_type = prop_info['type']
                
                # Apply conversion to the column
                converted_col = _convert_edm_type_spark(col_name, edm_type)
                df = df.withColumn(col_name, converted_col)
        
        print(f"Applied schema-based type conversions for entity '{entity_name}'")
    
    return df