In [1]:
import os, sys
import numpy as np
import matplotlib.pyplot as plt
import warnings
import copy
import time
import fnmatch
import pickle
from dask_ml.wrappers import ParallelPostFit
warnings.filterwarnings('ignore')

NP2GDAL_CONVERSION = {
  "byte": "uint8"
}

In [24]:
import xarray as xr

from dask.distributed import Client, progress
c = Client()
c

0,1
Client  Scheduler: tcp://dask-scheduler:8786  Dashboard: http://dask-scheduler:8787/status,Cluster  Workers: 30  Cores: 90  Memory: 1.57 TB


In [4]:
infolder = '/eos/jeodpp/data/projects/CSPI/Experiments_May2019/Reduced_feature_set/Training/'

In [5]:
Train_X = np.int8(np.genfromtxt(os.path.join(infolder,'Training_reduced_features.csv'), delimiter=','))
Train_X.shape

(129014, 15694)



In [19]:
Train_X.nbytes / 1024**3

1.8856913931667805

In [6]:
Train_Y = np.int8(np.genfromtxt(os.path.join(infolder,'Training_classes_reduced_features.csv'), delimiter=','))
Train_Y.shape

(129014,)

In [None]:
xchunk = 5120
ychunk = 5120
data = xr.open_rasterio(os.path.join(infolder, 'features.vrt'), chunks={'band': 1, 'x': xchunk, 'y': ychunk})
#data = xr.open_rasterio(os.path.join(infolder, 'features_tile20.vrt'), chunks={'band': 1, 'x': xchunk, 'y': ychunk})
data = data.stack(z=['y', 'x'])
data = data.transpose('z', 'band')
data

In [7]:
from sklearn.model_selection import StratifiedKFold
skf = StratifiedKFold(n_splits=3, shuffle=True, random_state=7)
print(skf.get_n_splits(Train_X, Train_Y))
for train_index, test_index in skf.split(Train_X, Train_Y):
    print("TRAIN:", train_index, "TEST:", test_index)
    X_train, X_test = Train_X[train_index,:], Train_X[test_index,:]
    Y_train, Y_test = Train_Y[train_index], Train_Y[test_index]

3
TRAIN: [     1      3      5 ... 129008 129010 129011] TEST: [     0      2      4 ... 129009 129012 129013]
TRAIN: [     0      1      2 ... 129011 129012 129013] TEST: [     3      7      8 ... 129005 129008 129010]
TRAIN: [     0      2      3 ... 129010 129012 129013] TEST: [     1      5     11 ... 129003 129006 129011]


In [25]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.externals.joblib import parallel_backend
with parallel_backend('dask'):
    param_grid = { 
                'n_estimators': [60, 100, 150],
                'max_features': ['auto', 'sqrt', 'log2'],
                'max_depth' : [3,5,7,9, None],
                'min_samples_leaf': [3,4,8,10,12],
                'bootstrap': [0, 1],
                'min_samples_split': [3, 5, 7],
                'criterion' : ['gini', 'entropy']
                }
    clf = GridSearchCV(estimator=RandomForestClassifier(n_jobs=10), param_grid=param_grid, cv=skf, scoring='f1', n_jobs=100, verbose=1)
    clf.fit(Train_X, Train_Y)

Fitting 3 folds for each of 2700 candidates, totalling 8100 fits


[Parallel(n_jobs=100)]: Using backend DaskDistributedBackend with 90 concurrent workers.
[Parallel(n_jobs=100)]: Done  20 tasks      | elapsed: 49.9min


KilledWorker: ('_fit_and_score-batch-f56aa68d481a46069b1127a974fa8507', <Worker 'tcp://10.42.3.241:33759', memory: 0, processing: 85>)

In [None]:
pickle.dump(clf, open('RandomForest_redfeat', 'wb'))

In [11]:
#clf = pickle.load(open('/home/testproc/DEMO/TESTDATA/RFmodel_1x1_scenario2', 'rb'))

In [8]:
t = time.time()
clf_p = ParallelPostFit(estimator=clf, scoring='accuracy')
Class = clf_p.predict(data)
print('Elapsed time for classification: %.02f sec' % (time.time() - t))

Elapsed time for training: 2941.52 sec


In [15]:
import dask_ml.model_selection as dcv
from sklearn.ensemble import RandomForestClassifier
t = time.time()
param_grid = { 
                'n_estimators': [60, 100, 150],
                'max_features': ['auto', 'sqrt', 'log2'],
                'max_depth' : [3,5,7,9, None],
                'min_samples_leaf': [3,4,8,10,12],
                'bootstrap': [0, 1],
                'min_samples_split': [3, 5, 7],
                'criterion' : ['gini', 'entropy']
                }
clf = dcv.GridSearchCV(estimator=RandomForestClassifier(), param_grid=param_grid, cv=skf, scoring='f1', n_jobs=-1, scheduler=c)
clf.fit(Train_X, Train_Y)
print('Elapsed time for training: %.02f sec' % (time.time() - t))

KilledWorker: ("('randomforestclassifier-fit-score-a603c7c687c83813486d58da762cca2d', 1735, 0)", <Worker 'tcp://10.42.2.83:45553', memory: 0, processing: 2181>)

In [20]:
from dask_ml.wrappers import ParallelPostFit
t = time.time()
param_grid = { 
                'n_estimators': [60, 100, 150],
                'max_features': ['auto', 'sqrt', 'log2'],
                'max_depth' : [3,5,7,9, None],
                'min_samples_leaf': [3,4,8,10,12],
                'bootstrap': [0, 1],
                'min_samples_split': [3, 5, 7],
                'criterion' : ['gini', 'entropy']
                }
clf = ParallelPostFit(estimator=dcv.GridSearchCV(RandomForestClassifier(), param_grid, cv=skf), scoring='f1')
clf.fit(Train_X, Train_Y)
print('Elapsed time for training: %.02f sec' % (time.time() - t))

KilledWorker: ("('randomforestclassifier-fit-score-a603c7c687c83813486d58da762cca2d', 1658, 2)", <Worker 'tcp://10.42.2.83:39303', memory: 0, processing: 202>)

In [9]:
from dask_ml.wrappers import ParallelPostFit
from sklearn.model_selection import GridSearchCV
t = time.time()
param_grid = { 
            'n_estimators': [50, 75, 100, 130],
            'max_features': ['auto', 'sqrt', 'log2'],
            'max_depth': [4,5,6,8,12,None],
            'min_samples_leaf': [1,2,3,4,8,10,12],
            'bootstrap': [0, 1],
            'min_samples_split': [3, 5, 7],
            'criterion': ['gini', 'entropy']
            }
clf = ParallelPostFit(estimator=GridSearchCV(RandomForestClassifier(), param_grid, cv=3), scoring='accuracy')
clf.fit(X_Train_1x1, Y_Train_1x1)
print('Elapsed time for training: %.02f sec' % (time.time() - t))

Elapsed time for training: 42065.23 sec


In [12]:
from dask_ml.wrappers import ParallelPostFit
from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import Pipeline
from sklearn import preprocessing
t = time.time()
Pipeline = Pipeline([('Scaler', preprocessing.StandardScaler()), ('RFclf', RandomForestClassifier())])
clf = ParallelPostFit(estimator=Pipeline, scoring='accuracy')
clf.fit(X_Train_1x1, Y_Train_1x1)
print('Elapsed time for training: %.02f sec' % (time.time() - t))

Elapsed time for training: 0.48 sec


In [13]:
t = time.time()
Class = clf.predict(data)
print('Elapsed time for classification: %.02f sec' % (time.time() - t))

Elapsed time for classification: 823.46 sec
