In [1]:
import dask
import numpy as np
import xarray as xr
import glob
import matplotlib.pyplot as plt
import time
import datetime
from dask_jobqueue import SLURMCluster
from dask.distributed import Client, progress
import warnings

In [2]:
warnings.filterwarnings('ignore')
now = datetime.datetime.now()
now_string = str(now.strftime("%Y-%m-%d_%A_%H%M%S"))
now_string

'2019-07-06_Saturday_211534'

In [3]:
cluster = SLURMCluster(cores=16, memory='20GB', project='pi_jianwu', queue='high_mem', job_extra=['--qos=medium+','--exclusive'])

In [4]:
cluster.scheduler

<Scheduler: "tcp://10.2.1.9:41381" processes: 0 cores: 0>

In [5]:
cluster.scale(5)

In [7]:
!squeue -u savio1

             JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON)
           1075504  high_mem dask-wor   savio1  R       0:09      1 cnode007
           1075505  high_mem dask-wor   savio1  R       0:09      1 cnode025
           1075506  high_mem dask-wor   savio1  R       0:09      1 cnode026
           1075507  high_mem dask-wor   savio1  R       0:09      1 cnode027
           1075503  high_mem dask-wor   savio1  R       0:12      1 cnode006
           1075472  high_mem   tunnel   savio1  R    4:39:40     16 cnode[009-024]


In [8]:
client = Client()

In [9]:
cluster.dashboard_link

'http://10.2.1.9:35512/status'

In [10]:
cluster.running_jobs

OrderedDict([('1075503',
              {'dask-worker--1075503--': <Worker 'tcp://10.2.1.6:39152', memory: 0, processing: 0>}),
             ('1075504',
              {'dask-worker--1075504--': <Worker 'tcp://10.2.1.7:36370', memory: 0, processing: 0>}),
             ('1075505',
              {'dask-worker--1075505--': <Worker 'tcp://10.2.1.25:37678', memory: 0, processing: 0>}),
             ('1075507',
              {'dask-worker--1075507--': <Worker 'tcp://10.2.1.27:40251', memory: 0, processing: 0>}),
             ('1075506',
              {'dask-worker--1075506--': <Worker 'tcp://10.2.1.26:34315', memory: 0, processing: 0>})])

In [11]:
cluster.pending_jobs

OrderedDict()

In [12]:
print(client)

<Client: scheduler='tcp://127.0.0.1:34039' processes=36 cores=36>


In [13]:
t0 = time.time()
total_pix = np.zeros((180, 360))
cloud_pix = np.zeros((180, 360))

In [14]:
def ingest_data(M03_dir, M06_dir):
    M03_files = sorted(glob.glob(M03_dir + "MYD03.A2008*.hdf"))
    M06_files = sorted(glob.glob(M06_dir + "MYD06_L2.A2008*.hdf"))
    for M03, M06 in zip (M03_files, M06_files):
        d06 = xr.open_mfdataset(M06[:], parallel=True)['Cloud_Mask_1km'][:,:,:].values
        d06CM = d06[::3,::3,0]
        ds06_decoded = (np.array(d06CM, dtype = "byte") & 0b00000110) >> 1
        d03_lat = xr.open_mfdataset(M03[:], drop_variables = "Scan Type", parallel=True)['Latitude'][:,:].values
        d03_lon = xr.open_mfdataset(M03[:], drop_variables = "Scan Type", parallel=True)['Longitude'][:,:].values

        lat = d03_lat[::3,::3]
        lon = d03_lon[::3,::3]

        l_index = (lat + 89.5).astype(int).reshape(lat.shape[0]*lat.shape[1])
        lat_index = np.where(l_index > -1, l_index, 0)
        ll_index = (lon + 179.5).astype(int).reshape(lon.shape[0]*lon.shape[1])
        lon_index = np.where(ll_index > -1, ll_index, 0)
        for i, j in zip(lat_index, lon_index):
            total_pix[i,j] += 1

        indicies = np.nonzero(ds06_decoded <= 0)
        row_i = indicies[0]
        column_i = indicies[1]
        cloud_lon = [lon_index.reshape(ds06_decoded.shape[0],ds06_decoded.shape[1])[i,j] for i, j in zip(row_i, column_i)]
        cloud_lat = [lat_index.reshape(ds06_decoded.shape[0],ds06_decoded.shape[1])[i,j] for i, j in zip(row_i, column_i)]

        for x, y in zip(cloud_lat, cloud_lon):
            cloud_pix[int(x),int(y)] += 1
            
    return cloud_pix, total_pix

In [15]:
t0 = time.time()
import dask.multiprocessing
dask.config.set(num_workers=5)

<dask.config.set at 0x2aabf9ae16d8>

In [16]:
M03_dir = '/umbc/xfs1/jianwu/common/MODIS_Aggregation/MODIS_one_day_data/'
M06_dir = '/umbc/xfs1/jianwu/common/MODIS_Aggregation/MODIS_one_day_data/'

In [17]:
cluster.running_jobs

OrderedDict([('1075503',
              {'dask-worker--1075503--': <Worker 'tcp://10.2.1.6:39152', memory: 0, processing: 0>}),
             ('1075504',
              {'dask-worker--1075504--': <Worker 'tcp://10.2.1.7:36370', memory: 0, processing: 0>}),
             ('1075505',
              {'dask-worker--1075505--': <Worker 'tcp://10.2.1.25:37678', memory: 0, processing: 0>}),
             ('1075507',
              {'dask-worker--1075507--': <Worker 'tcp://10.2.1.27:40251', memory: 0, processing: 0>}),
             ('1075506',
              {'dask-worker--1075506--': <Worker 'tcp://10.2.1.26:34315', memory: 0, processing: 0>})])

In [19]:
t0 = time.time()
import dask.multiprocessing
dask.config.set(num_workers=5)
M03_dir = "/umbc/xfs1/jianwu/common/MODIS_Aggregation/MODIS_one_day_data/"
M06_dir = "/umbc/xfs1/jianwu/common/MODIS_Aggregation/MODIS_one_day_data/"
from dask.distributed import Client, progress
future1 = client.submit(ingest_data,M03_dir,M06_dir)
progress(future1)

VBox()

In [20]:
future1.result()

RuntimeError: can't start new thread

In [25]:
cluster.close()

BlockingIOError: [Errno 11] Resource temporarily unavailable

In [26]:
client.close()

In [None]:
cf1 = future1.result()[0]/future1.result()[1]
progress(cf1)

In [None]:
plt.figure(figsize=(14,7))
plt.contourf(range(-180,180), range(-90,90), cf1, 100, cmap = "jet")
plt.xlabel("Longitude", fontsize = 14)
plt.ylabel("Latitude", fontsize = 14)
plt.title("Level 3 Cloud Fraction Aggregation For One Month %s" %now_string, fontsize = 16)
plt.colorbar()
plt.savefig("/umbc/xfs1/jianwu/common/MODIS_Aggregation/savioexe/test/8/%s.png" %now_string)

In [None]:
cf2 = xr.DataArray(cf1)
cf2.to_netcdf("/umbc/xfs1/jianwu/common/MODIS_Aggregation/savioexe/test/8/%s.hdf" %now_string)

In [None]:
t1 = time.time()
total = t1-t0
print(total,"seconds")

In [None]:
print(total/60,"minutes")

In [None]:
cluster.pending_jobs

In [None]:
cluster.close()

In [None]:
client.close()

In [None]:
!squeue -u savio1