## Data Processing for NYC Trees and Air Quality Analysis

In [2]:
import pandas as pd
import numpy as np
from datetime import datetime
import logging
import os
from typing import Tuple, Dict, Any
import pickle
from sklearn.preprocessing import LabelEncoder

In [None]:
# Create data directories if they don't exist
os.makedirs('./../data', exist_ok=True)
for dir_name in ['raw_data', 'processed_data', 'log']:
    basedir = './../data/'
    os.makedirs(basedir+dir_name, exist_ok=True)

In [8]:
class DataProcessor:
    """Class to handle data processing for NYC Trees and Air Quality datasets"""
    
    def __init__(self):
        self.tree_data = None
        self.air_quality_data = None
        self.label_encoders = {}
        
    def load_data(self, tree_path: str, air_quality_path: str) -> None:
        """
            Load raw data from CSV files
            
            Parameters:
            -----------
            tree_path: str
                Path to tree census CSV file
            air_quality_path: str
                Path to air quality CSV file
        """
        try:
            logger.info("Loading tree census data...")
            self.tree_data = pd.read_csv(tree_path)
            logger.info(f"Loaded {len(self.tree_data)} tree records")
            
            logger.info("Loading air quality data...")
            self.air_quality_data = pd.read_csv(air_quality_path)
            logger.info(f"Loaded {len(self.air_quality_data)} air quality records")
            
        except Exception as e:
            logger.error(f"Error loading data: {str(e)}")
            raise
    
    def clean_tree_data(self) -> None:
        """Clean and preprocess tree census data"""
        try:
            logger.info("Cleaning tree census data...")
            
            # Remove duplicates
            initial_len = len(self.tree_data)
            self.tree_data.drop_duplicates(subset=['tree_id'], inplace=True)
            logger.info(f"Removed {initial_len - len(self.tree_data)} duplicate trees")
            
            # Handle missing values
            self.tree_data['tree_dbh'] = self.tree_data['tree_dbh'].fillna(
                self.tree_data.groupby('spc_common')['tree_dbh'].transform('median')
            )
            
            # Filter for living trees only
            self.tree_data = self.tree_data[self.tree_data['status'] == 'Alive']
            
            # Convert coordinates to numeric
            for col in ['latitude', 'longitude']:
                self.tree_data[col] = pd.to_numeric(self.tree_data[col], errors='coerce')
            
            # Encode categorical variables
            categorical_columns = ['health', 'spc_common', 'steward', 'guards', 'sidewalk']
            for col in categorical_columns:
                if col in self.tree_data.columns:
                    self.label_encoders[col] = LabelEncoder()
                    self.tree_data[f'{col}_encoded'] = self.label_encoders[col].fit_transform(
                        self.tree_data[col].fillna('Unknown')
                    )
            
            # Create health score (1-3)
            health_mapping = {'Poor': 1, 'Fair': 2, 'Good': 3}
            self.tree_data['health_score'] = self.tree_data['health'].map(health_mapping)
            
            logger.info("Tree data cleaning completed")
            
        except Exception as e:
            logger.error(f"Error cleaning tree data: {str(e)}")
            raise
    
    def clean_air_quality_data(self) -> None:
        """Clean and preprocess air quality data"""
        try:
            logger.info("Cleaning air quality data...")
            
            # Remove duplicates
            initial_len = len(self.air_quality_data)
            self.air_quality_data.drop_duplicates(inplace=True)
            logger.info(f"Removed {initial_len - len(self.air_quality_data)} duplicate records")
            
            # Convert dates
            if 'start_date' in self.air_quality_data.columns:
                self.air_quality_data['start_date'] = pd.to_datetime(
                    self.air_quality_data['start_date']
                )
            
            # Handle missing values in data_value
            self.air_quality_data['data_value'] = pd.to_numeric(
                self.air_quality_data['data_value'], 
                errors='coerce'
            )
            
            # Create seasonal indicator
            if 'start_date' in self.air_quality_data.columns:
                self.air_quality_data['season'] = self.air_quality_data['start_date'].dt.quarter
            
            logger.info("Air quality data cleaning completed")
            
        except Exception as e:
            logger.error(f"Error cleaning air quality data: {str(e)}")
            raise
    
    def validate_data(self) -> Dict[str, Any]:
        """
        Validate processed data and return validation results
        
        Returns:
        --------
        Dict containing validation results
        """
        validation_results = {
            'trees': {
                'total_records': len(self.tree_data),
                'missing_values': self.tree_data.isnull().sum().to_dict(),
                'value_ranges': {
                    'tree_dbh': {
                        'min': self.tree_data['tree_dbh'].min(),
                        'max': self.tree_data['tree_dbh'].max()
                    },
                    'latitude': {
                        'min': self.tree_data['latitude'].min(),
                        'max': self.tree_data['latitude'].max()
                    },
                    'longitude': {
                        'min': self.tree_data['longitude'].min(),
                        'max': self.tree_data['longitude'].max()
                    }
                }
            },
            'air_quality': {
                'total_records': len(self.air_quality_data),
                'missing_values': self.air_quality_data.isnull().sum().to_dict(),
                'value_ranges': {
                    'data_value': {
                        'min': self.air_quality_data['data_value'].min(),
                        'max': self.air_quality_data['data_value'].max()
                    }
                }
            }
        }
        
        logger.info("Data validation completed")
        return validation_results
    
    def save_processed_data(self, output_dir: str = 'processed_data') -> None:
        """
        Save processed data and metadata to pickle files
        
        Parameters:
        -----------
        output_dir: str
            Directory to save processed data files
        """
        try:
            # Create timestamp for versioning
            timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
            
            # Save processed datasets
            with open(f'{output_dir}/tree_data_{timestamp}.pkl', 'wb') as f:
                pickle.dump(self.tree_data, f)
            
            with open(f'{output_dir}/air_quality_data_{timestamp}.pkl', 'wb') as f:
                pickle.dump(self.air_quality_data, f)
            
            # Save label encoders
            with open(f'{output_dir}/label_encoders_{timestamp}.pkl', 'wb') as f:
                pickle.dump(self.label_encoders, f)
            
            # Save validation results
            validation_results = self.validate_data()
            with open(f'{output_dir}/validation_results_{timestamp}.pkl', 'wb') as f:
                pickle.dump(validation_results, f)
            
            logger.info(f"All data saved to {output_dir} with timestamp {timestamp}")
            
        except Exception as e:
            logger.error(f"Error saving processed data: {str(e)}")
            raise