# Generate k-means clusters

- I am using the conda environment version 21.10 http://climate-cms.wikis.unsw.edu.au/Conda#21.10

### In this notebook: 
- Apply k-means clustering to the MODIS cloud-top pressure and optical depth histograms 
- Fit the clusters to the equivalent ACCESS histograms 
- Save cluster centers and assignments 

In [1]:
from dask.distributed import Client,LocalCluster
client = Client()

distributed.diskutils - INFO - Found stale lock file and directory '/local/jk72/slf563/tmp/dask-worker-space/worker-s5y8olg6', purging
distributed.diskutils - INFO - Found stale lock file and directory '/local/jk72/slf563/tmp/dask-worker-space/worker-c59dzx26', purging
distributed.diskutils - INFO - Found stale lock file and directory '/local/jk72/slf563/tmp/dask-worker-space/worker-g3fzlaud', purging
distributed.diskutils - INFO - Found stale lock file and directory '/local/jk72/slf563/tmp/dask-worker-space/worker-tfh9btui', purging


In [2]:
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: /proxy/8787/status,

0,1
Dashboard: /proxy/8787/status,Workers: 4
Total threads: 16,Total memory: 44.92 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:32883,Workers: 4
Dashboard: /proxy/8787/status,Total threads: 16
Started: Just now,Total memory: 44.92 GiB

0,1
Comm: tcp://127.0.0.1:43319,Total threads: 4
Dashboard: /proxy/44421/status,Memory: 11.23 GiB
Nanny: tcp://127.0.0.1:37751,
Local directory: /local/jk72/slf563/tmp/dask-worker-space/worker-tnqvbq6g,Local directory: /local/jk72/slf563/tmp/dask-worker-space/worker-tnqvbq6g

0,1
Comm: tcp://127.0.0.1:43843,Total threads: 4
Dashboard: /proxy/44327/status,Memory: 11.23 GiB
Nanny: tcp://127.0.0.1:33897,
Local directory: /local/jk72/slf563/tmp/dask-worker-space/worker-00__hmux,Local directory: /local/jk72/slf563/tmp/dask-worker-space/worker-00__hmux

0,1
Comm: tcp://127.0.0.1:37379,Total threads: 4
Dashboard: /proxy/46335/status,Memory: 11.23 GiB
Nanny: tcp://127.0.0.1:40751,
Local directory: /local/jk72/slf563/tmp/dask-worker-space/worker-5lcqioj8,Local directory: /local/jk72/slf563/tmp/dask-worker-space/worker-5lcqioj8

0,1
Comm: tcp://127.0.0.1:35809,Total threads: 4
Dashboard: /proxy/38865/status,Memory: 11.23 GiB
Nanny: tcp://127.0.0.1:38195,
Local directory: /local/jk72/slf563/tmp/dask-worker-space/worker-pgxjxhma,Local directory: /local/jk72/slf563/tmp/dask-worker-space/worker-pgxjxhma


In [3]:
import sys
import os
import numpy as np
import pandas as pd
import xarray as xr
import dask.distributed
from dask_ml.cluster import KMeans
import dask.array as da
import time

# MODIS 
### Read in data 

In [4]:
fdir = '/g/data/p66/slf563/OBS/MCD06COSP_D3_MODIS/'
modis = xr.open_mfdataset(fdir+'MCD06COSP_D3_MODIS.201*histos.nc', parallel=True,combine='by_coords').astype('float32')

tau = modis.attrs['JHisto_Bin_Boundaries'] # save these attributes for later
tau = tau[[0,2,3,4,5,6,7]] # merge first two tau bins
pressure = modis.attrs['JHisto_Bin_Boundaries_Joint_Parameter']

In [5]:
fdir = '/g/data/p66/slf563/OBS/MCD06COSP_D3_MODIS/'
cf = xr.open_mfdataset(fdir+'MCD06COSP_D3_MODIS.201*cloud.nc', parallel=True,combine='by_coords').astype('float32')
cf = cf['CF']

Organise for clustering 

In [6]:
modis_k = modis.histo[1:,:,:].copy()
modis_k[0,:,:].values = modis.histo[0,:,:].values + modis.histo[1,:,:].values # merge first two tau bins
modis_k = modis_k.fillna(0) # k-means can't handle nans - so convert to 0, and we will remove them later. 

#modis_k = (modis_k/modis_k.sum(dim=('tau','pressure')))*cf # Normalise by cloud fraction.
modis_k = (modis_k/modis_k.sum(dim=('tau','pressure'))) # Normalise by one 

modis_k = modis_k.stack(x = ('time','lon','lat'),
                    y = ('tau','pressure')).reset_index(dims_or_levels=['x','y']) #flatten into two dims 

missing_M = np.where(modis_k.sum(dim='y').values!=0)[0] # make note of where there is either missing data or clear sky for later (eg. sum of histo == 0)
modis_k = modis_k.where(modis_k.sum(dim='y') != 0, drop=True) # drop these values for k-means 

  return func(*(_execute_task(a, cache) for a in args))
  return func(*(_execute_task(a, cache) for a in args))
  return func(*(_execute_task(a, cache) for a in args))
  return func(*(_execute_task(a, cache) for a in args))


In [7]:
modis_k.load()

  ("('reshape-0668ee3a9cb48350652a5c40c24e317a', 0,  ... , None, None)))
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and 
keep data on workers

    future = client.submit(func, big_data)    # bad

    big_future = client.scatter(big_data)     # good
    future = client.submit(func, big_future)  # good


In [8]:
modis_k = modis_k.persist()

In [9]:
modis_k

# K-means:

###  Generate model from MODIS data
- 12 clusters were selected (after some trial & error and some unhelpful metrics)

In [10]:
nclus = 10

In [11]:
model = KMeans(n_clusters=nclus,random_state=0).fit(modis_k)

In [12]:
clusters = model.cluster_centers_
centers = clusters.reshape(nclus,6,7)

In [13]:
centers = xr.DataArray(dims=['k','Tau','Pressure'],
                       coords=({'k':np.arange(1,nclus+1),'Tau':np.arange(0,6),'Pressure':np.arange(0,7)}),
                       attrs={'Cloud top pressure':pressure,'Cloud optical depth':tau})
centers[:] = clusters.reshape(nclus,6,7)
centers = centers.transpose('k','Pressure','Tau')
centers = centers.reindex(Pressure=list(reversed(centers.Pressure)))

In [14]:
labels = model.labels_
CM = xr.DataArray(dims=['time','lon','lat'],
                 coords=({'time':modis.time,
                        'lat':modis.lat,
                        'lon':modis.lon}))
CM = CM.stack(x = ('time','lon','lat'))
CM[missing_M] = labels
CM = CM.unstack('x')

In [15]:
centers.to_netcdf('/g/data/jk72/slf563/ACCESS/clustering_data/modis_cluster_centres_2015-2019.nc')
CM.to_netcdf('/g/data/jk72/slf563/ACCESS/clustering_data/modis_cluster_labels_2015-2019.nc')

# Apply clustering to ACCESS

### Read ACCESS data

In [16]:
access = xr.open_mfdataset('/g/data/jk72/slf563/ACCESS/output/bx400/daily/bx400a.pdch201*',parallel=True,combine='by_coords')
access = access.rename_vars({'MODIS_ctp_tau_histogram':'histo'})
access = access.drop(('CALIPSO_BS_z_histogram','PARASOL_reflectance'))
access = access.drop_vars(('forecast_reference_time','forecast_period','height','solar_zenith_angle','backscatter'))
access = access.rename({'latitude':'lat',
                    'longitude':'lon'})
data2 = access.histo[1:,:,:]
data2[0,:,:].values = access.histo[0,:,:].values + access.histo[1,:,:].values
access = data2.fillna(0)

del data2

In [17]:
access_k = access.stack(x = ('time','lon','lat'),
                    y = ('tau','pressure')).reset_index(dims_or_levels=['x','y'])
access_k = access_k/access_k.sum(dim='y') # Normalise by one (raw values are already normalised to cloud fraction)
missing_A = np.where(access_k.sum(dim='y').values!=0)[0] # make note of where there is either missing data or clear sky for later (eg. sum of histo == 0)
access_k = access_k.where(access_k.sum(dim='y') != 0, drop=True) # drop these values for k-means 

  return func(*(_execute_task(a, cache) for a in args))
  return func(*(_execute_task(a, cache) for a in args))
  return func(*(_execute_task(a, cache) for a in args))
  return func(*(_execute_task(a, cache) for a in args))


In [18]:
modis_k.sum(dim='y').mean()

In [19]:
access_k.sum(dim='y').mean().values

array(1., dtype=float32)

### Apply clustering 

In [20]:
# have had to split this up, otherwise we run out of memory 
tmp = np.zeros(len(missing_A))
tmp[:10000000] = model.predict(access_k[:10000000,:].values)
tmp[10000000:20000000] = model.predict(access_k[10000000:20000000,:].values)
tmp[20000000:] = model.predict(access_k[20000000:,:].values)

  return func(*(_execute_task(a, cache) for a in args))
  return func(*(_execute_task(a, cache) for a in args))
  return func(*(_execute_task(a, cache) for a in args))
  return func(*(_execute_task(a, cache) for a in args))
  return func(*(_execute_task(a, cache) for a in args))
  return func(*(_execute_task(a, cache) for a in args))
  return func(*(_execute_task(a, cache) for a in args))
  return func(*(_execute_task(a, cache) for a in args))


In [21]:
CA = xr.DataArray(dims=['time','lon','lat'],
                 coords=({'time':access.time,
                        'lat':access.lat,
                        'lon':access.lon}))
CA = CA.stack(x = ('time','lon','lat'))
CA[missing_A] = tmp
CA = CA.unstack('x')

In [22]:
CA.to_netcdf('/g/data/jk72/slf563/ACCESS/clustering_data/bx400_cluster_labels_2015-2019.nc')