In [None]:
# Import necessary libraries
import pandas as pd
import pyarrow.parquet as pq
import pyarrow as pa
import numpy as np
import dask.dataframe as dd
from sklearn.decomposition import TruncatedSVD

In [None]:
# Define file paths for merged datasets
datasets = {
    "transparency": "/content/drive/MyDrive/L3 DATA/transparency_merged_ocean_data.parquet",
    "reflectance": "/content/drive/MyDrive/L3 DATA/reflectance_merged_ocean_data.parquet",
    "plankton": "/content/drive/MyDrive/L3 DATA/plankton_merged_ocean_data.parquet",
    "optics": "/content/drive/MyDrive/L3 DATA/optics_merged_ocean_data.parquet"
}

# Load each dataset into a Pandas DataFrame
df_transparency = pd.read_parquet(datasets["transparency"])
df_reflectance = pd.read_parquet(datasets["reflectance"])
df_plankton = pd.read_parquet(datasets["plankton"])
df_optics = pd.read_parquet(datasets["optics"])

# Merge datasets on 'time', 'latitude', 'longitude' using an outer join to preserve all data
# Specify suffixes to flags to avoid duplicate column names
df_merged = df_transparency.merge(df_reflectance, on=['time', 'latitude', 'longitude'], how='outer', suffixes=('_transparency', '_reflectance'))
df_merged = df_merged.merge(df_plankton, on=['time', 'latitude', 'longitude'], how='outer', suffixes=('', '_plankton'))
df_merged = df_merged.merge(df_optics, on=['time', 'latitude', 'longitude'], how='outer', suffixes=('', '_optics')) # Specify suffixes here to resolve the conflict


# Save the merged dataset
merged_file = "merged_ocean_data.parquet"
df_merged.to_parquet(merged_file)

print(f"Merged dataset saved as {merged_file}")

In [None]:
df_merged

Unnamed: 0,time,latitude,longitude,KD490,ZSD,flags_transparency,RRS490,RRS443,flags_reflectance,CHL,flags,MICRO,BBP,CDM,flags_optics
0,2023-01-01,49.140621,-10.755208,,,0.0,,,0.0,,0.0,,,,0.0
1,2023-01-01,49.140621,-10.744791,,,0.0,,,0.0,,0.0,,,,0.0
2,2023-01-01,49.140621,-10.734375,,,0.0,,,0.0,,0.0,,,,0.0
3,2023-01-01,49.140621,-10.723958,,,0.0,,,0.0,,0.0,,,,0.0
4,2023-01-01,49.140621,-10.713541,,,0.0,,,0.0,,0.0,,,,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
187545401,2024-12-31,52.234371,-2.192708,,,1.0,,,1.0,,1.0,,,,1.0
187545402,2024-12-31,52.234371,-2.182291,,,1.0,,,1.0,,1.0,,,,1.0
187545403,2024-12-31,52.234371,-2.171874,,,1.0,,,1.0,,1.0,,,,1.0
187545404,2024-12-31,52.234371,-2.161458,,,1.0,,,1.0,,1.0,,,,1.0


In [None]:
#Load Parquet files into DataFrames
df_path = pd.read_parquet("/content/merged_ocean_data.parquet")

In [None]:
# Function for EOF Interpolation
def eof_interpolation(df_path, output_file, n_components=5, max_iter=10):
    print(f"\nProcessing: EOF Interpolation on Merged Dataset")

    # Load merged dataset using Dask for memory efficiency
    df = dd.read_parquet(df_path)

    # Identify missing values
    missing_values = df.isnull().sum().compute().sum()
    print(f"Initial missing values: {missing_values}")

    if missing_values == 0:
        print(f"No missing values in dataset. Skipping EOF Interpolation.")
        return df

    # Convert to NumPy array
    df_filled = df.copy()

    # Exclude datetime columns from mean fill
    numeric_cols = df_filled.select_dtypes(include=np.number).columns
    # Use .map_partitions to apply fillna to each partition of the Dask DataFrame
    df_filled = df_filled.map_partitions(lambda partition: partition.fillna(partition.mean()), meta=df_filled)  # Initial mean fill for numeric columns only


    # Convert Dask DataFrame to Dask Array
    X = df_filled.to_dask_array(lengths=True).compute()

    # Perform EOF interpolation iteratively
    for _ in range(max_iter):
        # Apply SVD (EOF Decomposition)
        svd = TruncatedSVD(n_components=n_components)
        U = svd.fit_transform(X)
        V = svd.components_

        # Reconstruct missing values
        X_reconstructed = np.dot(U, V)

        # Replace missing values only
        missing_mask = df.isnull().to_numpy()
        X[missing_mask] = X_reconstructed[missing_mask]

    # Convert back to DataFrame
    df_cleaned = pd.DataFrame(X, columns=df.columns)

    # Save cleaned dataset
    df_cleaned.to_parquet(output_file)
    print(f"Cleaned dataset saved as {output_file}")

    return df_cleaned

# Run EOF interpolation on merged dataset
eof_interpolation("merged_ocean_data.parquet", "cleaned_eof_ocean_data.parquet")


Processing: EOF Interpolation on Merged Dataset
Initial missing values: 1103585663
