#  MODIS Water Cluster Identification for VIIRS

Date modified: 02.13.2024

Modified by: Amanda Burke

In [1]:
from sklearn.model_selection import train_test_split 
from sklearn.cluster import KMeans
import matplotlib.pyplot as plt
import pyarrow.parquet as pq
from pathlib import Path
import seaborn as sns
import pyarrow as pa
import pandas as pd
import numpy as np
import datetime
import warnings
import joblib
import pickle
import glob
import csv
import math 
import os


warnings.filterwarnings('ignore')
%matplotlib inline


In [2]:
GPU = False

In [3]:
MODEL = 'rf'
TEST_RATIO = 0.2
RANDOM_STATE = 42
LABEL_NAME = 'water'
if GPU is False:
    DATA_TYPE = np.int16
else: 
    DATA_TYPE = cp.float32
FRAC_LAND=0.5
num_datapoints = 10000000

In [4]:
# #############################
# # VERSION 4.2.1 (targeted 500k points)
# TILE_IN = 'Golden'#v4.2.1
# DATA_VERSION='v4.2.1'
# offsets_indexes = ['x_offset', 'y_offset', 'year', 'julian_day','tileID']
# #############################

##############################
#VERSION 2.0.1 (5 million points)
TILE_IN = 'GLOBAL'#v2.0.1
DATA_VERSION='v2.0.1'
offsets_indexes = ['x_offset', 'y_offset', 'year', 'julian_day']
##############################

# #############################
# #VERSION 0.0.0 (2billion data points)
# TILE_IN = 'cleaned'#v0.0.0
# DATA_VERSION='AGU'
# offsets_indexes = []#'x_offset', 'y_offset', 'year', 'julian_day']
# ##############################

training_data_basepath = f'/explore/nobackup/projects/ilab/data/MODIS/MODIS_WATER_ML/training_data/{DATA_VERSION}'
glob_string = os.path.join(training_data_basepath,'MOD*{}*.parquet.gzip'.format(TILE_IN))
data_paths = sorted([fv for fv in glob.glob(glob_string)])

#Only want the one with 4.2.0 because the other file doesnt work. 
print(data_paths)
data_path = data_paths[0]
print(data_path)

['/explore/nobackup/projects/ilab/data/MODIS/MODIS_WATER_ML/training_data/v2.0.1/MOD09_GLOBAL_5469777_2_0_1.parquet.gzip']
/explore/nobackup/projects/ilab/data/MODIS/MODIS_WATER_ML/training_data/v2.0.1/MOD09_GLOBAL_5469777_2_0_1.parquet.gzip


In [5]:
def load_cpu_data(fpath, colsToDrop, yCol='water', testSize=0.2, randomState=42, 
            dataType=np.int16, cpu=True, splitXY=False, trainTestSplit=False,
            applyLog=False, imbalance=False, frac=0.1, land=False, multi=False, 
            multisample=1000000):
    """
    Simple helper function for loading data to be used by models
    :param fpath: Path to the data to be ingested.
    :param dataType: Data type to convert ingested data to.
    :param colsToDrop: Columns which are not necessary, from which to drop.
    :param testSize: Ration to
    """
    if multi:
        all_dfs = [pd.read_csv(path_) for path_ in fpath]
        df = pd.concat(all_dfs).sample(n=multisample, random_state=randomState)
        print('DF length: {}'.format(len(df.index)))
    else:   
        df = pd.read_parquet(fpath) if '.parquet' in fpath else pd.read_csv(fpath)
    df = df[df['sur_refl_b01_1'] + df['sur_refl_b02_1'] != 0]
    df = df[df['sur_refl_b07_1'] + df['sur_refl_b02_1'] != 0]
    df = df[df['sur_refl_b06_1'] + df['sur_refl_b02_1'] != 0]

    df = df.drop(columns=colsToDrop)
    cleanedDF = df[~df.isin([np.NaN, np.inf, -np.inf]).any(1)].dropna(axis=0).astype(dataType)
    if applyLog:
        for col in cleanedDF.drop([yCol], axis=1).columns:
            print('Applying log1p func to {}'.format(col))
            cleanedDF[col] = np.log1p(cleanedDF[col])
        cleanedDF = cleanedDF[~cleanedDF.isin([np.NaN, np.inf, -np.inf]).any(1)].dropna(axis=0)
    df = None
    if imbalance:
        if land:
            print('Imbalancing data, sampling {} from water'.format(frac))
        else:
            print(f'Imbalancing data, sampling {frac} from land, {1-frac} from water')
        groupedDF = cleanedDF.groupby('water')
        dfs = [groupedDF.get_group(y) for y in groupedDF.groups]
        sampledDF = dfs[1].sample(frac=frac)if land else dfs[0].sample(frac=frac)
        concatDF = sampledDF.append(dfs[0]) if land else sampledDF.append(dfs[1])
        concatDF = concatDF.sample(frac=1)
        concatDF = concatDF.reset_index()
        cleanedDF = concatDF.drop(columns=['index'])
    if not splitXY:
        return cleanedDF
    X = cleanedDF.drop([yCol], axis=1).astype(dataType)
    y = cleanedDF[yCol].astype(dataType)
    if trainTestSplit:
        return train_test_split(X, y, test_size=TEST_RATIO)
    else:
        return X, y

In [6]:
colsToDrop = [
    'sur_refl_b03_1','sur_refl_b04_1',
    'sur_refl_b05_1','sur_refl_b06_1',
    'ndwi1','ndwi2'
        ]

colsToDropTraining = colsToDrop.copy()
colsToDropTraining.extend(offsets_indexes)
v_names = ['sur_refl_b01_1','sur_refl_b02_1','sur_refl_b03_1',
           'sur_refl_b04_1','sur_refl_b05_1','sur_refl_b06_1',
           'sur_refl_b07_1','ndvi','ndwi1','ndwi2']

### Input data

In [7]:
colsToDrop

['sur_refl_b03_1',
 'sur_refl_b04_1',
 'sur_refl_b05_1',
 'sur_refl_b06_1',
 'ndwi1',
 'ndwi2']

In [8]:
%%time
load_data_params = {'fpath':data_path,'colsToDrop':colsToDropTraining,'splitXY':True,
                    'dataType':DATA_TYPE,'imbalance':False,'trainTestSplit':True}

X, X_test, y, y_test = load_cpu_data(**load_data_params)

print(f'data shape: {X.shape}, {y.shape}')

data shape: (4375821, 4), (4375821,)
CPU times: user 3.63 s, sys: 952 ms, total: 4.58 s
Wall time: 4.05 s


# Clustering

In [9]:
#Getting the indices that are associated with land (0) and water (1)
y_water_ind = np.where(y>0.5)[0]
y_land_ind = np.where(y<0.5)[0]

#Subset the X AND y data to later/ subset with the clusters and then combine for RFA
X_water = X.iloc[y_water_ind,:]
y_water = y.iloc[y_water_ind]

X_land = X.iloc[y_land_ind,:]
y_land = y.iloc[y_land_ind]
print(f'data shape: {X_water.shape}, {X_land.shape}')

data shape: (1976352, 4), (2399469, 4)


In [10]:
_ = [print(column) for column in X.columns]

sur_refl_b01_1
sur_refl_b02_1
sur_refl_b07_1
ndvi


## Clustering Data for Input to Random Forest

Based on the cluster analysis above on 5.03.23, 15 clusters appears to have the most data and exclude outliers so will use that number for selection 

In [11]:
CLUSTER_NUM=15

common_params = {
    "n_init": "auto",
    "random_state": 42,
    "init":"random"
}

In [12]:
%%time
if glob.glob('kmeans_land_fit.pkl'):
    with open('kmeans_land_fit.pkl', 'rb') as f: kme_land_random = pickle.load(f)
else:
    kme_land_random = KMeans(n_clusters=CLUSTER_NUM, **common_params).fit(X_land)
kmeans_output_land_random = kme_land_random.predict(X_land)

CPU times: user 206 ms, sys: 127 ms, total: 333 ms
Wall time: 238 ms


In [13]:
%%time
if glob.glob('kmeans_water_fit.pkl'):
    with open('kmeans_water_fit.pkl', 'rb') as f: kme_water_random = pickle.load(f)
    print(kme_water_random)
else: 
    kme_water_random = KMeans(n_clusters=CLUSTER_NUM, **common_params).fit(X_water)
kmeans_output_water_random = kme_water_random.predict(X_water)

KMeans(init='random', n_clusters=15, n_init='auto', random_state=42)
CPU times: user 294 ms, sys: 66.5 ms, total: 360 ms
Wall time: 90.9 ms


### Even Balanced Random pulled datapoints

In [14]:
COUNT_EVEN_BALANCE_LAND = np.inf
COUNT_EVEN_BALANCE_WATER = np.inf
for cluster in np.unique(kmeans_output_water_random):
    land_num = len(np.where(kmeans_output_land_random == cluster)[0])
    water_num = len(np.where(kmeans_output_water_random == cluster)[0])
    if land_num < COUNT_EVEN_BALANCE_LAND: COUNT_EVEN_BALANCE_LAND = land_num
    if water_num < COUNT_EVEN_BALANCE_WATER: COUNT_EVEN_BALANCE_WATER = water_num
    
print(COUNT_EVEN_BALANCE_LAND, COUNT_EVEN_BALANCE_WATER)
if COUNT_EVEN_BALANCE_LAND < COUNT_EVEN_BALANCE_WATER:
    COUNT = COUNT_EVEN_BALANCE_LAND
else: 
    COUNT = COUNT_EVEN_BALANCE_WATER
print(COUNT,COUNT_EVEN_BALANCE_LAND,COUNT_EVEN_BALANCE_WATER)

916 8828
916 916 8828


In [19]:
# np.random.seed(42)
random_ind_land = np.array([])
random_ind_water = []

for cluster in np.unique(kmeans_output_water_random):
    print(f'cluster {cluster}')
    cluster_ind_water = np.where(kmeans_output_water_random == cluster)[0]
    random_pts_water = np.random.choice(cluster_ind_water,COUNT,replace=False)
    max_X_random_water = np.nanmax(X_water['sur_refl_b01_1'].iloc[random_pts_water])
    if max_X_random_water < 10000:
        random_ind_water = np.append(random_ind_water, random_pts_water)
    else: 
        print(f'Cluster {cluster} contains outliers')
        continue
    cluster_ind_land = np.where(kmeans_output_land_random == cluster)[0]
    random_pts_land = np.random.choice(cluster_ind_land,COUNT,replace=False)
    random_ind_land = np.append(random_ind_land, random_pts_land)

random_ind_water = random_ind_water.astype('int')
random_ind_land = random_ind_land.astype('int')

print(np.shape(random_ind_water),np.shape(random_ind_land))

cluster 0
cluster 1
cluster 2
cluster 3
cluster 4
cluster 5
cluster 6
Cluster 6 contains outliers
cluster 7
cluster 8
cluster 9
cluster 10
cluster 11
cluster 12
cluster 13
cluster 14
(12824,) (12824,)


## Getting Meta data of clusters

In [20]:
%%time
# This set of parameters has the date/lat lon encoded 
#Caleb needs all spectral values so that he can search for the tileid

load_data_params = {'fpath':data_path,'colsToDrop':[],'splitXY':True,
                    'imbalance':False,'trainTestSplit':True}

X_meta,ds X_meta_test, y_meta, y_meta_test = load_cpu_data(**load_data_params)

print(f'data shape: {X_meta.shape}, {y_meta.shape}')

data shape: (4375821, 14), (4375821,)
CPU times: user 9.13 s, sys: 2.7 s, total: 11.8 s
Wall time: 11.1 s


In [21]:
total_cluster_inds = np.concatenate([random_ind_water,random_ind_land])
cluster_meta_data = X_meta.iloc[total_cluster_inds]
# cluster_meta_data = X_meta.iloc[total_cluster_inds,4:]
# viirs_final_year_meta = cluster_meta_data.copy()
# before_viirs_launch_inds = cluster_meta_data.loc[cluster_meta_data['year'] <= 2012.0].index
# for i in before_viirs_launch_inds:
#     viirs_final_year_meta['year'][i] = np.random.choice(np.arange(2013.0,2024.0,1.0))
# viirs_final_year_meta.loc[viirs_final_year_meta['year'] >= 2012.0]

Output the VIIRS meta data to parquet file

In [22]:
# table = pa.Table.from_pandas(viirs_final_year_meta)
table = pa.Table.from_pandas(cluster_meta_data)

In [24]:
# pq.write_table(table, 'location_date_MW_to_VIIRS_subset.parquet')
pq.write_table(table, 'MW_to_VIIRS_info_all_vars.parquet')

In [25]:
table2 = pq.read_table('MW_to_VIIRS_info_all_vars.parquet')
table2.to_pandas()

Unnamed: 0,sur_refl_b01_1,sur_refl_b02_1,sur_refl_b03_1,sur_refl_b04_1,sur_refl_b05_1,sur_refl_b06_1,sur_refl_b07_1,ndvi,ndwi1,ndwi2,x_offset,y_offset,year,julian_day
4585632,700,250,336,547,393,243,114,-4736,141,3736,1121,2560,2006,175
5145931,48,-2,304,162,39,45,23,-10869,-10930,-11904,4760,1729,2001,214
4442551,-8,-21,114,69,38,104,52,4482,-15060,-23548,1709,17,2006,240
4551103,683,3204,265,630,3496,2062,930,6485,2168,5500,3002,2756,2006,180
2673895,385,2482,176,458,2413,1350,563,7314,2954,6302,934,3890,2020,167
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
5386092,341,4359,554,862,4188,2336,891,8548,3021,6605,717,3162,2001,309
2765989,-20,-5,58,-18,-11,29,7,-6000,-14166,5536,4743,3123,2006,202
3650157,32,-4,257,156,-7,46,3,-12857,-11904,4464,1607,2676,2001,201
2069344,-100,-97,-100,-100,218,970,822,-152,-12222,-12675,1401,1834,2006,76
