In [5]:
# ETL Pipeline - Sample Data Checker (Jupyter Notebook Version)
# ================================================================
# This notebook reads and validates daily spending sample data from parquet files
# using chunking techniques for memory-efficient processing.

# Cell 1: Imports and Setup
import os
import sys
from pathlib import Path
from typing import Dict, Any, Optional
from decimal import Decimal
import logging

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import numpy as np
from IPython.display import display, HTML

# Configure logging for Jupyter
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

print("✓ Imports completed successfully")

# Cell 2: ParquetDataChecker Class Definition
class ParquetDataChecker:
    """
    A class to check and validate parquet files using chunking techniques.
    
    Attributes:
        file_path (Path): Path to the parquet file
        chunk_size (int): Number of rows to process per chunk
    """
    
    def __init__(self, file_path: str, chunk_size: int = 10000):
        """
        Initialize the ParquetDataChecker.
        
        Args:
            file_path (str): Path to the parquet file
            chunk_size (int): Number of rows to process per chunk (default: 10000)
        """
        self.file_path = Path(file_path)
        self.chunk_size = chunk_size
        self.parquet_file = None
        
        if not self.file_path.exists():
            raise FileNotFoundError(f"Parquet file not found: {self.file_path}")
        
        logger.info(f"Initialized ParquetDataChecker for: {self.file_path}")
    
    def get_file_metadata(self) -> Dict[str, Any]:
        """
        Get metadata information about the parquet file.
        
        Returns:
            Dict containing file metadata
        """
        logger.info("Reading parquet file metadata...")
        
        parquet_file = pq.ParquetFile(self.file_path)
        
        metadata = {
            'file_path': str(self.file_path),
            'file_size_mb': self.file_path.stat().st_size / (1024 * 1024),
            'num_row_groups': parquet_file.num_row_groups,
            'total_rows': parquet_file.metadata.num_rows,
            'num_columns': parquet_file.metadata.num_columns,
            'schema': parquet_file.schema,
            'column_names': parquet_file.schema.names,
        }
        
        logger.info(f"File contains {metadata['total_rows']:,} rows and {metadata['num_columns']} columns")
        logger.info(f"File size: {metadata['file_size_mb']:.2f} MB")
        
        return metadata
    
    def read_in_chunks(self, columns: Optional[list] = None):
        """
        Generator to read parquet file in chunks using pyarrow.
        
        Args:
            columns (list, optional): List of columns to read. If None, reads all columns.
            
        Yields:
            pd.DataFrame: Chunk of data
        """
        logger.info(f"Reading parquet file in chunks of {self.chunk_size:,} rows...")
        
        parquet_file = pq.ParquetFile(self.file_path)
        
        for batch in parquet_file.iter_batches(batch_size=self.chunk_size, columns=columns):
            df_chunk = batch.to_pandas()
            yield df_chunk
    
    def validate_data_quality(self) -> Dict[str, Any]:
        """
        Perform data quality checks on the parquet file using chunking.
        
        Returns:
            Dict containing data quality metrics
        """
        logger.info("Starting data quality validation...")
        
        # Initialize counters
        total_rows = 0
        null_counts = {}
        column_types = {}
        numeric_stats = {}
        
        # Process data in chunks
        for chunk_num, df_chunk in enumerate(self.read_in_chunks(), start=1):
            total_rows += len(df_chunk)
            
            # Track null values
            for column in df_chunk.columns:
                if column not in null_counts:
                    null_counts[column] = 0
                    column_types[column] = str(df_chunk[column].dtype)
                
                null_counts[column] += df_chunk[column].isnull().sum()
                
                # Calculate numeric statistics
                if pd.api.types.is_numeric_dtype(df_chunk[column]):
                    if column not in numeric_stats:
                        numeric_stats[column] = {
                            'min': float('inf'),
                            'max': float('-inf'),
                            'sum': 0,
                            'count': 0
                        }
                    
                    valid_values = df_chunk[column].dropna()
                    if len(valid_values) > 0:
                        numeric_stats[column]['min'] = min(
                            numeric_stats[column]['min'], 
                            valid_values.min()
                        )
                        numeric_stats[column]['max'] = max(
                            numeric_stats[column]['max'], 
                            valid_values.max()
                        )
                        numeric_stats[column]['sum'] += valid_values.sum()
                        numeric_stats[column]['count'] += len(valid_values)
            
            if chunk_num % 10 == 0:
                logger.info(f"Processed {chunk_num} chunks ({total_rows:,} rows)...")
        
        # Calculate final statistics
        null_percentages = {
            col: (count / total_rows * 100) if total_rows > 0 else 0
            for col, count in null_counts.items()
        }
        
        for column in numeric_stats:
            if numeric_stats[column]['count'] > 0:
                numeric_stats[column]['mean'] = (
                    numeric_stats[column]['sum'] / numeric_stats[column]['count']
                )
        
        quality_report = {
            'total_rows': total_rows,
            'total_columns': len(column_types),
            'column_types': column_types,
            'null_counts': null_counts,
            'null_percentages': null_percentages,
            'numeric_statistics': numeric_stats
        }
        
        logger.info(f"Data quality validation complete. Total rows processed: {total_rows:,}")
        
        return quality_report
    
    def display_sample_data(self, n_rows: int = 10):
        """
        Display sample rows from the parquet file.
        
        Args:
            n_rows (int): Number of rows to display (default: 10)
        """
        logger.info(f"Reading first {n_rows} rows...")
        
        parquet_file = pq.ParquetFile(self.file_path)
        
        # Read only the first batch
        first_batch = next(parquet_file.iter_batches(batch_size=n_rows))
        df_sample = first_batch.to_pandas()
        
        print(f"\nSAMPLE DATA (First {len(df_sample)} rows)")
        print("="*80)
        display(df_sample)
        
        return df_sample
    
    def print_quality_report(self, quality_report: Dict[str, Any]):
        """
        Print a formatted data quality report.
        
        Args:
            quality_report (Dict): Quality report dictionary
        """
        print("\n" + "="*80)
        print("DATA QUALITY REPORT")
        print("="*80)
        print(f"Total Rows: {quality_report['total_rows']:,}")
        print(f"Total Columns: {quality_report['total_columns']}")
        print("\n" + "-"*80)
        print("COLUMN INFORMATION")
        print("-"*80)
        
        for col, dtype in quality_report['column_types'].items():
            null_count = quality_report['null_counts'][col]
            null_pct = quality_report['null_percentages'][col]
            print(f"  {col:30s} | Type: {dtype:15s} | Nulls: {null_count:8,} ({null_pct:5.2f}%)")
        
        if quality_report['numeric_statistics']:
            print("\n" + "-"*80)
            print("NUMERIC COLUMN STATISTICS")
            print("-"*80)
            
            for col, stats in quality_report['numeric_statistics'].items():
                print(f"\n  {col}:")
                print(f"    Min:   {stats['min']:,.2f}")
                print(f"    Max:   {stats['max']:,.2f}")
                print(f"    Mean:  {stats['mean']:,.2f}")
                print(f"    Count: {stats['count']:,}")
        
        print("\n" + "="*80 + "\n")
    
    def check_data_integrity(self) -> Dict[str, Any]:
        """
        Perform specific data integrity checks for financial data.
        
        Returns:
            Dict containing integrity check results
        """
        logger.info("Performing data integrity checks...")
        
        integrity_issues = {
            'negative_amounts': 0,
            'duplicate_rows': 0,
            'invalid_dates': 0,
            'outliers': []
        }
        
        seen_rows = set()
        
        for df_chunk in self.read_in_chunks():
            # Check for negative amounts (if amount columns exist)
            amount_columns = [col for col in df_chunk.columns if 'amount' in col.lower()]
            for col in amount_columns:
                if pd.api.types.is_numeric_dtype(df_chunk[col]):
                    negative_count = (df_chunk[col] < 0).sum()
                    integrity_issues['negative_amounts'] += negative_count
            
            # Check for duplicates (using hash of row values)
            for idx, row in df_chunk.iterrows():
                row_hash = hash(tuple(row))
                if row_hash in seen_rows:
                    integrity_issues['duplicate_rows'] += 1
                else:
                    seen_rows.add(row_hash)
            
            # Check for invalid dates
            date_columns = [col for col in df_chunk.columns if 'date' in col.lower()]
            for col in date_columns:
                if pd.api.types.is_datetime64_any_dtype(df_chunk[col]):
                    invalid_dates = df_chunk[col].isnull().sum()
                    integrity_issues['invalid_dates'] += invalid_dates
        
        logger.info("Data integrity checks complete.")
        
        return integrity_issues

print("✓ ParquetDataChecker class defined successfully")

# Cell 3: Define File Path (UPDATE THIS PATH!)
# UPDATE THIS PATH to point to your parquet file
parquet_file = "../data/daily_spending_sample.parquet"

# Alternative ways to specify the path:
# parquet_file = Path.cwd() / "data" / "daily_spending_sample.parquet"
# parquet_file = "/full/path/to/your/file.parquet"

print(f"Using parquet file: {parquet_file}")

# Cell 4: Initialize Checker
try:
    checker = ParquetDataChecker(str(parquet_file), chunk_size=10000)
    print("✓ Checker initialized successfully")
except FileNotFoundError as e:
    print(f"❌ Error: {e}")
    print("Please update the parquet_file path in Cell 3")

# Cell 5: Display File Metadata
print("="*80)
print("FILE METADATA")
print("="*80)
metadata = checker.get_file_metadata()
print(f"File Path: {metadata['file_path']}")
print(f"File Size: {metadata['file_size_mb']:.2f} MB")
print(f"Total Rows: {metadata['total_rows']:,}")
print(f"Total Columns: {metadata['num_columns']}")
print(f"Row Groups: {metadata['num_row_groups']}")
print("\nColumn Names:")
for i, col in enumerate(metadata['column_names'], start=1):
    print(f"  {i}. {col}")
print("\nSchema:")
print(metadata['schema'])
print("="*80)

# Cell 6: Display Sample Data
sample_df = checker.display_sample_data(n_rows=500)

# Cell 7: Perform Data Quality Validation
quality_report = checker.validate_data_quality()
checker.print_quality_report(quality_report)

# Cell 8: Perform Data Integrity Checks
integrity_issues = checker.check_data_integrity()
print("\n" + "="*80)
print("DATA INTEGRITY CHECKS")
print("="*80)
print(f"Negative amounts found: {integrity_issues['negative_amounts']:,}")
print(f"Duplicate rows found: {integrity_issues['duplicate_rows']:,}")
print(f"Invalid dates found: {integrity_issues['invalid_dates']:,}")
print("="*80 + "\n")
print("✓ Data check completed successfully!")

# Cell 9: Optional - Create Summary DataFrame
# Create a nice summary dataframe of column information
summary_data = []
for col in quality_report['column_types'].keys():
    summary_data.append({
        'Column': col,
        'Type': quality_report['column_types'][col],
        'Null Count': quality_report['null_counts'][col],
        'Null %': f"{quality_report['null_percentages'][col]:.2f}%"
    })

summary_df = pd.DataFrame(summary_data)
print("\nColumn Summary:")
display(summary_df)

# Cell 10: Optional - Visualize Numeric Statistics (if you have matplotlib)
# Uncomment if you want to create visualizations
# import matplotlib.pyplot as plt
# 
# if quality_report['numeric_statistics']:
#     fig, axes = plt.subplots(1, len(quality_report['numeric_statistics']), 
#                              figsize=(15, 4))
#     if len(quality_report['numeric_statistics']) == 1:
#         axes = [axes]
#     
#     for ax, (col, stats) in zip(axes, quality_report['numeric_statistics'].items()):
#         ax.bar(['Min', 'Mean', 'Max'], 
#                [stats['min'], stats['mean'], stats['max']])
#         ax.set_title(col)
#         ax.set_ylabel('Value')
#         plt.xticks(rotation=45)
#     
#     plt.tight_layout()
#     plt.show()

2025-10-18 12:23:34,986 - __main__ - INFO - Initialized ParquetDataChecker for: ../data/daily_spending_sample.parquet
2025-10-18 12:23:34,989 - __main__ - INFO - Reading parquet file metadata...
2025-10-18 12:23:34,992 - __main__ - INFO - File contains 6,000 rows and 7 columns
2025-10-18 12:23:34,993 - __main__ - INFO - File size: 0.09 MB
2025-10-18 12:23:34,996 - __main__ - INFO - Reading first 500 rows...


✓ Imports completed successfully
✓ ParquetDataChecker class defined successfully
Using parquet file: ../data/daily_spending_sample.parquet
✓ Checker initialized successfully
FILE METADATA
File Path: ../data/daily_spending_sample.parquet
File Size: 0.09 MB
Total Rows: 6,000
Total Columns: 7
Row Groups: 1

Column Names:
  1. person_name
  2. spending_date
  3. category
  4. amount
  5. location
  6. description
  7. payment_method

Schema:
<pyarrow._parquet.ParquetSchema object at 0x1063a3840>
required group field_id=-1 schema {
  optional binary field_id=-1 person_name (String);
  optional binary field_id=-1 spending_date (String);
  optional binary field_id=-1 category (String);
  optional binary field_id=-1 amount (String);
  optional binary field_id=-1 location (String);
  optional binary field_id=-1 description (String);
  optional binary field_id=-1 payment_method (String);
}


SAMPLE DATA (First 500 rows)


Unnamed: 0,person_name,spending_date,category,amount,location,description,payment_method
0,David Wong,01-Apr-2022,Groceries,155.66,Shopee Mart,Household items,Debit Card
1,David Wong,01-Apr-2023,Transport,$40.10,MRT Station,Shopping trip,Mobile Payment
2,David Wong,01-Apr-2023,Shopping,333.95 SGD,Zalora,Online shopping,Apple Pay
3,David Wong,01-Apr-2023,Food,SGD 17.51,Restaurant ABC,Dinner,Credit Card
4,David Wong,01-Aug-2023,Transport,SGD 26.22,Bus Interchange,Taxi to office,EZ-Link
...,...,...,...,...,...,...,...
495,David Wong,11/01/2024,Education,$105.43,Library Fine,Training,Bank Transfer
496,David Wong,11/01/23,Groceries,176.34,Sheng Siong,Household items,Mobile Payment
497,David Wong,11/02/2022,Groceries,SGD 126.59,NTUC FairPrice,Weekly Groceries,PayNow
498,David Wong,11/02/2023,Entertainment,54.86,Spotify,Movie Tickets,Google Pay


2025-10-18 12:23:35,021 - __main__ - INFO - Starting data quality validation...
2025-10-18 12:23:35,022 - __main__ - INFO - Reading parquet file in chunks of 10,000 rows...
2025-10-18 12:23:35,036 - __main__ - INFO - Data quality validation complete. Total rows processed: 6,000
2025-10-18 12:23:35,037 - __main__ - INFO - Performing data integrity checks...
2025-10-18 12:23:35,039 - __main__ - INFO - Reading parquet file in chunks of 10,000 rows...



DATA QUALITY REPORT
Total Rows: 6,000
Total Columns: 7

--------------------------------------------------------------------------------
COLUMN INFORMATION
--------------------------------------------------------------------------------
  person_name                    | Type: object          | Nulls:        0 ( 0.00%)
  spending_date                  | Type: object          | Nulls:        0 ( 0.00%)
  category                       | Type: object          | Nulls:        0 ( 0.00%)
  amount                         | Type: object          | Nulls:        0 ( 0.00%)
  location                       | Type: object          | Nulls:        0 ( 0.00%)
  description                    | Type: object          | Nulls:        0 ( 0.00%)
  payment_method                 | Type: object          | Nulls:        0 ( 0.00%)




2025-10-18 12:23:35,282 - __main__ - INFO - Data integrity checks complete.



DATA INTEGRITY CHECKS
Negative amounts found: 0
Duplicate rows found: 0
Invalid dates found: 0

✓ Data check completed successfully!

Column Summary:


Unnamed: 0,Column,Type,Null Count,Null %
0,person_name,object,0,0.00%
1,spending_date,object,0,0.00%
2,category,object,0,0.00%
3,amount,object,0,0.00%
4,location,object,0,0.00%
5,description,object,0,0.00%
6,payment_method,object,0,0.00%
