In [34]:
import os

In [35]:
%pwd

'C:\\Users\\RICH-FILES\\Desktop\\ml\\AI-powered-Bank-Product-Recommender-Chatbot'

In [36]:
os.chdir("../.")

In [37]:
%pwd

'C:\\Users\\RICH-FILES\\Desktop\\ml'

In [38]:
project_dir = "C:/Users/RICH-FILES/Desktop/ml/AI-powered-Bank-Product-Recommender-Chatbot"
os.chdir(project_dir)

In [39]:
from dataclasses import dataclass
from pathlib import Path


    
@dataclass(frozen=True)
class DataIngestionConfig:
    root_dir: Path
    local_data_file: Path
    export_csv_path: Path
    output_path: Path
    table_name: str
    
    

    

In [40]:
from BankProducts.constants import *
from BankProducts.utils.common import read_yaml, create_directories

In [41]:
class ConfigurationManager:
    def __init__(
        self,
        config_filepath = CONFIG_FILE_PATH,    
        params_filepath = PARAMS_FILE_PATH,
        schema_filepath: str = SCHEMA_FILE_PATH,
        ):
       
        self.config = read_yaml(config_filepath)
        self.params = read_yaml(params_filepath)
        self.schema = read_yaml(schema_filepath)        
    
        create_directories([self.config.artifacts_root])
    
    def get_data_ingestion_config(self) -> DataIngestionConfig:
        config = self.config.data_ingestion
    
    
        create_directories([self.config.artifacts_root])
        
        data_ingestion_config = DataIngestionConfig(
            root_dir=Path(config.root_dir),
            local_data_file=Path(config.local_data_file),
            export_csv_path=Path(config.export_csv_path),
            output_path=Path(config.output_path),
            table_name=config.table_name
            
            
        )
        
        return data_ingestion_config

In [42]:
# Import necessary libraries
import pandas as pd
from pathlib import Path
from sqlalchemy import create_engine
from BankProducts import logger
from dotenv import load_dotenv
import os
from BankProducts.utils.common import get_size
from sqlalchemy import text 

In [43]:
class DataIngestion:
    def __init__(self, config: DataIngestionConfig):
        self.config = config
        
        self.engine = self._create_engine_from_env()

    def _create_engine_from_env(self):
        """Load DB credentials and return SQLAlchemy engine."""
        load_dotenv()
        db_user = os.getenv("DB_USER")
        db_password = os.getenv("DB_PASSWORD")
        db_host = os.getenv("DB_HOST")
        db_port = os.getenv("DB_PORT")
        db_name = os.getenv("DB_NAME")

        return create_engine(f"postgresql+psycopg2://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}")

    def initiate_data_ingestion(self):
        """Load local CSV into PostgreSQL database."""
        logger.info("Starting data ingestion process...")

        # Check if file exists
        csv_path = self.config.local_data_file
        if not os.path.exists(csv_path):
            logger.error("CSV file not found. Please check the file path.")
            return

        try:
            df = pd.read_csv(csv_path)
            df.to_sql("bank_transactions", self.engine, if_exists="replace", index=False)
            logger.info("Dataset successfully loaded into PostgreSQL.")
        except Exception as e:
            logger.error(f"Failed to load data into PostgreSQL: {e}")

    def extract_and_save_data(self):
        """Fetch data from DB and save to CSV."""
        logger.info("Extracting and saving data...")

        query = "SELECT * FROM bank_transactions;"

        try:
            with self.engine.connect() as connection:
                result = connection.execute(text("SELECT to_regclass('bank_transactions')"))
                table_exists = result.scalar() is not None

            if not table_exists:
                logger.error("Table 'bank_transactions' does not exist in the database.")
                return

            logger.info("Fetching data from the 'bank_transactions' table...")
            df = pd.read_sql(query, self.engine)
            output_path = self.config.output_path
            if not output_path.parent.exists():
                output_path.parent.mkdir(parents=True, exist_ok=True)   
            df.to_csv(self.config.output_path, index=False)
            logger.info(f"Data successfully exported to '{output_path}'.")

        except Exception as e:
            logger.error(f"Error while extracting data from database: {e}")


In [44]:
try:
    config = ConfigurationManager()
    data_ingestion_config = config.get_data_ingestion_config()
    data_ingestion = DataIngestion(config=data_ingestion_config)
    #data_ingestion.initiate_data_ingestion()
    data_ingestion.extract_and_save_data()
    
except Exception as e:
    logger.exception(e)
    raise e


[2025-06-07 23:25:14,090: INFO: common: yaml file: config\config.yaml loaded successfully]
[2025-06-07 23:25:14,095: INFO: common: yaml file: params.yaml loaded successfully]
[2025-06-07 23:25:14,101: INFO: common: yaml file: schema.yaml loaded successfully]
[2025-06-07 23:25:14,103: INFO: common: created directory at: artifacts]
[2025-06-07 23:25:14,105: INFO: common: created directory at: artifacts]
[2025-06-07 23:25:14,110: INFO: 4038426609: Extracting and saving data...]
[2025-06-07 23:25:14,176: INFO: 4038426609: Fetching data from the 'bank_transactions' table...]
[2025-06-07 23:25:14,877: INFO: 4038426609: Data successfully exported to 'artifacts\data_ingestion\customer_data.csv'.]
