In [1]:
import logging
from scripts.extract import SalarySurveyExtractor
from scripts.transform import SalarySurveyTransformer
from scripts.load_to_redshift import SalarySurveyLoader


class SalarySurveyETL:
    """
    ETL pipeline for processing salary survey data from CSV to Amazon Redshift.
    """

    def __init__(self):
        """Initialize ETL pipeline with configuration parameters."""
        # File configuration
        self.input_file_path = r"C:\Users\hp\Desktop\capstone_project\dataset\data.csv"
        self.logger = logging.getLogger("salary-survey-etl")

        # S3 Configuration
        self.s3_bucket = 'salarysurvey-transformed-data'
        self.iam_role_arn = 'arn:aws:iam::536697233575:role/RedShiftLoadRole'
        self.file_name = 'transformed_salary_data.csv'
        
        # Redshift configuration
        self.redshift_config = {
            "host": "salary-survey-workgroup.536697233575.eu-north-1.redshift-serverless.amazonaws.com",
            "dbname": "salarysurveydb",
            "user": "ss_admin",
            "password": "ss-Admin-rs0292",
            "port": 5439
        }
        self.redshift_table = 'salary_survey_data.salary_survey'

    def run(self):
        """
        Main method to execute the complete ETL pipeline.
        """
        try:
            # Configure logging
            logging.basicConfig(
                level=logging.INFO,
                format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
            )
            
            self.logger.info("Starting the SalarySurveyETL pipeline...")
            
            # Extract
            extractor = SalarySurveyExtractor(self.input_file_path)
            df = extractor.extract_csv_data()
            
            if df.empty:
                self.logger.error("No data extracted. Exiting pipeline.")
                return
            
            # Transform
            transformer = SalarySurveyTransformer()
            df_transformed = transformer.transform(df)
            
            if df_transformed.empty:
                self.logger.error("Transformation failed. Exiting pipeline.")
                return
            
            # Load
            loader = SalarySurveyLoader(
                s3_bucket=self.s3_bucket,
                iam_role_arn=self.iam_role_arn,
                redshift_config=self.redshift_config,
                redshift_table=self.redshift_table
            )
            loader.execute_load(df_transformed, self.file_name)
            
            self.logger.info("✅ ETL pipeline completed successfully!")
            
        except Exception as e:
            self.logger.critical(f"❌ Exception occurred in ETL pipeline: {e}")


if __name__ == '__main__':
    etl = SalarySurveyETL()
    etl.run()

2025-10-16 15:24:11,184 - salary-survey-etl - INFO - Starting the SalarySurveyETL pipeline...
2025-10-16 15:24:11,192 - salary-survey-extract - INFO - Extracting data from CSV...
2025-10-16 15:24:11,422 - salary-survey-extract - INFO - File Encoding : ascii
2025-10-16 15:24:12,273 - salary-survey-extract - INFO - Successfully extracted 28154 rows from CSV
2025-10-16 15:24:12,273 - salary-survey-transform - INFO - Starting transformation process...
2025-10-16 15:24:12,552 - salary-survey-transform - INFO - No completely null rows found in dataframe
2025-10-16 15:24:12,552 - salary-survey-transform - INFO - Starting _rename_columns()
2025-10-16 15:24:12,568 - salary-survey-transform - INFO - Starting _normalize_job_titles()
2025-10-16 15:24:13,084 - salary-survey-transform - INFO - Starting _standardize_annual_salaries()
2025-10-16 15:24:13,225 - salary-survey-transform - INFO - Starting _standardize_additional_comp()
2025-10-16 15:24:13,432 - salary-survey-transform - INFO - Starting _s