# ATLAS v2 Demo - Data Generation

## Environment Setup

In [78]:
# imports
import ee, os, math, joblib, pickle, json
import numpy as np, pandas as pd, tensorflow as tf
from datetime import datetime, timezone
from tensorflow import keras
from keras import layers
from ast import literal_eval
from sklearn.linear_model import LogisticRegression
from featuretoolkit import src as ftk

# authenticating & initializing earth engine
from dotenv import load_dotenv
load_dotenv()
ee.Authenticate()
ee.Initialize(project=os.getenv('projectkey'))

# reading config & setting seed + guaranteeing determinism
import yaml
with open('config.yaml','r') as f:config_full=yaml.safe_load(f)
SEED=config_full.get('seed')
RNG=np.random.default_rng(SEED) # initializing global rng
np.random.seed(SEED)
tf.random.set_seed(SEED)
tf.keras.utils.set_random_seed(SEED)
tf.config.experimental.enable_op_determinism()
os.environ['TF_CPP_MIN_LOG_LEVEL']='2' # suppress tensorflow warnings

config=config_full.get('model') # specialize config for only model section - no more feature config required
config_path=config.get('path') # path config section

## Dataset Processing

In [2]:
dF_pca=pd.read_csv(config_path.get('pca')) # set of all points w/ pca features (add a distinct index column too)

ratios=config.get('split_ratio') # train, val, test ratio
assert sum(ratios)==1 and len(ratios)==3 # asserting valid split ratios
folds=np.sort(dF_pca['fold_id'].unique()) # fold ids
splits={}

# iterating through each fold to populate fold-wise splits with arrays of dF_pca indices
for fold in folds:
    idx=dF_pca.index[dF_pca['fold_id']==fold].to_numpy()
    # generating unique permutation each iteration while still maintaining reproducibility due to global seed & rng, then splitting indices
    perm=RNG.permutation(idx)
    n_train=int(len(perm)*ratios[0])
    n_val=int(len(perm)*ratios[1])
    splits[fold]={
        'train_idx':perm[:n_train],
        'val_idx':perm[n_train:n_train+n_val],
        'test_idx':perm[n_train+n_val:]}

splits # split dict organized fold-wise

# collecting basic fold stats, merging into superfolds
fold_stats=[]
for fid,s in splits.items(): # looping through folds
    pos=int((dF_pca.loc[s['train_idx'],'key']==1).sum()+(dF_pca.loc[s['val_idx'],'key']==1).sum()+(dF_pca.loc[s['test_idx'],'key']==1).sum())
    bg=int((dF_pca.loc[s['train_idx'],'key']==0).sum()+(dF_pca.loc[s['val_idx'],'key']==0).sum()+(dF_pca.loc[s['test_idx'],'key']==0).sum())
    fold_stats.append((fid,pos,bg,pos+bg))
superfolds={i:{'folds':[],'pos':0,'bg':0,'n':0} for i in range(config.get('folds'))}
for fid,pos,bg,n in sorted(fold_stats,key=lambda x:x[3],reverse=True): # largest folds first
    target=min(superfolds,key=lambda k:superfolds[k]['n'])
    superfolds[target]['folds'].append(fid)
    superfolds[target]['pos']+=pos
    superfolds[target]['bg']+=bg
    superfolds[target]['n']+=n

# assembling merged splits together
merged_splits={}
for sf_id,info in superfolds.items():
    merged_splits[sf_id]={'train_idx':[],'val_idx':[],'test_idx':[]}
    for fold in info['folds']: # looping through folds in superfold
        s=splits[fold]
        merged_splits[sf_id]['train_idx'].extend(s['train_idx'])
        merged_splits[sf_id]['val_idx'].extend(s['val_idx'])
        merged_splits[sf_id]['test_idx'].extend(s['test_idx'])
    for k in ('train_idx','val_idx','test_idx'): # converting to numpy arrays
        merged_splits[sf_id][k]=np.array(merged_splits[sf_id][k],dtype=int)

# summary printout
totals,pos_list,neg_list,ratios=[],[],[],[]
for sf_id,s in merged_splits.items(): # looping through superfolds
    pos=int((dF_pca.loc[s['train_idx'],'key']==1).sum()+(dF_pca.loc[s['val_idx'],'key']==1).sum()+(dF_pca.loc[s['test_idx'],'key']==1).sum())
    bg=int((dF_pca.loc[s['train_idx'],'key']==0).sum()+(dF_pca.loc[s['val_idx'],'key']==0).sum()+(dF_pca.loc[s['test_idx'],'key']==0).sum())
    total=pos+bg
    ratio=pos/bg
    totals.append(total)
    pos_list.append(pos)
    neg_list.append(bg)
    ratios.append(ratio)
    print(f'SF_{sf_id}: {len(info['folds'])} subfolds, total={total}, pos={pos}, bg={bg}, ratio={ratio:.3f}')

# computing total weights for positives & background
sum_pos=dF_pca.loc[dF_pca['key']==1,'weight'].sum()
sum_bg=dF_pca.loc[dF_pca['key']==0,'weight'].sum()

# computing scaling factor (so both totals are comparable) (this is a necessary step because training just doesnt work otherwise)
s=sum_bg/max(sum_pos,1e-12)
log_s=np.log(s) # natural log of s (saved for later)

# create scaled column
dF_pca['weight_scaled']=dF_pca['weight'].copy()
dF_pca.loc[dF_pca['key']==0,'weight_scaled']/=s

# printout
print(f'background rescaled by 1/s = {1/s:.3e} (log(s) = {log_s:.3f})')
print(f'\noriginal weights\n∑w (positives): {dF_pca.loc[dF_pca['key']==1,'weight'].sum():.3e}\n∑w (background): {dF_pca.loc[dF_pca['key']==0,'weight'].sum():.3e}')
print(f'\nrescaled weights\n∑w (positives): {dF_pca.loc[dF_pca['key']==1,'weight_scaled'].sum():.3e}\n∑w (background): {dF_pca.loc[dF_pca['key']==0,'weight_scaled'].sum():.3e}')

SF_0: 36 subfolds, total=542477, pos=173315, bg=369162, ratio=0.469
SF_1: 36 subfolds, total=542469, pos=152885, bg=389584, ratio=0.392
SF_2: 36 subfolds, total=542478, pos=138661, bg=403817, ratio=0.343
SF_3: 36 subfolds, total=542472, pos=162061, bg=380411, ratio=0.426
SF_4: 36 subfolds, total=542443, pos=119822, bg=422621, ratio=0.284
SF_5: 36 subfolds, total=542418, pos=130614, bg=411804, ratio=0.317
SF_6: 36 subfolds, total=542533, pos=118381, bg=424152, ratio=0.279
SF_7: 36 subfolds, total=542481, pos=77580, bg=464901, ratio=0.167
background rescaled by 1/s = 2.315e-08 (log(s) = 17.581)

original weights
∑w (positives): 7.356e+03
∑w (background): 3.178e+11

rescaled weights
∑w (positives): 7.356e+03
∑w (background): 7.356e+03


## Model Construction

In [74]:
# initialize hyperparameters
H=config.get('hyperparameters')
# casting hyperparameters to correct types (strings to float or tuple)
for k,v in H.items():
    if type(v)==str and 'e' in v:H[k]=float(v)
    elif type(v)==str and '(' in v:H[k]=literal_eval(v)

# rff+linear head
class RandomFourierFeatures(layers.Layer):
    # initialization
    def __init__(self,input_dim,output_dim,gamma):
        super().__init__() # parent init (everything will wrap into a bigger model)
        W_init=RNG.normal(scale=np.sqrt(2*gamma),size=(input_dim,output_dim))
        b_init=RNG.uniform(0,2*np.pi,size=(output_dim,))
        self.W=self.add_weight( # fixed weights
            name='W',shape=(input_dim,output_dim),
            initializer=tf.constant_initializer(W_init),trainable=False)
        self.b=self.add_weight( # fixed biases
            name='b',shape=(output_dim,),
            initializer=tf.constant_initializer(b_init),trainable=False)
    # forward pass
    def call(self,x):
        z=tf.matmul(x,self.W)+self.b
        return tf.sqrt(2/tf.cast(tf.shape(self.W)[1],tf.float32))*tf.cos(z)

# rff+linear lgcp model
class RFF_LGCP(keras.Model):
    # initialization
    def __init__(self,input_dim,rff_dim,gamma,l2,mu_clip):
        super().__init__() # parent init
        self.rff=RandomFourierFeatures(input_dim,rff_dim,gamma) # rff layer
        self.mu_clip=mu_clip # clipping range for mu
        self.linear=layers.Dense(1,use_bias=True,kernel_regularizer=keras.regularizers.l2(l2),bias_initializer=tf.constant_initializer(-5)) # linear layer w/ l2 regularization against overfit
    # forward pass
    def call(self,X):
        phi=self.rff(X) # this phi has nothing to do with the phi in features.ipynb
        mu=self.linear(phi)
        return tf.clip_by_value(mu,self.mu_clip[0],self.mu_clip[1])

# cox trainer
class CoxTrainer(keras.Model):
    # intialization
    def __init__(self,lgcp_model):
        super().__init__() # parent init
        self.lgcp_model=lgcp_model
    # reduce terms helper (just the math)
    def _reduce_terms(self,y,w,mu,add_reg_losses):
        pos=tf.cast(tf.equal(y,1),tf.float32)
        L_pos=-tf.reduce_sum(w*pos*mu)
        L_neg=tf.reduce_sum(w*(1-pos)*tf.exp(mu)) # times lambda
        reg=tf.add_n(self.lgcp_model.losses) if (add_reg_losses and self.lgcp_model.losses) else 0
        nll_pos_per_event=L_pos/tf.reduce_sum(w*pos)+1e-12
        return L_pos+L_neg+reg,nll_pos_per_event # L_pos+L_neg+reg is total loss
    
    # training step
    def train_step(self,data):
        # unpacking then reshaping y & w to column vectors
        X,y,w=data # unpacking data
        y=tf.cast(tf.reshape(y,(-1,1)),tf.float32)
        w=tf.cast(tf.reshape(w,(-1,1)),tf.float32)
        # gradient tape for automatic differentiation then compute & apply gradients
        with tf.GradientTape() as tape: total,nll_pos_evt=self._reduce_terms(y,w,self.lgcp_model(X,training=True),add_reg_losses=True) # gradient tape for automatic differentiation
        grads=tape.gradient(total,self.trainable_variables)
        if H.get('grad_clip'):grads=[tf.clip_by_norm(g,H['grad_clip']) if g is not None else None for g in grads] # gradient clipping (important)
        self.optimizer.apply_gradients(zip(grads,self.trainable_variables))
        return {'loss': total,'nll_pos_per_event': nll_pos_evt} # return

    # evaluation step
    def test_step(self,data):
        # unpacking then reshaping y & w to column vectors
        X,y,w=data
        y=tf.cast(tf.reshape(y,(-1,1)),tf.float32)
        w=tf.cast(tf.reshape(w,(-1,1)),tf.float32)
        mu=self.lgcp_model(X,training=False) # forward pass
        _,nll_pos_evt=self._reduce_terms(y,w,mu,add_reg_losses=False) # compute nll
        return {'loss': nll_pos_evt,'nll_pos_per_event': nll_pos_evt} # return

# helper for assembling datasets for training loop
# basically just a tf.data.Dataset.from_tensor_slices wrapper
def make_dataset(df,idx,features,ycol,wcol,batch,shuffle=False):
    X=df.loc[idx,features].to_numpy(np.float32)
    y=df.loc[idx,ycol].to_numpy(np.float32)
    w=df.loc[idx,wcol].to_numpy(np.float32)
    ds=tf.data.Dataset.from_tensor_slices((X,y,w))
    if shuffle:ds=ds.shuffle(buffer_size=min(200_000,len(idx)),seed=SEED)
    return ds.batch(batch).prefetch(tf.data.AUTOTUNE)

# training function
def train_rff_lgcp(df,split,H):
    features=[c for c in df.columns if c.startswith('pca_')] # detecting feature columns
    ds_tr=make_dataset(df,split['train_idx'],features,'key','weight_scaled',H['batch_size'],shuffle=True) # training dataset
    ds_va=make_dataset(df,split['val_idx'],features,'key','weight_scaled',H['batch_size']) # validation dataset
    model=RFF_LGCP(len(features),H['rff_dim'],H['rff_gamma'],H['l2'],H['mu_clip'],SEED) # initializing model
    trainer=CoxTrainer(model) # wrapping model in trainer
    # using AdamW optimizer rather than Adam (weight decay is important) then define callbacks & fit model then return
    opt=keras.optimizers.AdamW(learning_rate=H['learning_rate'],weight_decay=H['weight_decay'])
    trainer.compile(optimizer=opt)
    callbacks=[
        keras.callbacks.ReduceLROnPlateau(monitor='val_nll_pos_per_event',mode='min',factor=0.5,patience=H['plateau_patience'],min_delta=H['min_delta'],min_lr=H['min_lr'],verbose=1),# reduce lr on plateau
        keras.callbacks.EarlyStopping(monitor='val_nll_pos_per_event',mode='min',patience=H['patience'],min_delta=H['min_delta'],restore_best_weights=True,verbose=1)] # early stopping based on patience
    trainer.fit(ds_tr,validation_data=ds_va,epochs=H['epochs'],callbacks=callbacks,verbose=1)
    return trainer,model,features

# finally defining the ATLAS class for loading trained models and performing inference
class ATLAS:
    def __init__(self,dF_pca,merged_splits,H,save_dir=config_path.get('models'),sf_id=None,verbose=True):
        # initialization; loads & builds from best superfold model by default unless specified otherwise (from models directory as per config)
        self.df=dF_pca
        self.splits=merged_splits
        self.H=H
        self.save_dir=save_dir
        # feature columns (ordered pca_1..pca_K)
        self.feats=sorted([c for c in self.df.columns if c.startswith('pca_')],key=lambda c:int(c.split('_')[1]))
        self.D=len(self.feats)
        # choose best superfold unless specified otherwise
        if sf_id is None:self.sf_id,self.sf_val_score=self._pick_best_superfold(verbose=verbose)
        else:self.sf_id,self.sf_val=sf_id,None

        # build & load chosen model for inference
        self.model=RFF_LGCP(self.D,H['rff_dim'],H['rff_gamma'],H['l2'],H['mu_clip'])
        _=self.model(tf.zeros((1,self.D),dtype=tf.float32))
        wpath=os.path.join(self.save_dir,f'rff_lgcp_sf{self.sf_id}.weights.h5')
        self.model.load_weights(wpath)
        if verbose:print(f'ATLAS: loaded superfold {self.sf_id} @ {wpath}')

        # default reference window for window-aware scaling (can be overridden at call time though)
        self.ref_area_km2=np.pi*17.0**2 # hardcoding this here however because the dataset was trained on an avg radius of 17 but change at your own risk
        self.ref_dt_sec=3600.0 # really wouldn't recommend changing this beyond an hour since the data is only relevant for hour buckets

    # pick best sf by val nll/pos
    def _pick_best_superfold(self,verbose=True):
        scores={}
        for sf in sorted(self.splits.keys()): # looping through existing models directory and loading them up
            wpath=os.path.join(self.save_dir,f'rff_lgcp_sf{sf}.weights.h5')
            if not os.path.exists(wpath):
                if verbose:print(f'[skip] no weights for sf {sf} @ {wpath}')
                continue
            m=RFF_LGCP(self.D,self.H['rff_dim'],self.H['rff_gamma'],self.H['l2'],self.H['mu_clip']) # building models
            _=m(tf.zeros((1,self.D),dtype=tf.float32))
            m.load_weights(wpath)
            val_idx=np.asarray(self.splits[sf]['val_idx'])
            total_L_pos=0;total_L_neg=0;total_w_pos=0
            bs=self.H.get('batch_size',8192) # establishing batch size
            # for each validation batch essentially compile total weights, loss, etc. to eventually find lowest in directory
            for start in range(0,len(val_idx),bs):
                sl=val_idx[start:start+bs]
                Xv=self.df.loc[sl,self.feats].to_numpy(np.float32)
                yv=(self.df.loc[sl,'key']==1).to_numpy(np.float32).reshape(-1,1)
                wv=self.df.loc[sl,'weight_scaled'].to_numpy(np.float32).reshape(-1,1)
                mu=m(Xv,training=False).numpy()
                total_L_pos+=(wv*yv*(-mu)).sum() # positive loss
                total_L_neg+=(wv*(1-yv)*np.exp(mu)).sum() # neg loss
                total_w_pos+=(wv*yv).sum() # positive weight
            if total_w_pos<=0:
                if verbose:print(f'ATLAS: sf {sf} has zero positive weight in val; skipping') # probably means wrong directory
                continue
            score=(total_L_pos+total_L_neg+tf.add_n(m.losses).numpy())/total_w_pos # the tf.add... is the regularization
            scores[sf]=score
            if verbose:print(f'[sf {sf}] val_nll_per_pos={score:.4f}')
        if not scores:raise RuntimeError('no scored models found in directory')
        best_sf=min(scores,key=scores.get) # found best superfold, return it + scores
        return best_sf,scores[best_sf]

    # fit logistic calibrator: μ to R_ref in [0,1] for given reference window
    # this is just a simple logistic regression that takes the output mu value and converts it to an appropriate and easier-to-interpret risk value
    def fit_logit_calibrator(self,save_path=None,verbose=True):
        vidx=np.asarray(self.splits[self.sf_id]['val_idx']) # loading validation ids (all) before constructing batches
        Xv=self.df.loc[vidx,self.feats].to_numpy(np.float32)
        yv=self.df.loc[vidx,'key'].to_numpy(np.int32)
        wv=self.df.loc[vidx,'weight_scaled'].to_numpy(np.float64)
        mu_v=self.model.predict(Xv,batch_size=self.H.get('batch_size',8192),verbose=0).ravel()
        lr=LogisticRegression(solver='lbfgs',max_iter=500)
        lr.fit(mu_v.reshape(-1,1),yv,sample_weight=wv)
        self._logit=lr
        if verbose:
            a=float(lr.coef_[0,0]);b=float(lr.intercept_[0])
            print(f'[logit] fitted: R=σ({a:.4f}·μ+{b:.4f})')
        if save_path:joblib.dump(lr,save_path)
        return self._logit
    
    # load in logit calibrator
    def load_logit_calibrator(self,load_path,verbose=True):
        self._logit=joblib.load(load_path) # load from path -- self explanatory
        if verbose:
            a=float(self._logit.coef_[0,0]);b=float(self._logit.intercept_[0])
            print(f'[logit] loaded: R=σ({a:.4f}·μ+{b:.4f})')
        return self._logit

    # μ→risk (reference window). im gonna just old name (prob) as alias for safety and because the code wont work otherwise but this is referring to the risk score
    def risk_from_mu_logit(self,mu):
        if not hasattr(self,'_logit'):raise RuntimeError('no logistic calibrator; call fit_logit_calibrator() or load_logit_calibrator()') # hint hint
        mu=np.asarray(mu,dtype=np.float64).reshape(-1,1)
        return self._logit.predict_proba(mu)[:,1]
    prob_from_mu_logit=risk_from_mu_logit # alias (back-compat)

    # main inferential function to predict >=1 landslide event probability in a window (given uniform subsamples over window W=A·Δt)
    # note that window area is in km^2 and dt is in seconds
    # this produces a single inference and you can plug in the output from the phi function directly into this pretty much
    def risk_score(self,df_pca_subsamples,window_area,dt,batch_size=None):
        # columnwise alignment and then guarantee matching dimensionality
        cols=sorted([c for c in df_pca_subsamples.columns if c.startswith('pca_')],key=lambda c:int(c.split('_')[1]))
        assert len(cols)==self.D,f'PCA dimensionality mismatch: model expects D={self.D},got {len(cols)}'
        X=df_pca_subsamples[cols].to_numpy(np.float32)
        bs=int(batch_size or self.H.get('batch_size',8192))

        mu=self.model.predict(X,batch_size=bs,verbose=0).ravel().astype(np.float64)
        r_i=self.risk_from_mu_logit(mu) # per-row risk for reference window
        r_i=np.clip(r_i,0.0,1.0)

        # simple, stable aggregator for ref-window risk over K rows
        R_ref=float(np.clip(np.mean(r_i),0.0,1.0))

        # ref→target window scaling via Poisson mapping
        A_ref=getattr(self,'ref_area_km2',np.pi*17.0**2)
        T_ref=getattr(self,'ref_dt_sec',3600.0)
        Lambda_ref=-np.log(max(1.0-R_ref,1e-12)) # just ensuring the R_ref isnt 0 which it shouldnt be unless you change it
        scale=(window_area*dt)/(A_ref*T_ref)
        Lambda=float(np.clip(Lambda_ref*scale,0.0,1e12))
        R=float(np.clip(1.0-np.exp(-Lambda),0.0,1.0))
        return Lambda,R

    # raw μ for a batch of rows
    def mu(self,df_pca,batch_size=None):
        # columnwise alignment and then guarantee matching dimensionality
        cols=sorted([c for c in df_pca.columns if c.startswith('pca_')],key=lambda c:int(c.split('_')[1]))
        assert len(cols)==self.D,f'ATLAS: PCA dimensionality mismatch: model expects D={self.D},got {len(cols)}'
        X=df_pca[cols].to_numpy(np.float32)
        bs=int(batch_size or self.H.get('batch_size',8192))
        return self.model.predict(X,batch_size=bs,verbose=0).ravel()

    # best μ threshold by weighted F1
    # just loop through some values we know mu will be around pretty much and find the best one
    # change n_grid as needed for more or less precision
    def best_mu_threshold(self,mu,y,w,n_grid=200):
        mu=np.asarray(mu,dtype=np.float64)
        y=np.asarray(y,dtype=np.int32)
        w=np.asarray(w,dtype=np.float64)
        grid=np.quantile(mu,np.linspace(0,1,n_grid))
        best=(-1.0,float(np.median(mu)))
        for t in grid: # essentially looping through a linear space of mu threshold values to approximate the optimal point
            yp=(mu>=t).astype(np.int32)
            TP=float(w[(yp==1)&(y==1)].sum())
            FP=float(w[(yp==1)&(y==0)].sum())
            FN=float(w[(yp==0)&(y==1)].sum())
            prec=TP/(TP+FP+1e-12);rec=TP/(TP+FN+1e-12) # calculating precision in order to get f1 score, then just scoring
            f1=2*prec*rec/(prec+rec+1e-12)
            if f1>best[0]:best=(f1,float(t))
        return best

## Feature Extraction

In [4]:
# get feature-related & sampling-related config dicts
config_features=config_full.get('features').get('features')
config_sampling=config_full.get('features').get('sampling')
unmask_val=config_sampling.get('unmask_val')
SCALE_METRIC=ee.Number(config_sampling.get('scale'))

# import all necessary images/imagecollections used in the samploing process
nasa_dem=ee.Image('USGS/SRTMGL1_003').resample('bilinear')
openlandmap_sand=ee.Image('OpenLandMap/SOL/SOL_SAND-WFRACTION_USDA-3A1A1A_M/v02').resample('bilinear')
openlandmap_clay=ee.Image('OpenLandMap/SOL/SOL_CLAY-WFRACTION_USDA-3A1A1A_M/v02').resample('bilinear')
esa_worldcover=ee.ImageCollection('ESA/WorldCover/v200').first().rename('land_class')
esa_worldcover=esa_worldcover.reproject(crs=esa_worldcover.projection(),scale=10) # reprojecting worldcover to get nearest-pixel resampling
nasa_gpm=ee.ImageCollection('NASA/GPM_L3/IMERG_V07').select('precipitation')
copernicus_era5=ee.ImageCollection('ECMWF/ERA5_LAND/HOURLY').select([*[f'volumetric_soil_water_layer_{level}' for level in config_features.get('vsw_levels')],'potential_evaporation_hourly','surface_runoff','sub_surface_runoff','snow_cover','snow_depth_water_equivalent']) # selecting only relevant bands
nasa_modis_terraveg=ee.ImageCollection('MODIS/061/MOD13Q1')
upa=ee.Image('MERIT/Hydro/v1_0_1').resample('bilinear').select('upa') # upstream drainage area to calculate specific catchment area for terrain wetness
vwc_0033kPa=ee.Image('ISRIC/SoilGrids250m/v2_0/wv0033').resample('bilinear').unmask(-99) # volumetric water content at 33kPa suction level
vwc_1500kPa=ee.Image('ISRIC/SoilGrids250m/v2_0/wv1500').resample('bilinear').unmask(-99) # volumetric water content at 1.5mPa suction level

# slightly adjusted phi function for ease of passing predictions into ATLAS (for demonstration), although this is a mostly identical copy of the original phi function and is not intended for direct use outside of ATLAS or for real deployment/sampling purposes
# the automatic assignment of a K value based on spacetime volume is removed here, giving the user freedom to specify K (subsampling accuracy) directly.
# the adjustments take into account some of the post-processing steps performed in the features.ipynb notebook after running phi originally like constructing a snow mask and running principal component analysis on the features
# this function will not work properly with an altered config.yaml file unless the same post-processing steps are also applied to the original feature dataframe from features.ipynb which is a process that takes forever; use at your own risk
def phi(pointer:ee.geometry.Geometry,timestamp:ee.Number,radius:ee.Number,K:ee.Number=128,ignore:set={'precip_hits_72h'},seed:ee.Number=ee.Number(SEED)):

    # helper function to convert pointer & radius (km) to a roughly circular ee geometry object for subsampling (returns pointer if radius is 0)
    def uncertainty_region(pointer:ee.geometry.Geometry=pointer,radius:ee.Number=radius): 
        return ee.Algorithms.If(radius.eq(0),pointer,pointer.buffer(radius.multiply(1000)))
    region=uncertainty_region()

    # helper function to randomly & uniformly samples the spacetime area specified by parameters K times, outputs only specific timestamps & coordinate locations
    def generate_samples(region:ee.geometry.Geometry=region,timestamp:ee.Number=timestamp,K:ee.Number=K):
        start=ee.Date(timestamp)
        pts=ee.FeatureCollection.randomPoints(region=region,points=K,seed=seed).randomColumn('u',seed=seed.add(1))
        def _add_time(feature):
            ts=start.millis().add(ee.Number(3600000).multiply(feature.get('u')))
            return feature.set({'ts':ts,'date':ee.Date(ts)})
        return pts.map(_add_time)
    # initialize subsamples, calculate which buckets are occupied by aggreating u values into a list & converting into hour intervals then finding unique values
    subsamples=generate_samples().map(lambda f:f.set('bucket',ee.Number(f.get('u')).multiply(24).floor().int()))
    occupied_buckets=subsamples.aggregate_array('bucket').distinct().sort()
    subsamples=subsamples.map(lambda f:f.set('bkt_idx',occupied_buckets.indexOf(ee.Number(f.get('bucket'))).int()))
    timestamps=occupied_buckets.map(lambda b:ee.Number(timestamp).add(ee.Number(b).multiply(3600000)).add(900000)) # calculate exact quarter-way timestamp for each occupied bucket as ee.List of millis (serves as a kind of buffer)

    # helper to flatten & convert list of images to singular multibanded image
    def cat_list(imgs:ee.List):
        imgs=ee.List(imgs).flatten()
        def _cat(img,acc):return ee.Image(acc).addBands(ee.Image(img))
        return ee.Image(imgs.slice(1).iterate(_cat,ee.Image(imgs.get(0))))

    # array collapse mechanism (dynamic events are temporally flattened into 3d images that, when point-sampled, return an array representing different values over varying timesteps)
    # this function 'collapses' array images into scalar images depending on which time step is relevant for any given sample, ignoring all others
    def array_collapse(fc:ee.FeatureCollection,band_names:ee.List):
        def _collapse(f):
            bkt=ee.Number(f.get('bkt_idx'))
            def _set_scalar(pname,acc):
                acc=ee.Feature(acc)
                pname=ee.String(pname)
                val=ee.Algorithms.If(acc.propertyNames().contains(pname),ee.Number(ee.Array(acc.get(pname)).get([bkt])),ee.Number(unmask_val))
                return acc.set(pname.replace('_arr$',''),val)
            return ee.Feature(band_names.iterate(_set_scalar,f))
        return fc.map(_collapse)
    
    # helper function to fill coastal holes for era5 land images (some coastal areas return null arrays due to masking errors around coastal regions)
    def fill_coastal_ic(ic:ee.ImageCollection,radius_px:int=3):
        def _fill_coastal(img,radius_px:int=3):
            img=ee.Image(img)
            mean=img.focal_mean(radius=radius_px,units='pixels')
            # fill masked w/ neighborhood mean, then any remaining w/ unmasking value
            return img.unmask(mean).unmask(unmask_val)
        return ee.ImageCollection(ic).map(lambda i:_fill_coastal(i,radius_px))
    
    # child phi function that specializes in dynamic imagecollections (in this case those with a <1d temporal cadence)
    # dynamic images are structured as array stacks which are collapsed later on; all specified windows are defaults -- these numbers are customizable through the config.yaml file
    def phi_dynamic():

        # helper function to convert ic hourly cadence to daily aggregate
        def hourly_to_daily_mean(ic:ee.ImageCollection):
            ic=fill_coastal_ic(ic,3)
            # get start & end dates of ic
            start_date=ee.Date(ic.first().get('system:time_start'))
            end_date=ee.Date(ic.sort('system:time_start',False).first().get('system:time_start'))
            # helper function to get mean for a single day
            def _day_mean(offset):
                current_day=start_date.advance(offset,'day')
                next_day=current_day.advance(1,'day')
                # filter collection for current day, take mean for all images in filtered ic, set time start property for new resultant img & return
                mean_img=ic.filterDate(current_day,next_day).mean()
                return mean_img.set('system:time_start',current_day.millis())
            # derive number of days, convert list of images into ic after mapping over generated list of daily timestamps
            num_days=end_date.difference(start_date,'day').round()
            return ee.ImageCollection.fromImages(ee.List.sequence(0,num_days.subtract(1)).map(_day_mean))
        
        # array stacking helper function that stacks an imagecollection into one (technically 3-dimensional) image with pixels that return array values 
        def array_stack(ic:ee.ImageCollection,hrs:ee.Number,reducer:ee.Reducer,name:ee.String):
            ic=fill_coastal_ic(ic,3).map(lambda i:ee.Image(i).resample('bilinear'))
            return ee.ImageCollection.fromImages(
                timestamps.map(lambda ts:ic.filterDate(ee.Number(ts).subtract(ee.Number(hrs).multiply(3600000)),ee.Number(ts)).reduce(reducer))
            ).toBands().resample('bilinear').toArray().rename([name.cat('_arr')])
        
        # derive hourly precipitation sums over past 1, 6, 24, 72, 240, 720 hrs
        precip_sum_hrs=ee.List(config_features.get('dynamic')['precip_sum_h'])
        precip_sum_img=cat_list(precip_sum_hrs.map(
            lambda h:array_stack(
                nasa_gpm.select('precipitation'),h,ee.Reducer.sum(),
                ee.String('precip_mm_').cat(ee.Number(h).format()).cat('h_sum'))))
                
        # derive hourly precipitation maxes over past 6, 12, 24 hrs
        precip_max_hrs=ee.List(config_features.get('dynamic')['precip_max_h'])
        precip_max_img=cat_list(precip_max_hrs.map(
            lambda h:array_stack(
                nasa_gpm.select('precipitation'),h,ee.Reducer.max(),
                ee.String('precip_mm_').cat(ee.Number(h).format()).cat('h_max'))))

        # derive the number of half hours for 72 hours before timestamp where precipitation exceeded 5mm (hits)
        precip_hits_hrs=ee.List(config_features.get('dynamic')['precip_hits_h'])
        def precip_hits_stack(hrs:int):
            hrs=ee.Number(hrs)
            return ee.ImageCollection.fromImages(
                timestamps.map(lambda ts:nasa_gpm.filterDate(ee.Number(ts).subtract(hrs.multiply(3600000)),ee.Number(ts)).map(lambda img:img.multiply(0.5).gt(10).rename('hit').int().updateMask(img.mask())).sum())
            ).toBands().toArray().rename([ee.String('precip_hits_').cat(hrs.format()).cat('h_arr')])
        precip_hits_img=cat_list(precip_hits_hrs.map(precip_hits_stack))

        # derive soil runoff (surface & subsurface) over past 1, 6, 24, 72 hrs 
        runoff_hrs=ee.List(config_features.get('dynamic')['runoff_h'])
        surface_runoff_sum_img=cat_list(runoff_hrs.map(
            lambda h:array_stack(
                copernicus_era5.select('surface_runoff'),h,ee.Reducer.sum(),
                ee.String('surface_runoff_').cat(ee.Number(h).format()).cat('h_sum'))))
        subsurface_runoff_sum_img=cat_list(runoff_hrs.map(
            lambda h:array_stack(
                copernicus_era5.select('sub_surface_runoff'),h,ee.Reducer.sum(),
                ee.String('subsurface_runoff_').cat(ee.Number(h).format()).cat('h_sum'))))
        
        # derive mean hourly volumetric soil water content (soil moisture) over past 24, 72 hours @ depth levels 1 (0-7cm), 4 (100-289cm); (4 bands total)
        sm_hrs=ee.List(config_features.get('dynamic')['soil_moist_mean_h'])
        sm_levels=ee.List(config_features.get('vsw_levels'))
        sm_labels=ee.List(config_features.get('vsw_labels'))

        # stack images by depth
        def _vsw_mean_depthwise(level:str,label:str):
            level=ee.Number(level);label=ee.String(label)
            depth_ic=copernicus_era5.select(ee.String('volumetric_soil_water_layer_').cat(level.format()))
            return ee.Image.cat(sm_hrs.map(lambda h:array_stack(
                depth_ic,h,ee.Reducer.mean(),
                ee.String('soil_moist_').cat(label).cat('_').cat(ee.Number(h).format()).cat('h_mean'))))
        sm_depths_img=cat_list(sm_levels.zip(sm_labels).map(lambda pair:_vsw_mean_depthwise(ee.List(pair).get(0),ee.List(pair).get(1))))
        
        # derive soil moisture anomaly (z-score) and trend (slope of linear fit) over past 7, 90 days @ depth levels 1 (0-7cm), 4 (100-289cm)
        sm_levels=ee.List(config_features.get('vsw_levels'))
        sm_labels=ee.List(config_features.get('vsw_labels'))
        sm_anom_days=ee.List(config_features.get('dynamic')['soil_moist_anom_d'])
        sm_trend_days=ee.List(config_features.get('dynamic')['soil_moist_trend_d'])

        def _vsw_daily_for_level(level):
            level=ee.Number(level)
            band=ee.String('volumetric_soil_water_layer_').cat(level.format())
            hourly=copernicus_era5.select(band).filterDate(ee.Date(ee.Number(timestamps.reduce(ee.Reducer.min()))).advance(ee.Number(sm_anom_days.reduce(ee.Reducer.max())).max(ee.Number(sm_trend_days.reduce(ee.Reducer.max()))).multiply(-1),'day'),ee.Date(ee.Number(timestamps.reduce(ee.Reducer.max()))))
            # daily mean with band name 'sm'
            return hourly_to_daily_mean(hourly).select([0],['sm'])

        # soil anomaly (z score)
        def _vsw_anom_depthwise(level,label):
            level=ee.Number(level)
            label=ee.String(label)
            vsw_daily=_vsw_daily_for_level(level)
            def _stack_for_days(days):
                days=ee.Number(days)
                def _z_for_ts(ts):
                    ts=ee.Number(ts)
                    stats=vsw_daily.filterDate(ee.Date(ts.subtract(days.multiply(86400000))),ee.Date(ts)).reduce(ee.Reducer.mean().combine(ee.Reducer.stdDev(),'_',True))
                    today=vsw_daily.filterDate(ee.Date(ts).advance(-1,'day'),ee.Date(ts)).first()
                    # handle potential empty today window defensively
                    today=ee.Image(ee.Algorithms.If(today,today,ee.Image.constant(0).rename('sm')))
                    return ee.Image(today).subtract(stats.select('sm_mean')).divide(stats.select('sm__stdDev').max(1e-6))
                stack=ee.ImageCollection.fromImages(timestamps.map(_z_for_ts)).toBands().toArray()
                return stack.rename([ee.String('soil_moist_anom_').cat(days.format()).cat('d_').cat(label).cat('_arr')])
            return ee.Image.cat(sm_anom_days.map(_stack_for_days))
        sm_anom_img=cat_list(sm_levels.zip(sm_labels).map(lambda pair:_vsw_anom_depthwise(ee.List(pair).get(0),ee.List(pair).get(1))))

        # soil trend (linear slope)
        def _vsw_trend_depthwise(level,label):
            level=ee.Number(level)
            label=ee.String(label)
            def _stack_for_days(days):
                days=ee.Number(days)
                def _slope_for_ts(ts):
                    ts=ee.Number(ts)
                    start=ts.subtract(days.multiply(86400000))
                    def _add_t(img):
                        img=ee.Image(img).rename('sm')
                        return img.addBands(ee.Image.constant(ee.Date(img.get('system:time_start')).difference(ee.Date(start),'day')).rename('t').toFloat()).select(['t','sm'])
                    seq=_vsw_daily_for_level(level).filterDate(ee.Date(start),ee.Date(ts)).map(_add_t)
                    return ee.ImageCollection(seq).reduce(ee.Reducer.linearFit()).select('scale')
                stack=ee.ImageCollection.fromImages(timestamps.map(_slope_for_ts)).toBands().toArray()
                return stack.rename([ee.String('soil_moist_trend_').cat(days.format()).cat('d_').cat(label).cat('_arr')])
            return ee.Image.cat(sm_trend_days.map(_stack_for_days))
        sm_trend_img=cat_list(sm_levels.zip(sm_labels).map(lambda pair:_vsw_trend_depthwise(ee.List(pair).get(0),ee.List(pair).get(1))))

        # derive sum of potential evaporation/evapotranspiration over antecedent 7d window
        pev_days=ee.Number(config_features.get('dynamic')['pev_sum_d'])
        pev_hrs=pev_days.multiply(24)
        pev_sum_arr=array_stack(
            copernicus_era5.select('potential_evaporation_hourly'),pev_hrs,ee.Reducer.sum(),
            ee.String('pev_sum_').cat(pev_days.format()).cat('d'))
        
        # derive moisture deficit calculated by the ReLU of potential evaporation minus precipitation over antecedent 7d window
        precip_sum_pev=array_stack( # calculating 7 day precipitation sum (only for moisture deficit calculation; not returned)
            nasa_gpm.select('precipitation'),pev_hrs,ee.Reducer.sum(),
            ee.String('precip_mm_').cat(pev_hrs.format()).cat('h_sum_for_pev'))
        moisture_deficit_img=pev_sum_arr.subtract(precip_sum_pev).rename([ee.String('moisture_deficit_').cat(pev_days.format()).cat('d_arr')])
        
        # derive percentage snow cover of the given pixel (of the era5 image) and snow depth in terms of water equivalent
        snow_hrs=ee.List(config_features.get('dynamic')['snow_h'])
        snow_cover_img=cat_list(snow_hrs.map(
            lambda h:array_stack(
                copernicus_era5.select('snow_cover'),h,ee.Reducer.sum(),
                ee.String('snow_cover_').cat(ee.Number(h).format()).cat('h_sum'))))
        snow_depth_img=cat_list(snow_hrs.map(
            lambda h:array_stack(
                copernicus_era5.select('snow_depth_water_equivalent'),h,ee.Reducer.sum(),
                ee.String('snow_depth_').cat(ee.Number(h).format()).cat('h_sum'))))

        # day of year trig encoding before passing on to static so that model is somewhat temporally aware
        tau=ee.Number(2*math.pi)
        def _angle_for_ts(ts):
            d=ee.Date(ts)
            doy=d.difference(d.getRange('year').start(),'day').add(1)
            return doy.divide(365.25).multiply(tau)
        doy_sin_arr=ee.ImageCollection.fromImages(
            timestamps.map(lambda ts:ee.Image.constant(_angle_for_ts(ts).sin()))
        ).toBands().toArray().rename(['doy_sin_arr'])
        doy_cos_arr=ee.ImageCollection.fromImages(
            timestamps.map(lambda ts:ee.Image.constant(_angle_for_ts(ts).cos()))
        ).toBands().toArray().rename(['doy_cos_arr'])
        
        # concatenate & return dynamic img stack
        dynamic_array_img=ee.Image.cat([
            precip_sum_img,
            precip_max_img,
            precip_hits_img,
            surface_runoff_sum_img,
            subsurface_runoff_sum_img,
            sm_depths_img,
            sm_anom_img,
            sm_trend_img,
            pev_sum_arr,
            moisture_deficit_img,
            snow_cover_img,
            snow_depth_img,
            doy_sin_arr,
            doy_cos_arr])
        return dynamic_array_img
    
    def phi_static(timestamp:ee.Number=timestamp):

        # terrain slope in degrees
        terrain_slope=ee.Terrain.slope(nasa_dem).rename('slope_deg')

        # terrain aspect sin & cos
        aspect_deg=ee.Terrain.aspect(nasa_dem)

        aspect_sin=aspect_deg.multiply(math.pi/180).sin().rename('aspect_sin')
        aspect_cos=aspect_deg.multiply(math.pi/180).cos().rename('aspect_cos')

        # terrain ruggedness
        kernel=ee.Kernel.square(radius=1,units='pixels',normalize=False) # kernel for ruggedness
        tri=nasa_dem.neighborhoodToBands(kernel).subtract(nasa_dem).pow(2).reduce(ee.Reducer.sum()).sqrt().rename('tri') # tri (terrain ruggedness index)

        # topographic wedness index
        proj=upa.projection()
        sca=upa.multiply(1e6).divide(ee.Image.pixelArea().reproject(proj).sqrt()).max(1) # specific catchment area
        tan_slope=ee.Terrain.slope(nasa_dem.reproject(proj)).multiply(math.pi/180).tan().max(1e-6) # tangent of slope
        twi=sca.divide(tan_slope).log().rename('twi') # twi (topographic wetness index) derived from sca (specific catchment area) & tan of slope in rad 

        # laplacian curvature (signed second derivative of elevation) derived via built-in laplacian kernel
        dem_window=nasa_dem.clip(ee.Geometry(region).buffer(SCALE_METRIC.multiply(8)))
        base=ee.Image(1).rename('base').clip(ee.Geometry(region).buffer(SCALE_METRIC.multiply(8)))
        curvature=dem_window.convolve(ee.Kernel.laplacian4()).divide(ee.Image.pixelArea()).rename('curvature_laplace').updateMask(base.mask())

        # sand and clay content (4 bands each for each depth)
        sc_depths=ee.List(config_features.get('static')['sc_depths'])
        sand_imgs=sc_depths.map(lambda d:openlandmap_sand.select(ee.String(d)).rename([ee.String('sand_content_').cat(ee.String(d))]))
        sand_pc=cat_list(sand_imgs)
        clay_imgs=sc_depths.map(lambda d:openlandmap_clay.select(ee.String(d)).rename([ee.String('clay_content_').cat(ee.String(d))]))
        clay_pc=cat_list(clay_imgs)
        
        # static volumetric soil water content at 0-5cm and 5-15cm depth at 10kPa, 33kPa, & 1500kPa suctions (6 bands)
        depths=ee.List(config_features.get('static')['vwc_depths'])
        quant=ee.String(config_features.get('static')['vwc_quantile'])
        def _vwc_for(img:ee.Image,suction:ee.String):return ee.Image(img).select(depths.map(lambda d:ee.String('val_').cat(ee.String(d)).cat('_').cat(quant)),depths.map(lambda d: ee.String('vwc_kPa').cat(suction).cat('_').cat(ee.String(d))))
        vwc=ee.Image.cat([
            _vwc_for(vwc_0033kPa,ee.String('0033')),
            _vwc_for(vwc_1500kPa,ee.String('1500'))])
        
        # derive z score of ndvi over antecedent 96d window - technically variable over time but can still be treated as static because of >1d cadence
        ndvi_days=config_features.get('static')['ndvi_anom_d']
        def _ndvi_anom(timestamp:ee.Number,days:int):
            days=ee.Number(days)
            # get 16-day interval sequence of ndvi data from modis
            sequence=nasa_modis_terraveg.filterDate(timestamp.subtract(days.multiply(86400000)),timestamp).select('NDVI')
            # calculate and return z score based on mean & stdev of ndvi
            return sequence.sort('system:time_start',False).first().subtract(sequence.mean()).divide(sequence.reduce(ee.Reducer.stdDev()).max(1e-6)).resample('bilinear').rename([ee.String('ndvi_anom_').cat(days.format()).cat('d')])
        ndvi_anom=_ndvi_anom(timestamp,ndvi_days)

        # longitude-latitude trig encoding
        def coord_encoding():
            # return an encoding for coordinates so the model can be somewhat spatially conscious
            lonlat=ee.Image.pixelLonLat()
            lat_rad=lonlat.select('latitude').multiply(math.pi/180)
            lon_rad=lonlat.select('longitude').multiply(math.pi/180)
            return ee.Image.cat(lat_rad.sin().rename('lat_sin'),lat_rad.cos().rename('lat_cos'),lon_rad.sin().rename('lon_sin'),lon_rad.cos().rename('lon_cos'))
        lonlat_trig=coord_encoding() # sin and cos of longitude and latitute each for spatial encoding (4 bands)
        
        # stack all static bands and sample (also added esa_worldcover which doesn't require any processing), return
        static_img=ee.Image.cat([
            terrain_slope,
            aspect_sin,
            aspect_cos,
            tri,
            twi,
            curvature,
            sand_pc,
            clay_pc,
            esa_worldcover,
            vwc,
            ndvi_anom,
            lonlat_trig])
        return static_img
    
    # concatenate dynamic & static images
    dynamic_arr_img=phi_dynamic()
    static_img=phi_static()
    img=ee.Image.cat([dynamic_arr_img,static_img])
    
    # figure out which channels to keep (like the array-based ones in favor of collapsed ones as well as timestamps, u values, and other helper properties)
    arr_names=dynamic_arr_img.bandNames().filter(ee.Filter.stringEndsWith('item','_arr'))
    scalar_names=arr_names.map(lambda n:ee.String(n).replace('_arr$',''))
    dynamic_non_array_names=dynamic_arr_img.bandNames().filter(ee.Filter.stringEndsWith('item','_arr').Not())
    to_keep=ee.List(scalar_names.cat(dynamic_non_array_names).cat(static_img.bandNames()))

    # sample final image and collapse array properties
    samples=img.sampleRegions(collection=subsamples,scale=SCALE_METRIC,tileScale=config_features.get('tileScale'),geometries=False)
    collapsed=array_collapse(samples,arr_names)
    # compute featurs as a dataframe, return
    dF=ee.data.computeFeatures({'expression':collapsed.select(to_keep),'fileFormat':'PANDAS_DATAFRAME'}).drop('geo',errors='ignore',axis=1)
    
    # new snow flag feature construction, identical to the one in features.ipynb
    snow_covers=[col for col in dF.columns if 'snow_cover' in col]
    snow_depths=[col for col in dF.columns if 'snow_depth' in col]
    snow_flag=(dF[snow_covers].sum(axis=1)>0)|(dF[snow_depths].sum(axis=1)>0)
    dF=dF.drop(columns=snow_covers+snow_depths) # drop original snow channels
    dF['snow_flag']=snow_flag.astype(int)

    # this is also new; moving snow flag to the end of the dataframe to match the PCA ordering done originally, also using ftk before that to perform data normalizations
    # also mostly copied from features.ipynb 
    dF2M=ftk.transform_full(dF,[],ignore,config_full.get('features')['path']['spec'])[0] # transform using ftk
    pca_model=joblib.load(config_path.get('pca_persist')) # load in pca model from earlier in this notebook
    return pd.DataFrame(pca_model.transform(dF2M.astype('float32',copy=False))).add_prefix('pca_'),subsamples # transform normalized features, return as dataframe (also returning, in this version, the subsamples featurecollection for visualization)

## Inference

In [138]:
# initialize ATLAS
atlasv2=ATLAS(dF_pca,merged_splits,H,sf_id=1)

# fit once and save calibrator then plot evaluation chart:
atlasv2.fit_logit_calibrator(save_path=os.path.join(config_path.get('models'), f'logit_sf{atlasv2.sf_id}.joblib'))

ATLAS: loaded superfold 1 @ database/models/rff_lgcp_sf1.weights.h5
[logit] fitted: R=σ(0.9800·μ+0.9339)


2025-10-30 17:08:34.972268: E tensorflow/core/framework/node_def_util.cc:680] NodeDef mentions attribute use_unbounded_threadpool which is not in the op definition: Op<name=MapDataset; signature=input_dataset:variant, other_arguments: -> handle:variant; attr=f:func; attr=Targuments:list(type),min=0; attr=output_types:list(type),min=1; attr=output_shapes:list(shape),min=1; attr=use_inter_op_parallelism:bool,default=true; attr=preserve_cardinality:bool,default=false; attr=force_synchronous:bool,default=false; attr=metadata:string,default=""> This may be expected if your graph generating binary is newer  than this binary. Unknown attributes will be ignored. NodeDef: {{node ParallelMapDatasetV2/_14}}


0,1,2
,penalty,'l2'
,dual,False
,tol,0.0001
,C,1.0
,fit_intercept,True
,intercept_scaling,1
,class_weight,
,random_state,
,solver,'lbfgs'
,max_iter,500


### Example Usage
see: 
https://en.wikipedia.org/wiki/2020_Pettimudi_landslide & 
https://www.thehindu.com/specials/text-and-context/landslides-in-pettimudi-social-inequalities-in-disasters/article65939327.ece

In [145]:
point=(10.01776704920028,76.97837079702704) # pettimudi, idukki, india
date='6 August 2020' # the model should find a 47% chance of landslide within the next hour which would have exceeded the action threshold and triggered a warning beforehand

In [144]:
output0=phi(pointer=ee.Geometry.Point(point[1],point[0]),timestamp=ee.Number(int(datetime.strptime(date,'%d %B %Y %H:%M' if ':' in date else '%d %B %Y').replace(tzinfo=timezone.utc).timestamp()*1000)),radius=ee.Number(17),K=64)
testdF=output0[0] # only getting risk score
atlasv2.risk_score(testdF,17**2*math.pi,3600)[1] # feature dF (from phi), window area (17km radius), window interval (1h, in seconds), select only 1st element (risk score, since 0th element returns lambda absolute intensity)

2025-10-30 18:11:28.632743: E tensorflow/core/framework/node_def_util.cc:680] NodeDef mentions attribute use_unbounded_threadpool which is not in the op definition: Op<name=MapDataset; signature=input_dataset:variant, other_arguments: -> handle:variant; attr=f:func; attr=Targuments:list(type),min=0; attr=output_types:list(type),min=1; attr=output_shapes:list(shape),min=1; attr=use_inter_op_parallelism:bool,default=true; attr=preserve_cardinality:bool,default=false; attr=force_synchronous:bool,default=false; attr=metadata:string,default=""> This may be expected if your graph generating binary is newer  than this binary. Unknown attributes will be ignored. NodeDef: {{node ParallelMapDatasetV2/_14}}


0.47112528681848864

In [141]:
# saving example for later for display & vis
savename='pettimudi_6-9-20'
dir_path=os.path.join('database/outputs/examples',savename)
os.makedirs(dir_path,exist_ok=True)

# saving both risk assessment score and points featurecollection from inference step
dFoutput,fc=output0 # phi output
risk=atlasv2.risk_score(dFoutput,17**2*math.pi,3600)[1]
with open(os.path.join(dir_path,'tuple.pkl'),'wb') as f:pickle.dump(risk,f,pickle.HIGHEST_PROTOCOL)
with open(os.path.join(dir_path,'featurecollection.json'),'w') as f:json.dump(fc.getInfo(),f)

print(f'saved to: {dir_path}')

saved to: database/outputs/examples/pettimudi_6-9-20
