# Data Ingestion Pipeline Development
- Author: Marcellinus Aditya Witarsah
- Date: 05 June 2024

In [1]:
# Imports
%load_ext autoreload
%autoreload 2
%matplotlib inline
import pandas as pd
import polars as pl
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import gc
import logging
import time
import pickle
import os
from pathlib import Path
from abc import ABC
from abc import abstractmethod
from scipy import stats
from typing import Tuple
from typing import Union
from dataclasses import dataclass
from src.utils.common import logger
from src.utils.common import read_yaml, create_directories
from src.constants import CONFIG_FILE_PATH, SCHEMA_FILE_PATH, PARAMS_FILE_PATH



In [2]:
# # run once only
# os.chdir("..")

# Configuration

In [3]:
# src/entities/config_entity.py
@dataclass(frozen=True)
class DataIngestionConfig:
    """
    Data class for storing data ingestion configuration.

    Attributes:
        root_dir (Path): Root directory for data ingestion.
        source_path (Path): Source path of the data.
        target_path (Path): Target path for the processed data.
    """
    root_dir: Path
    source_path: Path
    target_path: Path

# src/config/configuration_manager.py
class ConfigurationManager:
    """
    Prepare ConfigurationManager class.
    
    This class is responsible for reading configuration files and preparing
    configuration settings for the pipeline.

    Attributes:
        config (dict): Parsed configuration file content.
        params (dict): Parsed parameters file content.
        schema (dict): Parsed schema file content.
    """
    def __init__(
        self,
        config_filepath: str = CONFIG_FILE_PATH, 
        params_filepath: str = PARAMS_FILE_PATH, 
        schema_filepath: str = SCHEMA_FILE_PATH
    ):
        """
        Initialize the ConfigurationManager with file paths.

        Args:
            config_filepath (str): File path to the configuration YAML file.
            params_filepath (str): File path to the parameters YAML file.
            schema_filepath (str): File path to the schema YAML file.
        """
        self.config = read_yaml(Path(config_filepath))
        self.params = read_yaml(Path(params_filepath))
        self.schema = read_yaml(Path(schema_filepath))
        create_directories([self.config.artifacts_root])

    def get_data_ingestion_config(self) -> DataIngestionConfig:
        """
        Get configuration for data ingestion.
        
        Returns:
            DataIngestionConfig: Configuration for data ingestion.
        """
        config = self.config.get('data_ingestion', {})
        
        create_directories([config.root_dir])

        data_ingestion_config = DataIngestionConfig(
            root_dir=Path(config.root_dir),
            source_path=Path(config.source_path),
            target_path=Path(config.target_path),
        )
        return data_ingestion_config

# Data Load

In [4]:
# src/data/data_ingestion.py
class DataIngestionStrategy(ABC):
    """
    Abstract base class for data ingestion strategies.
    """
    @abstractmethod
    def ingest_data(self, paths: list, target_path: Path) -> None:
        """
        Ingests data from the specified paths to the target path.

        Args:
            paths (list): List of file paths to ingest data from.
            target_path (Path): Path to save the ingested data.
        """
        pass

# src/data/data_ingestion.py
class PandasDataIngestionStrategy(DataIngestionStrategy):
    """
    Data ingestion strategy using Pandas.
    """
    def ingest_data(self, paths: list, target_path: Path) -> None:
        """
        Ingests data using Pandas.

        Args:
            paths (list): List of file paths to ingest data from.
            target_path (Path): Path to save the ingested data.

        Returns:
            None
        """
        df = pd.DataFrame({})

        # Read and concatenate dataframes (if more than one):
        for path in paths:
            path = Path(path)
            start = time.perf_counter()
            if path.suffix == ".csv":
                temp_df = pd.read_csv(path)
            elif path.suffix == ".parquet":
                temp_df = pd.read_parquet(path)
            df = pd.concat([df, temp_df], axis=0)

        # Save data to target_path:
        df.to_csv(target_path, index=False)
        logger.info(f"Data saved to {target_path}")

# src/data/data_ingestion.py
class PolarsDataIngestionStrategy(DataIngestionStrategy):
    """
    Data ingestion strategy using Polars.
    """
    def ingest_data(self, paths: list, target_path: Path) -> None:
        """
        Ingests data using Polars.

        Args:
            paths (list): List of file paths to ingest data from.
            target_path (Path): Path to save the ingested data.

        Returns:
            None
        """
        df = None

        # Read and concatenate lazyframes (if more than one):
        for path in paths:
            path = Path(path)
            start = time.perf_counter()
            if path.suffix == ".csv":
                temp_df = pl.scan_csv(path)
            elif path.suffix == ".parquet":
                temp_df = pl.scan_parquet(path)
            if df is None:
                df = temp_df
            else:
                df = pl.concat([df, temp_df], how="vertical")

        # Save data to target_path:
        df.sink_csv(target_path, index=False)
        logger.info(f"Data saved to {target_path}")

# src/data/data_ingestion.py
class DataIngestion:
    """
    Class to manage data ingestion process.
    """
    def __init__(self, config: DataIngestionConfig):
        """
        Instantiate `DataIngestion` class.

        Args:
            config (DataIngestionConfig): Configuration for data ingestion.
        """
        self.config = config

    def ingest_data(self, strategy: DataIngestionStrategy) -> None:
        """
        Ingests data using the specified strategy.

        Args:
            strategy (DataIngestionStrategy): Strategy to use for data ingestion.

        Returns:
            None
        """
        try:
            # Check if the path is a string or list of paths:
            paths = self.config.source_path if isinstance(self.config.source_path, list) else [self.config.source_path]
            logger.info("Ingest data")
            strategy.ingest_data(paths, self.config.target_path)
            logger.info(f"Successfully ingest data using {strategy.__class__.__name__}")
        except Exception as e:
            logger.error(f"Error during data ingestion: {e}")

In [5]:
from src.config.configuration_manager import ConfigurationManager
from src.data.data_ingestion import DataIngestion
from src.data.data_ingestion import PandasDataIngestionStrategy

try:
    configuration_manager = ConfigurationManager()
    data_ingestion = DataIngestion( 
        config=configuration_manager.get_data_ingestion_config()
    )
    data_ingestion.ingest_data(strategy=PandasDataIngestionStrategy())
except Exception as e:
    logger.error(e)

2024-06-05 14:09:47,864 - credit-scorecard-logger - INFO - yaml file: config.yaml loaded successfully
2024-06-05 14:09:47,868 - credit-scorecard-logger - INFO - yaml file: params.yaml loaded successfully
2024-06-05 14:09:47,872 - credit-scorecard-logger - INFO - yaml file: schema.yaml loaded successfully
2024-06-05 14:09:47,874 - credit-scorecard-logger - INFO - Created directory at: artifacts
2024-06-05 14:09:47,875 - credit-scorecard-logger - INFO - Created directory at: artifacts/data_ingestion
2024-06-05 14:09:48,067 - credit-scorecard-logger - INFO - Data saved to artifacts\data_ingestion\credit_risk_dataset.csv
2024-06-05 14:09:48,068 - credit-scorecard-logger - INFO - Successfully ingested data using PandasDataIngestionStrategy


# Testing
Restart and run again

In [1]:
import os
os.chdir("..")

In [2]:
%load_ext autoreload
%autoreload 2
%matplotlib inline
from src.utils.common import logger
from src.config.configuration_manager import ConfigurationManager
from src.data.data_ingestion import DataIngestion
from src.data.data_ingestion import PandasDataIngestionStrategy

# src/pipelines/data_ingestion.py
class DataIngestionPipeline:
    """
    Class to manage the data ingestion training pipeline.
    """

    def __init__(self):
        """
        Instantiate `DataIngestionPipeline` class.
        """
        self.configuration_manager = ConfigurationManager()

    def run(self):
        """
        Ingest data using the Pandas data ingestion strategy.
        """
        configuration_manager = ConfigurationManager()
        data_ingestion = DataIngestion(
            config=configuration_manager.get_data_ingestion_config()
        )
        data_ingestion.ingest_data(strategy=PandasDataIngestionStrategy())


STAGE_NAME = "Data Ingestion Stage"
try:
    logger.info(f">>>>>> {STAGE_NAME} Started <<<<<<")
    data_ingestion_training_pipeline = DataIngestionPipeline()
    data_ingestion_training_pipeline.run()
    logger.info(f">>>>>> {STAGE_NAME} Completed <<<<<<")
except Exception as e:
    logger.error(e)

2024-06-05 14:10:57,823 - credit-scorecard-logger - INFO - >>>>>> Data Ingestion Stage Started <<<<<<
2024-06-05 14:10:57,829 - credit-scorecard-logger - INFO - yaml file: config.yaml loaded successfully
2024-06-05 14:10:57,832 - credit-scorecard-logger - INFO - yaml file: params.yaml loaded successfully
2024-06-05 14:10:57,837 - credit-scorecard-logger - INFO - yaml file: schema.yaml loaded successfully
2024-06-05 14:10:57,839 - credit-scorecard-logger - INFO - Created directory at: artifacts
2024-06-05 14:10:57,845 - credit-scorecard-logger - INFO - yaml file: config.yaml loaded successfully
2024-06-05 14:10:57,850 - credit-scorecard-logger - INFO - yaml file: params.yaml loaded successfully
2024-06-05 14:10:57,856 - credit-scorecard-logger - INFO - yaml file: schema.yaml loaded successfully
2024-06-05 14:10:57,857 - credit-scorecard-logger - INFO - Created directory at: artifacts
2024-06-05 14:10:57,859 - credit-scorecard-logger - INFO - Created directory at: artifacts/data_ingestio