In [ ]:
import os
import io
import boto3
import pickle
import pandas as pd
import numpy as np
from dotenv import load_dotenv
from pathlib import Path

# Vis Imports
import seaborn as sns

In [ ]:
pd.set_option('mode.chained_assignment', None)
pd.options.display.float_format = '{:.2f}'.format
pd.options.display.max_columns = None
pd.options.display.max_rows = None

In [13]:
# Determine ecosystem and load appropriate variables
if os.getenv('DEEPNOTE_RUNTIME_UUID'):
    deepnote = True
    print('Running on Deepnote with Env and S3 integrations, skipping dotenv')
else:
    deepnote = False
    print('Loading dotenv file')

    # Private file contains non-public variable configurations for local development.  Not loaded to github.
    # variables.env can be populated with user specific API and Access keys and is empty by default.  Loaded to github.
    private_vars_path = Path("../private_variables.env")
    var_path = Path("../variables.env")
    
    # Use the private vars if exists, otherwise use the public vars file
    env_path = private_vars_path if private_vars_path.exists() else var_path
    
    # Load the environment variables from the env path
    load_dotenv(env_path)
    
    # Establish the variables
    aws_access_key_id = os.getenv('AWS_ACCESS_KEY_ID')
    aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY')
    s3_bucket_name = os.getenv('S3_BUCKET_NAME')
    neptune_project = os.getenv('NEPTUNE_PROJECT')
    neptune_api_key = os.getenv('NEPTUNE_API_KEY')
    mapbox_api_key = os.getenv('MAPBOX_API_KEY')

In [68]:
def list_s3_contents(file_path, access_key_id=aws_access_key_id, secret_access_key=aws_secret_access_key, bucket_name=s3_bucket_name):
    """
    List the contents of an S3 bucket path, prioritizing directories first, 
    then files in alphabetical order.
    
    Args:
        file_path (str): The S3 bucket path to list.
        access_key_id (str): AWS access key ID.
        secret_access_key (str): AWS secret access key.
        bucket_name (str): Name of the S3 bucket.
    """
    # Initialize a boto3 client
    s3_client = boto3.client('s3', aws_access_key_id=access_key_id, aws_secret_access_key=secret_access_key)
    
    # Add a trailing slash if not present to properly emulate directory behavior
    if not file_path.endswith('/'):
        file_path += '/'
    
    # List objects in the specified path
    response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=file_path, Delimiter='/')
    
    # Collect directories (CommonPrefixes) and files
    directories = [cp['Prefix'] for cp in response.get('CommonPrefixes', [])]
    files = [obj['Key'] for obj in response.get('Contents', []) if obj['Key'] != file_path]
    
    # Sort directories and files separately
    directories.sort()
    files.sort()
    
    # Combine directories and files for listing
    all_contents = directories + files
    
    # Print or return the sorted list
    for item in all_contents:
        print(item)
    
    return all_contents

In [62]:
def load_from_s3(file_path, access_key_id=aws_access_key_id, secret_access_key=aws_secret_access_key, bucket_name=s3_bucket_name):
    '''
    Download a file from the S3 bucket location

    Args:
        file_path (str): The path of the file within the S3 bucket.
        access_key_id (str, optional): The AWS access key ID. Defaults to global aws_access_key_id variable.
        secret_access_key (str, optional): The AWS secret access key. Defaults to global aws_secret_access_key variable.
        bucket_name (str, optional): The name of the S3 bucket. Defaults to global s3_bucket_name variable.

    Returns:
        io.BytesIO: A BytesIO object containing the file content.
    '''
    
    # Initialize a boto3 s3 client with credentials from the .env file
    s3_client = boto3.client('s3', aws_access_key_id=access_key_id, aws_secret_access_key=secret_access_key)
    
    # Use the client to grab the data
    f_obj = s3_client.get_object(Bucket=bucket_name, Key=file_path)
    
    # Set f to the body of the file object
    f = io.BytesIO(f_obj['Body'].read())
    
    s3_client.close()
        
    return f


In [63]:
def write_to_s3(file_path, data, access_key_id=aws_access_key_id, secret_access_key=aws_secret_access_key, bucket_name=s3_bucket_name, pickle_file=False):
    '''
    Upload a file to the S3 bucket location
    
     Args:
        file_path (str): The path to store the data within the S3 bucket.
        data ([pd.DataFrame, Any]): The data to upload. Can be a Pandas DataFrame or any picklable object.
        access_key_id (str, optional): The AWS access key ID. Defaults to global aws_access_key_id variable.
        secret_access_key (str, optional): The AWS secret access key. Defaults to global aws_secret_access_key variable.
        bucket_name (str, optional): The name of the S3 bucket. Defaults to global s3_bucket_name variable.
        pickle_file (bool, optional): Whether to pickle the data before uploading. Defaults to False.
    '''
    
    # Open the S3 client
    s3_client = boto3.client('s3', aws_access_key_id=access_key_id, aws_secret_access_key=secret_access_key)
    
    if pickle_file:
        pickled_data = pickle.dumps(data)
        s3_client.put_object(Bucket=bucket_name, Key=file_path, Body=pickled_data)
    
    elif isinstance(data, pd.DataFrame):
        with io.BytesIO() as buffer:
            data.to_parquet(buffer)
            buffer.seek(0)
            s3_client.put_object(Bucket=bucket_name, Key=file_path, Body=buffer)
        
    else:
        s3_client.close()
        raise ValueError("Unsuppored data type for upload")
    
    s3_client.close()

In [19]:
gcb_df = pd.read_csv(load_from_s3(file_path="data/Global_Coral_Bleaching_DB/Global_Coral_Bleaching_DB_v3.csv"), low_memory=False)

### Impute Data

#### Simple Imputation - Columns Missing Handfull of Values

In this section we update some columns that were missing a couple of values (1-10) and we discard all rows that were missing the cruical column 'SSTA' which dramatically cleaned up the dataframe while only discarding about 0.4% of the entire dataframe. It should also be noted that rows missing 'SSTA' were also missing other cruical oceanic information.

In [ ]:
# Fix the observation missing the French Polynesian country name
gcb_df.Country_Name.fillna("French Polynesia", inplace=True)

# Only 3 missing records for Ecoregion, all are on the northern coast of Honshu Island, Japan
gcb_df.Ecoregion_Name.fillna("Honshu, Japan", inplace=True)

# If Substrate_Name is missing, it's hard coral
gcb_df.Substrate_Name.fillna("Hard Coral", inplace=True)

# If Distance_to_Shore is missing, it's a FL Key site, 62m from shore
gcb_df.Distance_to_Shore.fillna(62, inplace=True)

# 'SSTA' is a column that seems to track with other important oceanic metrics.
# There are 259 rows that are missing 'SSTA' and the dataset is about 63k rows.
# This means that we are only discarding about 0.4% of the total dataframe while retaining
# a bulk of the information that we need for a quality analysis.
gcb_df.dropna(subset=['SSTA'], inplace=True)

#### Not So Simple Imputation - Depth_m & SSTA_Minimum

After performing the simple imputations above, we still have a couple of important columns that could be cleaned up: Depth_m and SSTA_Minimum.

In [21]:
def impute_depth(row):
    '''
    Function to impute Depth_m
    param row: Pandas dataframe row object
    return: depth
    '''

    # Set the default value to None
    depth = None

    # Assign the current value of depth as 'depth'
    depth = row['Depth_m']

    if not pd.isna(depth):
        return depth
    else:
        temp_df = gcb_df[gcb_df['Reef_ID'] == row['Reef_ID']]
        reef_median = temp_df['Depth_m'].median()

        if not np.isnan(reef_median):
            return reef_median
        else:
            # If the reef median is also NaN, use the overall median
            overall_median = gcb_df['Depth_m'].median()
            return overall_median

In [ ]:
def impute_SSTAMin(row):
    '''
    Function to impute SSTA_Minimum
    param row: Pandas dataframe row object
    return: SSTA_Minimum
    '''

    # Set the default value to None
    sstaValue = None

    # Assign the current value of SSTA_Minimum as 'sstaValue'
    sstaValue = row['SSTA_Minimum']

    if not pd.isna(sstaValue):
        return sstaValue
    else:
        temp_df = gcb_df[gcb_df['Ocean_Name'] == row['Ocean_Name']]
        ocean_median = temp_df['SSTA_Minimum'].median()

        if not np.isnan(ocean_median):
            return ocean_median
        else:
            # If the ocean median is also NaN, use the overall median
            overall_median = gcb_df['SSTA_Minimum'].median()
            return overall_median

In [ ]:
gcb_df['Depth_m'] = gcb_df.apply(impute_depth, axis=1)
gcb_df['SSTA_Minimum'] = gcb_df.apply(impute_SSTAMin, axis=1)

### Write Out

In [ ]:
# Uncomment to write out parquet file, which is used in the feature building notebook
gcb_df.to_parquet("/work/data/Global_Coral_Bleaching_DB/gcb_v3.parquet")
gcb_df.to_parquet("/datasets/s3/data/Global_Coral_Bleaching_DB/gcb_v3.parquet")

### Validate the Data

In [ ]:
# How much data is missing from the columns?

#gcb_df.isna().sum() # Total NaN per col
#gcb_df.isna().mean() # Percent NaN per col

# Number of observations with no bleaching value
#len(gcb_df[gcb_df["Percent_Bleached_Value"].isna()])

# Identify the columns missing more than 10% of data
gcb_cols = gcb_df.columns[gcb_df.isna().mean() > 0.1]
gcb_df[gcb_cols].isna().mean()