In [57]:
"""
DataReader module for reading data from various sources into pandas DataFrames.

Dependencies:
- pandas: Required for DataFrame operations.
- requests: Required for API reading.
- sqlalchemy: Required for database reading.
- pyarrow: Optional for .parquet file support.
- openpyxl: Optional for .xlsx file support.

Usage:
    reader = DataReader()
    source = DataSource(url="data.csv")
    df = reader.read_data(source)

DataReader         ->   it is used to initialise registry
DataSource         ->   validate the url and store the parsed result
Reader             ->   Read the data and return supported formats
ReaderRegistry     ->   Registers the file type we will be using
Configure Logging  ->   Configure the DataReader logger with console and optional file handlers.
"""
__all__ = ['DataReader', 'DataSource', 'Reader', 'ReaderRegistry', 'configure_logging']

import logging
import os
import json
from abc import ABC, abstractmethod
from urllib.parse import urlparse, ParseResult
import pandas as pd
import requests
from sqlalchemy import create_engine
from typing import Callable, Optional,Set
from dataclasses import dataclass
from io import StringIO, BytesIO


In [58]:

def configure_logging(
    level: int = logging.WARNING,
    log_file: Optional[str] = 'datareader.log',
    logging_enabled: bool = True,
    debug_enabled: bool = False
) -> logging.Logger:
    """
    Configure the DataReader logger with console and optional file handlers.

    Args:
        level: Logging level (e.g., logging.WARNING, logging.INFO). Defaults to WARNING.
        log_file: File path for file logging. Defaults to 'datareader.log'. If None, logs to console only.
        logging_enabled: If False, disables all logging by setting level to CRITICAL+1. Defaults to True.
        debug_enabled: If True, enables DEBUG logging for detailed output. Defaults to False.
                       Must be explicitly set to True to enable DEBUG logs.

    Returns:
        Configured logger instance.
    """
    logger = logging.getLogger('DataReader')
    logger.handlers.clear()

    if not logging_enabled:
        logger.setLevel(logging.CRITICAL + 1)
        return logger

    effective_level = logging.DEBUG if debug_enabled else level
    logger.setLevel(effective_level)

    console_handler = logging.StreamHandler()
    console_handler.setLevel(effective_level)
    console_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
    logger.addHandler(console_handler)

    if log_file:
        file_handler = logging.FileHandler(log_file)
        file_handler.setLevel(effective_level)
        file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
        logger.addHandler(file_handler)

    return logger

logger = configure_logging()

In [59]:
class Reader(ABC):
    @abstractmethod
    def read(self, url: str, **kwargs) -> pd.DataFrame:
        """Read data from a source into a DataFrame."""
        pass

    @abstractmethod
    def get_supported_formats(self) -> set[str]:
        """Return supported formats or schemes."""
        pass


In [60]:

class ApiReader(Reader):
    def read(self, url: str, **kwargs) -> pd.DataFrame:
        """
        Read data from an HTTP/HTTPS API endpoint into a DataFrame.

        Args:
            url: The API endpoint.
            **kwargs: Optional parameters such as headers, params, or response_format.

        Returns:
            pandas DataFrame containing the API response.
        """
        logger.debug("Requesting API URL: %s with kwargs: %s", url, kwargs)
        response = requests.get(url, **kwargs)
        response.raise_for_status()

        content_type = response.headers.get("Content-Type", "")
        if "application/json" in content_type:
            data = response.json()
            return pd.DataFrame(data)
        elif "text/csv" in content_type:
            from io import StringIO
            return pd.read_csv(StringIO(response.text))
        else:
            raise ValueError(f"Unsupported Content-Type: {content_type}")

    def validate(self, url: str) -> bool:
        """
        Validate the URL format to ensure it's HTTP or HTTPS.
        """
        return url.startswith(("http://", "https://"))

    def get_supported_formats(self) -> Set[str]:
        """
        Return supported URL schemes.
        """
        return {"http", "https"}


In [61]:
#used to SQL databases 
class DatabaseReader(Reader):
    def read(self, url: str, **kwargs) -> pd.DataFrame:
        """
        Read data from a database into a DataFrame.

        Args:
            url: Database connection string (e.g., 'sqlite:///db.sqlite').
            **kwargs: Optional parameters (e.g., query as a non-empty string, default: 'SELECT * FROM data_table').

        Returns:
            pandas DataFrame with query results.

        Raises:
            ValueError: If the URL or query is invalid.
            sqlalchemy.exc.SQLAlchemyError: If the query fails.
        """
        query = kwargs.get("query", "SELECT * FROM data_table")
        logger.debug("Reading database: %s with query: %s", url, query)
        if "query" in kwargs and (not isinstance(query, str) or not query.strip()):
            logger.error("Invalid query: %s", query)
            raise ValueError(f"Invalid query: {query}")
        if not self.validate(url):
            logger.error("Invalid database URL: %s", url)
            raise ValueError(f"Invalid database URL: {url}")
        engine = create_engine(url)
        df = pd.read_sql(query, engine)
        logger.info("Successfully read database data from %s", url)
        return df

    def validate(self, url: str) -> bool:
        """Check if the URL is a valid database connection string."""
        return url.startswith(("sqlite://", "postgresql://", "mysql://"))

    def get_supported_formats(self) -> set[str]:
        """Return supported schemes."""
        return {"sqlite", "postgresql", "mysql"}

In [62]:
#it is used to read from files (csv,xlsx,json,parquet)
class FileReader(Reader):
    def read(self, path: str, **kwargs) -> pd.DataFrame:
        """
        Read data from a file into a DataFrame.

        Args:
            path: File path (e.g., 'data.csv').
            **kwargs: Optional parameters for file reading (e.g., encoding as a string).

        Returns:
            pandas DataFrame with file data.

        Raises:
            FileNotFoundError: If the file does not exist.
            ValueError: If the file format or encoding is invalid.
        """
        logger.debug("Reading file: %s with kwargs: %s", path, kwargs)
        if "encoding" in kwargs and not isinstance(kwargs["encoding"], str):
            logger.error("Invalid encoding: %s", kwargs["encoding"])
            raise ValueError(f"Invalid encoding: {kwargs['encoding']}")
        if not os.path.exists(path):
            logger.error("File not found: %s", path)
            raise FileNotFoundError(f"File not found: {path}")
        file_readers = {
            ".csv": pd.read_csv,
            ".json": pd.read_json,
            ".parquet": pd.read_parquet,
            ".xlsx": pd.read_excel
        }
        for ext, reader in file_readers.items():
            if path.endswith(ext):
                logger.info("Reading %s with %s", path, reader.__name__)
                return reader(path, **kwargs)
        logger.error("Unsupported file format: %s. Supported formats: %s", path, list(file_readers.keys()))
        raise ValueError(f"Unsupported file format: {path}. Supported formats: {list(file_readers.keys())}")
    def get_supported_formats(self) -> set[str]:
        return {".csv", ".json", ".parquet", ".xlsx"}

In [None]:
class CompressedFileReader(Reader):
    """
    CompressedFileReader is used to read tabular data from compressed files into a pandas DataFrame.

    Supported compression formats:
        - .gz  → gzip
        - .zip → zip
        - .bz2 → bz2
        - .xz  → xz

    Methods:
        - read(path: str, **kwargs): Reads and returns the contents of the compressed file as a DataFrame.
        - validate(path: str): Validates if the file extension is supported.
        - get_supported_formats(): Returns a set of supported file extensions.

    Notes:
        - Compression format is inferred **only** from the file extension.
          Files with incorrect or missing extensions may result in errors or incorrect behavior.
        
        - File contents must be compatible with `pandas.read_csv()`.
          If the file contains binary or non-tabular data, an exception will be raised.
        
        - Additional parameters (e.g., `sep`, `encoding`, `dtype`, etc.) can be passed via **kwargs to customize reading.
    """

    def read(self, path: str, **kwargs) -> pd.DataFrame:
        """
        Reads a compressed CSV file and returns it as a pandas DataFrame.

        Parameters:
            path (str): Path to the compressed file.
            **kwargs: Additional keyword arguments for pandas.read_csv.

        Returns:
            pd.DataFrame: Loaded data.

        Raises:
            ValueError: If the compression format is unsupported.
            Exception: If reading the file fails.
        """
        logger.debug("Reading compressed file: %s", path)

        compression_type = self._infer_compression(path)
        if compression_type is None:
            logger.error("Unsupported compression format for file: %s", path)
            raise ValueError(f"Unsupported compression format: {path}")

        try:
            return pd.read_csv(path, compression=compression_type, **kwargs)
        except Exception as e:
            logger.error("Error reading compressed file %s: %s", path, str(e))
            raise

    def validate(self, path: str) -> bool:
        """
        Validates if the given file has a supported compression format.

        Parameters:
            path (str): Path to the file.

        Returns:
            bool: True if supported, False otherwise.
        """
        return self._infer_compression(path) is not None

    def get_supported_formats(self) -> set[str]:
        """
        Returns a set of supported compressed file extensions.

        Returns:
            set[str]: Supported extensions.
        """
        return {".gz", ".zip", ".bz2", ".xz"}

    def _infer_compression(self, path: str) -> str | None:
        """
        Infers the compression format based on file extension.

        Parameters:
            path (str): File path.

        Returns:
            str | None: Compression format string or None if unsupported.
        """
        ext = os.path.splitext(path)[1].lower()
        mapping = {
            ".gz": "gzip",
            ".zip": "zip",
            ".bz2": "bz2",
            ".xz": "xz"
        }
        return mapping.get(ext)



In [64]:
class URLReader(Reader):
    def read(self, url: str, **kwargs) -> pd.DataFrame:
        """
        Read data from a URL into a pandas DataFrame.

        Args:
            url: The URL pointing to the data file.
            **kwargs: Optional parameters like 'file_type' (csv, json, excel).

        Returns:
            pandas DataFrame with data loaded from the URL.

        Raises:
            ValueError: If the URL or file type is invalid.
            requests.exceptions.RequestException: If the download fails.
            pd.errors.ParserError: If pandas fails to parse the data.
        """
        file_type = kwargs.pop("file_type", None)
        logger.debug("Downloading data from URL: %s", url)
        
        response = requests.get(url)
        response.raise_for_status()

        if not file_type:
            if url.endswith(".csv"):
                file_type = "csv"
            elif url.endswith(".json"):
                file_type = "json"
            elif url.endswith(".xlsx") or url.endswith(".xls"):
                file_type = "excel"
            else:
                logger.error("Could not infer file type from URL. Please specify 'file_type'")
                raise ValueError("Unsupported or unknown file type. Please specify 'file_type' (csv, json, excel).")

        logger.debug("Reading data as %s", file_type)

        if file_type == "csv":
            data = pd.read_csv(StringIO(response.text), **kwargs)
        elif file_type == "json":
            data = pd.read_json(StringIO(response.text), **kwargs)
        elif file_type == "excel":
            data = pd.read_excel(BytesIO(response.content), **kwargs)
        else:
            logger.error("Unsupported file type requested: %s", file_type)
            raise ValueError(f"Unsupported file type: {file_type}")

        logger.info("Successfully read data from URL: %s", url)
        return data

    def validate(self, url: str) -> bool:
        """Check if the URL is a valid HTTP/HTTPS URL."""
        return url.startswith(("http://", "https://"))

    def get_supported_formats(self) -> set[str]:
        """Return supported file types for URLReader."""
        return {"csv", "json", "excel"}


In [65]:
#it is used to check if the url or file path exists and if its valid  
@dataclass #decorator which is in built in python with various functions 
class DataSource:
    url: str
    _parsed: Optional[ParseResult] = None

    def __post_init__(self):
        """Validate the URL and store parsed result."""
        logger.debug("Validating DataSource URL: %s", self.url)
        if not self.url or not isinstance(self.url, str):
            logger.error("Invalid URL: must be a non-empty string")
            raise ValueError("URL must be a non-empty string")
        self._parsed = urlparse(self.url)
        if not (self._parsed.scheme or self._parsed.path) and not os.path.exists(self.url):
            logger.error("Invalid URL: %s", self.url)
            raise ValueError(f"Invalid URL: {self.url}")

In [68]:
"""this class helps in registering new mappings,lookup source by extension,
lookup for reader by source type,checks if all the sources have readers """
class ReaderRegistry:
    """Manages mappings of schemes/extensions to source types and readers."""
    _default_mappings = None
    _default_readers = None

    def __init__(self):
        logger.info("Creating new ReaderRegistry")
        if ReaderRegistry._default_mappings is None:
            ReaderRegistry._default_mappings = {}
            ReaderRegistry._default_readers = {}
            self._mappings = ReaderRegistry._default_mappings
            self._readers = ReaderRegistry._default_readers
            self._register_defaults()
        else:
            self._mappings = ReaderRegistry._default_mappings.copy()
            self._readers = ReaderRegistry._default_readers.copy()
            self._register_defaults()

    def _register_defaults(self):
        """Register default mappings and readers."""
        defaults = [
            (("http", "https"), "api", ApiReader()),
            (("sqlite", "postgresql", "mysql"), "database", DatabaseReader()),
            (("csv", "json", "parquet", "xlsx"), "file", FileReader()),
            (("gz",), "compressed_file", CompressedFileReader())
        ]
        for keys, source_type, reader in defaults:
            self.register(keys, source_type, reader)

    def register(self, keys: tuple[str, ...], source_type: str, reader: Reader):
        """
        Register schemes/extensions, source type, and reader.

        Args:
            keys: Tuple of schemes/extensions (e.g., ('zip',)).
            source_type: Source type identifier (e.g., 'zip_file').
            reader: Reader instance for the source type.

        Raises:
            ValueError: If inputs are invalid or mappings conflict.
        """
        logger.debug("Registering keys %s to source type %s with reader %s", keys, source_type, reader.__class__.__name__)
        if not isinstance(reader, Reader):
            logger.error("Invalid reader for %s: not a Reader instance", source_type)
            raise ValueError(f"Reader for {source_type} must be a Reader instance")
        if not source_type or not isinstance(source_type, str):
            logger.error("Invalid source type: %s", source_type)
            raise ValueError(f"Invalid source type: {source_type}")
        for key in keys:
            if not key or not isinstance(key, str):
                logger.error("Invalid key: %s", key)
                raise ValueError(f"Invalid key: {key}")
            if key in self._mappings:
                logger.error("Key %s already registered", key)
                raise ValueError(f"Key {key} already registered")
            self._mappings[key] = source_type
        if source_type in self._readers:
            logger.error("Source type %s already registered", source_type)
            raise ValueError(f"Source type {source_type} already registered")
        self._readers[source_type] = reader

    def get_source_type(self, key: str) -> Optional[str]:
        """Get source type for a scheme/extension."""
        return self._mappings.get(key)

    def get_reader(self, source_type: str) -> Optional[Reader]:
        """Get reader for a source type."""
        return self._readers.get(source_type)

    def validate(self):
        """Validate that all source types have readers."""
        logger.debug("Validating ReaderRegistry mappings")
        source_types = set(self._mappings.values())
        for source_type in source_types:
            if source_type not in self._readers:
                logger.error("No reader provided for source type: %s", source_type)
                raise ValueError(f"No reader provided for source type: {source_type}")


In [69]:
"""it is used to initalise the registry, validate the registry,
validate reader,executes and read the data,
concantenate source and target data frame ,
checks type of data source,handles error during data reading"""
class DataReader:
    _default_registry = ReaderRegistry()

    def __init__(self, registry: ReaderRegistry = None):
        """
        Initialize DataReader with a reader registry.

        Args:
            registry: Optional ReaderRegistry instance. If None, uses the shared default registry.
        """
        logger.info("Initializing DataReader with %s registry", "shared default" if registry is None else "custom")
        self._registry = registry or DataReader._default_registry
        self._post_init()

    def _post_init(self):
        """Validate registry after initialization."""
        self._registry.validate()

    def register_reader(self, keys: tuple[str, ...], source_type: str, reader: Reader):
        """
        Register a new reader for the DataReader instance.

        Args:
            keys: Tuple of schemes/extensions (e.g., ('zip',)).
            source_type: Source type identifier (e.g., 'zip_file').
            reader: Reader instance for the source type.

        Raises:
            ValueError: If inputs are invalid or mappings conflict.
        """
        logger.debug("Registering reader for DataReader: %s -> %s", keys, source_type)
        self._registry.register(keys, source_type, reader)

    def _select_reader(
        self,
        source: DataSource,
        custom_reader: Optional[Callable[[str], pd.DataFrame] | Reader]
    ) -> Callable[[str], pd.DataFrame] | Reader:
        """
        Select and validate the reader for the given source.

        Args:
            source: DataSource with the URL.
            custom_reader: Optional Reader or callable to override default reading.

        Returns:
            Reader or callable to read the data.

        Raises:
            ValueError: If no reader is found for the URL.
        """
        logger.debug("Selecting reader for source: %s", source.url)
        reader = custom_reader or self._registry.get_reader(self._infer_source_type(source))
        if not reader:
            logger.error("No reader found for URL: %s", source.url)
            raise ValueError(f"Cannot infer reader for URL: {source.url}")
        logger.info("Using reader: %s", reader.__class__.__name__ if isinstance(reader, Reader) else type(reader).__name__)
        return reader

    def _execute_reader(
        self,
        reader: Callable[[str], pd.DataFrame] | Reader,
        url: str,
        **kwargs
    ) -> pd.DataFrame:
        """
        Execute the reader to read data from the source.

        Args:
            reader: Reader or callable to read the data.
            url: Source URL.
            **kwargs: Additional parameters for the reader.

        Returns:
            pandas DataFrame with the read data.
        """
        logger.debug("Executing reader for %s with kwargs: %s", url, kwargs)
        return reader.read(url, **kwargs) if isinstance(reader, Reader) else reader(url, **kwargs)

    def _concatenate_df(
        self,
        df: pd.DataFrame,
        target_df: pd.DataFrame,
        concatenate: bool
    ) -> pd.DataFrame:
        """
        Concatenate the read DataFrame with the target DataFrame if required.

        Args:
            df: DataFrame read from the source.
            target_df: DataFrame to concatenate with.
            concatenate: Whether concatenation is required.

        Returns:
            Concatenated pandas DataFrame.
        """
        return pd.concat([df, target_df], ignore_index=True) if concatenate else df

    def _handle_read_error(self, e: Exception, url: str, concatenate: bool, locals_dict: dict) -> None:
        """
        Handle errors during data reading, including concatenation failures.

        Args:
            e: Exception raised during reading.
            url: Source URL.
            concatenate: Whether concatenation was attempted.
            locals_dict: Local variables for checking DataFrame existence.

        Raises:
            ValueError: With appropriate error message for reading or concatenation failure.
        """
        logger.error("Failed to read from %s: %s", url, str(e))
        if concatenate and isinstance(e, ValueError) and 'df' in locals_dict:
            raise ValueError(f"Failed to concatenate with target_df: {e}")
        raise ValueError(f"Failed to read from {url}: {e}")

    def read_data(
        self,
        source: DataSource,
        target_df: Optional[pd.DataFrame] = None,
        custom_reader: Optional[Callable[[str], pd.DataFrame] | Reader] = None,
        **kwargs
    ) -> pd.DataFrame:
        """
        Read data from a source into a DataFrame, optionally concatenating with an existing DataFrame.

        Args:
            source: DataSource with the URL.
            target_df: Optional DataFrame to concatenate with.
            custom_reader: Optional Reader or callable to override default reading.
            **kwargs: Additional parameters for the reader.

        Returns:
            pandas DataFrame with the read data.

        Raises:
            ValueError: If the reader is invalid or reading/concatenation fails.
            requests.HTTPError: If an API request fails.
            sqlalchemy.exc.SQLAlchemyError: If a database query fails.
            pandas.errors.ParserError: If file parsing fails.
        """
        logger.info("Reading data from source: %s", source.url)
        concatenate = target_df is not None
        target_df = pd.DataFrame() if target_df is None else target_df
        try:
            reader = self._select_reader(source, custom_reader)
            df = self._execute_reader(reader, source.url, **kwargs)
            df = self._concatenate_df(df, target_df, concatenate)
        except (ValueError, pd.errors.ParserError, requests.HTTPError, sqlalchemy.exc.SQLAlchemyError) as e:
            self._handle_read_error(e, source.url, concatenate, locals())
        logger.info("Successfully read data from %s", source.url)
        return df

    def _infer_source_type(self, source: DataSource) -> str:
        """
        Infer the source type from a DataSource using its parsed URL.

        Args:
            source: DataSource with the URL and parsed result.

        Returns:
            Source type ('api', 'database', 'file', 'compressed_file').

        Raises:
            ValueError: If the source type cannot be inferred.
        """
        logger.debug("Inferring source type for URL: %s (scheme=%s, path=%s)", source.url, source._parsed.scheme, source._parsed.path)
        parsed = source._parsed
        path = parsed.path if parsed.path else source.url
        extension = path.split(".")[-1].lower() if "." in path else ""
        key = parsed.scheme if extension == "" else extension
        source_type = self._registry.get_source_type(key)
        if source_type is None and extension == "" and os.path.exists(source.url):
            logger.warning("No extension for URL %s, defaulting to file source type", source.url)
            source_type = "file"
        if source_type is None:
            logger.error("Cannot infer source type for URL: %s", source.url)
            raise ValueError(f"Cannot infer source type from URL: {source.url}")
        logger.debug("Inferred source type: %s", source_type)
        return source_type

ModuleNotFoundError: No module named 'data_reader'