# Custom pipette class

In [21]:
class CustomPipette:
    def __init__(self, pipette, name):
        self.pipette = pipette
        self.name = name

    def pick_up_tip(self):
        while True:
            try:
                self.pipette.pick_up_tip()
                break
            except:
                self.pipette.drop_tip()
                self.pipette.pick_up_tip()
                break

    def drop_tip(self):
        self.pipette.drop_tip()

    def set_solvent(self, solvent="default"):
        buffer_params = {
            "asp_rate": 0.8,  # times default flow_rate
            "asp_delay": 1,  # seconds
            "asp_with": 10,  # mm/s
            "disp_rate": 0.8,  # x default flow_rate
            "disp_delay": 1,  # seconds
            "blowout_rate": 10,  # ul/s
            "touch_tip_disp_offset": 0,
            "touch_tip_disp_speed": 20,
        }

        default_params = {
            "asp_rate": 1,  # times default flow_rate
            "asp_delay": 0,  # seconds
            "asp_with": 40,  # mm/s
            "disp_rate": 1,  # x default flow_rate
            "disp_delay": 0,  # seconds
            "blowout_rate": 30,  # ul/s
            "touch_tip_disp_offset": -1,
            "touch_tip_disp_speed": 60,
        }

        solvent_library = {"default": default_params, "buffer": buffer_params}

        self.solvent_params = solvent_library[solvent]

    def c_aspirate(self, vol, well):
        self.pipette.aspirate(vol, well, rate=self.solvent_params["asp_rate"])
        protocol.delay(self.solvent_params["asp_delay"])
        self.pipette.move_to(well.top(), speed=self.solvent_params["asp_with"])

    def c_dispense(
        self, vol, well, disp_blow_out=None, touch_tip=None, top_dispense=None
    ):
        if top_dispense == "Yes":
            well = well.top()

        self.pipette.dispense(vol, well, rate=self.solvent_params["disp_rate"])
        protocol.delay(self.solvent_params["disp_delay"])

        # Lower the blow rate

        if disp_blow_out == "Yes":
            def_pipette = self.pipette.flow_rate.blow_out
            self.pipette.flow_rate.blow_out = self.solvent_params["blowout_rate"]
            self.pipette.blow_out()
            self.pipette.flow_rate.blow_out = (
                def_pipette  # set to default blow out rate
            )

        # Touch tip

        if touch_tip == "Yes":
            self.pipette.touch_tip(
                v_offset=params["touch_tip_disp_offset"],
                speed=params["touch_tip_disp_speed"],
            )

        # Return to top

        if top_dispense != "Yes":
            self.pipette.move_to(well.top(), speed=self.solvent_params["asp_with"])

    def c_mix(self, vol, well, reps, mix_rate):
        for i in [*range(reps - 1)]:
            self.pipette.aspirate(vol, well.bottom(2), rate=mix_rate)
            self.pipette.dispense(vol, well.top(-3), rate=mix_rate)

        self.pipette.aspirate(vol, well.bottom(2), rate=mix_rate)
        self.pipette.dispense(vol, well.bottom(2), rate=mix_rate)
        self.pipette.dispense(vol / 5, well.bottom(2), rate=mix_rate)

    def c_transfer(
        self,
        vol,
        source,
        well,
        mix=None,
        mix_vol=None,
        mix_reps=5,
        mix_rate=5,
        touch_tip=None,
        top_dispense=None,
        blow_out=None,
        disp_blow_out=None,
    ):
        if mix_vol == None:
            mix_vol = self.pipette.max_volume / 2

        if mix == "before" or mix == "before_after":
            self.c_mix(mix_vol, source, mix_reps, mix_rate)

        self.c_aspirate(vol, source)
        self.c_dispense(vol, well, disp_blow_out, touch_tip, top_dispense)

        if mix == "after" or mix == "before_after":
            self.c_mix(mix_vol, well, mix_reps, mix_rate)

    def c_multidispense(
        self,
        vol,
        source,
        wells,
        touch_tip=None,
        top_dispense=None,
        blow_out=None,
        disp_blow_out=None,
    ):
        if type(wells) == opentrons.protocol_api.labware.Well:
            wells = [wells]

        pipette_max_vol = self.pipette.max_volume * 0.9
        num_wells = len(wells)
        disp_vol = vol
        total_disp_vol = num_wells * disp_vol
        chunks = math.floor(pipette_max_vol/disp_vol)
        split_wells = list(split_seq(wells, chunks))

        for idx, i in enumerate(split_wells):
            num_split_wells = len(i)
            if idx > 0:
                self.c_dispense(round(disp_vol * num_split_wells * 0.2, 2), source)
            
            self.c_aspirate(round(disp_vol * num_split_wells * 1.08, 2), source)

            for well in i:
                self.c_dispense(disp_vol, well, disp_blow_out, touch_tip, top_dispense)

            
    def c_serial_dilute(
        self,
        vol,
        sd_factor,
        source,
        wells,
        mix_rate=5,
    ):
        transfer_vol = (vol * sd_factor)(1 - sd_factor)
        transfer_vol = round(transfer_vol, 2)

        self.c_aspirate(transfer_vol, source)

        for i in wells:
            self.c_dispense(transfer_vol, i)
            self.c_mix(transfer_vol, i, 5, mix_rate=mix_rate)
            self.c_aspirate(transfer_vol, i)

# helper functions

In [None]:
def get_cols(ot_plate):
    if len(ot_plate.wells()) == 384:
        return [*ot_plate.rows()[0] + ot_plate.rows()[1]]

    if len(ot_plate.wells()) == 96:
        return ot_plate.rows()[0]


def split_seq(iterable, size):
    it = iter(iterable)
    item = list(itertools.islice(it, size))
    while item:
        yield item
        item = list(itertools.islice(it, size))


def chunk_list_no_single_n(lst, n):
    """Divide a list into chunks of size n."""

    if n > 1:
        for i in range(0, len(lst), n):
            yield lst[i : i + n]
    if n == 1:
        yield lst


def user_wants_to_continue():
    while True:
        response = input("Do you want to continue? (Y/N): ").strip().lower()
        if response == "y":
            return True
        elif response == "n":
            return False
        else:
            print("Invalid input. Please enter 'Y' or 'N'.")

# plate classes

In [None]:
class ResPlate:
    def __init__(self, reagents=None, active_cols=None):
        self.reagents = reagents
        self.active_cols = active_cols

        if type(active_cols) == tuple:
            self.active_cols_slice = slice(*active_cols)
        else:
            self.active_cols_slice = active_cols

    def assign_ot(self, ot_plate, print_info=False):
        self.cols = get_cols(ot_plate)

        if self.reagents != None:
            for key, value in self.reagents.items():
                self.reagents[key] = self.cols[value]

        if self.active_cols != None:
            self.active_cols = self.cols[self.active_cols_slice]

        if print_info:
            print(f"The solvent is in {self.solvent_col}")
            print("")
            print(f"The active cols are:")
            print("")
            pprint(self.active_cols)
            print("")

In [14]:
class GeneralPlate:
    def __init__(
        self,
        start_conc=None,
        start_vol=None,
        end_conc=None,
        end_vol=None,
        active_cols=None,
        map_to=None,
    ):
        self.start_conc = start_conc
        self.start_vol = start_vol
        self.end_conc = end_conc
        self.end_vol = end_vol
        self.map_to = map_to

        if type(active_cols) == tuple:
            self.active_cols_slice = slice(*active_cols)
        else:
            self.active_cols_slice = active_cols

    def assign_ot(self, ot_plate, print_info=False):
        self.ot_plate = ot_plate
        self.cols = get_cols(self.ot_plate)

        if self.map_to != None:
            self.active_cols = self.cols[self.map_to.active_cols_slice]
        else:
            self.active_cols = self.cols[self.active_cols_slice]
            
        if type(self.active_cols) != list:
            self.active_cols = [self.active_cols]

    def make_transfer_map(self, print_info=False):
        if self.map_to == None:
            print("no map_to plate assigned")
        else:
            self.transfer_map = []
            
            for i, j in zip(self.map_to.active_cols, self.active_cols):
                self.transfer_map.append([i, j])

    ## plate operations ##

In [13]:
class MeasurePlate:
    def __init__(self, map_to, points=1, repeats=1):
        self.map_to = map_to
        self.points = points
        self.repeats = repeats

        self.used_cols = 0

    def assign_ot(self, ot_plate):
        self.ot_plate = ot_plate
        self.cols = get_cols(self.ot_plate)

        self.active_cols = self.cols[
            : len(self.map_to.active_cols) * self.points * self.repeats
        ]
            
    def assign_ots(self, ot_plates):
        self.ot_plates = ot_plates
        
        cols = [get_cols(i) for i in measure_plates_ot]
        self.cols = list(itertools.chain(*cols))
        
        self.active_cols = self.cols[
            : len(self.map_to.active_cols) * self.points * self.repeats
        ]

    def make_transfer_map(self, reset=False):
        if reset == True:
            self.used_cols = 0

        self.transfer_map = []

        # handling multiple points/repeats (MeasurePlate side)

        if self.points == 1:
            chunked_cols = list(chunk_list_no_single_n(self.cols, self.repeats))
        else:
            chunked_cols = list(chunk_list_no_single_n(self.cols, self.points))

        # handling multiple points/repeats (map_to side)

        map_to_cols = [
            col
            for col in self.map_to.active_cols[self.used_cols :]
            for _ in range(self.repeats)
        ]

        for i, j in zip(map_to_cols, chunked_cols):
            self.transfer_map.append([i, j])

        # updating used cols

        self.used_cols = int(self.used_cols + len(self.transfer_map) / self.repeats)

# custom protocol class

In [22]:
class CustomProtocol:
    def __init__(self, solvent_location, m20, m300):
        self.m20 = m20
        self.m300 = m300
        self.solvent_location = solvent_location

        self.m300.set_solvent("default")
        self.m20.set_solvent("default")

    def choose_pipette(self, vol):
        if vol <= 18:
            pipette = self.m20
            return pipette
        if vol > 18:
            pipette = self.m300
            return pipette

    def add(self, plate, vol, source=None, top_dispense=None):
        print(
            f"This will add the specified source to the active columns of the plate using the {self.choose_pipette(vol).name} pipette."
        )
        
        if isinstance(plate, GeneralPlate) and plate.start_conc != None and plate.start_vol != None:

                    end_vol = vol
                    end_conc = plate.start_conc * plate.start_vol / end_vol
                    
                    print(f'The plate will be diluted to {end_conc} µM')
        
        while True:
            if user_wants_to_continue():
                print("continuing...")

                if source == None:
                    source = self.solvent_location

                pipette = self.choose_pipette(vol)

                pipette.pick_up_tip()
                pipette.c_multidispense(
                    vol,
                    source,
                    plate.active_cols,
                    top_dispense=top_dispense,
                )
                pipette.drop_tip()
                print("done")
                
                if isinstance(plate, GeneralPlate) and plate.start_conc != None and plate.start_vol != None:

                    plate.end_vol = vol
                    plate.end_conc = plate.start_conc * plate.start_vol / plate.end_vol
                    
                    print(f'plate diluted to {plate.end_conc} µM')
                    
                
                break  

            else:
                print("exiting...")
                break
        
    def dilution(self, start_plate, end_plate, source=None):
        dil_factor = end_plate.end_conc / start_plate.end_conc
        transfer_vol = dil_factor * end_plate.end_vol
        solvent_vol = end_plate.end_vol - transfer_vol
        
        print(
            f"This will transfer {transfer_vol} µL and combine it with {solvent_vol} µL of solvent for a end concentration of {end_plate.end_conc} µM."
        )
        
        while True:
            if user_wants_to_continue():
                print("continuing...")
                
                if source == None:
                    source = self.solvent_location

                pipette = self.choose_pipette(solvent_vol)

                pipette.pick_up_tip()
                pipette.c_multidispense(
                    solvent_vol, source, end_plate.active_cols
                )
                pipette.drop_tip()

                for i, j in end_plate.transfer_map:
                    pipette = self.choose_pipette(transfer_vol)

                    pipette.pick_up_tip()
                    pipette.c_transfer(transfer_vol, i, j, mix='before_after')
                    pipette.drop_tip()

                print("done")
                break
            else:
                print("exiting...")
                break
    
    def serial_dilute(self, plate, vol, sd_factor, source=None):
        
        transfer_vol = (vol * sd_factor)/(1 - sd_factor)
        transfer_vol = round(transfer_vol, 2)
        
        start_vol = transfer_vol + vol
        
        print(
            f"This will use the transfer map of the specified plate to carry out a serial dilution using the {self.choose_pipette(vol / sd_factor).name} pipette."
            f"The transfer_vol will be {transfer_vol} µL, which mean the wells will require a capacity of {start_vol} µL."
        )

        while True:
            if user_wants_to_continue():
                print("continuing...")
                
                if source == None:
                    source = self.solvent_location

                pipette = self.choose_pipette(vol / sd_factor)

                pipette.pick_up_tip()
                pipette.c_multidispense(vol, source, plate.active_cols)
                pipette.drop_tip()

                for i, j in plate.transfer_map:
                    pipette.pick_up_tip()
                    pipette.c_serial_dilute(vol, sd_factor, i, j)
                    pipette.drop_tip()

                print("done")
                break

            else:
                print("exiting...")
                break
    
    def transfer(self, plate, vol, top_dispense=None):
        print(
            f"This will use the transfer map of the specified plate to carry out a transfer using the {self.choose_pipette(vol).name} pipette."
        )

        while True:
            if user_wants_to_continue():
                print("continuing...")
                
                pipette = self.choose_pipette(vol)

                for i, j in plate.transfer_map:
                    pipette.pick_up_tip()
                    pipette.c_multidispense(vol, i, j, top_dispense=top_dispense)
                    pipette.drop_tip()

                print("done")
                break
                
            else:
                print("exiting...")
                break