In [1]:
import os

In [2]:
%pwd

'/home/tousside/Documents/recrutement/cowrywise-customer-plan-abandonment/research'

In [3]:
os.chdir("../")

In [4]:
%pwd

'/home/tousside/Documents/recrutement/cowrywise-customer-plan-abandonment'

In [5]:
from dataclasses import dataclass
from pathlib import Path


@dataclass(frozen=True)
class DataIngestionConfig:
    root_dir: Path
    host: str
    port: str
    user: str
    password: str
    database_name: str
    sql_query: str
    local_data_file: Path

In [6]:
from mlProject.constants import *
from mlProject.utils.common import read_yaml, create_directories, read_sql_file

In [7]:
class ConfigurationManager:
    def __init__(self,
                 config_filepath: str = CONFIG_FILE_PATH,
                 params_filepath: str = PARAMS_FILE_PATH,
                 schema_filepath: str = SCHEMA_FILE_PATH,
                 sql_filepath: str = DATA_FECTH_SQL_PATH,
                 ):
                self.config = read_yaml(config_filepath)
                self.params = read_yaml(params_filepath)
                self.schema = read_yaml(schema_filepath)
                self.sql_query = read_sql_file(sql_filepath)
                create_directories([self.config.artifacts_root])
    def get_data_ingestion_config(self) -> DataIngestionConfig:
        config = self.config.data_ingestion
        create_directories([config.root_dir])
        data_ingestion_config = DataIngestionConfig(
            root_dir=Path(config.root_dir),
            host=config.host,
            port=config.port,
            user=config.user,
            password=config.password,
            database_name=config.database_name,
            sql_query=self.sql_query,
            local_data_file=Path(config.local_data_file)
        )
        return data_ingestion_config
                

In [8]:
import mysql.connector
import pandas as pd
from mlProject.utils.common import get_size
from mlProject import logger
import warnings
warnings.filterwarnings("ignore")


In [9]:
class DataIngestion:
    def __init__(self, config: DataIngestionConfig):
        self.config = config

    def connect_to_database(self,
        host: str = "localhost",
        port: str = "3307",
        user: str = "root",
        password: str = "mysecret",
        database: str = "adashi_staging"
    ):
        """
        Establishes and returns a MySQL database connection if successful.

        Args:
            host (str): Database host address.
            port (str): Port number.
            user (str): Database username.
            password (str): Database password.
            database (str): Database name.

        Returns:
            MySQLConnection: A valid MySQL connection object.

        Raises:
            Error: If the connection fails.
        """
        try:
            conn = mysql.connector.connect(
                host=host,
                port=port,
                user=user,
                password=password,
                database=database
            )
            if conn.is_connected():
                logger.info("Database connection established successfully.")
                return conn
            else:
                logger.error("Failed to connect to the database.")
        except Exception as e:
            logger.error(f"Database connection failed: {e}")

    def retrieve_data(self):
        connection = self.connect_to_database(
            host=self.config.host,
            port=self.config.port,
            user=self.config.user,
            password=self.config.password,
            database=self.config.database_name
        )
        if not os.path.exists(self.config.local_data_file):
            data = pd.read_sql(self.config.sql_query, connection)
            data.to_csv(self.config.local_data_file, index=False)
            logger.info(f"data fetched with following info: shape:{data.shape}")
        else:
            logger.info(f"File already exists with size: {get_size(Path(self.config.local_data_file))} bytes")
            
        connection.close()
        

In [10]:
try:
    config = ConfigurationManager()
    data_ingestion_config = config.get_data_ingestion_config()
    data_ingestion = DataIngestion(config=data_ingestion_config)
    data_ingestion.retrieve_data()
except Exception as e:
    raise e

[2025-05-27 16:35:00,196: INFO: common: yaml file: config/config.yaml loaded successfully]
[2025-05-27 16:35:00,198: INFO: common: yaml file: params.yaml loaded successfully]
[2025-05-27 16:35:00,198: INFO: common: yaml file: schema.yaml loaded successfully]
[2025-05-27 16:35:00,199: INFO: common: SQL file 'sql_queries/data_fetch.sql' loaded successfully.]
[2025-05-27 16:35:00,200: INFO: common: created directory at: artifacts]
[2025-05-27 16:35:00,200: INFO: common: created directory at: artifacts/data_ingestion]
[2025-05-27 16:35:00,251: INFO: 1131048647: Database connection established successfully.]
[2025-05-27 16:35:00,252: INFO: 1131048647: File already exists with size: ~ 321 KB bytes]
