In [32]:
import os

In [33]:
%pwd

'/Users/rajusubba/Documents/End-to-End MLOps/customer-churn-project'

In [34]:
os.chdir("/Users/rajusubba/Documents/End-to-End MLOps/customer-churn-project")
%pwd

'/Users/rajusubba/Documents/End-to-End MLOps/customer-churn-project'

In [35]:
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
@dataclass
class DataIngestionConfig:
    """Configuration for data ingestion"""
    # Where ingestion artifacts will be stored
    root_dir: Path
    
    #Source type: "postgres" 
    source_type: str
    
    # Postgres connection details
    host: str
    port: int
    database: str
    user: str
    
    #Environment variable name for password
    password_env: Optional[str] = None
    
    #Data source inside Postgres
    schema: str = "public"
    table: Optional[str] = None
    query: Optional[str] = None
    
    #Export of raw snapshot
    export_files: Optional[Path] = None

In [41]:
from pathlib import Path

p = Path("config/config.yaml").resolve()
print("CONFIG PATH:", p)
print("EXISTS:", p.exists())
print("SIZE:", p.stat().st_size if p.exists() else None)
print("CONTENT PREVIEW:\n", repr(p.read_text()[:300]) if p.exists() else "missing")

CONFIG PATH: /Users/rajusubba/Documents/End-to-End MLOps/customer-churn-project/config/config.yaml
EXISTS: True
SIZE: 0
CONTENT PREVIEW:
 ''


In [37]:
from src.customerchurn.constants import *
from src.customerchurn.utils.common import read_yaml, create_directories

In [65]:
class ConfigurationManager:
    def __init__(self, 
                 config_file_path = CONFIG_FILE_PATH,
                 params_file_path = PARAMS_FILE_PATH,
                 schema_file_path = SCHEMA_FILE_PATH,
                 ):
        self.config = read_yaml(config_file_path)
        self.params = read_yaml(params_file_path)
        self.schema = read_yaml(schema_file_path)
        
        create_directories([self.config.artifacts_root])
        
    def get_data_ingestion_config(self) -> DataIngestionConfig:
        
        config = self.config.data_ingestion
        print("config.export:", config.get("export"))
        create_directories([config.root_dir])
        
        pg = config.postgres  # <- IMPORTANT: nested config

        export_files = None
        if config.get("export") and config.export.get("enabled", False):
            export_files = Path(config.export.output_file)

        data_ingestion_config = DataIngestionConfig(
            root_dir=Path(config.root_dir),
            source_type=config.source_type,

            host=pg.host,
            port=int(pg.port),
            database=pg.database,
            user=pg.user,
            password_env=pg.get("password_env"),

            schema=pg.get("schema", "public"),
            table=pg.get("table"),
            query=pg.get("query"),

            export_files=export_files,
        )

        return data_ingestion_config
            
            
    

In [66]:
from pathlib import Path

p = Path("config/config.yaml").resolve()
print("CONFIG PATH:", p)
print("EXISTS:", p.exists())
print("SIZE:", p.stat().st_size if p.exists() else None)
print(p.read_text()[:500] if p.exists() else "missing")

CONFIG PATH: /Users/rajusubba/Documents/End-to-End MLOps/customer-churn-project/config/config.yaml
EXISTS: True
SIZE: 0



In [67]:
import os
from src.customerchurn.logging.logger import logging

In [68]:
import os
import logging
import pandas as pd
import psycopg2

class DataIngestion:
    def __init__(self, config: DataIngestionConfig):
        self.config = config

    def ingest_data(self) -> pd.DataFrame:
        if self.config.source_type != "postgres":
            raise ValueError(f"Unsupported source type: {self.config.source_type}")

        password = os.getenv(self.config.password_env) if self.config.password_env else None

        query = self.config.query
        if not query:
            if not self.config.table:
                raise ValueError("Either 'query' or 'table' must be provided for postgres source.")
            query = f"SELECT * FROM {self.config.schema}.{self.config.table};"

        with psycopg2.connect(
            host=self.config.host,
            port=self.config.port,
            database=self.config.database,
            user=self.config.user,
            password=password,
        ) as conn:
            df = pd.read_sql_query(query, conn)

        logging.info(f"Data ingested successfully with shape: {df.shape}")

        if self.config.export_files:
            self.config.export_files.parent.mkdir(parents=True, exist_ok=True)
            df.to_csv(self.config.export_files, index=False)
            logging.info(f"Raw snapshot exported to {self.config.export_files}")

        return df

In [69]:
try:
    config_manager = ConfigurationManager()
    data_ingestion_config = config_manager.get_data_ingestion_config()
    print("export_files =", data_ingestion_config.export_files)
    print("root_dir =", data_ingestion_config.root_dir)
    
    data_ingestion = DataIngestion(config=data_ingestion_config)
    df = data_ingestion.ingest_data()
    print("Loaded shape:", df.shape)
    print(df.head())
    
    logging.info("Data ingestion completed successfully.")
except Exception as e:
    logging.exception("Data ingestion failed.")
    raise e

[2026-02-03 13:36:05,671: INFO: common: YAML file src/customerchurn/config/config.yaml loaded successfully.]
[2026-02-03 13:36:05,672: INFO: common: YAML file params.yaml loaded successfully.]
[2026-02-03 13:36:05,674: INFO: common: YAML file schema.yaml loaded successfully.]
[2026-02-03 13:36:05,674: INFO: common: Directory created at: artifacts]
config.export: {'enabled': True, 'output_file': 'artifacts/data_ingestion/raw_snapshot.csv'}
[2026-02-03 13:36:05,675: INFO: common: Directory created at: artifacts/data_ingestion]
export_files = artifacts/data_ingestion/raw_snapshot.csv
root_dir = artifacts/data_ingestion
[2026-02-03 13:36:05,731: INFO: 4280944749: Data ingested successfully with shape: (10000, 13)]
[2026-02-03 13:36:05,760: INFO: 4280944749: Raw snapshot exported to artifacts/data_ingestion/raw_snapshot.csv]
Loaded shape: (10000, 13)
   customerid   surname  creditscore geography  gender  age  tenure  \
0    15634602  Hargrave          619    France  Female   42       2   


  df = pd.read_sql_query(query, conn)
