# Data Processing
Designed by Tayven Stover.

In [1]:
import pandas as pd
import numpy as np
import os
from tqdm.auto import tqdm
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder
import json

TRAINING_DATA_FOLDER = '../data/'

In [2]:
def select_useful_columns(df, columns_to_keep):
    columns = [col for col in df.columns if col not in columns_to_keep]

    df = df.drop(columns=columns)

    return df

def get_mean_location(df):
    mean_location = df[['latitude', 'longitude']].mean()

    # Convert to list
    mean_location = mean_location.to_list()

    return mean_location

def run_length_encoding(df, column_name):
    # Apply RLE: Get run lengths and values
    n = len(df)
    y = np.array(df[f'{column_name}_is_nan'])
    starts = np.r_[0, np.flatnonzero(y[1:] != y[:-1]) + 1]
    lengths = np.diff(np.r_[starts, n])
    values = y[starts]
    return starts, lengths, values

def mark_large_gaps(df, column_name, gap_threshold=5):
    # Ensure the column exists in the DataFrame
    if column_name not in df.columns:
        raise ValueError(f"The column '{column_name}' does not exist in the DataFrame.")

     # Flag rows that are NaN
    df[f'{column_name}_is_nan'] = df[column_name].isna().astype(int)
    
    # Apply RLE on the column
    starts, lengths, values = run_length_encoding(df, column_name)
    
    # Initialize the validity column with ones
    df[f'{column_name}_valid_sequence'] = 1
    
    # Identify the start positions of large NaN gaps and their lengths
    large_gaps = (values == 1) & (lengths > gap_threshold)

    # Set the validity of sequences following large gaps to 0
    for start, length in zip(starts[large_gaps], lengths[large_gaps]):
        df.loc[start:start+length-1, f'{column_name}_valid_sequence'] = 0
    
    df.drop([f'{column_name}_is_nan'], axis=1, inplace=True)
    
    return df

def interpolate_missing_values(df, columns_to_interpolate):
    # Specify validation columns
    validation_columns = [f'{column}_valid_sequence' for column in columns_to_interpolate]
    
    # Determine rows eligible for interpolation
    # Only interpolate rows where all associated validation columns are 1
    df['interpolate_flag'] = df[validation_columns].all(axis=1)

    # Iterate over each column that needs interpolation
    for column in columns_to_interpolate:
        if column in df.columns:
            # Use 'mask' to isolate parts of the column that should be interpolated
            # This will replace values where interpolate_flag is 0 with NaN, which are then not interpolated
            mask = df['interpolate_flag'] == 1
            # Temporarily store the original data
            original_data = df[column].copy()
            # Replace data not to be interpolated with NaN
            df.loc[~mask, column] = np.nan
            # Interpolate missing (NaN) values only where the mask is True
            df[column] = df[column].interpolate(method='linear', limit_direction='both')
            # Replace the NaN values back with the original data to avoid affecting non-interpolated parts
            df.loc[~mask, column] = original_data[~mask]

    return df

def encode_categories(category_dataframe):
    encoder = OneHotEncoder(sparse_output=False)

    # Fit the encoder
    encoder.fit(category_dataframe)

    # Transform the data
    encoded_data = encoder.transform(category_dataframe)

    return encoded_data

def process_category(category_data):
    # Print unique values in the category data
    category_keys = np.unique(category_data)

    category_df = pd.DataFrame(category_data)

    data_array = encode_categories(category_df)

    feature_len = np.shape(data_array)[-1]
    reshaped_data = data_array.reshape(-1, feature_len)

    category_values = np.unique(reshaped_data, axis=0)
    
    # Convert from np.int32 to int
    if type(category_keys[0]) == np.int32:
        category_dict = {int(category_keys[i]): category_values[i].tolist() for i in range(len(category_keys))}
        
    else:
        # Compile a dict
        category_dict = {category_keys[i]: category_values[i].tolist() for i in range(len(category_keys))}

    return reshaped_data, category_dict

def sequence_categorical_data(category_data):
    for i in range((int(len(category_data) / 24))):
        start_idx = i * 24
        end_idx = start_idx + 24
        block = category_data[start_idx:end_idx]
        
        # Find the first non-empty string in the block
        first_non_empty_string = next((x[0] for x in block if x[0] != ''), None)
        
        if first_non_empty_string:
            # Replace all values in the block with the first non-empty string
            category_data[start_idx:end_idx] = first_non_empty_string

    # Remove 23 values from every 24 values this just saves memory
    category_data = category_data[::24]

    return category_data

# Takes an np array of dates (y, m, d) and returns two arrays of months and days (x, 1)
# TODO update to also return start hour of each sequence!!!
def process_date_data(date_data):
    # Convert list to df with date col
    full_date_df = pd.DataFrame(date_data, columns=['Date'])
    full_date_df['Date'] = pd.to_datetime(full_date_df['Date'], format="%Y-%m-%d")

    full_date_df['Month'] = full_date_df['Date'].dt.strftime("%b")
    full_date_df['Day'] = full_date_df['Date'].dt.day

    # Extract the Month and Day columns as NumPy arrays
    month_array = full_date_df['Month'].to_numpy().reshape(-1, 1)
    day_array = full_date_df['Day'].to_numpy().reshape(-1, 1)
    
    return month_array, day_array

def cleanse_df_blocks(df, block_size = 24):
    num_rows = len(df)
    num_full_blocks = num_rows // block_size
    new_num_rows = num_full_blocks * block_size

    # Slice the DataFrame to keep only the rows up to 'new_num_rows'
    df = df.iloc[:new_num_rows]

    # Step 2: Reset the index to ensure it starts from 0 and is sequential
    df = df.reset_index(drop=True)

    # Step 3: Assign block numbers to each row
    df['block'] = df.index // block_size

    # Step 4: Remove blocks that contain any NaN values
    # This function returns True if the block has no NaN values, so it will be kept
    df_cleaned = df.groupby('block').filter(lambda x: not x.isnull().values.any())

    # Step 5: Drop the 'block' column if it's no longer needed
    df_cleaned = df_cleaned.drop(columns=['block'])

    return df_cleaned

In [3]:
def create_sequences(df, x_columns, y_columns, time_steps=24):
    x_sequence = []
    y_sequence = []
    date_sequence = []
    used_indices = []  # List to hold indices used in sequences

    # Iterate through the DataFrame to form sequences
    for i in range(len(df) - time_steps + 1):

        temp_df = df.iloc[i:i + time_steps]

        # Check if all entries in the sequence have interpolate_flag set to TRUE and timestamps are consecutive
        if temp_df['interpolate_flag'].all() and \
           (temp_df['date'].iloc[-1] - temp_df['date'].iloc[0]).total_seconds() == (time_steps - 1) * 3600:
            # Append the sequences to the lists as NumPy arrays
            x_sequence.append(temp_df[x_columns].to_numpy())
            y_sequence.append(temp_df[y_columns].to_numpy())
            date_sequence.append(temp_df['date'].to_numpy())
            used_indices.extend(range(i, i + time_steps))  # Add all indices in this sequence to the list

    # Convert lists to numpy arrays
    x_sequence = np.array(x_sequence)
    y_sequence = np.array(y_sequence)
    date_sequence = np.array(date_sequence)
    # Find indices that are not part of any sequence
    unused_indices = set(range(len(df))) - set(used_indices)
    unused_rows = df.iloc[list(unused_indices)].sort_index()

    return x_sequence, y_sequence, date_sequence, unused_rows


def split_data(X, y, location, month, day, test_size=0.2, validation_size=0.15, random_state=42):
    # Ensure both arrays have the same length
    assert len(X) == len(y), "The length of X and y must be the same."

    np.random.seed(random_state)
    
    # Create an array of indices and shuffle them
    indices = np.arange(len(X))
    np.random.shuffle(indices)
    
    # Shuffle the data arrays using the shuffled indices
    X = np.array(X)[indices]
    y = np.array(y)[indices]
    location = np.array(location)[indices]
    month = np.array(month)[indices]
    day = np.array(day)[indices]

    # Calculate the split index for test data
    test_split_index = int(len(X) * (1 - test_size))
    
    # Split the arrays into train and validation and test sets
    X_train_val, X_test = X[:test_split_index], X[test_split_index:]
    y_train_val, y_test = y[:test_split_index], y[test_split_index:]
    location_train_val, location_test = location[:test_split_index], location[test_split_index:]
    month_train_val, month_test = month[:test_split_index], month[test_split_index:]
    day_train_val, day_test = day[:test_split_index], day[test_split_index:]

    # Calculate the split index for validation data within the training set
    validation_split_index = int(len(X_train_val) * (1 - validation_size))

    # Split the train and validation arrays into train and validation sets
    X_train, X_validate = X_train_val[:validation_split_index], X_train_val[validation_split_index:]
    y_train, y_validate = y_train_val[:validation_split_index], y_train_val[validation_split_index:]
    location_train, location_validate = location_train_val[:validation_split_index], location_train_val[validation_split_index:]
    month_train, month_validate = month_train_val[:validation_split_index], month_train_val[validation_split_index:]
    day_train, day_validate = day_train_val[:validation_split_index], day_train_val[validation_split_index:]

    return X_train, X_validate, X_test, y_train, y_validate, y_test, \
        location_train, location_validate, location_test, \
        month_train, month_validate, month_test, \
        day_train, day_validate, day_test

def scale_data(train, test, validate):
    # Initialize the StandardScaler
    scaler = StandardScaler()

    # Fit on the training data
    scaler.fit(np.concatenate(train))

    # Scale the training, test, and validation data
    train_scaled = [scaler.transform(df) for df in train]
    test_scaled = [scaler.transform(df) for df in test]
    validate_scaled = [scaler.transform(df) for df in validate]

    return train_scaled, test_scaled, validate_scaled

def save_data(directory, data, names):
    types = ['train', 'test', 'validate']

    i = 0

    for name in names:
        for data_type in types:
            file_path = os.path.join(directory, f'{name}_{data_type}.npy')

            np.save(file_path, data[i])

            i += 1
    
def initialize_processing(file, COLUMNS_TO_KEEP):    
    sensor_readings_df = pd.read_csv(file) # Load in file.

    # Step 0: Drop unnecessary columns and columns with a date not falling on the hour
    sensor_readings_df = select_useful_columns(sensor_readings_df, COLUMNS_TO_KEEP)
    sensor_readings_df = sensor_readings_df[sensor_readings_df['date'].str.endswith(':00:00')]

    # Step 0.1: Ensure location remains the same
    mean_location = get_mean_location(sensor_readings_df)
    sensor_readings_df['latitude'] = mean_location[0]
    sensor_readings_df['longitude'] = mean_location[1]

    # Step 1: Add is_original column and set to 1
    sensor_readings_df['is_original'] = 1
 
    # Step 2: Convert date column to date type
    sensor_readings_df['date'] = pd.to_datetime(sensor_readings_df['date'], format='%Y-%m-%d %H:%M:%S')
    sensor_readings_df = sensor_readings_df.sort_values(by='date').reset_index(drop=True)
 
    # Step 3: Create a complete date range
    full_range = pd.date_range(start=sensor_readings_df['date'].min(), end=sensor_readings_df['date'].max(), freq='H')
    full_df = pd.DataFrame(full_range, columns=['date'])
 
    # Step 4: Merge with the original dataframe
    merge_df = pd.merge(full_df, sensor_readings_df, on='date', how='left')
    merge_df['is_original'].fillna(0, inplace=True)  # Set is_original to 0 for new rows
 
    # Step 5: Fill other columns for new rows with NaN
    for col in set(merge_df.columns) - {'date', 'is_original'}:
        if col not in sensor_readings_df:
            continue
        merge_df[col] = merge_df[col].where(merge_df['is_original'] == 1)
    
    # Step 7: Mark large gaps in columns - latitude, longitude, sea_water_temperature, and date
    # Step 7.1: latitude
    lat_mark_df = mark_large_gaps(merge_df, 'latitude', gap_threshold=5)

    # Step 7.2: longitude
    long_mark_df = mark_large_gaps(lat_mark_df, 'longitude', gap_threshold=5)

    # Step 7.3: Sample measurement
    sea_temp_mark_df = mark_large_gaps(long_mark_df, 'sea_water_temperature', gap_threshold=5)

    # Step 7.4: Date
    date_mark_df = mark_large_gaps(sea_temp_mark_df, 'date', gap_threshold=5)

    # Step 8: Interpolate small gaps in the columns using linear interpolation
    interpolate_df = interpolate_missing_values(date_mark_df, COLUMNS_TO_KEEP)

    # Step 9: remove all 24 row blocks with NaN values
    # IK not the best but were on a time crunch
    # Hello tech debt!
    interpolate_df = cleanse_df_blocks(interpolate_df)

    # Step 10: add a column for future temperature data
    interpolate_df['future_temp'] = interpolate_df['sea_water_temperature'].shift(-24)
    interpolate_df.drop(interpolate_df.tail(24).index, inplace=True)
    
    return interpolate_df

In [9]:
# Prepare data
RAW_DATA_FOLDER = f'{TRAINING_DATA_FOLDER}/raw/'
MAIN_FOLDER = os.listdir(RAW_DATA_FOLDER) # Get all raw data files

COVARIATE_COLUMNS = ['latitude', 'longitude', 'date', 'sea_water_temperature']
TARGET_COLUMN = 'future_temp'

time_steps = 24
feature_list = []
target_list = []
date_list = []

file_count = 1#len(MAIN_FOLDER)

# Create an empty dataframe
df = pd.DataFrame()

# Loop through the files and add to df
for i in tqdm(range(file_count), desc='Gathering and sequencing data', unit='files'):
    file = f'{RAW_DATA_FOLDER}{MAIN_FOLDER[i]}'

    if file.endswith('.csv'):
        df = initialize_processing(file, COVARIATE_COLUMNS)

        x_sequence, y_sequence, date_sequence, unused_rows = create_sequences(df, COVARIATE_COLUMNS, TARGET_COLUMN, time_steps)

        # Append to overall array
        feature_list.extend(x_sequence)
        target_list.extend(y_sequence)
        date_list.extend(date_sequence)

Gathering and sequencing data:   0%|          | 0/1 [00:00<?, ?files/s]

In [None]:
print('Extracting and encoding categories')
# Process all categorical data
# Date first
date_data = np.array(date_list).reshape(-1, 1)
date_data = sequence_categorical_data(date_data)

month_data, day_data = process_date_data(date_data)

# Location now
# Slice and reshape the categorical data (location_code) for encoding
latitude_data = np.array(feature_list)[:, :, 0].reshape(-1, 1)
longitude_data = np.array(feature_list)[:, :, 1].reshape(-1, 1)

latitude_data = sequence_categorical_data(latitude_data)
longitude_data = sequence_categorical_data(longitude_data)

# Encode all categories
encoded_latitude_data, latitude_dict = process_category(latitude_data)
encoded_longitude_data, longitude_dict = process_category(longitude_data)
encoded_month_data, month_dict = process_category(month_data)
encoded_day_data, day_dict = process_category(day_data)

file_names = {'latitude_data': latitude_data, 'longitude_data': longitude_data, 'month_data': month_dict, 'day_data': day_dict}

Extracting and encoding categories


In [15]:
# Save the category dictionaries as json
for key, value in file_names.items():
    print(f'Saving {key} dictionary')
    with open(f"{TRAINING_DATA_FOLDER}/processed/{key}.json", 'w') as f:
        json.dump(value, f, indent=4)

# Print the shapes
print(np.shape(feature_list))
print(np.shape(target_list))
print(np.shape(encoded_location_data))
print(np.shape(encoded_month_data))
print(np.shape(encoded_day_data))

Saving month_data dictionary
Saving day_data dictionary
(33862, 24, 2)
(33862, 24)
(33862, 12)
(33862, 31)


In [6]:
print('Data processing complete')
print('Splitting data')
# Remove the 5th column (county) from the feature list
feature_list = np.delete(feature_list, 3, axis=2)
# Split the data
X_train, X_test, X_validate, \
    y_train, y_test, y_validate, \
    location_train, location_test, location_validate, \
    month_train, month_test, month_validate, \
    day_train, day_test, day_validate = \
    split_data(feature_list, target_list, encoded_location_data, encoded_day_data, encoded_month_data)
    
print('Scaling data')
# Scale the data
X_train_scaled, X_test_scaled, X_validate_scaled = scale_data(X_train, X_test, X_validate)

33862

In [58]:
# Save the data
print('Saving data')

final_data = [X_train_scaled, X_test_scaled, X_validate_scaled,
        y_train, y_test, y_validate,
        location_train, location_test, location_validate,
        month_train, month_test, month_validate,
        day_train, day_test, day_validate]

data_names = ['X', 'y', 'location', 'month', 'day']

# Save the data
save_data(f"{TRAIN_DATA_REGION}/processed/{'filtered' if UNPROCESSED_VERSION == 'Filtered' else 'non-filtered'}", final_data, data_names)
np.save(os.path.join(f"{TRAIN_DATA_REGION}/processed/{'filtered' if UNPROCESSED_VERSION == 'Filtered' else 'non-filtered'}", 'X_train_unscaled.npy'), X_train) # To reverse scaling for predictions
print('Data processing complete')