In [None]:
"""
===============================================================================
SimpleCSVAnalyzerSnowflake: HDB Resale CSV Analyzer for Snowflake Notebooks
===============================================================================

This class performs a full ETL (Extract, Transform, Load) process on HDB resale
flat data stored as CSV files in a Snowflake stage. It automates data loading,
cleaning, transformation, validation, and saving to a Snowflake table.

-------------------------------------------------------------------------------
| Step | Description                                                         |
|------|----------------------------------------------------------------------|
| 1    | Load CSV files from a Snowflake stage into pandas DataFrames        |
| 2    | Derive AGE column from REMAINING_LEASE or LEASE_COMMENCE_DATE        |
| 3    | Clean text columns (uppercase conversion and NaN handling)           |
| 4    | Filter unnecessary columns and ensure column consistency             |
| 5    | Combine all cleaned CSV files into a single DataFrame               |
| 6    | Split 'MONTH' column into 'YEAR' and 'MONTH_NUM'                    |
| 7    | Display a sample of the cleaned data                                 |
| 8    | Check for and remove duplicate rows                                  |
| 9    | Save the final cleaned dataset to a Snowflake table (default: HDB_SILVER) |
| 10   | Query and display sample rows from the saved Snowflake table        |
-------------------------------------------------------------------------------

Usage:
    analyzer = SimpleCSVAnalyzerSnowflake()
    analyzer.run_preprocess()

Dependencies:
    - snowflake.snowpark
    - pandas
    - datetime

Assumptions:
    - Snowflake session is active and authenticated
    - CSV files are stored in the provided Snowflake stage path
    - Files contain consistent HDB resale data format

"""

In [None]:
import pandas as pd
from snowflake.snowpark import Session
from datetime import datetime
from datetime import datetime
from snowflake.snowpark.context import get_active_session

class SimpleCSVAnalyzerSnowflake:
    """Complete CSV Analyzer for HDB resale flat data in Snowflake Notebook"""

    def __init__(self, stage="@HDB_stage/Resale"):
        self.session = get_active_session()
        self.stage = stage
        self.csv_files = {}
        self.final_data = None

    # Step 1: Load CSVs from stage
    def load_all_csv_files(self):
        print("Step 1: Loading ResaleFlat CSV files from Snowflake stage...")
        print("=" * 50)

        file_list = self.session.sql(f"LIST {self.stage}").collect()
        resale_flat_files = [
            row["name"] for row in file_list
            if row["name"].upper().endswith(".CSV") and "RESALE" in row["name"].upper()
        ]

        if not resale_flat_files:
            print("‚ùå No CSV files starting with 'Resale' found in stage!")
            return None

        print(f"Found {len(resale_flat_files)} Resale CSV files:")
        for i, file in enumerate(resale_flat_files, 1):
            print(f"   {i}. {file}")

        for file_path in resale_flat_files:
            try:
                stage_path = f"@{file_path}"
                df = self.session.read.options({
                    "FIELD_OPTIONALLY_ENCLOSED_BY": '"',
                    "SKIP_HEADER": 0,
                    "HEADER": True      # use first row as header
                }).csv(stage_path)

                pdf = df.to_pandas()
                pdf.columns = [col.strip().upper() for col in pdf.columns]
                print("Columns in this CSV:", list(pdf.columns))
                self.csv_files[file_path] = pdf
                print(f"‚úÖ Loaded: {file_path} | Rows: {len(pdf):,} | Cols: {len(pdf.columns)}")
                
            except Exception as e: 
                print(f"‚ùå Failed to load {file_path}: {e}")

        return self.csv_files

    # Step 2: Fix AGE columns
    def fix_age_columns(self):
        """Step 2: Fix AGE columns to make all files compatible"""
        print("\nStep 2: Fixing AGE columns...")
        print("=" * 50)
        
        current_year = datetime.now().year
        print(f"Using current year: {current_year}")
        
        for file_name, data in self.csv_files.items():
            print(f"\nüîß Processing: {file_name}")
            
            # Check what columns exist - convert to lowercase for comparison
            column_names = [col.lower() for col in data.columns]
            
            # If 'remaining_lease' exists, convert to AGE = 99 - lease_years
            if 'remaining_lease' in column_names:
                old_col_name = None
                for col in data.columns:
                    if col.lower() == 'remaining_lease':
                        old_col_name = col
                        break

                # Parse remaining lease text to extract years
                def parse_remaining_lease(lease_str):
                    if pd.isna(lease_str) or lease_str == '':
                        return 0
                    lease_str = str(lease_str).strip()

                    # Try to extract years from formats like "61 years 04 months"
                    import re
                    year_match = re.search(r'(\d+)\s*years?', lease_str.lower())
                    if year_match:
                        return int(year_match.group(1))

                    # If it's already a number, use it
                    try:
                        return int(float(lease_str))
                    except:
                        return 0

                # Apply the parsing function and compute AGE
                data['AGE'] = data[old_col_name].apply(lambda x: 99 - parse_remaining_lease(x))
                print(f"   ‚úÖ Converted '{old_col_name}' to AGE = 99 - remaining_lease")
                print(f"   Sample values: {data['AGE'].head(3).tolist()}")

                
            # If no 'remaining_lease' but 'lease_commence_date' exists, create AGE
            elif 'lease_commence_date' in column_names:
                lease_col = None
                for col in data.columns:
                    if col.lower() == 'lease_commence_date':
                        lease_col = col
                        break
                
                # Convert to numeric and calculate AGE
                data[lease_col] = pd.to_numeric(data[lease_col], errors='coerce')
                data['AGE'] = current_year - data[lease_col]
                print(f"   ‚úÖ Created AGE column from '{lease_col}'")
                print(f"   Formula: AGE = {current_year} - {lease_col}")
                
            else:
                print(f"   ‚ö†Ô∏è No lease columns found in {file_name}")


    # Step 3: Clean text columns
    def clean_text_columns(self):
        print("\nStep 3: Converting text columns to uppercase...")
        print("=" * 50)
        text_columns = ['TOWN', 'FLAT_TYPE', 'STREET_NAME', 'FLAT_MODEL']
        for file_name, data in self.csv_files.items():
            for col in text_columns:
                if col in data.columns:
                    data[col] = data[col].astype(str).str.upper()
                    data[col] = data[col].replace('NAN', pd.NA)
                    print(f"   ‚úÖ Converted {col} to uppercase in {file_name}")

    # Step 4: Filter dataset and check column consistency
    def prepare_before_combining(self):
        print("\nStep 4: Preparing dataset before combining...")
        print("=" * 50)

        if not self.csv_files:
            print("‚ùå No CSV files loaded")
            return False

        # Filter columns: remove unnecessary ones before combining
        exclude_cols = ['STREET_NAME', 'SOURCE_FILE', 'LEASE_COMMENCE_DATE','REMAINING_LEASE','REMAINING_LEASE_YEARS','BLOCK']
        for file_name, data in self.csv_files.items():
            keep_cols = [c for c in data.columns if c not in exclude_cols]
            self.csv_files[file_name] = data[keep_cols].copy()
            print(f"   ‚úÖ Columns filtered for {file_name}: {len(self.csv_files[file_name].columns)} columns kept")

        # Check column consistency
        col_sets = [sorted(list(df.columns)) for df in self.csv_files.values()]
        first_cols = col_sets[0]
        all_match = all(cols == first_cols for cols in col_sets)
        if not all_match:
            print("‚ùå Column mismatch detected between CSV files!")
            for i, cols in enumerate(col_sets):
                print(f"   File {i+1} columns: {cols}")
            return False
        print("‚úÖ All CSV files have consistent columns.")
        return True

    # Step 5: Combine all CSVs into one DataFrame
    def combine_all_files(self):
        print("\nStep 5: Combining all files...")
        print("=" * 50)

        if not self.prepare_before_combining():
            print("‚ùå Cannot combine CSVs due to column mismatch")
            return None

        dfs = []
        for file_name, data in self.csv_files.items():
            data['SOURCE_FILE'] = file_name
            dfs.append(data)

        if dfs:
            self.final_data = pd.concat(dfs, ignore_index=True)
            # Convert numeric columns
            for col in ['RESALE_PRICE', 'FLOOR_AREA_SQM']:
                if col in self.final_data.columns:
                    self.final_data[col] = self.final_data[col].fillna(0).astype(int)
            print(f"‚úÖ Combined successfully! Rows: {len(self.final_data):,}, Cols: {len(self.final_data.columns)}")
        else:
            print("‚ö†Ô∏è No files to combine")

    # Step 6: Split MONTH field
    def split_month_field(self):
        print("\nStep 6: Splitting MONTH field...")
        print("=" * 50)
        if self.final_data is None:
            print("‚ùå No data available")
            return
        if 'MONTH' not in self.final_data.columns:
            print("‚ùå MONTH column not found")
            return

        self.final_data['MONTH'] = pd.to_datetime(self.final_data['MONTH'], errors='coerce')
        self.final_data['YEAR'] = self.final_data['MONTH'].dt.year
        self.final_data['MONTH_NUM'] = self.final_data['MONTH'].dt.month
        print("‚úÖ Split MONTH into YEAR and MONTH_NUM")

    # Step 7: Show sample data
    def show_sample_data(self, n=3):
        print("\nStep 7: Sample of filtered data...")
        print("=" * 50)
        if self.final_data is not None:
            print(self.final_data.head(n))
            print("\nColumns:", list(self.final_data.columns))

    # Step 8: Check duplicates
    def check_duplicate_data(self):
        print("\nStep 8: Checking for duplicate rows...")
        print("=" * 50)
        if self.final_data is None:
            print("‚ùå No data available")
            return

        duplicate_rows = self.final_data[self.final_data.duplicated()]
        num_duplicates = len(duplicate_rows)
        if num_duplicates > 0:
            print(f"üîç Found {num_duplicates:,} duplicate rows. Dropping duplicates...")
            self.final_data = self.final_data.drop_duplicates()
            print(f"‚úÖ Remaining rows: {len(self.final_data):,}")
        else:
            print("‚úÖ No duplicate rows found")

    # Step 9: Save final DataFrame to Snowflake table
    def save_to_snowflake_table(self, table_name="HDB_SILVER"):
        print("\nStep 9: Saving final dataset to Snowflake table...")
        print("=" * 50)
        if self.final_data is None:
            print("‚ùå No data available to save")
            return

        # Exclude MONTH and SOURCE_FILE columns
        columns_to_drop = []
        if 'MONTH' in self.final_data.columns:
            columns_to_drop.append('MONTH')
        if 'SOURCE_FILE' in self.final_data.columns:
            columns_to_drop.append('SOURCE_FILE')
            
        df_to_save = self.final_data.drop(columns=columns_to_drop) if columns_to_drop else self.final_data.copy()
        
        # Reset index to fix the pandas index warning
        df_to_save = df_to_save.reset_index(drop=True)
        
        print(f"Final columns to save: {list(df_to_save.columns)}")
        print(f"Rows to save: {len(df_to_save):,}")

        try:
            # Write to Snowflake (replace table if exists)
            self.session.write_pandas(
                df_to_save,
                table_name,
                auto_create_table=True,
                overwrite=True
            )
            print(f"‚úÖ Saved final dataset to Snowflake table: {table_name}")
        except Exception as e:
            print(f"‚ùå Failed to save table {table_name}: {e}")

    # Step 10: Query and show top 3 rows from Snowflake table
    def show_snowflake_table_sample(self, table_name="HDB_SILVER", n=3):
        print(f"\nStep 10: Showing top {n} rows from Snowflake table {table_name}...")
        print("=" * 70)
        try:
            # Query the table from Snowflake
            result = self.session.sql(f"SELECT * FROM {table_name} LIMIT {n}").collect()
            
            if result:
                print(f"‚úÖ Top {n} rows from {table_name}:")
                print("-" * 100)
                for i, row in enumerate(result, 1):
                    print(f"Row {i}: {dict(row.asDict())}")
                    print("-" * 50)
                    
                # Also show column names
                columns_result = self.session.sql(f"DESCRIBE TABLE {table_name}").collect()
                column_names = [row['name'] for row in columns_result]
                print(f"\nTable columns ({len(column_names)}): {column_names}")
            else:
                print(f"‚ö†Ô∏è No data found in table {table_name}")
        except Exception as e:
            print(f"‚ùå Failed to query table {table_name}: {e}")
        
    # Main execution method
    def run_preprocess(self):
        print("üöÄ Starting preprocess, load and clean HDB data")
        print("=" * 70)
        self.load_all_csv_files()
        if not self.csv_files:
            return None
        self.fix_age_columns()
        self.clean_text_columns()
        self.combine_all_files()
        self.split_month_field()
        self.show_sample_data()
        self.check_duplicate_data()
        self.save_to_snowflake_table()
        self.show_snowflake_table_sample()  # Added this line
        print("üéâ Processing Complete!")
        return self.final_data

In [None]:
# Create analyzer
analyzer = SimpleCSVAnalyzerSnowflake()

# Run full pipeline and get final combined DataFrame
df_all = analyzer.run_preprocess()

# Preview
df_all.head()


In [None]:
**OUTPUT**

üöÄ Starting preprocess, load and clean HDB data  
======================================================================  
Step 1: Loading ResaleFlat CSV files from Snowflake stage...  
==================================================  
Found 3 Resale CSV files:  
   1. hdb_stage/ResaleFlatPricesBasedonRegistrationDateFromJan2015toDec2016.csv  
   2. hdb_stage/ResaleFlatPricesBasedonRegistrationDateFromMar2012toDec2014.csv  
   3. hdb_stage/ResaleflatpricesbasedonregistrationdatefromJan2017onwards.csv  
Columns in this CSV: ['MONTH', 'TOWN', 'FLAT_TYPE', 'BLOCK', 'STREET_NAME', 'STOREY_RANGE', 'FLOOR_AREA_SQM', 'FLAT_MODEL', 'LEASE_COMMENCE_DATE', 'REMAINING_LEASE', 'RESALE_PRICE']  
‚úÖ Loaded: hdb_stage/ResaleFlatPricesBasedonRegistrationDateFromJan2015toDec2016.csv | Rows: 37,153 | Cols: 11  
Columns in this CSV: ['MONTH', 'TOWN', 'FLAT_TYPE', 'BLOCK', 'STREET_NAME', 'STOREY_RANGE', 'FLOOR_AREA_SQM', 'FLAT_MODEL', 'LEASE_COMMENCE_DATE', 'RESALE_PRICE']  
‚úÖ Loaded: hdb_stage/ResaleFlatPricesBasedonRegistrationDateFromMar2012toDec2014.csv | Rows: 52,203 | Cols: 10  
Columns in this CSV: ['MONTH', 'TOWN', 'FLAT_TYPE', 'BLOCK', 'STREET_NAME', 'STOREY_RANGE', 'FLOOR_AREA_SQM', 'FLAT_MODEL', 'LEASE_COMMENCE_DATE', 'REMAINING_LEASE', 'RESALE_PRICE']  
‚úÖ Loaded: hdb_stage/ResaleflatpricesbasedonregistrationdatefromJan2017onwards.csv | Rows: 214,893 | Cols: 11  
Step 2: Fixing AGE columns...  
==================================================  
Using current year: 2025  
üîß Processing: hdb_stage/ResaleFlatPricesBasedonRegistrationDateFromJan2015toDec2016.csv  
   ‚úÖ Converted 'REMAINING_LEASE' to AGE = 99 - remaining_lease  
   Sample values: [29, 34, 35]  
üîß Processing: hdb_stage/ResaleFlatPricesBasedonRegistrationDateFromMar2012toDec2014.csv  
   ‚úÖ Created AGE column from 'LEASE_COMMENCE_DATE'  
   Formula: AGE = 2025 - LEASE_COMMENCE_DATE  
üîß Processing: hdb_stage/ResaleflatpricesbasedonregistrationdatefromJan2017onwards.csv  
   ‚úÖ Converted 'REMAINING_LEASE' to AGE = 99 - remaining_lease  
   Sample values: [38, 39, 37]  
Step 3: Converting text columns to uppercase...  
==================================================  
   ‚úÖ Converted TOWN to uppercase in hdb_stage/ResaleFlatPricesBasedonRegistrationDateFromJan2015toDec2016.csv  
   ‚úÖ Converted FLAT_TYPE to uppercase in hdb_stage/ResaleFlatPricesBasedonRegistrationDateFromJan2015toDec2016.csv  
   ‚úÖ Converted STREET_NAME to uppercase in hdb_stage/ResaleFlatPricesBasedonRegistrationDateFromJan2015toDec2016.csv  
   ‚úÖ Converted FLAT_MODEL to uppercase in hdb_stage/ResaleFlatPricesBasedonRegistrationDateFromJan2015toDec2016.csv  
   ‚úÖ Converted TOWN to uppercase in hdb_stage/ResaleFlatPricesBasedonRegistrationDateFromMar2012toDec2014.csv  
   ‚úÖ Converted FLAT_TYPE to uppercase in hdb_stage/ResaleFlatPricesBasedonRegistrationDateFromMar2012toDec2014.csv  
   ‚úÖ Converted STREET_NAME to uppercase in hdb_stage/ResaleFlatPricesBasedonRegistrationDateFromMar2012toDec2014.csv  
   ‚úÖ Converted FLAT_MODEL to uppercase in hdb_stage/ResaleFlatPricesBasedonRegistrationDateFromMar2012toDec2014.csv  
   ‚úÖ Converted TOWN to uppercase in hdb_stage/ResaleflatpricesbasedonregistrationdatefromJan2017onwards.csv  
   ‚úÖ Converted FLAT_TYPE to uppercase in hdb_stage/ResaleflatpricesbasedonregistrationdatefromJan2017onwards.csv  
   ‚úÖ Converted STREET_NAME to uppercase in hdb_stage/ResaleflatpricesbasedonregistrationdatefromJan2017onwards.csv  
   ‚úÖ Converted FLAT_MODEL to uppercase in hdb_stage/ResaleflatpricesbasedonregistrationdatefromJan2017onwards.csv  
Step 5: Combining all files...  
==================================================  
Step 4: Preparing dataset before combining...  
==================================================  
   ‚úÖ Columns filtered for hdb_stage/ResaleFlatPricesBasedonRegistrationDateFromJan2015toDec2016.csv: 8 columns kept  
   ‚úÖ Columns filtered for hdb_stage/ResaleFlatPricesBasedonRegistrationDateFromMar2012toDec2014.csv: 8 columns kept  
   ‚úÖ Columns filtered for hdb_stage/ResaleflatpricesbasedonregistrationdatefromJan2017onwards.csv: 8 columns kept  
‚úÖ All CSV files have consistent columns.  
‚úÖ Combined successfully! Rows: 304,249, Cols: 9  
Step 6: Splitting MONTH field...  
==================================================  
‚úÖ Split MONTH into YEAR and MONTH_NUM  
Step 7: Sample of filtered data...  
==================================================  
       MONTH        TOWN  ...  YEAR MONTH_NUM  
0 2015-01-01  ANG MO KIO  ...  2015         1  
1 2015-01-01  ANG MO KIO  ...  2015         1  
2 2015-01-01  ANG MO KIO  ...  2015         1  
[3 rows x 11 columns]  
Columns: ['MONTH', 'TOWN', 'FLAT_TYPE', 'STOREY_RANGE', 'FLOOR_AREA_SQM', 'FLAT_MODEL', 'RESALE_PRICE', 'AGE', 'SOURCE_FILE', 'YEAR', 'MONTH_NUM']  
Step 8: Checking for duplicate rows...  
==================================================  
üîç Found 2,322 duplicate rows. Dropping duplicates...  
‚úÖ Remaining rows: 301,927  
Step 9: Saving final dataset to Snowflake table...  
==================================================  
Final columns to save: ['TOWN', 'FLAT_TYPE', 'STOREY_RANGE', 'FLOOR_AREA_SQM', 'FLAT_MODEL', 'RESALE_PRICE', 'AGE', 'YEAR', 'MONTH_NUM']  
Rows to save: 301,927  
‚úÖ Saved final dataset to Snowflake table: HDB_SILVER  
Step 10: Showing top 3 rows from Snowflake table HDB_SILVER...  
======================================================================  
‚úÖ Top 3 rows from HDB_SILVER:  
----------------------------------------------------------------------------------------------------  
Row 1: {'TOWN': 'WOODLANDS', 'FLAT_TYPE': '4 ROOM', 'STOREY_RANGE': '13 TO 15', 'FLOOR_AREA_SQM': 103, 'FLAT_MODEL': 'PREMIUM APARTMENT', 'RESALE_PRICE': 626000, 'AGE': 26, 'YEAR': 2025, 'MONTH_NUM': 4}  
--------------------------------------------------  
Row 2: {'TOWN': 'WOODLANDS', 'FLAT_TYPE': '4 ROOM', 'STOREY_RANGE': '01 TO 03', 'FLOOR_AREA_SQM': 104, 'FLAT_MODEL': 'PREMIUM APARTMENT', 'RESALE_PRICE': 600000, 'AGE': 26, 'YEAR': 2025, 'MONTH_NUM': 4}  
--------------------------------------------------  
Row 3: {'TOWN': 'WOODLANDS', 'FLAT_TYPE': '4 ROOM', 'STOREY_RANGE': '13 TO 15', 'FLOOR_AREA_SQM': 102, 'FLAT_MODEL': 'PREMIUM APARTMENT', 'RESALE_PRICE': 608000, 'AGE': 26, 'YEAR': 2025, 'MONTH_NUM': 4}  
--------------------------------------------------  
Table columns (9): ['TOWN', 'FLAT_TYPE', 'STOREY_RANGE', 'FLOOR_AREA_SQM', 'FLAT_MODEL', 'RESALE_PRICE', 'AGE', 'YEAR', 'MONTH_NUM']  
üéâ Processing Complete!