In [1]:
from sklearn.datasets import fetch_openml
import numpy as np
import time
import torch
from scipy.optimize import linprog
from qpsolvers import solve_qp
from matplotlib import pyplot as plt
from matplotlib.ticker import MaxNLocator
from torch.autograd import Function
import torch.nn as nn
from sklearn.model_selection import train_test_split
import sys
import StochasticGhost
import importlib
from torch.nn.utils import clip_grad_norm_
import pandas as pd
from sklearn.preprocessing import StandardScaler
from torch.utils.data import DataLoader, TensorDataset
import torch.optim as optim

2023-12-27 17:52:44.898341: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: SSE4.1 SSE4.2 AVX AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-12-27 17:53:05.430937: I tensorflow/core/common_runtime/process_util.cc:146] Creating new thread pool with default inter op setting: 2. Tune using inter_op_parallelism_threads for best performance.
from tensorflow.python.ops.numpy_ops import np_config
np_config.enable_numpy_behavior()
  register_backend(TensorflowBackend())


In [None]:
raw_data = pd.read_csv("compas-scores-two-years.csv")

In [None]:
df = raw_data[['age', 'c_charge_degree', 'race', 'age_cat', 'score_text', 'sex', 'priors_count',
               'days_b_screening_arrest', 'decile_score', 'is_recid', 'two_year_recid', 'c_jail_in', 'c_jail_out']]
df = df[(df['days_b_screening_arrest'] <= 30) & (df['days_b_screening_arrest'] >= -30) &
        (df['is_recid'] != -1) & (df['c_charge_degree'] != "O") & (df['score_text'] != 'N/A')]

In [None]:
df['length_of_stay'] = pd.to_datetime(df['c_jail_out']) - pd.to_datetime(df['c_jail_in'])
df['length_of_stay'] = df['length_of_stay'].dt.total_seconds() / 3600

In [None]:
null_counts = df.isnull().sum()
print(null_counts)

In [None]:
# Calculate length_of_stay and correlation
correlation = df['length_of_stay'].corr(df['decile_score'])
print(f"Correlation between length_of_stay and decile_score: {correlation}")

In [None]:
race_distribution = df['race'].value_counts(normalize=True) * 100
print(race_distribution)

# Summary of score_text
print(df['score_text'].describe())

# Cross-tabulation of sex and race
sex_race_cross_tab = pd.crosstab(df['sex'], df['race'])
print(sex_race_cross_tab)

# Summary of sex
print(df['sex'].describe())

# Percentage of two_year_recid == 1
recid_percentage = len(df[df['two_year_recid'] == 1]) / len(df) * 100
print(f"Percentage of two_year_recid == 1: {recid_percentage:.2f}%")

In [None]:
fig, axes = plt.subplots(nrows=1, ncols=2, figsize=(12, 5))

# Plot for African-American
pblack = df[df['race'] == "African-American"]['decile_score'].value_counts(
).sort_index().plot(kind='bar', ax=axes[0])
pblack.set_title("Black Defendant's Decile Scores")
pblack.set_xlabel('Decile Score')
pblack.set_ylabel('Count')

# Plot for Caucasian
pwhite = df[df['race'] == "Caucasian"]['decile_score'].value_counts(
).sort_index().plot(kind='bar', ax=axes[1])
pwhite.set_title("White Defendant's Decile Scores")
pwhite.set_xlabel('Decile Score')
pwhite.set_ylabel('Count')

# Adjust layout for better visualization
plt.tight_layout()

# Show the plots
plt.show()

In [None]:
df_needed = df[(df['race'] == 'Caucasian') | (df['race'] =='African-American')]

In [None]:
# Categorizing
df_needed['crime_code'] = pd.Categorical(df_needed['c_charge_degree']).codes
df_needed['age_code'] = pd.Categorical(df_needed['age_cat']).codes
df_needed['race_code'] = pd.Categorical(df_needed['race']).codes
df_needed['gender_code'] = pd.Categorical(df_needed['sex']).codes
df_needed['score_code'] = pd.Categorical(df_needed['score_text']).codes
df_needed['charge_degree_code'] = pd.Categorical(
    df_needed['c_charge_degree']).codes

# Releveling factors
# df['age_factor'] = df['age_factor'].cat.reorder_categories(['Greater than 45', '25 - 45', 'Less than 25'], ordered=True)
# df['race_factor'] = df['race_factor'].cat.reorder_categories(['African-American', 'Asian', 'Caucasian', 'Hispanic', 'Native American', 'Other'], ordered=True)
# df['gender_factor'] = df['gender_factor'].cat.reorder_categories(['Female', 'Male'], ordered=True)

In [None]:
in_df = df_needed[['priors_count', 'score_code', 'age_code', 'gender_code', 'race_code', 'crime_code', 'charge_degree_code']]
out_df = df_needed[['two_year_recid']]

In [None]:
X_train, X_val, Y_train, Y_val = train_test_split(torch.tensor(in_df.values), torch.tensor(out_df.values), test_size=0.2, random_state=42)

In [None]:
class SimpleClassifier(nn.Module):
    def __init__(self, layer_sizes):
        super(SimpleClassifier, self).__init__()

        # Create a list of linear layers based on layer_sizes
        self.layers = nn.ModuleList()
        self.layer_norms = nn.ModuleList()
        for i in range(len(layer_sizes) - 1):
            self.layers.append(nn.Linear(layer_sizes[i], layer_sizes[i+1]))

    def forward(self, x):
        for layer in self.layers[:-1]:
            x = torch.relu((layer(x)))
        x = torch.sigmoid(self.layers[-1](x))
        return x

In [None]:
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_val)

# Convert numpy arrays to PyTorch tensors
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(Y_train, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(Y_val, dtype=torch.float32)

# Create DataLoader for training and testing sets
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

In [None]:
# Instantiate the model
input_size = X_train.shape[1]
hidden_size1 = 12
hidden_size2 = 8
op_size = 1
layer_sizes = [input_size, hidden_size1, hidden_size2, op_size]
model = SimpleClassifier(layer_sizes)

# Define loss function and optimizer
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
num_epochs = 300

In [None]:
for epoch in range(num_epochs):
    for inputs, labels in train_loader:
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels.view(-1, 1))
        loss.backward()
        optimizer.step()

    print(f'Epoch {epoch+1}/{num_epochs}, Loss: {loss.item()}')

In [None]:
model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    pred = []
    lab = []
    x_val = []
    out = []
    start = 0
    for inputs, labels in test_loader:
        outputs = model(inputs)
        predictions = (outputs >= 0.5).float()
        total += labels.size(0)
        end = start + len(labels)
        x_val.append(X_val[start:end, :])
        pred.append(predictions.flatten())
        lab.append(labels.flatten())
        out.append(outputs.flatten().float())
        correct += (predictions == labels.view(-1, 1)).sum().item()
        start = end
    accuracy = correct / total
    print(f'Test Accuracy: {accuracy * 100:.2f}%')

In [None]:
pred = torch.cat(pred, dim=0)
lab = torch.cat(lab, dim=0)
out = torch.cat(out, dim=0)
tensor_list = []
for i in range(len(x_val)):
    for ip_ten in (x_val[i]):
      tensor_list.append(np.array(ip_ten))
tensor_list = np.array(tensor_list)
pred = np.array(pred)
lab = np.array(lab)
out = np.array(out)

In [None]:
tp_white_condition = (pred == 1) & (lab == 1) & (tensor_list[:, 5] == 1)
tp_white = np.count_nonzero(tp_white_condition)
print(tp_white)

In [None]:
tp_black_condition = (pred == 1) & (lab == 1) & (tensor_list[:, 5] == 0)
tp_black = np.count_nonzero(tp_black_condition)
print(tp_black)

In [None]:
fp_white_condition = (pred == 1) & (lab == 0) & (tensor_list[:, 5] == 1)
fp_white = np.count_nonzero(fp_white_condition)
print("False positives in white:", fp_white)
p_white_condition = (lab == 1) & (tensor_list[:, 5] == 1)
p_white = np.count_nonzero(p_white_condition)
print("Total positives in white:", p_white)
print("False positive rate in white:", fp_white/p_white)

In [None]:
fp_black_condition = (pred == 1) & (lab == 0) & (tensor_list[:, 5] == 0)
fp_black = np.count_nonzero(fp_black_condition)
print("False positives in black:", fp_black)
p_black_condition = (lab == 1) & (tensor_list[:, 5] == 1)
p_black = np.count_nonzero(p_black_condition)
print("Total positives in black:", p_black)
print("False positive rate in black:", fp_black/p_black)

In [None]:
plt.bar(['FP Black', 'FP White'], [fp_black/p_black, fp_white/p_white])

In [None]:
out_black_0_label = out[(tensor_list[:, 5] == 0) & (lab == 0)]
out_white_0_label = out[(tensor_list[:, 5] == 1) & (lab == 0)]
# The mean value of the predicted label for black when the true value is 0 (Shows Blacks have a higher false positivity tendency)
avg_b_l_0 = np.sum(out_black_0_label)/len(out_black_0_label)
print(avg_b_l_0)
# The mean value of the predicted label for white when the true value is 0 (Shows Whites have a lower false positivity tendency)
avg_w_l_0 = np.sum(out_white_0_label)/len(out_white_0_label)
print(avg_w_l_0)

In [None]:
out_black_1_label = out[(tensor_list[:, 5] == 0) & (lab == 1)]
out_white_1_label = out[(tensor_list[:, 5] == 1) & (lab == 1)]
# The mean value of the predicted label for black when the true value is 1 (Shows Blacks have a lower false negative tendency)
avg_b_l_1 = np.sum(out_black_1_label)/len(out_black_1_label)
print(avg_b_l_1)
# The mean value of the predicted label for black when the true value is 1 (Shows Whites have a higher false negative tendency)
avg_w_l_1 = np.sum(out_white_1_label)/len(out_white_1_label)
print(avg_w_l_1)

In [None]:
colors = ['blue', 'red', 'blue', 'red']
plt.bar(['Avg(Pred|True=0,c=Black)', 'Avg(Pred|True=0,c=White)', 'Avg(Pred|True=1,c=Black)',
        'Avg(Pred|True=1,c=White)'], [avg_b_l_0, avg_w_l_0, avg_b_l_1, avg_w_l_1], color=colors, alpha=0.5)
plt.xticks(rotation=45, ha='right')
plt.ylabel('Marginal avg ')