In [3]:
import numpy as np
import netCDF4 as nc
import xarray as xr
import matplotlib.pyplot as plt
import dask
import os, glob
import pandas as pd
#from scipy.stats import nanmean

In [4]:
import dask 
from dask_jobqueue import SLURMCluster 
from distributed import Client, progress, wait 
from tempfile import NamedTemporaryFile, TemporaryDirectory 

In [5]:
account_name = 'sylvia'
partition = 'high_priority'
job_name = 'geos-pe' 
memory = '200GiB' 
cores = 16 
walltime = '01:00:00' 

In [6]:
scratch_dir = '/xdisk/sylvia/temakgoale/DYAMOND/logs/' 
dask_scratch_dir = TemporaryDirectory(dir=scratch_dir, prefix=job_name)
cluster = SLURMCluster(memory=memory,
                       cores=cores,
                       account=account_name,
                       walltime=walltime,
                       queue=partition,
                       name=job_name,
                       processes=8,
                       scheduler_options={'dashboard_address': ':12435'},
                       local_directory=dask_scratch_dir.name,
                       job_extra_directives=[f'-J {job_name}', 
                                  f'-D {dask_scratch_dir.name}',
                                  f'--begin=now',
                                  f'--output={dask_scratch_dir.name}/LOG_cluster.%j.o',
                                  f'--output={dask_scratch_dir.name}/LOG_cluster.%j.o'
                                 ],
                       interface='eth0')

In [7]:
print(cluster.job_script())

#!/usr/bin/env bash

#SBATCH -J dask-worker
#SBATCH -p high_priority
#SBATCH -A sylvia
#SBATCH -n 1
#SBATCH --cpus-per-task=16
#SBATCH --mem=200G
#SBATCH -t 01:00:00
#SBATCH -J geos-pe
#SBATCH -D /xdisk/sylvia/temakgoale/DYAMOND/logs/geos-pen873pb_2
#SBATCH --begin=now
#SBATCH --output=/xdisk/sylvia/temakgoale/DYAMOND/logs/geos-pen873pb_2/LOG_cluster.%j.o
#SBATCH --output=/xdisk/sylvia/temakgoale/DYAMOND/logs/geos-pen873pb_2/LOG_cluster.%j.o

/opt/ohpc/pub/apps/python/3.8.12/bin/python3.8 -m distributed.cli.dask_worker tcp://10.140.86.93:38834 --nthreads 2 --nworkers 8 --memory-limit 25.00GiB --name dummy-name --nanny --death-timeout 60 --local-directory /xdisk/sylvia/temakgoale/DYAMOND/logs/geos-pen873pb_2 --interface eth0



In [8]:
from dask.distributed import Client, LocalCluster
cluster = LocalCluster()
client = Client(cluster)
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 16,Total memory: 64.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:35418,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 16
Started: Just now,Total memory: 64.00 GiB

0,1
Comm: tcp://127.0.0.1:45252,Total threads: 4
Dashboard: http://127.0.0.1:40057/status,Memory: 16.00 GiB
Nanny: tcp://127.0.0.1:37785,
Local directory: /tmp/dask-scratch-space/worker-cg6u9nc7,Local directory: /tmp/dask-scratch-space/worker-cg6u9nc7

0,1
Comm: tcp://127.0.0.1:35790,Total threads: 4
Dashboard: http://127.0.0.1:41738/status,Memory: 16.00 GiB
Nanny: tcp://127.0.0.1:36277,
Local directory: /tmp/dask-scratch-space/worker-pgj61peo,Local directory: /tmp/dask-scratch-space/worker-pgj61peo

0,1
Comm: tcp://127.0.0.1:43230,Total threads: 4
Dashboard: http://127.0.0.1:37051/status,Memory: 16.00 GiB
Nanny: tcp://127.0.0.1:34687,
Local directory: /tmp/dask-scratch-space/worker-ozqs0me0,Local directory: /tmp/dask-scratch-space/worker-ozqs0me0

0,1
Comm: tcp://127.0.0.1:32863,Total threads: 4
Dashboard: http://127.0.0.1:35233/status,Memory: 16.00 GiB
Nanny: tcp://127.0.0.1:45101,
Local directory: /tmp/dask-scratch-space/worker-5z7hsm9l,Local directory: /tmp/dask-scratch-space/worker-5z7hsm9l


In [9]:
dask_client = Client(cluster)
dask_client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 16,Total memory: 64.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:35418,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 16
Started: Just now,Total memory: 64.00 GiB

0,1
Comm: tcp://127.0.0.1:45252,Total threads: 4
Dashboard: http://127.0.0.1:40057/status,Memory: 16.00 GiB
Nanny: tcp://127.0.0.1:37785,
Local directory: /tmp/dask-scratch-space/worker-cg6u9nc7,Local directory: /tmp/dask-scratch-space/worker-cg6u9nc7

0,1
Comm: tcp://127.0.0.1:35790,Total threads: 4
Dashboard: http://127.0.0.1:41738/status,Memory: 16.00 GiB
Nanny: tcp://127.0.0.1:36277,
Local directory: /tmp/dask-scratch-space/worker-pgj61peo,Local directory: /tmp/dask-scratch-space/worker-pgj61peo

0,1
Comm: tcp://127.0.0.1:43230,Total threads: 4
Dashboard: http://127.0.0.1:37051/status,Memory: 16.00 GiB
Nanny: tcp://127.0.0.1:34687,
Local directory: /tmp/dask-scratch-space/worker-ozqs0me0,Local directory: /tmp/dask-scratch-space/worker-ozqs0me0

0,1
Comm: tcp://127.0.0.1:32863,Total threads: 4
Dashboard: http://127.0.0.1:35233/status,Memory: 16.00 GiB
Nanny: tcp://127.0.0.1:45101,
Local directory: /tmp/dask-scratch-space/worker-5z7hsm9l,Local directory: /tmp/dask-scratch-space/worker-5z7hsm9l


# PRECEFF TIME PERIOD AVERAGE INPUT

In [15]:
basedir = 'data/input/directory'
basedir2 = 'data/output/directory'
data_file_path1 = basedir + 'CWP_input_file.nc'
data_file1 = xr.open_dataset( data_file_path1)
data_file_path2 = basedir + 'Pr_input_file.nc'
data_file2 = xr.open_dataset( data_file_path2)
cwp_data = data_file1['CWP_var']
precip_data = data_file2['PR_var']
#
precip_threshold_0001 = 2.777777778e-8
precip_threshold_1 = 2.777777778e-4
cwp_threshold = 100
cwp_data_filtered = xr.where( (cwp_data > 0) & (cwp_data < cwp_threshold), cwp_data, np.nan )
#precip_data_filtered =xr.where( precip_data > precip_threshold_1, precip_data, np.nan )
precip_data_filtered =xr.where( (precip_data > precip_threshold_0001) & (precip_data < precip_threshold_1), precip_data, np.nan )
#
mask = ~np.isnan(precip_data_filtered)
cwp_data_filtered = cwp_data_filtered.where(mask)
#
precip_timeavg = precip_data_filtered.mean(skipna=True, dim="time")
cwp_timeavg = cwp_data_filtered.mean(skipna=True, dim="time")
precip_efficiency = precip_timeavg / cwp_timeavg
precip_efficiency = xr.where(precip_efficiency < 50, precip_efficiency, np.nan)
#
#precip_efficiency.to_netcdf( basedir2 + 'Model_name_PRECEFF_1mm_hr_20160809-20160909_Asia_timeavg.nc')
precip_efficiency.to_netcdf( basedir2 + 'Model_name_PRECEFF_less1mm_hr_20160809-20160909_Asia_timeavg.nc')
print("Precipitation efficiency with threshold calculated and saved to the output file.")

Precipitation efficiency with threshold calculated and saved to the output file.


# PRECEFF DAILY AVERAGE INPUT

In [13]:
basedir = 'data/input/directory'
basedir2 = 'data/output/directory'
data_file_path1 = basedir + 'CWP_input_file.nc'
data_file1 = xr.open_dataset( data_file_path1)
data_file_path2 = basedir + 'Pr_input_file.nc'
data_file2 = xr.open_dataset( data_file_path2)
cwp_data = data_file1['CWP_var']
precip_data = data_file2['PR_var']
#
precip_threshold_0001 = 2.777777778e-8
precip_threshold_1 = 2.777777778e-4
cwp_threshold = 100
cwp_data_filtered = xr.where( (cwp_data > 0) & (cwp_data < cwp_threshold), cwp_data, np.nan) 
#precip_data_filtered =xr.where( precip_data > precip_threshold_1, precip_data, np.nan )
precip_data_filtered =xr.where( (precip_data > precip_threshold_0001) & (precip_data < precip_threshold_1), precip_data, np.nan )
#
# Create a mask where PE is not NaN
mask = ~np.isnan(precip_data_filtered)
# Apply the mask to CWP
cwp_data_filtered_mean = cwp_data_filtered.where(mask)
#
#Convert time values to integers representing 15-minute intervals
time_int = (precip_data_filtered.time - precip_data_filtered.time[0]) / pd.Timedelta(minutes=15)
# Group the data into intervals of 8 timesteps and calculate the mean for each interval
precip_data_filtered_mean = precip_data_filtered.groupby((time_int // 96).astype(int)).mean(skipna=True, dim='time')
cwp_data_filtered_mean = cwp_data_filtered.groupby((time_int // 96).astype(int)).mean(skipna=True, dim='time')                           
#
precip_efficiency = precip_data_filtered_mean / cwp_data_filtered_mean
precip_efficiency = xr.where(precip_efficiency < 50, precip_efficiency, np.nan)
precip_efficiency_spatialavg = precip_efficiency.mean(skipna=True, dim="time")
#
#precip_efficiency.to_netcdf( basedir2 + 'Model_name_PRECEFF_1mm_hr_20160809-20160909_Asia_timeavg.nc')
precip_efficiency.to_netcdf( basedir2 + 'Model_name_PRECEFF_less1mm_daily_20160809-20160909_Asia.nc' )
print("Precipitation efficiency with threshold calculated and saved to the output file.")

Precipitation efficiency with threshold calculated and saved to the output file.
