In [None]:
import os

In [None]:
%pwd

In [None]:
# Change to the main directory
# So, it's executed from main directory
os.chdir("../")

In [None]:
with open('.env') as f:
    os.environ.update(
        line.strip().split('=') for line in f
)

In [None]:
%pwd

### Data Ingestion Config

This code will be apply in `src/MarketplaceReviews/entity/config_entity.py`

In [None]:
from dataclasses import dataclass
from pathlib import Path

@dataclass(frozen=True)
class DataIngestionSQLConfig:
    root_dir: Path
    source_URI: str
    reviews_table: str
    reviews_path: Path


### SQL Data Ingestion Config Manager

This code will be apply in `src/MarketplaceReviews/config/configurations.py`.

In [None]:
from MarketplaceReviews.constants import CONFIG_FILE_PATH, PARAMS_FILE_PATH
from MarketplaceReviews.utils.common import read_yaml, create_directories

In [None]:
class ConfigurationManager:
    def __init__(self, 
                 config_filepath = CONFIG_FILE_PATH,
                 params_filepath = PARAMS_FILE_PATH):

        self.config = read_yaml(config_filepath)
        self.params = read_yaml(params_filepath)

        create_directories([self.config.artifacts_root])
    
    def get_data_ingestion_sql_config(self) -> DataIngestionSQLConfig:
        """read data ingestion config file and store as config entity
        then apply the dataclasses
        
        Returns:
            config: DataIngestionConfig type
        """
        data_ingest_config = self.config.ingest_from_sql

        create_directories([data_ingest_config.root_dir])

        config = DataIngestionSQLConfig(
            root_dir=data_ingest_config.root_dir,
            source_URI=os.environ["POSTGRES_URI"],
            reviews_table=data_ingest_config.reviews_table,
            reviews_path=Path(data_ingest_config.reviews_path),
        )

        return config

### Perform data ingestion

This code in `src/MarketplaceReviews/components/data_ingestion.py`

In [None]:
import pandas as pd

from sqlalchemy import create_engine 
from tqdm import tqdm

from MarketplaceReviews import logger

class DataIngestionSQL:
    def __init__(self, config: DataIngestionSQLConfig):
        self.config = config

    def sql_to_csv(self) -> None:
        """get data from the SQL database
        """
        try:
            db = create_engine(self.config.source_URI)  
            conn = db.connect()

            logger.info(f"Querying reviews data from SQL Database.")
            df_reviews = pd.read_sql_table("reviews", conn)
            
            logger.info(f"Dump data from SQL Database to CSV.")
            df_reviews.to_csv(self.config.reviews_path, index=False)
              
            logger.info(f"Data dumped from SQL query into {self.config.root_dir} directory")
            conn.close()
        except Exception as e:
            conn.close()
            logger.error(e)
            raise e

### Run Ingest from SQL Database

This code in `src/MarketplaceReviews/pipeline/step_01_data_ingestion.py`

In [None]:
try:
    config = ConfigurationManager()
    data_ingestion_config = config.get_data_ingestion_sql_config()
    
    data_ingestion = DataIngestionSQL(config=data_ingestion_config)
    data_ingestion.sql_to_csv()
except Exception as e:
    logger.error(e)
    raise e