In [13]:
from dask_jobqueue import SLURMCluster

# Compose SLURM script
cluster = SLURMCluster(queue='caslake', cores=10, memory='40GB', 
                       processes=10, walltime='01:00:00', interface='ib0',
                       job_extra=['--account=macs30123']
                      )

# Request resources
cluster.scale(jobs=1)

Perhaps you already have a cluster running?
Hosting the HTTP server on port 40537 instead


In [14]:
from dask.distributed import Client

client = Client(cluster)
client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.SLURMCluster
Dashboard: http://172.25.0.66:40537/status,

0,1
Dashboard: http://172.25.0.66:40537/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://172.25.0.66:38955,Workers: 0
Dashboard: http://172.25.0.66:40537/status,Total threads: 0
Started: Just now,Total memory: 0 B


In [15]:
import dask.dataframe as dd
from dask_ml.model_selection import train_test_split, IncrementalSearchCV
from sklearn.linear_model import SGDClassifier, SGDRegressor
from sklearn.metrics import classification_report, ConfusionMatrixDisplay, mean_squared_error
import matplotlib.pyplot as plt
import numpy as np

# ---------- Step 1: Load Data ----------
df = dd.read_parquet("data/hmda_filtered_2018_2023.parquet")

# ---------- Step 2: Prepare features ----------
features = [
    'loan_amount',
    'income',
    'debt_to_income_ratio',
    'applicant_age',
    'derived_race',
    'derived_ethnicity',
    'loan_type'
]

# Step 2a: Categorize categorical columns BEFORE using .cat.codes
df = df.categorize(columns=['derived_race', 'derived_ethnicity', 'loan_type'])

# Step 2b: Convert numeric columns
for col in ['loan_amount', 'income']:
    df[col] = dd.to_numeric(df[col], errors='coerce')

# Step 2c: Apply .cat.codes ONLY (no astype!)
for col in ['derived_race', 'derived_ethnicity', 'loan_type']:
    df[col] = df[col].cat.codes


In [None]:
# Variables to check for missing values
columns_to_check = features + ['action_taken']

# Compute the proportion of missing values for each variable (NaN count / total rows)
total_rows = df.shape[0].compute()

missing_stats = (
    df[columns_to_check]
    .isna()
    .sum()
    .compute()
    .sort_values(ascending=False)
    / total_rows
)

# Convert to DataFrame for easier viewing
missing_df = missing_stats.reset_index()
missing_df.columns = ['variable', 'missing_rate']
missing_df['missing_rate'] = (missing_df['missing_rate'] * 100).round(2).astype(str) + '%'

# Print the results
print(missing_df)


               variable missing_rate
0         applicant_age       90.38%
1  debt_to_income_ratio       72.04%
2                income       13.38%
3           loan_amount         0.0%
4          derived_race         0.0%
5     derived_ethnicity         0.0%
6             loan_type         0.0%
7          action_taken         0.0%


In [None]:
# ---------- Step 3: Binary Classification: Predict rejection ----------

# Ensure the target label column exists
df['label_reject'] = df['action_taken'].isin(['3', '7']).astype(int)

# Drop rows with missing values in features and label
df_model = df[features + ['label_reject']].dropna()

# Output the number of valid samples
print("Valid sample size:", df_model.shape[0].compute())

# Construct X and y arrays for classification
X_cls = df_model[features].to_dask_array(lengths=True)
y_cls = df_model['label_reject'].to_dask_array(lengths=True)


In [None]:
from sklearn.model_selection import train_test_split as sklearn_split

# Note: Convert Dask arrays to NumPy arrays
X_np = X_cls.compute()
y_np = y_cls.compute()

# Split data using scikit-learn
X_train, X_test, y_train, y_test = sklearn_split(X_np, y_np, train_size=0.7, random_state=42)

# Handle class imbalance by computing class weights
class_counts = df.groupby('label_reject').count().compute()
w = class_counts.iloc[0, 0] / class_counts.iloc[1, 0]

# Set up classifier and hyperparameter search
clf = SGDClassifier(class_weight={0: 1, 1: w})
params = {
    'alpha': np.logspace(-4, 0, num=20),
    'loss': ['log_loss', 'hinge'],
    'average': [True, False]
}
search_cls = IncrementalSearchCV(clf, params)
search_cls.fit(X_train, y_train, classes=[0, 1])

# Evaluate performance
print("Best score (train):", search_cls.best_score_)
print("Test accuracy:", search_cls.best_estimator_.score(X_test, y_test))

# Display confusion matrix
y_pred = search_cls.best_estimator_.predict(X_test)
ConfusionMatrixDisplay.from_predictions(y_test.compute(), y_pred.compute())
plt.title("Rejection Classifier")
plt.show()



    * Use InverseDecaySearchCV to use `decay_rate`
    * Specify decay_rate=None


  warn(


KilledWorker: ("('array-15a9e2799977d47a8b6f4294b3d70649', 0, 0)", <WorkerState 'tcp://172.25.2.135:42039', name: SLURMCluster-0-6, status: closed, memory: 0, processing: 1>)

In [None]:
# ---------- Step 4: Regression: Predict rate_spread ----------

# Note: 'rate_spread' has many missing values, filter them out first
df_reg = df[features + ['rate_spread']].dropna()
df_reg['rate_spread'] = dd.to_numeric(df_reg['rate_spread'], errors='coerce')

# Prepare Dask arrays for regression
X_reg = df_reg[features].to_dask_array(lengths=True)
y_reg = df_reg['rate_spread'].to_dask_array(lengths=True)

# Split into training and testing sets
X_train_r, X_test_r, y_train_r, y_test_r = train_test_split(X_reg, y_reg, train_size=0.7, random_state=42)

# Set up SGD regressor and parameter search
reg = SGDRegressor()
params_r = {
    'alpha': np.logspace(-4, 0, num=20),
    'loss': ['squared_error', 'huber'],
    'average': [True, False]
}
search_reg = IncrementalSearchCV(reg, params_r)
search_reg.fit(X_train_r, y_train_r)

# Evaluate regression performance using RMSE
y_pred_r = search_reg.best_estimator_.predict(X_test_r)
rmse = mean_squared_error(y_test_r.compute(), y_pred_r.compute(), squared=False)
print("Regression RMSE:", rmse)
