In [33]:
import os
import pandas as pd
import matplotlib.pyplot as plt
import logging
from sqlalchemy import (
    create_engine, MetaData, Table, Column, String, Integer, Float, Boolean,
    select, insert, update, func, case, cast, desc
)

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

class CensusETLPipeline:
    def __init__(self):
        """Initialize the ETL pipeline with file paths and database connections"""
        __file__ = "Notebook"  # Workaround for notebooks
        self.base_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
        self.census_db_path = os.path.join(self.base_dir, 'data', 'census.sqlite')
        self.census_csv_path = os.path.join(self.base_dir, 'data', 'census.csv')

        self.engine = None
        self.connection = None
        self.metadata = MetaData()
        self._connect_to_database()

        self.census = None
        self.state_fact = None
        self.data = None

    def _connect_to_database(self):
        """Establish database connection with error handling"""
        try:
            self.engine = create_engine(f'sqlite:///{self.census_db_path}')
            self.connection = self.engine.connect()
            logger.info("Database connection established successfully")
        except Exception as e:
            logger.error(f"Failed to establish database connection: {e}")
            raise

    def extract(self):
        """Extract phase: Get data from various sources"""
        logger.info("Starting data extraction")

        if not os.path.exists(self.census_db_path):
            error_msg = f"Database file not found at {self.census_db_path}"
            logger.error(error_msg)
            raise FileNotFoundError(error_msg)

        try:
            self.census = Table('census', self.metadata, autoload_with=self.engine)
            self.state_fact = Table('state_fact', self.metadata, autoload_with=self.engine)
            logger.info("Successfully reflected census and state_fact tables")
        except Exception as e:
            logger.error(f"Error reflecting tables: {e}")
            raise

        try:
            census_df = pd.read_csv(self.census_csv_path, header=None)
            census_df.columns = ['state', 'sex', 'age', 'pop2000', 'pop2008']
            logger.info(f"Successfully extracted {len(census_df)} records from CSV")
            return census_df
        except Exception as e:
            logger.error(f"Failed to read CSV file: {e}")
            raise

    def transform(self, census_df):
        """Transform phase: Process and analyze the extracted data"""
        logger.info("Starting data transformation")
        transformed_data = {}

        try:
            avg_age_stmt = select(
                self.census.c.sex,
                (func.sum(self.census.c.pop2000 * self.census.c.age) /
                 func.sum(self.census.c.pop2000)).label('average_age')
            ).group_by(self.census.c.sex)
            avg_age_results = self.connection.execute(avg_age_stmt).fetchall()

            percent_female_stmt = select(
                self.census.c.state,
                (func.sum(case(
                    (self.census.c.sex == 'F', self.census.c.pop2000),
                    else_=0)) /
                 cast(func.sum(self.census.c.pop2000), Float) * 100).label('percent_female')
            ).group_by(self.census.c.state)
            percent_female_results = self.connection.execute(percent_female_stmt).fetchall()

            pop_change_stmt = select(
                self.census.c.state,
                (func.sum(self.census.c.pop2008) - func.sum(self.census.c.pop2000)).label('pop_change')
            ).group_by(self.census.c.state).order_by(desc('pop_change')).limit(10)
            pop_change_results = self.connection.execute(pop_change_stmt).fetchall()

            transformed_data['avg_age'] = avg_age_results
            transformed_data['percent_female'] = percent_female_results
            transformed_data['pop_change'] = pop_change_results

            values_list = []
            for row in census_df.to_dict('records'):
                try:
                    values_list.append({
                        'state': str(row['state']),
                        'sex': str(row['sex']),
                        'age': int(row['age']),
                        'pop2000': int(row['pop2000']),
                        'pop2008': int(row['pop2008'])
                    })
                except (ValueError, TypeError) as e:
                    logger.warning(f"Invalid data in row: {row}, error: {e}")
                    continue

            logger.info("Data transformation completed successfully")
            return transformed_data, values_list

        except Exception as e:
            logger.error(f"Transformation failed: {e}")
            raise

    def load(self, transformed_data, values_list):
        """Load phase: Store transformed data and generate report"""
        logger.info("Starting data loading")

        try:
            self.data = Table('data', self.metadata,
                              Column('name', String(255), unique=True),
                              Column('count', Integer(), default=1),
                              Column('amount', Float()),
                              Column('valid', Boolean(), default=False))
            self.metadata.create_all(self.engine, checkfirst=True)

            if values_list:
                insert_stmt = insert(self.census)
                result = self.connection.execute(insert_stmt, values_list)
                logger.info(f"Inserted {result.rowcount} records into census table")

            update_stmt = update(self.state_fact).values(notes='The Wild West') \
                .where(self.state_fact.c.census_region_name == 'West')
            update_result = self.connection.execute(update_stmt)
            logger.info(f"Updated {update_result.rowcount} records in state_fact table")

            report_content = "# Census Analysis Report\n\n"
            report_content += "## Average Age by Gender\n"
            for sex, avg_age in transformed_data['avg_age']:
                report_content += f"- {sex}: {avg_age:.2f}\n"

            report_content += "\n## Percentage Female by State\n"
            for state, percent in transformed_data['percent_female']:
                report_content += f"- {state}: {percent:.2f}%\n"

            report_content += "\n## Top 10 States by Population Change\n"
            for state, change in transformed_data['pop_change']:
                report_content += f"- {state}: {change:,}\n"

            plt.figure(figsize=(10, 6))
            states = [row[0] for row in transformed_data['pop_change']]
            changes = [row[1] for row in transformed_data['pop_change']]
            plt.bar(states, changes)
            plt.xticks(rotation=45)
            plt.title('Top 10 States by Population Change')
            plt.xlabel('State')
            plt.ylabel('Population Change')
            plt.tight_layout()
            plt.savefig('pop_change_plot.png')
            plt.close()

            logger.info("Data loading and report generation completed successfully")
            return report_content

        except Exception as e:
            logger.error(f"Loading failed: {e}")
            raise

        finally:
            if self.connection:
                self.connection.close()
                logger.info("Database connection closed")

    def run_pipeline(self):
        """Execute the complete ETL pipeline"""
        logger.info("Starting ETL Pipeline")

        try:
            census_df = self.extract()
            transformed_data, values_list = self.transform(census_df)
            report_content = self.load(transformed_data, values_list)

            logger.info("ETL Pipeline completed successfully")
            return report_content

        except Exception as e:
            logger.error(f"ETL Pipeline failed: {e}")
            raise

# Execution Block
if __name__ == "__main__":
    try:
        pipeline = CensusETLPipeline()
        report_content = pipeline.run_pipeline()

        artifact = f"""<xaiArtifact 
            artifact_id="314505a6-48c2-4c4e-a34a-32802d9bbf90" 
            artifact_version_id="971a7801-80dc-4db3-ad47-9fc2cbd10aa8" 
            title="Census Analysis Report" 
            contentType="text/markdown">
{report_content}
</xaiArtifact>"""

        print("\nGenerated Report:")
        print(report_content)
        print("\nArtifact:")
        print(artifact)

        with open('census_report.md', 'w') as f:
            f.write(artifact)

    except Exception as e:
        logger.error(f"Pipeline execution failed: {e}")
        raise


2025-04-10 18:02:51,389 - INFO - Database connection established successfully
2025-04-10 18:02:51,389 - INFO - Starting ETL Pipeline
2025-04-10 18:02:51,396 - INFO - Starting data extraction
2025-04-10 18:02:51,396 - INFO - Successfully reflected census and state_fact tables
2025-04-10 18:02:51,409 - INFO - Successfully extracted 8772 records from CSV
2025-04-10 18:02:51,409 - INFO - Starting data transformation
2025-04-10 18:02:51,451 - INFO - Data transformation completed successfully
2025-04-10 18:02:51,451 - INFO - Starting data loading
2025-04-10 18:02:51,484 - INFO - Inserted 8772 records into census table
2025-04-10 18:02:51,484 - INFO - Updated 13 records in state_fact table
2025-04-10 18:02:51,755 - INFO - Data loading and report generation completed successfully
2025-04-10 18:02:51,755 - INFO - Database connection closed
2025-04-10 18:02:51,755 - INFO - ETL Pipeline completed successfully



Generated Report:
# Census Analysis Report

## Average Age by Gender
- F: 37.02
- M: 34.51

## Percentage Female by State
- Alabama: 51.83%
- Alaska: 49.30%
- Arizona: 50.22%
- Arkansas: 51.27%
- California: 50.35%
- Colorado: 49.85%
- Connecticut: 51.67%
- Delaware: 51.61%
- District of Columbia: 53.13%
- Florida: 51.36%
- Georgia: 51.11%
- Hawaii: 51.12%
- Idaho: 49.99%
- Illinois: 51.11%
- Indiana: 50.95%
- Iowa: 50.95%
- Kansas: 50.82%
- Kentucky: 51.33%
- Louisiana: 51.75%
- Maine: 51.51%
- Maryland: 51.94%
- Massachusetts: 51.84%
- Michigan: 50.97%
- Minnesota: 50.49%
- Mississippi: 51.92%
- Missouri: 51.47%
- Montana: 50.32%
- Nebraska: 50.86%
- Nevada: 49.37%
- New Hampshire: 50.86%
- New Jersey: 51.52%
- New Mexico: 51.05%
- New York: 51.83%
- North Carolina: 51.48%
- North Dakota: 50.50%
- Ohio: 51.47%
- Oklahoma: 51.11%
- Oregon: 50.43%
- Pennsylvania: 51.74%
- Rhode Island: 52.07%
- South Carolina: 51.73%
- South Dakota: 50.53%
- Tennessee: 51.43%
- Texas: 50.52%
- Utah: 4

In [35]:
import os
import pandas as pd
import matplotlib.pyplot as plt
import logging
from sqlalchemy import (
    create_engine, MetaData, Table, Column, String, Integer, Float, Boolean,
    select, insert, update, func, case, cast, desc
)

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

class CensusETLPipeline:
    def __init__(self):
        """Initialize the ETL pipeline with file paths and database connections"""
        __file__ = "Notebook"  # Workaround for notebooks
        self.base_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
        self.census_db_path = os.path.join(self.base_dir, 'data', 'census.sqlite')
        self.census_csv_path = os.path.join(self.base_dir, 'data', 'census.csv')
        self.results_dir = os.path.join(self.base_dir, 'results')

        self.engine = None
        self.connection = None
        self.metadata = MetaData()
        self._connect_to_database()

        self.census = None
        self.state_fact = None
        self.data = None

    def _connect_to_database(self):
        """Establish database connection with error handling"""
        try:
            self.engine = create_engine(f'sqlite:///{self.census_db_path}')
            self.connection = self.engine.connect()
            logger.info("Database connection established successfully")
        except Exception as e:
            logger.error(f"Failed to establish database connection: {e}")
            raise

    def extract(self):
        """Extract phase: Get data from various sources"""
        logger.info("Starting data extraction")

        if not os.path.exists(self.census_db_path):
            error_msg = f"Database file not found at {self.census_db_path}"
            logger.error(error_msg)
            raise FileNotFoundError(error_msg)

        try:
            self.census = Table('census', self.metadata, autoload_with=self.engine)
            self.state_fact = Table('state_fact', self.metadata, autoload_with=self.engine)
            logger.info("Successfully reflected census and state_fact tables")
        except Exception as e:
            logger.error(f"Error reflecting tables: {e}")
            raise

        try:
            census_df = pd.read_csv(self.census_csv_path, header=None)
            census_df.columns = ['state', 'sex', 'age', 'pop2000', 'pop2008']
            logger.info(f"Successfully extracted {len(census_df)} records from CSV")
            return census_df
        except Exception as e:
            logger.error(f"Failed to read CSV file: {e}")
            raise

    def transform(self, census_df):
        """Transform phase: Process and analyze the extracted data"""
        logger.info("Starting data transformation")
        transformed_data = {}

        try:
            avg_age_stmt = select(
                self.census.c.sex,
                (func.sum(self.census.c.pop2000 * self.census.c.age) /
                 func.sum(self.census.c.pop2000)).label('average_age')
            ).group_by(self.census.c.sex)
            avg_age_results = self.connection.execute(avg_age_stmt).fetchall()

            percent_female_stmt = select(
                self.census.c.state,
                (func.sum(case(
                    (self.census.c.sex == 'F', self.census.c.pop2000),
                    else_=0)) /
                 cast(func.sum(self.census.c.pop2000), Float) * 100).label('percent_female')
            ).group_by(self.census.c.state)
            percent_female_results = self.connection.execute(percent_female_stmt).fetchall()

            pop_change_stmt = select(
                self.census.c.state,
                (func.sum(self.census.c.pop2008) - func.sum(self.census.c.pop2000)).label('pop_change')
            ).group_by(self.census.c.state).order_by(desc('pop_change')).limit(10)
            pop_change_results = self.connection.execute(pop_change_stmt).fetchall()

            transformed_data['avg_age'] = avg_age_results
            transformed_data['percent_female'] = percent_female_results
            transformed_data['pop_change'] = pop_change_results

            values_list = []
            for row in census_df.to_dict('records'):
                try:
                    values_list.append({
                        'state': str(row['state']),
                        'sex': str(row['sex']),
                        'age': int(row['age']),
                        'pop2000': int(row['pop2000']),
                        'pop2008': int(row['pop2008'])
                    })
                except (ValueError, TypeError) as e:
                    logger.warning(f"Invalid data in row: {row}, error: {e}")
                    continue

            logger.info("Data transformation completed successfully")
            return transformed_data, values_list

        except Exception as e:
            logger.error(f"Transformation failed: {e}")
            raise

    def load(self, transformed_data, values_list):
        """Load phase: Store transformed data and generate report"""
        logger.info("Starting data loading")

        try:
            self.data = Table('data', self.metadata,
                              Column('name', String(255), unique=True),
                              Column('count', Integer(), default=1),
                              Column('amount', Float()),
                              Column('valid', Boolean(), default=False))
            self.metadata.create_all(self.engine, checkfirst=True)

            if values_list:
                insert_stmt = insert(self.census)
                result = self.connection.execute(insert_stmt, values_list)
                logger.info(f"Inserted {result.rowcount} records into census table")

            update_stmt = update(self.state_fact).values(notes='The Wild West') \
                .where(self.state_fact.c.census_region_name == 'West')
            update_result = self.connection.execute(update_stmt)
            logger.info(f"Updated {update_result.rowcount} records in state_fact table")

            report_content = "# Census Analysis Report\n\n"
            report_content += "## Average Age by Gender\n"
            for sex, avg_age in transformed_data['avg_age']:
                report_content += f"- {sex}: {avg_age:.2f}\n"

            report_content += "\n## Percentage Female by State\n"
            for state, percent in transformed_data['percent_female']:
                report_content += f"- {state}: {percent:.2f}%\n"

            report_content += "\n## Top 10 States by Population Change\n"
            for state, change in transformed_data['pop_change']:
                report_content += f"- {state}: {change:,}\n"

            plt.figure(figsize=(10, 6))
            states = [row[0] for row in transformed_data['pop_change']]
            changes = [row[1] for row in transformed_data['pop_change']]
            plt.bar(states, changes)
            plt.xticks(rotation=45)
            plt.title('Top 10 States by Population Change')
            plt.xlabel('State')
            plt.ylabel('Population Change')
            plt.tight_layout()
            plt.savefig(f'{self.results_dir}/pop_change_plot.png')
            plt.close()

            logger.info("Data loading and report generation completed successfully")
            return report_content

        except Exception as e:
            logger.error(f"Loading failed: {e}")
            raise

        finally:
            if self.connection:
                self.connection.close()
                logger.info("Database connection closed")

    def run_pipeline(self):
        """Execute the complete ETL pipeline"""
        logger.info("Starting ETL Pipeline")

        try:
            census_df = self.extract()
            transformed_data, values_list = self.transform(census_df)
            report_content = self.load(transformed_data, values_list)

            logger.info("ETL Pipeline completed successfully")
            return report_content

        except Exception as e:
            logger.error(f"ETL Pipeline failed: {e}")
            raise

# Execution Block
if __name__ == "__main__":
    try:
        pipeline = CensusETLPipeline()
        report_content = pipeline.run_pipeline()

        artifact = f"""<xaiArtifact 
            artifact_id="314505a6-48c2-4c4e-a34a-32802d9bbf90" 
            artifact_version_id="971a7801-80dc-4db3-ad47-9fc2cbd10aa8" 
            title="Census Analysis Report" 
            contentType="text/markdown">
{report_content}
</xaiArtifact>"""

        print("\nGenerated Report:")
        print(report_content)
        print("\nArtifact:")
        print(artifact)

        with open('C:/Users/kanha/Census Data Explorer/results/census_report.md', 'w') as f:
            f.write(artifact)

    except Exception as e:
        logger.error(f"Pipeline execution failed: {e}")
        raise


2025-04-10 18:16:28,356 - INFO - Database connection established successfully
2025-04-10 18:16:28,356 - INFO - Starting ETL Pipeline
2025-04-10 18:16:28,356 - INFO - Starting data extraction
2025-04-10 18:16:28,371 - INFO - Successfully reflected census and state_fact tables
2025-04-10 18:16:28,375 - INFO - Successfully extracted 8772 records from CSV
2025-04-10 18:16:28,383 - INFO - Starting data transformation
2025-04-10 18:16:28,409 - INFO - Data transformation completed successfully
2025-04-10 18:16:28,409 - INFO - Starting data loading
2025-04-10 18:16:28,449 - INFO - Inserted 8772 records into census table
2025-04-10 18:16:28,449 - INFO - Updated 13 records in state_fact table
2025-04-10 18:16:28,597 - INFO - Data loading and report generation completed successfully
2025-04-10 18:16:28,597 - INFO - Database connection closed
2025-04-10 18:16:28,597 - INFO - ETL Pipeline completed successfully



Generated Report:
# Census Analysis Report

## Average Age by Gender
- F: 37.02
- M: 34.51

## Percentage Female by State
- Alabama: 51.83%
- Alaska: 49.30%
- Arizona: 50.22%
- Arkansas: 51.27%
- California: 50.35%
- Colorado: 49.85%
- Connecticut: 51.67%
- Delaware: 51.61%
- District of Columbia: 53.13%
- Florida: 51.36%
- Georgia: 51.11%
- Hawaii: 51.12%
- Idaho: 49.99%
- Illinois: 51.11%
- Indiana: 50.95%
- Iowa: 50.95%
- Kansas: 50.82%
- Kentucky: 51.33%
- Louisiana: 51.75%
- Maine: 51.51%
- Maryland: 51.94%
- Massachusetts: 51.84%
- Michigan: 50.97%
- Minnesota: 50.49%
- Mississippi: 51.92%
- Missouri: 51.47%
- Montana: 50.32%
- Nebraska: 50.86%
- Nevada: 49.37%
- New Hampshire: 50.86%
- New Jersey: 51.52%
- New Mexico: 51.05%
- New York: 51.83%
- North Carolina: 51.48%
- North Dakota: 50.50%
- Ohio: 51.47%
- Oklahoma: 51.11%
- Oregon: 50.43%
- Pennsylvania: 51.74%
- Rhode Island: 52.07%
- South Carolina: 51.73%
- South Dakota: 50.53%
- Tennessee: 51.43%
- Texas: 50.52%
- Utah: 4