In [None]:
# Python ≥3.5 is required
import sys
assert sys.version_info >= (3, 5)

# Scikit-Learn ≥0.20 is required
import sklearn
assert sklearn.__version__ >= "0.20"

# Common imports
import numpy as np
import os
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib as mpl

from joblib import Parallel, delayed
from scipy.signal import savgol_filter as sgf
from scipy.optimize import curve_fit
from sklearn import linear_model
from sklearn import metrics

# to make this notebook's output stable across runs
np.random.seed(42)

# To plot pretty figures
%matplotlib inline
import matplotlib as mpl
import matplotlib.pyplot as plt
mpl.rc('axes', labelsize=14)
mpl.rc('xtick', labelsize=12)
mpl.rc('ytick', labelsize=12)

# Where to save the figures
PROJECT_ROOT_DIR = "."
CHAPTER_ID = "classification"
IMAGES_PATH = os.path.join(PROJECT_ROOT_DIR, "images", CHAPTER_ID)
os.makedirs(IMAGES_PATH, exist_ok=True)

def save_fig(fig_id, tight_layout=True, fig_extension="png", resolution=300):
    path = os.path.join(IMAGES_PATH, fig_id + "." + fig_extension)
    print("Saving figure", fig_id)
    if tight_layout:
        plt.tight_layout()
    plt.savefig(path, format=fig_extension, dpi=resolution)

In [None]:
import matplotlib.pyplot as plt
from sklearn import preprocessing
from sklearn.linear_model import LinearRegression, LogisticRegression, Ridge, Lasso
from sklearn.model_selection import train_test_split
from sklearn.neural_network import MLPRegressor, MLPClassifier
from sklearn.metrics import ConfusionMatrixDisplay
from sklearn.utils import shuffle
from torch.utils.data import Dataset, DataLoader
import pandas as pd
import numpy as np

# featurize:

In [None]:
Synth = pd.read_csv('CsPbBr_synthesis.csv', index_col = 0)

def conc_factor(di=1,mw=1,density=1,conc=0):
    result = 0
    if conc != 0:
        return conc/500/di
    else:
        volume = 1 #in ul
        mass = (volume/1000000)/di*density #g
        amount = mass/mw #mol
        result = amount*1000*1000000/500
        return result #converting into mM

In [None]:
#Dilution ratio:
#Cs Pb 200,OLA 100, Br OA 80
SynData_new = pd.DataFrame()
#Cesium Concentration in mM
SynData_new['Cs'] = Synth['Cs']*conc_factor(di=200,conc=1000)
#Lead Concentration in mM
SynData_new['Pb'] = Synth['Pb']*conc_factor(di=200,conc=667)
#Oleylamine Concentration in mM
SynData_new['OLA'] = Synth['OLA']*conc_factor(di=100,mw=267.5,density=813)
#Oleic Acid Concentration in mM
SynData_new['OA'] = (Synth['OA']+Synth['Cs']+Synth['Pb'])*conc_factor(di=80,mw=282.5,density=895)
#Benzoyl Bromide Concentration in mM
SynData_new['Br'] = Synth['Br']*conc_factor(di=80,mw=185,density=1570)
#Temperature in Celcius
SynData_new['Temp'] = Synth['Temp'] + 273
#Normalized Concentration of Products
SynData_new['PbBr3-'] = Synth['PbBr3-']
SynData_new['Cs4PbBr6'] = Synth['Cs4PbBr6']
SynData_new['CsPb2Br5'] = Synth['CsPb2Br5']
SynData_new['PbBr2'] = Synth['PbBr2']
SynData_new['1ML'] = Synth['1 ML']
SynData_new['2ML'] = Synth['2 ML']
SynData_new['3ML'] = Synth['3 ML']
SynData_new['4ML'] = Synth['4 ML']
SynData_new['CsPbBr3'] = Synth['CsPbBr3']

In [None]:
product = ['Cs4PbBr6', 'CsPb2Br5', 'CsPbBr3', 'PbBr3-', 'PbBr2', '1ML', '2ML', '3ML', '4ML']
id = 8

X = np.array(SynData_new[['Cs', 'Pb', 'OLA', 'OA', 'Br', 'Temp']])
y = np.array(SynData_new[product[id]])
y_logged = np.log(1 + np.log( 1 + np.log( 1 + np.log(1 + y))))


# 'Cs4PbBr6', 'CsPb2Br5', 'CsPbBr3', 'PbBr3-', 'PbBr2', '1ML', '2ML', '3ML', '4ML'

scaler = preprocessing.StandardScaler()
scaler.fit(X)
X_scaled = scaler.transform(X)


In [None]:

# for id,name in enumerate(product):
#     y = np.array(SynData_new[product[id]])
#     y_logged = np.log( 1 + np.log(1 + y))
#     bin = 30
#     fig, ax = plt.subplots(1,2, sharey=True, figsize=(16,5))
#     ax[0].hist(y, bins=bin)
#     ax[1].hist(y_logged, bins=bin)
#     ax[0].set_title('Original')
#     ax[1].set_title('Logged')
#     fig.suptitle(f'{product[id]}')
#     fig.savefig(f'images/hist_{product[id]}.png')


In [None]:
# # split train and test set:

# X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size = 0.2, random_state=42)

# Optimize hyper parameters for MLP:

In [None]:
# optimize hyper parameter:

from hyperopt import hp
from hyperopt import STATUS_OK
from hyperopt import fmin, tpe, space_eval, Trials
from sklearn.model_selection import cross_val_score


def hyperoptoutput2param(best):
    
    '''Change hyperopt output to dictionary with values '''
    
    for key in best.keys():
        if key in hyper_dict.keys():
            best[key] = hyper_dict[key][ best[key] ] 
            
    return best

# Define a dicionary for each parameter range 

hyper_dict = {
    "depth": [1, 2, 3],
    "width": [32, 64, 128, 192],
    "optimizer": ['adam'],
    "activation": ['relu', 'logistic', 'tanh'],
    "alpha":[0.02, 0.04, 0.08, 0.16]
}

parameter_space =  { "depth": hp.choice("depth", hyper_dict['depth']),
                    "width": hp.choice("width", hyper_dict['width']),
                    "optimizer": hp.choice("optimizer", hyper_dict['optimizer']), 
                    "activation": hp.choice("activation", hyper_dict['activation']), 
                    "alpha": hp.choice("alpha", hyper_dict['alpha'])
                    }


# Evaludation function 

def model_eval(args):

    '''Take suggested arguments and perform model evaluation'''

    # generate tuple input for hidden_layer_sizes 
    hidden_layers = tuple( [args['width']] * args['depth'] )

    # your code here to train MLPRegressors and trun CV score on the training data 
    activation = args['activation']
    solver = args['optimizer']
    alpha = args['alpha']
    
    mlp = MLPRegressor(hidden_layer_sizes=hidden_layers, activation=activation, solver=solver, alpha=alpha, early_stopping=False, max_iter=10000, random_state=2) # Increase "max_iter" to ensure convergence
    cv_score = -cross_val_score(mlp, X_train, y_train, cv=5, scoring='r2').mean()
    return cv_score


print("Start trials") 

trials = Trials()
best = fmin(model_eval, parameter_space, algo=tpe.suggest, max_evals=40, trials=trials) # this will take a while to run 
best = hyperoptoutput2param(best)

print("best parameter set: {}".format(best))
print("best cv {}".format(trials.best_trial['result']['loss']))

In [None]:
# Train a MLP
regr_new = MLPRegressor(hidden_layer_sizes= (192, 192, 192), activation='relu', solver='adam', alpha=0.08, max_iter=10000, early_stopping=False, random_state=2)
regr_new.fit(X_train, y_train)
train_prediction = regr_new.predict(X_train)
train_truth = y_train
test_prediction = regr_new.predict(X_test)
test_truth = y_test

# Construct neural network:

In [None]:
# Build Datasets
import torch
from torch.utils.data import Dataset, DataLoader
import numpy as np

# Generate dataset 
class SequenceDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.Tensor(X)  # store X as a pytorch Tensor
        self.y = torch.Tensor(y)  # store y as a pytorch Tensor
        self.len=len(self.X)                # number of samples in the data 

    def __getitem__(self, index):
        # your implementation here: 
        return self.X[index], self.y[index] # implement your __getitem__function here 

    def __len__(self):
        return self.len

In [None]:
from sklearn.model_selection import train_test_split

# Define dataset 
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y_logged, test_size=0.2, random_state=42)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.1, random_state=42)

#Build dataset 
traindata = SequenceDataset(X=X_train, y=y_train)
valdata = SequenceDataset(X=X_val, y=y_val)
testdata = SequenceDataset(X=X_test, y=y_test)

# Build dataloader 
batchsize = 128
train_loader = DataLoader(dataset=traindata,batch_size=batchsize,shuffle=True)
val_loader = DataLoader(dataset=valdata,batch_size=batchsize,shuffle=True)
test_loader = DataLoader(dataset=testdata,batch_size=batchsize,shuffle=True)

In [None]:
from torch import nn

class LMPseq(torch.nn.Module) :

    def __init__(self, input_dim, activation = 'relu', hidden_dim = (128, 128, 128)) :
        super().__init__()
        
        # define a mlp regressor 
        if activation == 'relu':

            activation = nn.ReLU()

        if activation == 'tanh':

            activation = nn.Tanh()

        if activation == 'logistic':

            activation = nn.LogSigmoid()

        # print(activation)

        mlp = nn.Sequential(nn.Linear(input_dim, hidden_dim[0]), 
                            activation)

        for idx, dim in enumerate(hidden_dim[1:]):

            mlp.append(nn.Linear(hidden_dim[idx - 1], dim))
            mlp.append(activation)

        self.mlp = mlp

        outlayer = nn.Sequential(nn.Linear(dim, 1),
                                 nn.Sigmoid())        
        
        self.outlayer = outlayer
        
        
    def forward(self, x):
        
        # Apply lstm 
        # lstm_out, (ht, ct) = self.lstm(x)
        # pass ouput into a MLP 
        output = self.mlp(x)
        # transform output into probabilites 
        output = self.outlayer(output)
        # return probabilities 
        
        return output

In [None]:
from sklearn.metrics import roc_auc_score
import torch.nn.functional as F

def train(model, dataloader, optimizer):
    
    '''
    A function train on the entire dataset for one epoch .
    
    Args: 
        model (torch.nn.Module): your model from before 
        dataloader (torch.utils.data.DataLoader): DataLoader object for the train data
        optimizer (torch.optim.Optimizer(()): optimizer object to interface gradient calculation and optimization 
        device (str): Your device (usually 'cuda:0' for your GPU)
        
    Returns: 
        float: loss averaged over all the batches 
    
    '''

    epoch_loss = []
    model.train() # Set model to training mode 
    
    for batch in dataloader:    
        X, y = batch
        X = X
        y = y
        
        # train your model on each batch here 
        y_pred = model(X)
        
        loss = F.binary_cross_entropy(y_pred.squeeze(), y.squeeze())# fill in loss here
        # loss = F.mse_loss(y_pred.squeeze(), y.squeeze())# fill in loss here
        epoch_loss.append(loss.item())
        # run backpropagation given the loss you defined
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    return np.array(epoch_loss).mean()


def validate(model, dataloader):
    
    '''
    A function validate on the validation dataset for one epoch .
    
    Args: 
        model (torch.nn.Module): your model for before 
        dataloader (torch.utils.data.DataLoader): DataLoader object for the validation data
        device (str): Your device (usually 'cuda:0' for your GPU)
        
    Returns: 
        float: loss averaged over all the batches 
    
    '''
    
    val_loss = []
    model.eval() # Set model to evaluation mode 
    with torch.no_grad():    
        for batch in dataloader:
            X, y = batch
            X = X
            y = y
            
            # validate your model on each batch here 
            y_pred = model(X)

            loss = F.binary_cross_entropy(y_pred.squeeze(), y.squeeze())# fill in loss here
            # loss = F.mse_loss(y_pred.squeeze(), y.squeeze())# fill in loss here
            val_loss.append(loss.item())
            
    return np.array(val_loss).mean()

In [None]:
model = LMPseq(6, activation='relu', hidden_dim = (128,128,128))

optimizer = torch.optim.Adam(list(model.parameters()), lr=0.002)

val_loss_curve = []
train_loss_curve = []
difference = 1

for epoch in range(30):
    
    # Compute train your model on training data
    epoch_loss = train(model, train_loader, optimizer)
    
    # Validate your on validation data 
    val_loss = validate(model, val_loader) 
    
    # Record train and loss performance 
    train_loss_curve.append(epoch_loss)
    val_loss_curve.append(val_loss)
    
    # print(epoch, epoch_loss, val_loss)

In [None]:
import matplotlib.pyplot as plt

plt.plot(val_loss_curve, label="validation")
plt.plot(train_loss_curve, label="train")
plt.legend()

In [None]:
# Code to compute AUC on test data 
pred_train = model(traindata.X)
pred_train = pred_train.detach().numpy()[:,0]
test_score = sklearn.metrics.r2_score(y_train, pred_train.squeeze())
print("R2 on the train dataset is {}".format(test_score) ) 

pred_test = model(testdata.X)
pred_test = pred_test.detach().numpy()[:,0]
test_score = sklearn.metrics.r2_score(y_test, pred_test.squeeze())
print("R2 on the test dataset is {}".format(test_score) ) 

In [None]:
def evaluate(model, train_loader, optimizer, max_iteration = 400, lr = 0.002, tol=0.0001):
    
    convergence = True

    optimizer = torch.optim.Adam(list(model.parameters()), lr=lr)
    val_loss_curve = []
    train_loss_curve = []
    difference = 1

    for epoch in range(max_iteration):
        
        if difference > tol:
    
            # Compute train your model on training data
            epoch_loss = train(model, train_loader, optimizer)
            
            # Validate your on validation data 
            val_loss = validate(model, val_loader) 

            difference = np.abs(val_loss - epoch_loss)
            
            # Record train and loss performance 
            train_loss_curve.append(epoch_loss)
            val_loss_curve.append(val_loss)
        
    if difference > tol:

        print('fail to converge')
        convergence = False

    return model, convergence

model = LMPseq(6, activation='relu', hidden_dim = (128,128,128))
trained_model, convergence = evaluate(model, train_loader, optimizer, max_iteration = 400, lr = 0.002, tol=0.00005)


In [None]:
# Generate different trails:

num_trail = 100

train_score_list = []
test_score_list = []

for i in range(num_trail):

    model = LMPseq(6, activation='relu', hidden_dim = (128,128,128))
    trained_model, convergence = evaluate(model, train_loader, optimizer, max_iteration = 400, lr = 0.002, tol=0.0001)

    if convergence:

        pred_train = trained_model(traindata.X)
        pred_train = pred_train.detach().numpy()[:,0]
        train_score = sklearn.metrics.r2_score(y_train, pred_train.squeeze())
        # print("R2 on the train dataset is {}".format(train_score) ) 

        pred_test = trained_model(testdata.X)
        pred_test = pred_test.detach().numpy()[:,0]
        test_score = sklearn.metrics.r2_score(y_test, pred_test.squeeze())
        # print("R2 on the test dataset is {}".format(test_score))

        train_score_list.append(train_score)
        test_score_list.append(test_score)


train_score_list = np.array(train_score_list)
test_score_list = np.array(test_score_list)

In [None]:
print(train_score_list.mean())
print(train_score_list.std())
print(test_score_list.mean())
print(test_score_list.std())

In [None]:
# scatter plot R2:

pred_train = trained_model(traindata.X)
pred_train = pred_train.detach().numpy()[:,0]
train_score = sklearn.metrics.r2_score(y_train, pred_train.squeeze())
print("R2 on the train dataset is {}".format(train_score) ) 

pred_test = trained_model(testdata.X)
pred_test = pred_test.detach().numpy()[:,0]
test_score = sklearn.metrics.r2_score(y_test, pred_test.squeeze())
print("R2 on the test dataset is {}".format(test_score))

fig, ax = plt.subplots(nrows=1, ncols=1, figsize=(5, 5))
ax.scatter(pred_train.squeeze(), y_train, label='train', alpha=0.5)
ax.scatter(pred_test.squeeze(), y_test, label='test', alpha=0.5, c='orange')

ax.set_ylabel(f"True yield")
ax.set_xlabel(f"Predicted yield")
ax.set_title(f'{product[id]}')

ax.text(0.5,0, 'R2_test:{:.2f}'.format(test_score))
ax.text(0.55,0.05, 'R2_train:{:.2f}'.format(train_score))

# fig.savefig(f'result_{product[id]}.png')