In [4]:
import dask.dataframe as dd
import dask.array as da
import dask_ml.model_selection as dcv
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
# from dask_cuda import LocalCUDACluster
from dask.distributed import Client
import xgboost as xgb
import pandas as pd
import matplotlib.pyplot as plt
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '0'

# Set up Dask client to use multiple GPUs. Note to set temp space
local_directory='/explore/nobackup/people/spotter5/temp_dir',
client = Client(
                n_workers=1, threads_per_worker=1, processes=True, memory_limit='28GB')


# Set up a Dask cluster that assigns each worker to a separate GPU
# cluster = LocalCUDACluster(
#     n_workers=2,  # Number of GPUs you have
#     threads_per_worker=1,  # One thread per worker
#     memory_limit='28GB',  # Set memory limit for each worker
#     local_directory='/explore/nobackup/people/spotter5/temp_dir'  # Temporary directory
# )

# Connect to the Dask client
# client = Client(cluster)
# Create an output directory if it doesn't exist
out_path = '/explore/nobackup/people/spotter5/cnn_mapping/nbac_training/xgboost'
os.makedirs(out_path, exist_ok=True)

# Load the data as a Dask DataFrame
# df = dd.read_parquet('/explore/nobackup/people/spotter5/cnn_mapping/nbac_training/parquet_files/all_training_na.parquet', 
#                      columns=['dNBR', 'dNDVI', 'dNDII', 'y'])

# # Shuffle the dataframe (keeps it in Dask format)
# df = df.shuffle(on='dNBR')

# # Sample 100,000 rows from the Dask DataFrame
# df = df.sample(frac=100000 / len(df), random_state=42)

# sampled_out_path = os.path.join(out_path, 'sampled_100k.parquet')
# df.to_parquet(sampled_out_path, write_index = False)
# print(f"Sampled data saved to {sampled_out_path}")

df = dd.read_parquet('/explore/nobackup/people/spotter5/cnn_mapping/nbac_training/parquet_files/all_training_anna_stratified_sampled_ndsi.parquet')
# df2 = dd.read_parquet('/explore/nobackup/people/spotter5/cnn_mapping/nbac_training/parquet_files/all_training_nbac_stratified_sampled.parquet')

# df = dd.read_parquet('/explore/nobackup/people/spotter5/cnn_mapping/nbac_training/parquet_files/all_training_anna_sampled_ndsi.parquet')
# df2 = dd.read_parquet('/explore/nobackup/people/spotter5/cnn_mapping/nbac_training/parquet_files/all_training_nbac_sampled.parquet')
# df1 = df1.repartition(npartitions=10) 
# df2 = df2.repartition(npartitions=10) 

# df = dd.concat([df1, df2])
# df = df.repartition(npartitions=10)

# Specify predictors and target variables, converting directly from Dask DataFrame to Dask Array
X = df[['dNBR', 'dNDVI', 'dNDII']].to_dask_array(lengths=True)
y = df['y'].to_dask_array(lengths=True)

# X = df[['dNBR', 'dNDVI', 'dNDII']].values
# y = df['y'].values

# Split data into training and testing sets
X_train, X_test, y_train, y_test = dcv.train_test_split(X, y, test_size=0.2, random_state=42, shuffle = True)
# X_train, X_test, y_train, y_test = dcv.train_test_split(X, y, test_size=0.2, random_state=42)

# Convert to DaskDMatrix (XGBoost specific data structure for distributed training)
dtrain = xgb.dask.DaskDMatrix(client, X_train, y_train)
dtest = xgb.dask.DaskDMatrix(client, X_test, y_test)

# Set up XGBoost parameters for GPU training
params = {
    'objective': 'binary:logistic',  # Binary classification
    'learning_rate': 0.01,
    'max_depth': 8,
    'n_estimators': 1000,
    'tree_method': 'hist',  # Use histogram-based method
    'eval_metric': 'logloss',  # Metric for binary classification
    'device': 'cuda',  # Use CUDA for GPU support
}

# params = {
#     'objective': 'binary:logistic',  # Binary classification
#     'learning_rate': 0.1,
#     'max_depth': 8,
#     'n_estimators': 1000,
#     'tree_method': 'gpu_hist',  # Use GPU-accelerated histogram algorithm
#     'eval_metric': 'logloss',
#     'predictor': 'gpu_predictor'  # Use GPU predictor
# }


# Train the model with early stopping
model = xgb.dask.train(
    client, 
    params, 
    dtrain, 
    num_boost_round=100,
    evals=[(dtest, 'test')],
    early_stopping_rounds=10
)

# Make predictions (the output will be probabilities for binary classification)
y_pred_proba = xgb.dask.predict(client, model, X_test)

# Convert predicted probabilities to binary predictions
y_pred = (y_pred_proba > 0.5).astype(int)

# Convert Dask arrays to NumPy arrays for sklearn metrics
y_pred_np = y_pred.compute()
y_test_np = y_test.compute()

# Calculate classification metrics using sklearn
accuracy = accuracy_score(y_test_np, y_pred_np)
precision = precision_score(y_test_np, y_pred_np, average='binary')
recall = recall_score(y_test_np, y_pred_np, average='binary')
f1 = f1_score(y_test_np, y_pred_np, average='binary')
# Calculate IoU using confusion matrix
cm = confusion_matrix(y_test_np, y_pred_np)
TP = cm[1, 1]  # True Positives
FP = cm[0, 1]  # False Positives
FN = cm[1, 0]  # False Negatives
IoU = TP / (TP + FP + FN)

# Print the results
print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1 Score: {f1}")
print(f"IoU: {IoU}")

# Save the classification metrics to a CSV file
results = pd.DataFrame({
    'Metric': ['Accuracy', 'Precision', 'Recall', 'F1 Score', 'IoU'],
    'Value': [accuracy, precision, recall, f1, IoU]
})

results.to_csv(os.path.join(out_path, 'xgboost_ea_stratified_sampled_results_ndsi.csv'), index=False)
print(f"Classification metrics saved to {os.path.join(out_path, 'xgboost_ea_stratified_sampled_results_ndsi.csv')}")

Perhaps you already have a cluster running?
Hosting the HTTP server on port 45419 instead
[14:49:05] Task [xgboost.dask-0]:tcp://127.0.0.1:40407 got rank 0
Parameters: { "n_estimators" } are not used.

[14:49:14] [0]	test-logloss:0.68826
[14:49:14] [1]	test-logloss:0.68346
[14:49:14] [2]	test-logloss:0.67876
[14:49:14] [3]	test-logloss:0.67416
[14:49:14] [4]	test-logloss:0.66964
[14:49:14] [5]	test-logloss:0.66521
[14:49:14] [6]	test-logloss:0.66086
[14:49:14] [7]	test-logloss:0.65660
[14:49:15] [8]	test-logloss:0.65242
[14:49:15] [9]	test-logloss:0.64831
[14:49:15] [10]	test-logloss:0.64428
[14:49:15] [11]	test-logloss:0.64033
[14:49:15] [12]	test-logloss:0.63645
[14:49:15] [13]	test-logloss:0.63265
[14:49:15] [14]	test-logloss:0.62891
[14:49:15] [15]	test-logloss:0.62524
[14:49:15] [16]	test-logloss:0.62164
[14:49:16] [17]	test-logloss:0.61810
[14:49:16] [18]	test-logloss:0.61462
[14:49:16] [19]	test-logloss:0.61121
[14:49:16] [20]	test-logloss:0.60786
[14:49:16] [21]	test-logloss:0.

Accuracy: 0.82661075
Precision: 0.8786492619480638
Recall: 0.7578885714840009
F1 Score: 0.8138134302422567
IoU: 0.6860754041486603
Classification metrics saved to /explore/nobackup/people/spotter5/cnn_mapping/nbac_training/xgboost/xgboost_ea_stratified_sampled_results_ndsi.csv


In [3]:
results

Unnamed: 0,Metric,Value
0,Accuracy,0.917146
1,Precision,0.869897
2,Recall,0.50031
3,F1 Score,0.635259
4,IoU,0.46548


Above trains a small file, do it in batches

In [None]:
import dask.dataframe as dd
from dask.distributed import Client
import xgboost as xgb
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
import pandas as pd
import os

# Set up Dask client for distributed training
client = Client(n_workers=4, threads_per_worker=1, processes=True, memory_limit='28GB')

# Output directory
out_path = '/explore/nobackup/people/spotter5/cnn_mapping/nbac_training/xgboost'
os.makedirs(out_path, exist_ok=True)

# Read the Parquet directory with specific columns to control memory usage
df = dd.read_parquet('/explore/nobackup/people/spotter5/cnn_mapping/nbac_training/parquet_files/all_training_na.parquet', 
                     columns=['dNBR', 'dNDVI', 'dNDII', 'y'])

# XGBoost parameters
params = {
    'objective': 'binary:logistic',  # Binary classification
    'learning_rate': 0.1,
    'max_depth': 8,
    'eval_metric': 'logloss',  # Metric for binary classification
    'tree_method': 'hist',  # Use histogram-based method for efficient training
    'device': 'cuda',  # Use GPU for training
}

# Get the number of partitions
npartitions = df.npartitions  # Number of partitions in the dataset

# Initialize booster
booster = None

# Process data in chunks (by partition)
for i in range(npartitions):
    print(f"Processing partition {i + 1} of {npartitions}")
    
    # Compute a single partition at a time
    df_partition = df.get_partition(i).compute()

    # Split the partition into features (X) and target (y)
    X_batch = df_partition[['dNBR', 'dNDVI', 'dNDII']].values
    y_batch = df_partition['y'].values
    
    # Convert to DMatrix for XGBoost
    dtrain = xgb.DMatrix(X_batch, label=y_batch)
    
    # If this is the first partition, train the initial model
    if booster is None:
        booster = xgb.train(params, dtrain, num_boost_round=10)  # Initial training on the first batch
    else:
        # Update the model with the next partition
        booster.update(dtrain, iteration=i)

# After training on all partitions, make predictions on the test data
# Split the dataset into train and test sets
X_train, X_test, y_train, y_test = dcv.train_test_split(df[['dNBR', 'dNDVI', 'dNDII']].compute(), df['y'].compute(), test_size=0.2, random_state=42)

# Convert test data to DMatrix
dtest = xgb.DMatrix(X_test)

# Make predictions on the test set
y_pred_proba = booster.predict(dtest)

# Convert predicted probabilities to binary predictions
y_pred = (y_pred_proba > 0.5).astype(int)

# Calculate classification metrics using sklearn
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred, average='binary')
recall = recall_score(y_test, y_pred, average='binary')
f1 = f1_score(y_test, y_pred, average='binary')

# Calculate IoU using confusion matrix
cm = confusion_matrix(y_test, y_pred)
TP = cm[1, 1]  # True Positives
FP = cm[0, 1]  # False Positives
FN = cm[1, 0]  # False Negatives

# IoU = True Positives / (True Positives + False Positives + False Negatives)
IoU = TP / (TP + FP + FN)

# Print the results
print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1 Score: {f1}")
print(f"IoU: {IoU}")

# Save the classification metrics to a CSV file
results = pd.DataFrame({
    'Metric': ['Accuracy', 'Precision', 'Recall', 'F1 Score', 'IoU'],
    'Value': [accuracy, precision, recall, f1, IoU]
})

results.to_csv(os.path.join(out_path, 'xgboost_classification_results_batch.csv'), index=False)
print(f"Classification metrics saved to {os.path.join(out_path, 'xgboost_classification_results_batch.csv')}")


Processing partition 1 of 11642
Processing partition 2 of 11642
Processing partition 3 of 11642


In [1]:
import dask.dataframe as dd
from dask.distributed import Client
import xgboost as xgb
from dask_ml.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
import pandas as pd
import os

# Set up Dask client for distributed training
client = Client(local_directory='/explore/nobackup/people/spotter5/temp_dir', n_workers=4, threads_per_worker=1, processes=True, memory_limit='28GB')

# Output directory
out_path = '/explore/nobackup/people/spotter5/cnn_mapping/nbac_training/xgboost'
os.makedirs(out_path, exist_ok=True)

# Read the Parquet directory lazily (without loading it all into memory)
df = dd.read_parquet('/explore/nobackup/people/spotter5/cnn_mapping/nbac_training/parquet_files/all_training_na.parquet', 
                     columns=['dNBR', 'dNDVI', 'dNDII', 'y'])

# Split data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(df[['dNBR', 'dNDVI', 'dNDII']], df['y'], test_size=0.2, random_state=42)

# Convert to DaskDMatrix for XGBoost
dtrain = xgb.dask.DaskDMatrix(client, X_train, y_train)
dtest = xgb.dask.DaskDMatrix(client, X_test, y_test)

# XGBoost parameters
params = {
    'objective': 'binary:logistic',  # Binary classification
    'learning_rate': 0.1,
    'max_depth': 8,
    'eval_metric': 'logloss',  # Metric for binary classification
    'tree_method': 'hist',  # Use histogram-based method
    'device': 'cuda',  # Use GPU for training
}

# Dictionary to store evaluation results
evals_result = {}

# Train the model with Dask, capturing evaluation results
output = xgb.dask.train(client, params, dtrain, num_boost_round=100, evals=[(dtest, 'test')], evals_result=evals_result, verbose_eval=10)

# Get the trained booster model
booster = output['booster']

# Print loss for each iteration
for i, logloss in enumerate(evals_result['test']['logloss']):
    print(f"Iteration {i + 1}: Log Loss = {logloss}")

# Make predictions on the test set
y_pred_proba = xgb.dask.predict(client, booster, X_test)

# Convert predicted probabilities to binary predictions
y_pred = (y_pred_proba > 0.5).astype(int)

# Convert Dask arrays to NumPy arrays for sklearn metrics
y_pred_np = y_pred.compute()
y_test_np = y_test.compute()

# Calculate classification metrics using sklearn
accuracy = accuracy_score(y_test_np, y_pred_np)
precision = precision_score(y_test_np, y_pred_np, average='binary')
recall = recall_score(y_test_np, y_pred_np, average='binary')
f1 = f1_score(y_test_np, y_pred_np, average='binary')

# Calculate IoU using confusion matrix
cm = confusion_matrix(y_test_np, y_pred_np)
TP = cm[1, 1]  # True Positives
FP = cm[0, 1]  # False Positives
FN = cm[1, 0]  # False Negatives

# IoU = True Positives / (True Positives + False Positives + False Negatives)
IoU = TP / (TP + FP + FN)

# Print the results
print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1 Score: {f1}")
print(f"IoU: {IoU}")

# Save the classification metrics to a CSV file
results = pd.DataFrame({
    'Metric': ['Accuracy', 'Precision', 'Recall', 'F1 Score', 'IoU'],
    'Value': [accuracy, precision, recall, f1, IoU]
})

results.to_csv(os.path.join(out_path, 'xgboost_classification_results_batch.csv'), index=False)
print(f"Classification metrics saved to {os.path.join(out_path, 'xgboost_classification_results_batch.csv')}")


This may cause some slowdown.
Consider scattering data ahead of time and using futures.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.


TypeError: train() got an unexpected keyword argument 'evals_result'

In [2]:
import dask.dataframe as dd
# Read the data in chunks using Dask DataFrame (partitioning the data to avoid loading the full dataset into memory)
df = dd.read_parquet('/explore/nobackup/people/spotter5/cnn_mapping/nbac_training/parquet_files/all_training_na.parquet', 
                     columns=['dNBR', 'dNDVI', 'dNDII', 'y'])
df.npartitions

11642

In [19]:
't'

't'

In [3]:
import xgboost as xgb
print(xgb.__version__)
from xgboost import DMatrix

# Check if GPU support is available
params = {'tree_method': 'gpu_hist'}
dtrain = DMatrix([[1, 2], [3, 4]], label=[1, 0])
bst = xgb.train(params, dtrain, num_boost_round=2)

print("GPU support is enabled.")


2.1.1



    E.g. tree_method = "hist", device = "cuda"



GPU support is enabled.


In [1]:
import pandas as pd
df = pd.read_parquet('/explore/nobackup/people/spotter5/cnn_mapping/nbac_training/parquet_files/all_training_anna_ndsi.parquet',   columns=['dNBR', 'dNDVI', 'dNDII', 'y'])

# df.shape

df = df.sample(n = 100000000)

df.to_parquet('/explore/nobackup/people/spotter5/cnn_mapping/nbac_training/parquet_files/all_training_anna_sampled.parquet', index = False)

df.shape

(100000000, 4)

In [2]:
import pandas as pd
df = pd.read_parquet('/explore/nobackup/people/spotter5/cnn_mapping/nbac_training/parquet_files/all_training_anna_ndsi.parquet',   columns=['dNBR', 'dNDVI', 'dNDII', 'y'])


# Sample 100,000,000 rows from each class (y=0 and y=1)
df_0 = df[df['y'] == 0].sample(n=int(100000000/2), random_state=42)
df_1 = df[df['y'] == 1].sample(n= int(100000000/2), random_state=42)

# Concatenate the stratified samples
df_stratified = pd.concat([df_0, df_1])

# Shuffle the DataFrame (optional)
df_stratified = df_stratified.sample(frac=1, random_state=42)

# Save the stratified sample to a new parquet file
df_stratified.to_parquet('/explore/nobackup/people/spotter5/cnn_mapping/nbac_training/parquet_files/all_training_anna_stratified_sampled_ndsi.parquet', 
                         index=False)

In [6]:
results

NameError: name 'results' is not defined