In [69]:

import boto3
import pandas as pd
import awswrangler as wr
from typing import List

def list_parquet_files(bucket: str, prefix: str) -> List[str]:
    """
    List all parquet files under a given S3 prefix, including date-partitioned folders
    
    Args:
        bucket (str): S3 bucket name
        prefix (str): S3 prefix path
        
    Returns:
        List[str]: List of complete S3 URIs for parquet files
    """
    s3_client = boto3.client('s3')
    parquet_files = []
    
    # Remove s3:// if present in bucket name
    bucket = bucket.replace('s3://', '')
    
    # Use delimiter to list folders first
    paginator = s3_client.get_paginator('list_objects_v2')
    
    # First, get all year folders
    year_pages = paginator.paginate(Bucket=bucket, Prefix=prefix, Delimiter='/')
    
    for year_page in year_pages:
        if 'CommonPrefixes' in year_page:
            for year_prefix in year_page['CommonPrefixes']:
                year_path = year_prefix['Prefix']
                
                # For each year, get month folders
                month_pages = paginator.paginate(Bucket=bucket, Prefix=year_path, Delimiter='/')
                for month_page in month_pages:
                    if 'CommonPrefixes' in month_page:
                        for month_prefix in month_page['CommonPrefixes']:
                            month_path = month_prefix['Prefix']
                            
                            # For each month, get day folders
                            day_pages = paginator.paginate(Bucket=bucket, Prefix=month_path, Delimiter='/')
                            for day_page in day_pages:
                                if 'CommonPrefixes' in day_page:
                                    for day_prefix in day_page['CommonPrefixes']:
                                        day_path = day_prefix['Prefix']
                                        
                                        # Finally, list parquet files in the day folder
                                        file_pages = paginator.paginate(Bucket=bucket, Prefix=day_path)
                                        for file_page in file_pages:
                                            if 'Contents' in file_page:
                                                for obj in file_page['Contents']:
                                                    if obj['Key'].endswith('.parquet'):
                                                        parquet_files.append(f"s3://{bucket}/{obj['Key']}")
    
    return parquet_files

def read_all_parquets(s3_path: str) -> pd.DataFrame:
    """
    Read and concatenate all parquet files from a given S3 path with date partitions
    
    Args:
        s3_path (str): Base S3 path including bucket and prefix
        
    Returns:
        pd.DataFrame: Combined DataFrame from all parquet files
    """
    # Split S3 path into bucket and prefix
    s3_path = s3_path.rstrip('/')
    bucket = s3_path.split('/')[2]
    prefix = '/'.join(s3_path.split('/')[3:]) + '/'
    
    # Get list of all parquet files
    print("Listing parquet files...")
    parquet_files = list_parquet_files(bucket, prefix)
    
    if not parquet_files:
        raise ValueError(f"No parquet files found in {s3_path}")
    
    # Read all parquet files
    print(f"Found {len(parquet_files)} parquet files")
    print("Reading parquet files...")
    df = wr.s3.read_parquet(path=parquet_files)
    
    return df

# Example usage
try:
    s3_path = "s3://mlo-team4/features/simulation/637423203755/sagemaker/us-east-2/offline-store/housing-feature-group-simulation-1733973023/data/"
    data = read_all_parquets(s3_path)
    print(f"Combined DataFrame shape: {data.shape}")
except Exception as e:
    print(f"Error: {str(e)}")


data.info()

Listing parquet files...
Found 14 parquet files
Reading parquet files...
Combined DataFrame shape: (10000, 17)
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10000 entries, 0 to 9999
Data columns (total 17 columns):
 #   Column               Non-Null Count  Dtype              
---  ------               --------------  -----              
 0   number               10000 non-null  string             
 1   event_time           10000 non-null  string             
 2   date                 10000 non-null  float64            
 3   price                10000 non-null  float64            
 4   bedrooms             10000 non-null  float64            
 5   bathrooms            10000 non-null  float64            
 6   sqft_living          10000 non-null  float64            
 7   sqft_lot             10000 non-null  float64            
 8   floors               10000 non-null  float64            
 9   condition            10000 non-null  float64            
 10  grade                10000 non-n

In [37]:
def list_parquet_files(bucket: str, prefix: str) -> List[str]:
    """
    List all parquet files under a given S3 prefix, including date-partitioned folders
    
    Args:
        bucket (str): S3 bucket name
        prefix (str): S3 prefix path
        
    Returns:
        List[str]: List of complete S3 URIs for parquet files
    """
    s3_client = boto3.client('s3')
    parquet_files = []
    
    # Remove s3:// if present in bucket name
    bucket = bucket.replace('s3://', '')
    
    # Use delimiter to list folders first
    paginator = s3_client.get_paginator('list_objects_v2')
    
    # First, get all year folders
    year_pages = paginator.paginate(Bucket=bucket, Prefix=prefix, Delimiter='/')
    
    for year_page in year_pages:
        if 'CommonPrefixes' in year_page:
            for year_prefix in year_page['CommonPrefixes']:
                year_path = year_prefix['Prefix']
                
                # For each year, get month folders
                month_pages = paginator.paginate(Bucket=bucket, Prefix=year_path, Delimiter='/')
                for month_page in month_pages:
                    if 'CommonPrefixes' in month_page:
                        for month_prefix in month_page['CommonPrefixes']:
                            month_path = month_prefix['Prefix']
                            
                            # For each month, get day folders
                            day_pages = paginator.paginate(Bucket=bucket, Prefix=month_path, Delimiter='/')
                            for day_page in day_pages:
                                if 'CommonPrefixes' in day_page:
                                    for day_prefix in day_page['CommonPrefixes']:
                                        day_path = day_prefix['Prefix']
                                        
                                        # Finally, list parquet files in the day folder
                                        file_pages = paginator.paginate(Bucket=bucket, Prefix=day_path)
                                        for file_page in file_pages:
                                            if 'Contents' in file_page:
                                                for obj in file_page['Contents']:
                                                    if obj['Key'].endswith('.parquet'):
                                                        parquet_files.append(f"s3://{bucket}/{obj['Key']}")
    
    return parquet_files

def read_all_parquets(s3_path: str) -> pd.DataFrame:
    """
    Read and concatenate all parquet files from a given S3 path with date partitions
    
    Args:
        s3_path (str): Base S3 path including bucket and prefix
        
    Returns:
        pd.DataFrame: Combined DataFrame from all parquet files
    """
    # Split S3 path into bucket and prefix
    s3_path = s3_path.rstrip('/')
    bucket = s3_path.split('/')[2]
    prefix = '/'.join(s3_path.split('/')[3:]) + '/'
    
    # Get list of all parquet files
    print("Listing parquet files...")
    parquet_files = list_parquet_files(bucket, prefix)
    
    if not parquet_files:
        raise ValueError(f"No parquet files found in {s3_path}")
    
    # Read all parquet files
    print(f"Found {len(parquet_files)} parquet files")
    print("Reading parquet files...")
    df = wr.s3.read_parquet(path=parquet_files)
    
    return df

# Example usage
try:
    s3_path = "s3://mlo-team4/features/test/637423203755/sagemaker/us-east-2/offline-store/housing-feature-group-test-1733972554/data/"
    test_data = read_all_parquets(s3_path)
    print(f"Combined DataFrame shape: {test_data.shape}")
except Exception as e:
    print(f"Error: {str(e)}")

test_data

Listing parquet files...
Found 14 parquet files
Reading parquet files...
Combined DataFrame shape: (3000, 17)


Unnamed: 0,number,event_time,date,price,bedrooms,bathrooms,sqft_living,sqft_lot,floors,condition,grade,sqft_above,sqft_basement,yr_built,write_time,api_invocation_time,is_deleted
0,5189,2024-12-12T02:54:13Z,2015.0,190000.0,2.0,1.0,670.0,3101.0,1.0,4.0,6.0,670.0,0.0,1948.0,2024-12-12 03:10:41.978000+00:00,2024-12-12 03:05:51+00:00,False
1,7843,2024-12-12T02:54:13Z,2014.0,550000.0,2.0,1.0,950.0,4080.0,1.0,4.0,7.0,950.0,0.0,1924.0,2024-12-12 03:10:41.978000+00:00,2024-12-12 03:05:51+00:00,False
2,340,2024-12-12T02:54:13Z,2015.0,485000.0,4.0,1.0,2560.0,43995.0,2.0,4.0,7.0,2560.0,0.0,1962.0,2024-12-12 03:10:41.978000+00:00,2024-12-12 03:05:51+00:00,False
3,14114,2024-12-12T02:54:13Z,2015.0,275000.0,2.0,2.0,1340.0,5995.0,2.0,3.0,7.0,1340.0,0.0,1989.0,2024-12-12 03:10:41.978000+00:00,2024-12-12 03:05:51+00:00,False
4,15374,2024-12-12T02:54:13Z,2015.0,340000.0,3.0,1.0,2650.0,7378.0,1.0,3.0,7.0,1460.0,1190.0,1952.0,2024-12-12 03:10:41.978000+00:00,2024-12-12 03:05:51+00:00,False
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2995,15463,2024-12-12T02:54:13Z,2015.0,235000.0,3.0,2.0,1090.0,8400.0,1.0,4.0,6.0,1090.0,0.0,1961.0,2024-12-12 03:10:42.385000+00:00,2024-12-12 03:06:04+00:00,False
2996,10123,2024-12-12T02:54:13Z,2015.0,465000.0,3.0,2.0,1890.0,4808.0,2.0,3.0,8.0,1890.0,0.0,2000.0,2024-12-12 03:10:42.385000+00:00,2024-12-12 03:06:04+00:00,False
2997,7979,2024-12-12T02:54:13Z,2015.0,590000.0,4.0,2.0,2940.0,12600.0,1.0,4.0,8.0,1850.0,1090.0,1974.0,2024-12-12 03:10:42.385000+00:00,2024-12-12 03:06:04+00:00,False
2998,2475,2024-12-12T02:54:13Z,2014.0,345000.0,2.0,1.0,970.0,10423.0,1.0,3.0,7.0,970.0,0.0,1947.0,2024-12-12 03:10:42.385000+00:00,2024-12-12 03:06:04+00:00,False


In [38]:
from sklearn.model_selection import train_test_split

train_data, validation_data= train_test_split(data, test_size=0.2, random_state=42)


splits_summary = {
    "Training Data": train_data.shape[0],
    "Validation Data": validation_data.shape[0],
    "Test Data": test_data.shape[0],
}

print(splits_summary)

{'Training Data': 8000, 'Validation Data': 2000, 'Test Data': 3000}


In [41]:


columns_to_drop = ['Unnamed: 0',
 'number',
 'event_time',
 'write_time',
 'api_invocation_time',
    'is_deleted']  
train_data = train_data.drop(columns=columns_to_drop, errors='ignore')
validation_data = validation_data.drop(columns=columns_to_drop, errors='ignore')
test_data = test_data.drop(columns=columns_to_drop, errors='ignore')

In [42]:
#!pip install h2o
#!pip install xgboost
import h2o
from h2o.automl import H2OAutoML
import xgboost

In [43]:

h2o.init()

train_h2o = h2o.H2OFrame(train_data)
validation_h2o = h2o.H2OFrame(validation_data)
test_h2o = h2o.H2OFrame(test_data)

target = 'price'
features = [col for col in train_data.columns if col != target]



aml = H2OAutoML(max_models=20, seed=42, exclude_algos=None)
aml.train(x=features, y=target, training_frame=train_h2o, validation_frame=validation_h2o)



Checking whether there is an H2O instance running at http://localhost:54321. connected.


0,1
H2O_cluster_uptime:,1 hour 10 mins
H2O_cluster_timezone:,UTC
H2O_data_parsing_timezone:,UTC
H2O_cluster_version:,3.46.0.6
H2O_cluster_version_age:,1 month and 10 days
H2O_cluster_name:,H2O_from_python_ec2_user_7vek2j
H2O_cluster_total_nodes:,1
H2O_cluster_free_memory:,7.664 Gb
H2O_cluster_total_cores:,8
H2O_cluster_allowed_cores:,8


Parse progress: |████████████████████████████████████████████████████████████████| (done) 100%
Parse progress: |████████████████████████████████████████████████████████████████| (done) 100%
Parse progress: |████████████████████████████████████████████████████████████████| (done) 100%
AutoML progress: |
05:05:02.103: User specified a validation frame with cross-validation still enabled. Please note that the models will still be validated using cross-validation only, the validation frame will be used to provide purely informative validation metrics on the trained models.

███████████████████████████████████████████████████████████████| (done) 100%


key,value
Stacking strategy,cross_validation
Number of base models (used / total),10/20
# GBM base models (used / total),4/7
# XGBoost base models (used / total),3/6
# DeepLearning base models (used / total),3/4
# DRF base models (used / total),0/2
# GLM base models (used / total),0/1
Metalearner algorithm,GLM
Metalearner fold assignment scheme,Random
Metalearner nfolds,5

Unnamed: 0,mean,sd,cv_1_valid,cv_2_valid,cv_3_valid,cv_4_valid,cv_5_valid
aic,43775.387,1837.6389,45198.465,44678.137,44199.02,44232.086,40569.22
loglikelihood,0.0,0.0,0.0,0.0,0.0,0.0,0.0
mae,132143.75,1874.5265,133982.98,132818.19,130859.664,133505.45,129552.484
mean_residual_deviance,43976827000.0,2896264190.0,44734210000.0,47090815000.0,40018055000.0,45970805000.0,42070249000.0
mse,43976827000.0,2896264190.0,44734210000.0,47090815000.0,40018055000.0,45970805000.0,42070249000.0
null_deviance,238032137000000.0,32143312900000.0,232529378000000.0,281071401000000.0,221897288000000.0,257002521000000.0,197660099000000.0
r2,0.7021232,0.0191558,0.6823707,0.7268159,0.707394,0.7109457,0.6830898
residual_deviance,70421517000000.0,6316494400000.0,73856182000000.0,76710934000000.0,64869269000000.0,74196877000000.0,62474322000000.0
rmse,209614.47,6946.548,211504.64,217004.19,200045.14,214408.03,205110.34
rmsle,0.3123729,0.0044544,0.3141195,0.3100301,0.3147326,0.3058474,0.3171351


In [44]:
leaderboard = aml.leaderboard
print(leaderboard)

predictions = aml.leader.predict(test_h2o)

pred_df = predictions.as_data_frame()
print(pred_df.head())

model_id                                                  rmse          mse     mae       rmsle    mean_residual_deviance
StackedEnsemble_AllModels_1_AutoML_2_20241212_50502     209910  4.40622e+10  132270    0.3123                 4.40622e+10
StackedEnsemble_BestOfFamily_1_AutoML_2_20241212_50502  210767  4.44226e+10  132733    0.314223               4.44226e+10
DeepLearning_grid_2_AutoML_2_20241212_50502_model_1     215650  4.65051e+10  138419    0.327862               4.65051e+10
GBM_2_AutoML_2_20241212_50502                           216362  4.68127e+10  133623    0.316034               4.68127e+10
DeepLearning_grid_3_AutoML_2_20241212_50502_model_1     216530  4.68853e+10  137927    0.324232               4.68853e+10
GBM_3_AutoML_2_20241212_50502                           216969  4.70754e+10  133527    0.316078               4.70754e+10
GBM_4_AutoML_2_20241212_50502                           217150  4.7154e+10   133895    0.316387               4.7154e+10
GBM_grid_1_AutoML_2_20241

In [45]:
leaderboard_df = leaderboard.as_data_frame()

best_model_row = leaderboard_df.loc[leaderboard_df['rmse'].idxmin()]
best_model_id = best_model_row['model_id']
best_rmse = best_model_row['rmse']

best_model_df = pd.DataFrame({
    'model_id': [best_model_id],
    'rmse': [best_rmse]
})



Unnamed: 0,model_id,rmse
0,StackedEnsemble_AllModels_1_AutoML_2_20241212_...,209910.08112


In [46]:
#!pip install evidently
import pandas as pd
import boto3
import os
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, DataQualityPreset, TargetDriftPreset, RegressionPreset

def prepare_monitoring_data(train_data, test_data, predictions, target_column='price'):
    """
    Prepare data for Evidently monitoring
    """
    reference_data = train_data.copy()
    current_data = test_data.copy()
    
    # Get model predictions
    reference_predictions = aml.leader.predict(h2o.H2OFrame(reference_data))
    reference_predictions = reference_predictions.as_data_frame()
    reference_data['prediction'] = reference_predictions['predict'].values
    current_data['prediction'] = predictions
    
    # Rename target column for Evidently
    reference_data = reference_data.rename(columns={target_column: 'target'})
    current_data = current_data.rename(columns={target_column: 'target'})
    
    # Drop any unnecessary columns 
    columns_to_keep = list(reference_data.columns)
    reference_data = reference_data[columns_to_keep]
    current_data = current_data[columns_to_keep]
    
    return reference_data, current_data

def create_monitoring_report(reference_data, current_data, s3_bucket='mlo4-res', s3_prefix='evidently'):
    """
    Generate comprehensive monitoring report using Evidently and save to S3
    """
    # Initialize S3 client
    s3 = boto3.client('s3')
    
    # Create report
    report = Report(metrics=[
        DataQualityPreset(),
        DataDriftPreset(),
        TargetDriftPreset(),
        RegressionPreset()
    ])
    
    # Run report
    try:
        report.run(reference_data=reference_data, current_data=current_data)
    except Exception as e:
        print("Debug info:")
        print(f"Reference data columns: {reference_data.columns.tolist()}")
        print(f"Current data columns: {current_data.columns.tolist()}")
        print(f"Reference data shape: {reference_data.shape}")
        print(f"Current data shape: {current_data.shape}")
        raise e
    
    # Save report locally first
    timestamp = pd.Timestamp.now().strftime('%Y%m%d_%H%M%S')
    report_path = f'monitoring_report_{timestamp}.html'
    
    report.save_html(report_path)
    
    # Upload to S3
    s3.upload_file(
        report_path, 
        s3_bucket, 
        f'{s3_prefix}/monitoring_report_{timestamp}.html'
    )
    
    # Clean up local file
    os.remove(report_path)
    
    print(f"Report saved to s3://{s3_bucket}/{s3_prefix}/monitoring_report_{timestamp}.html")
    
    return report

In [47]:
# After your predictions
reference_data, current_data = prepare_monitoring_data(
    train_data=train_h2o.as_data_frame(),
    test_data=test_h2o.as_data_frame(), 
    predictions=pred_df['predict'].values,
    target_column='price'
)

# Print debug info before creating report
print("Before creating report:")
print(f"Reference data columns: {reference_data.columns.tolist()}")
print(f"Current data columns: {current_data.columns.tolist()}")

report = create_monitoring_report(
    reference_data=reference_data,
    current_data=current_data
)

Parse progress: |████████████████████████████████████████████████████████████████| (done) 100%
stackedensemble prediction progress: |███████████████████████████████████████████| (done) 100%
Before creating report:
Reference data columns: ['date', 'target', 'bedrooms', 'bathrooms', 'sqft_living', 'sqft_lot', 'floors', 'condition', 'grade', 'sqft_above', 'sqft_basement', 'yr_built', 'prediction']
Current data columns: ['date', 'target', 'bedrooms', 'bathrooms', 'sqft_living', 'sqft_lot', 'floors', 'condition', 'grade', 'sqft_above', 'sqft_basement', 'yr_built', 'prediction']
Report saved to s3://mlo4-res/evidently/monitoring_report_20241212_050955.html


In [49]:
s3 = boto3.client('s3')
bucket_name = 'mlo4-res'
s3_key = 'housing_automl/best_model_info.csv'

try:
    s3.download_file(bucket_name, s3_key, 'existing_best_model_info.csv')
    existing_best_model_df = pd.read_csv('existing_best_model_info.csv')
    
    if existing_best_model_df.empty:
        existing_best_model_df = pd.DataFrame({'model_id': [''], 'rmse': [float('inf')], 'training_time_ms': [float('inf')]})
except:
    existing_best_model_df = pd.DataFrame({'model_id': [''], 'rmse': [float('inf')], 'training_time_ms': [float('inf')]})

leaderboard_df = leaderboard.as_data_frame()

current_best_model_row = leaderboard_df.loc[leaderboard_df['rmse'].idxmin()]
current_best_model_id = current_best_model_row['model_id']
current_best_rmse = current_best_model_row['rmse']

# Get training time for the current best model
current_best_model = h2o.get_model(current_best_model_id)
current_best_training_time = current_best_model._model_json['output']['run_time']

print(f"Current model RMSE: {current_best_rmse}")
print(f"Existing best model RMSE: {existing_best_model_df['rmse'].iloc[0]}")

if current_best_rmse < existing_best_model_df['rmse'].iloc[0] or existing_best_model_df['rmse'].iloc[0] == 0:

    new_best_model_df = pd.DataFrame({
        'model_id': [current_best_model_id],
        'rmse': [current_best_rmse],
        'training_time_ms': [current_best_training_time]
    })
    
    new_best_model_df.to_csv('best_model_info.csv', index=False)
    
    s3.upload_file('best_model_info.csv', bucket_name, s3_key)
    
    best_model = aml.leader
    model_path = h2o.save_model(model=best_model, path="h2o_models/", force=True)
    print(f"Model saved to: {model_path}")
    
    s3_model_key = "housing_automl/h2o_best_model.zip"
    s3.upload_file(Filename=model_path, Bucket=bucket_name, Key=s3_model_key)
    print(f"Model uploaded to s3://{bucket_name}/{s3_model_key}")
else:
    print("Current model is not better than the existing best model.")


Current model RMSE: 209910.08111986367
Existing best model RMSE: 209910.08111986367
Current model is not better than the existing best model.
