# Legolas Demo Code

Use this code to test your calibration values and as a reference for solving the challenges

In [None]:
#Not needed if you have already installed GPy and GPyOpt
!pip install GPy
# GPFlow, GPyTorch
!pip install GPyOpt

In [None]:
from core import *
import utils
import time as time
from pathlib import Path
# import time is in core
# import Path is in core

In [None]:
# this is how you reset the rpyc server when facing the port already in use
# need to change to actual host name
host_1 = "192.168.1.11"
host_2 = "192.168.1.14"

utils.restart_server(host=host_1)
utils.restart_server(host=host_2)

In [None]:
stage, depo_device, pH_device, conn1, conn2, config = load_from_config("config.yaml")

In [None]:
#!dir .

In [None]:
# find the usb port if problem occur
list_ports = conn1.modules['serial.tools.list_ports']

ports=list_ports.comports()
for port, desc, hwid in sorted(ports):
    print(port, desc, hwid)

## Test Device Functionality

In [None]:
# Go to Home Position and do other things. Be careful with this command.
reset(stage=stage, pH_device=pH_device, depo_device=depo_device)
# stage.home()

In [None]:
# Go to Home Position
stage.home()

In [None]:
# Drop pH meter and raise it up (don't run if pH path is obstructed)
pH_device.to_zpos("full_down")
pH_device.to_zpos("full_up")

In [None]:
# pH meter and raise it up (don't run if pH path is obstructed)
pH_device.to_zpos("full_up")

In [None]:
# Drop pH meter and raise it up (don't run if pH path is obstructed)
pH_device.to_zpos("full_down")

In [None]:
# Drop deposition device and raise it up (don't run if syringe path is obstructed)
depo_device.to_zpos("full_down")
depo_device.to_zpos("full_up")

In [None]:
pH_device.to_zpos("full_up")
stage.home()
stage.move_to_cell(0,1)
pH_device.to_zpos("full_down")

## Test Cell Mapping and Accuracy
Use this code to see if pH device is aligning with the cell map properly

If calibrated properly with the pH sensor, "stage" and "pH_device" will move the cart to the same spot

In [None]:
stage.home()

In [None]:
stage.move_to_cell(0,0)

In [None]:
stage.get_XYloc()

In [None]:
pH_device.to_zpos("full_down")

In [None]:
pH_device.to_zpos("full_up")

In [None]:
stage.move_to_cell(1,0)

In [None]:
#Test positioning in Row 1, start with everything full_up
pH_device.to_zpos("full_up")
stage.home()
#stage.move_to_cell(0,0)
for i in range(6):
    stage.move_to_cell(0,i)
    pH_device.to_zpos("full_down")
    pH_device.to_zpos("full_up")
    #time.sleep(2)

In [None]:
#Test positioning in specified row
pH_device.to_zpos("full_up")
stage.home()
#stage.move_to_cell(0,0)
for j in range (4):
    for i in range(6):
        stage.move_to_cell(j,i)
        print("j,i =",j,",",i)
        pH_device.to_zpos("full_down")
        #time.sleep(2)
        pH_device.to_zpos("full_up")
        #time.sleep(2)

In [None]:
stage.move_to_cell(3, 0)
pH_device.to_zpos("full_down")
pH_device.to_zpos("full_up")

In [None]:
depo_device.move_to_cell(0,0)
depo_device.to_zpos("full_down")
depo_device.to_zpos("full_up")

In [None]:
stage.move_to_loc("clean")
depo_device.to_zpos("full_down")
depo_device.to_zpos("full_up")
#pH_device.to_zpos("full_down")
#pH_device.to_zpos("full_up")

## Test pH device

In [None]:
# this is the pH meter
pH_device

In [None]:
pH_device.to_zpos("full_up")
stage.home()

In [None]:
# the cleaning location can be used to wash off any acidic or basic solution from the probe prior to measurements
# it is not necessary to use this in your code, but if you are getting incorrect pH values you may need to clean with DI
pH_device.move_to_loc("clean")
pH_device.to_zpos("full_down")
pH_device.to_zpos("full_up")
pH_device.to_zpos("full_down")
pH_device.to_zpos("full_up")

In [None]:
pH_device.move_to_cell(row=0, col=1)

In [None]:
# Check pH values
pH_device.to_zpos("full_down")

In [None]:
voltage = pH_device.pH_measure(40)
#pH = 16.746*voltage-40.188
#print(pH)

In [None]:
pH_device.to_zpos("full_up")

In [None]:
# Cleaning locations in wells
pH_device.move_to_cell(row=0, col=0)
pH_device.to_zpos("full_down")
pH_device.to_zpos("full_up")

In [None]:
# Repeated measurements of standard.
# Clean first.
row=3
col=0
for index in range(6):
    pH_device.move_to_cell(row, col)
    pHmeas = pH_device.pH_measure(20)
    print(f"row {row} col {col} pH {pHmeas}")     

In [None]:
stage.home()

In [None]:
pH_device.to_zpos("full_up")
stage.home()
pH_device.move_to_loc("blot")
pH_device.to_zpos("full_down")
time.sleep(1)
pH_device.to_zpos("full_up")

In [None]:
#making the calibration of the pH sensor

y = [4,4,4,7,7,7]
x = [2.67,2.63,2.63,2.84,2.81,2.79]

line = np.polyfit(x,y,1)
print(f'slope: {line[0]}, intercept: {line[1]}')
trend = np.poly1d(line)
plt.plot(x,y, 'ro', x, trend(x), '-k')
plt.show

## Test depo_device (syringe) functionality

In [None]:
# this is the syringe
depo_device

In [None]:
stage.home()

In [None]:
depo_device.move_to_cell(0,0)
depo_device.to_zpos("full_down")
#depo_device.to_zpos("full_up")

In [None]:
#depo_device.move_to_cell(0,0)
#depo_device.to_zpos("full_down")
depo_device.to_zpos("full_up")

In [None]:
#depo_device.move_to_cell(0,0)
#depo_device.to_zpos("full_down")
#depo_device.to_zpos("full_up")
for i in range(2):
    depo_device.move_to_cell(0,i)
    depo_device.to_zpos("full_down")
    depo_device.to_zpos("full_up")

In [None]:
# go to acid (or base or clean) well and test range of motion
depo_device.move_to_loc("clean")
depo_device.to_zpos("full_down")
depo_device.to_zpos("full_up")

In [None]:
# acquire 0.6 mL from the acid reservoir
stage.home()
depo_device.acquire(0.5, location="acid")

In [None]:
# deposit the 0.6 mL into the 0,0 sample well
# calib is lost at this step. Regain by going home first.
stage.home()
depo_device.deposition(0.6, row=0, col=4)

## Example Loops

Use these loops as a reference for ways to construct your autonomous experiments and the functions needed

In [None]:
stage.home() #GO HOME BEFORE EVERYTHING
depo_device.acquire(0.5, location="acid") #RUN ACQUIRE BEFORE DEPOSITION
stage.home()
depo_device.deposition(0.5, row=0, col=0)
#depo_device.deposition(0.2, row=0, col=2)
#depo_device.deposition(0.3, row=0, col=3)

In [None]:
stage.home()
depo_device.acquire(0.6, location="acid")
stage.home()
depo_device.deposition(0.3, row=0, col=1)
stage.home()
depo_device.deposition(0.1, row=0, col=2)
stage.home()
depo_device.deposition(0.2, row=0, col=3)

In [None]:
pH_device.to_zpos("full_down")

In [None]:
#for row in range(4):
for row in range(1):
    stage.home()
    for col in range(3,5):  #6 for full row
        depo_device.acquire(0.6, location="acid")
        depo_device.deposition(0.6, row=row, col=col)
        
        """
        if col==0:
            colleft=0
        else: 
            colleft=col-1
        pH_device.move_to_cell(row=row, col=colleft)
        """
        pH_device.move_to_cell(row=row, col=col)
        #pH = pH_device.pH_measure(stable_time=1)
        pH=10
        print(f"row {row} col {col} pH {pH}")

In [None]:
def pHcorrection(pHmeas, p1, p2):
# correct pH values. pHmeas is the measured pH with no correction.
# p1 variable is the measured value of the pH4 standard
# p2 variable is the measured value of the pH7 standard  
  y1=4
  y2=7
  m = (y1-y2)/(p1-p2)            #slope
  b = y1 - m*p1                #y-intercept
  pHcorrected = m*pHmeas + b
  print ('p1, p2, slope, b, pHmeas, pHcorr: ', p1, p2, m, b, pHmeas, pHcorrected)

  return pHcorrected

test = pHcorrection(4.6, 4.95, 7.09)
print('test=', test)


In [None]:
# 7/21/23 Loop through pH measurement. Clean first. Clean after each measurement.

# Run previous cell (def pHcorrection) first to correct pH measurement.

for row in range(1):
    stage.home()
    pH_device.move_to_cell(3,3)   #clean first
    pH_device.to_zpos("full_down")
    pH_device.to_zpos("full_up")
    pH_device.to_zpos("full_down")
    pH_device.to_zpos("full_up")
    
    for col in range(0,6):  #6 for full row
    #for col in range(4,-1,-1):
        pH_device.move_to_cell(row=row, col=col)
        pHmeas = pH_device.pH_measure(stable_time=20)
        pHmeas = pH_device.pH_measure(stable_time=20)
        pHmeas = pH_device.pH_measure(stable_time=20)
        pHcorrected = pHcorrection(pHmeas, 4.95, 7.09)
        print(f"row {row} col {col} pHcorrected {pHcorrected}")
        pH_device.move_to_cell(3,3)  #contains water. Clean after measurement.
        pH_device.to_zpos("full_down")
        pH_device.to_zpos("full_up")
        pH_device.to_zpos("full_down")
        pH_device.to_zpos("full_up")
        

In [None]:
pH_device.move_to_cell(row=1, col=1)
pHmeas = pH_device.pH_measure(stable_time=20)
pHcorrected = pHcorrection(pHmeas, 5.09, 5.57)
print(f"row {row} col {col} pHcorrected {pHcorrected}")
pH_device.move_to_cell(3,3)  #contains water. Clean after measurement.
pH_device.to_zpos("full_down")
pH_device.to_zpos("full_up")
pH_device.to_zpos("full_down")
pH_device.to_zpos("full_up")

In [None]:
pH_device.move_to_cell(3,3)  #contains water. Clean after measurement.
pH_device.to_zpos("full_down")
pH_device.to_zpos("full_up")
pH_device.to_zpos("full_down")
pH_device.to_zpos("full_up")

In [None]:
# Recheck pH4, pH7 standards.
pH_device.move_to_cell(row=1, col=3)
pH_device.pH_measure(20)

pH_device.move_to_cell(3,5)  #contains water. Clean after measurement.
pH_device.to_zpos("full_down")
pH_device.to_zpos("full_up")
pH_device.to_zpos("full_down")
pH_device.to_zpos("full_up")

In [None]:
stage.home()

In [None]:
#pH_device.pH_positions

In [None]:
#pH_device.motor_pH.get_position()

In [None]:
stage.home()
row = 1
for col in range(6):
    depo_device.acquire(0.6, location="acid")
    depo_device.deposition(0.6, row=row, col=col)
    pH_device.move_to_cell(row=row, col=col)
    pH = pH_device.pH_measure(stable_time=10)
    print(f"row {row} col {col} pH {pH}")

In [None]:
#dryrun
reset(stage=stage, pH_device=pH_device, depo_device=depo_device)
depo_device.deposition(0.6, row=0, col=0)

In [None]:
#saveinfo
path= 'c:/Users/mlowe/Documents/AlphaImmersion_I_attendedLEGOLAS2023/LEGOLAS new4 Scripts/Plots'
SAVE= Path(path)
SAVE.mkdir(exist_ok=True)
plt.figure()
plt.title(f'Acquisition func, next ratio:')
iterations=1
plt.savefig(SAVE/f"{iterations}.png", facecolor='white')
plt.show()
plt.close()

In [None]:
line = [4.6,-.77] #fake line for testing code

In [None]:
stage.home()

In [None]:
import GPy
#path = 'c:\Users\grufi\OneDrive\Desktop\legolas\Legolas_textCellLoc-main\plots'
#SAVE = Path(path)
#SAVE.mkdir(exist_ok=True)

def Bayesian_optimization_pH():
    # Uses GPy for Gaussian Process regression as surrogate function.

    # If True, this will simulate deposition and measurement
    simulate_val = True
    
    # X_grid is the list of all possible acid-base ratios that can be investigated.
    # This is the array that will be index with next_sample_index
    X_grid = np.linspace(0.1,1,20)[:,None]
    other = np.linspace(2,10,20)[:,None]
    X_grid = np.append(X_grid,other)[:,None] # set of all acid and base ratios
    Dsize = X_grid.shape[0] # number in X_grid set
    print(f'x grid: {X_grid}, Dsize : {Dsize}')
    BO_lambda = .1
    
    # # set up GP.
    # X_samples = np.atleast_1d([1.])[:,None] # ratio of R to G
    # Y_samples = np.atleast_1d([210.])[:,None]
    # measured = np.asarray([14])
    # unmeasured = np.setdiff1d(np.linspace(0,14,15), measured).astype(int)
    
    # set up variables
    ratio = [0.1] # ratio of initial sample to study, 0 is no acid
    sample_index = 0 # index of ratio in list of ratios
    count = 1 #counter for moving to appropriate well

    # deposit first ratio in well and collect pH
    pH = BO_get_data(ratio,count) 
    print('counter value:',count)
    print('Acid/Base ratio measured:',ratio)
    print('pH Value:',pH)
    
    measured = np.atleast_1d(sample_index) # indices of ratios that have been measured
    full_indices = np.linspace(0,X_grid.shape[0]-1,X_grid.shape[0]) # indices of all ratios to be investigated
    unmeasured = np.setdiff1d(full_indices, measured).astype(int)  # indices of ratios that are still to be measured
    X_samples = np.atleast_1d(ratio)[:,None] # X_samples is the RG ratios already studied
    Y_samples = np.atleast_1d(pH)[:,None] # Y_samples are the corresponding Red measurements for the RG ratios studied.
    
    # iteration loop for active learning (GP with exploration CO)
    for iterations in range(3):#HERE ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
        #reset(stage=stage, pH_device=pH_device, depo_device=depo_device)
        """
        if iterations in home_check:
            # Go to Home Position
            time.sleep(10)
            reset(stage=stage, pH_device=pH_device, depo_device=depo_device)
        """
        # Regression
        k = GPy.kern.RBF(1)
        m = GPy.models.GPRegression(X_samples, Y_samples, k)
        #blockPrint() # blocks printing statements to avoid printing GPy's optimization statements.
        m.optimize_restarts(5, robust=True);
        #enablePrint() # restarts the internal printing statements
        mean_full, variance_full = m.predict(X_grid) # Prediction
        mean, variance = m.predict(X_grid[unmeasured]) # Prediction just for unmeasured ratios
        
        # # Bayesian optimization to maximize measured red by varying mixture.
        # BO_beta = 2 * math.log(Dsize * math.pow(iterations + 1,2) * math.pow(np.pi,2) / (6 * BO_lambda) )
        # alpha = -1 * np.abs(100. - mean) + math.sqrt(BO_beta) * variance
        
        # Active Learning
        alpha_full = variance_full # variance for all ratios. use variance (uncertainty) in Gaussian Process to guide next sample
        alpha = variance # variance for unmeasured ratios
        ##alpha = (9/(iterations+1))*variance - np.absolute(mean-4.75)
        ##alpha_full = (9/(iterations+1))*variance_full - np.absolute(mean_full-4.75)
        sample_index = unmeasured[ np.argmax(alpha) ] # index of next ratio in X_grid
        ratio = X_grid[sample_index,:] # next ratio
        print('The next ratio to investigate is', ratio)

        # plot
        plt.figure(figsize = (7,3))
        num_subplots = 2
        if ~simulate_val:
            num_subplots = 3
        plt.subplot(1,num_subplots,1)
        plot_gp(X_grid, mean_full, variance_full, training_points=(X_samples,Y_samples)) # plot GP variance for all ratios
        bottom, top = plt.ylim()
        plt.plot([ratio, ratio],[bottom, top],'m') # indicate the next ratio to investigate
        plt.title('GP model for pH')
        #plt.tight_layout()
        #plt.savefig(SAVE/f"GP{iterations}.png", facecolor='white')
        #plt.show()
        #plt.close()
        
        plt.subplot(1,num_subplots,2)
        plt.plot(X_grid, alpha_full)  # plot the aquisition function for all ratios
        plt.plot([ratio, ratio],[np.min(alpha_full), np.max(alpha_full)],'m') # indicate the next ratio to be investigated
        plt.title(f'Acquisition func, next ratio:{ratio}')
        
        #plt.tight_layout()
        #plt.savefig(SAVE/f"Acquisition{iterations}.png", facecolor='white')
        plt.show()
        #plt.close()
           
        count+=1           #move to next well
        
        # collect data
        pH = BO_get_data(ratio, count, line) # run the next experiment.
        print('counter value:',count)
        print('Acid/Base ratio measured:',ratio)
        print('pH Value:',pH)
        measured = np.append(measured, sample_index) # add experiment ratio to the set of measured
        unmeasured = np.setdiff1d(full_indices, measured).astype(int)
        X_samples = np.append(X_samples, ratio)[:,None]
        Y_samples = np.append(Y_samples, pH)[:,None]
    
def BO_get_data(ratio, count):
    #r = count % 6
    #c = count // 6
    if count <= 5:          # 1st row [0] - 6 samples
        r = 0
        c = count
    elif count <= 11:        # 2nd row [1]  - 6 samples ...
        r = 1
        c = count-6
    elif count <= 17:        # 3rd row [2] - 6 samples ...
        r = 2
        c = count - 12
    elif count <= 23:        # 4th row [3] - 6 sample ...
        r = 3
        c = count - 18
    
    acid_vol,base_vol = ratio_conversion(ratio)
    print('acid_vol', acid_vol)
    print('base_vol', base_vol)
    
    while acid_vol > 0.6:
        depo_device.acquire(0.6, location="acid")
        depo_device.deposition(0.6, row=r, col=c)
        acid_vol -= 0.6
    depo_device.acquire(acid_vol, location="acid")
    depo_device.deposition(acid_vol, row=r, col=c)
    
    while base_vol > 0.6:
        depo_device.acquire(0.6, location="base")
        depo_device.deposition(0.6, row=r, col=c)
        base_vol -= 0.6
    depo_device.acquire(base_vol, location="base")
    depo_device.deposition(base_vol, row=r, col=c)
    
    pH_device.move_to_cell(row=r, col=c)
    pH_device.to_zpos("full_down")
    voltage = pH_device.pH_measure(stable_time=20)
    pH_device.to_zpos("full_up")
    pH = line[0]*voltage+line[1]
    print(f"row {r} col {c} pH {pH}")
    
    #pH_device.move_to_loc("clean")
    #pH_device.to_zpos("full_down")
    #pH_device.to_zpos("full_up")

    return pH



def ratio_conversion(ratio):
    acid_vol = float(2.0*ratio[0]/(1+ratio[0]))
    base_vol = float(2.0 - acid_vol)
    return acid_vol,base_vol
            
    
def plot_gp(X, m, C, training_points=None):
    # plot results of Gaussian Process analysis.
    # Plot 95% confidence interval 
    plt.fill_between(X[:,0], m[:,0] - 1.96*np.sqrt(np.diag(C)), m[:,0] + 1.96*np.sqrt(np.diag(C)), alpha=0.5)
    plt.plot(X, m, "-")  # Plot GP mean 
    plt.xlabel("x"), plt.ylabel("f")
    #plt.savefig(SAVE/f"gp{iterations}.png", facecolor='white')
    if training_points is not None:  # Plot training points if included
        X_, Y_ = training_points
        plt.plot(X_, Y_, "kx", mew=2)
        plt.show()
        
    
        

In [None]:
Bayesian_optimization_pH()

In [None]:
X_grid = np.linspace(0.1,1,20)[:,None]
other = np.linspace(2,10,20)[:,None]
X_grid = np.append(X_grid,other)[:,None] # set of all acid and base ratios
ratio = X_grid[0,:]
print(float(ratio))

In [None]:
###test printing

path= 'c:/Users/mlowe/Documents/AlphaImmersion_I_attendedLEGOLAS2023/LEGOLAS new4 Scripts/Plots'
SAVE= Path(path)
SAVE.mkdir(exist_ok=True)

def f(t):
    return np.exp(-t) * np.cos(2*np.pi*t)


def testBayesian_optimization_pH():
    
    # Uses GPy for Gaussian Process regression as surrogate function.

    # If True, this will simulate deposition and measurement
    simulate_val = True
    
    # X_grid is the list of all possible acid-base ratios that can be investigated.
    # This is the array that will be index with next_sample_index
    X_grid = np.linspace(0.1,1,20)[:,None]
    other = np.linspace(2,10,20)[:,None]
    X_grid = np.append(X_grid,other)[:,None] # set of all acid and base ratios
    Dsize = X_grid.shape[0] # number in X_grid set
    BO_lambda = .1
    
    # # set up GP.
    # X_samples = np.atleast_1d([1.])[:,None] # ratio of R to G
    # Y_samples = np.atleast_1d([210.])[:,None]
    # measured = np.asarray([14])
    # unmeasured = np.setdiff1d(np.linspace(0,14,15), measured).astype(int)
    
    # set up variables
    ratio = [0.1] # ratio of initial sample to study, 0 is no acid
    sample_index = 0 # index of ratio in list of ratios
    count = 0 #counter for moving to appropriate well

    # deposit first ratio in well and collect pH
    pH=10 #pH = BO_get_data(ratio,count) 
    print('counter value:',count)
    print('Acid/Base ratio measured:',ratio)
    print('pH Value:',pH)
    
    measured = np.atleast_1d(sample_index) # indices of ratios that have been measured
    full_indices = np.linspace(0,X_grid.shape[0]-1,X_grid.shape[0]) # indices of all ratios to be investigated
    unmeasured = np.setdiff1d(full_indices, measured).astype(int)  # indices of ratios that are still to be measured
    X_samples = np.atleast_1d(ratio)[:,None] # X_samples is the RG ratios already studied
    Y_samples = np.atleast_1d(pH)[:,None] # Y_samples are the corresponding Red measurements for the RG ratios studied.
    
    # iteration loop for active learning (GP with exploration CO)
    for iterations in range(2):
        #reset(stage=stage, pH_device=pH_device, depo_device=depo_device)

        print('The next ratio to investigate is', ratio)

        # plot
        plt.figure(figsize = (7,3))
        num_subplots = 2
        if ~simulate_val:
            num_subplots = 3
        plt.subplot(1,num_subplots,1)      
            
        t1 = np.arange(0.0, 5.0, 0.1)
        t2 = np.arange(0.0, 5.0, 0.02)

        plt.plot(t1, f(t1), 'bo', t2, f(t2), 'k')
        plt.title('model for pH')
        
        plt.subplot(1,num_subplots,2)
        plt.plot(t2, np.cos(2*np.pi*t2), 'r--')
        plt.title('acq func')     
        
        plt.tight_layout()
        plt.savefig(SAVE/f"Acquisition{iterations}.png", facecolor='white')
        plt.show()
        plt.close()
              
        
testBayesian_optimization_pH()    
        

In [None]:
def f(t):
    return np.exp(-t) * np.cos(2*np.pi*t)

t1 = np.arange(0.0, 5.0, 0.1)
t2 = np.arange(0.0, 5.0, 0.02)

plt.figure()
plt.subplot(2,1,1)  
plt.plot(t1, f(t1), 'bo', t2, f(t2), 'k')

plt.subplot(212)
plt.plot(t2, np.cos(2*np.pi*t2), 'r--')
plt.show()