In [1]:
%run -i GMD.py
%run -i iman_refine.ipynb
from types import SimpleNamespace
import warnings
warnings.filterwarnings('ignore')
import pims

path = r"C:\Users\Xinran Yu\OneDrive\Desktop\DATA\project\20180123 MCF10 EKAR Fra T4 ratio expt downstairs.nd2"
dao =  pims.bioformats.BioformatsReader(path, series = 60)
dao.bundle_axes = 'yxc'

In [39]:
class BkVault:
    def __init__(self, ip=None, op=None, dao=None):
        # Properties
        self.indsz = {}
        self.meta = {}
        self.bval = 0
        self.xy = {
            'reg': [],
            'dyn': [],
            'fix': [],
            'bkonly': [],
            'altxy': [],
            'value': [],
            'flat': []
        }
        self.version = 'v1.0'
        
        if ip:
            self.xy = [self.xy.copy() for _ in range(ip["indsz"]["xy"][0])]
            fields = ['reg', 'dyn', 'fix', 'bkonly', 'altxy']
            for field in fields:
                    for i in range(len(self.xy)):
                        self.xy[i][field] = ip["bkg"][i][field]
                        # Replace empty entries with 0
                        if not self.xy[i][field]:
                            self.xy[i][field] = 0
            #Set flags for wells using alternate XYs
            uax = [bool(item["altxy"]) for item in self.xy]
            # Check validity of background definitions
            bkvalid = uax.copy()
            for idx, item in enumerate(self.xy):
                if not uax[idx]:
                        valid_condition = (item["fix"] and len(item["reg"]) == ip["indsz"].get("c", 0)) or \
                                                (not item["fix"] and len(item["reg"]) == 2 and type(item["reg"]) != int)
                        bkvalid[idx] = valid_condition
            # Check for mislabeled alternates
            unaltXYi = list(set(item["altxy"] for item in self.xy if item["altxy"]))
            for alt in unaltXYi:
                if uax[alt-1]:
                    bkvalid[alt-1] = False

            if not all(bkvalid):
                        invalid_indices = [str(i) for i, valid in enumerate(bkvalid) if not valid]
                        raise ValueError(f"An XY indicated to provide background is not properly defined. Check background structure (bkg) for XY(s) {', '.join(invalid_indices)}.")

            self.indsz = ip["indsz"]
            self.meta = ip.get("bkmd", {})
            self.bval = ip.get("bval", 0)
        if op:
            # Check running validity
            unaltXYi = list(set([item["altxy"] for item in self.xy if item in op["xypos"]]))
            bkvalid = all(alt in [0] + op["xypos"] for alt in unaltXYi)

            if not bkvalid:
                invalid_alts = [str(alt) for alt in unaltXYi if alt not in [0] + op["xypos"]]
                raise ValueError(f"An XY listed for processing requests background from an alternate XY, which is not included. Add {', '.join(invalid_alts)} to the list of XYs to process (xypos).")
            if "flatten" in op and op["flatten"]:
                if "ff" not in op or "flatxy" not in op["ff"] or not op["ff"]["flatxy"]:
                    unaltXYi = [alt for alt in unaltXYi if alt != 0]
                    isok = [item["bkonly"] for item in self.xy if item in unaltXYi]

                    # Check and warn for bad definitions
                    if not all(isok):
                        invalid_indices = [str(i) for i, valid in enumerate(isok) if not valid]
                        print(f"Warning: A well ({', '.join(invalid_indices)}) indicated for field flattener calculation is not marked as background only. It will not be used. Ensure background parameters are properly defined.")

                    # Set list of XYs to use
                    op["ff"]["flatxy"] = [unalt for unalt, valid in zip(unaltXYi, isok) if valid]
                    if not op["ff"]["flatxy"]:
                        raise ValueError("No XYs suitable for flatfielding were identified. Recheck background parameters or set the op.ff.flatxy parameter.")

                # Allow outlier removal call to be passed along
                if "remout" not in op["ff"]:
                    op["ff"]["remout"] = False

                # Parse time range
                if len(op["trng"]) == 2:
                    trng = list(range(op["trng"][0], op["trng"][1] + 1))
                else:
                    trng = op["trng"]
                # Ensure DAO is available for Field Flattening
                if not dao or not isinstance(dao, ImageAccess):
                    raise ValueError("A valid Data Access Object must be provided when initializing a Field Flattener via the Background Vault.")


                # Make a Flattener for each XY indicated    
                flp = {
                        "camsize": [
                            self.meta["cam"]["CamSizeX"] / self.meta["cam"]["BinSizeX"],
                            self.meta["cam"]["CamSizeY"] / self.meta["cam"]["BinSizeY"]
                        ]
                    }
                    # Add Flattening parameters if provided
                if op and "ff" in op:
                    for key, value in op["ff"].items():
                        flp[key] = value

                    # Create flattener for each XY needed
                if op and "flatten" in op and op["flatten"]:
                    for sk in op["ff"]["flatxy"]:
                        # Calculate average frame for flattening
                        aim = iman_meanframe(dao, xy=sk, t=trng, z=list(range(1, dao.imax["z"] + 1)), remout=op["ff"]["remout"])

                        # First, remove camera baseline
                        aim -= self.bval

                        # Create Flattener
                        self.xy[sk]["flat"] = FieldFlattener(aim, **flp)

            # IF using objective bias
            elif op and "objbias" in op and op["objbias"]:
                im = np.zeros((self.meta["cam"]["PixNumY"], self.meta["cam"]["PixNumX"], ip["indsz"]["c"][0]))

                # Get and store objbias matrix
                _, self.xy[0]["flat"] = iman_refine(im, self.bval, self.meta)
                
    def alt(self,h):
        """
        Returns the index of the alternate XY used as a background
        for this XY, if it exists. Returns the same XY if it stands
        as its own background.
        """
        # Check for an Alternate XY and return if found
        if self.xy[h]['altxy']:
            h = self.xy[h]['altxy']
        return h
    
    def sample(self, dao, h, t, c=None, z=None):
            # If Channels, Z-slice not provided, use all
        if c is None:
            c = range(1, self.indsz['c'] + 1)
        if z is None:
            z = range(1, self.indsz['z'] + 1)

        nc = len(c)
        if type(z) == int:
            nz = 1
        else:
            nz = len(z)

        # Check for alternates to assert relevant index
        h = self.alt(h)

        # Get background image
        if isinstance(dao, pims.bioformats.BioformatsReader):  # If a Data Access Object was provided
            # Initialize image
            im = np.zeros((dao.sizes['y'], dao.sizes['x'], nc))
            dao.iter_axes = 't'
            for sc in range(nc):  # For each Channel
                        im[:, :, sc] = np.array(dao[t][:,:,c[sc]-1])
            # Refine image prior to sampling (assuming the function exists in Python)
            im,un = iman_refine(im, self.bval, self.flat(h), c)

        elif isinstance(dao, (np.ndarray, list)):  # If an image, just use
            im = dao

        # Sample Background region
        # Compute mean value across the specified region and dimensions
        bv = np.mean(im[self.xy[h]['reg'][0][1]-1:self.xy[h]['reg'][1][1],self.xy[h]['reg'][0][0]-1:self.xy[h]['reg'][1][0],:], axis=(0,1))
        return bv


    def pull(self,dao, h, t, c=None, z=None):
            # Check for alternates to get relevant index
        h = self.alt(h)-1
        
        # If this Background is not Dynamic, only consider the first frame
        if not self.xy[h]['dyn']:
            t = 0
        # If Channels, Z-slice not provided, use all
        if c is None:
            c = range(1, self.indsz.c + 1)
        if z is None:
            z = range(1, self.indsz.z + 1)

        # If value is fixed, return it after removing baseline
        if self.xy[h]['fix']:
            out = self.xy[h].reg[t, c] - self.bval
        else:
            # Check for a pre-existing background in the vault
            if len(self.xy[h]['value']) < t or all(val == 0 for val in self.xy[h]['value']):
                # If value is not stored, collect it
                # Assuming the Sample function returns the desired value and updates the bk instance
                out = self.sample(dao, h, t, c, z)
                # Update the value in the Vault
                self.xy[h]['value'] = out
            else:
                out = self.xy[h]['value']

        return out, self

    def flat(self,h):
        """
        Returns the Field Flattener mapped for the XY index (h).
        """
        h = self.alt(h)-1  # Check for alternate XY first

        # Check for Objective Bias model, stored in XY 1
        if h > 1 and not self.xy[h]['flat'] and isinstance(self.xy[0]['flat'][0][0][0],(int, float)):
            ff = self.xy[0]['flat']
        else:
            ff = self.xy[h]['flat']

        return ff


[bkit, imd.bkg] = imd.bkg.Pull(imd.dao, p.xy, p.t, op.cind, p.z); 

p = SimpleNamespace(
    nclr = [0,0,1],
    cclr = [0,1,0.5],
    pastinfo = [],
    imthresh = 95,
    t  = 1,
    c  = op['seg']['chan'][0],
    xy = 1,
    z  = 1,
    display = True
)

# Usage
bkf = BkVault(ip,op,dao)



p = SimpleNamespace(
    nclr = [0,0,1],
    cclr = [0,1,0.5],
    pastinfo = [],
    imthresh = 95,
    t  = 1,
    c  = op['seg']['chan'][0],
    xy = 1,
    z  = 1,
    display = True
)

# for later correction if needed

## 1
if ip:
    self.xy = [self.xy.copy() for _ in range(ip["indsz"]["xy"][0])]
    fields = ['reg', 'dyn', 'fix', 'bkonly', 'altxy']
    for field in fields:
            for i in range(len(self.xy)):
                self.xy[i][field] = ip["bkg"][i][field]
                # Replace empty entries with 0
                if not self.xy[i][field]:
                    self.xy[i][field] = 0
    #Set flags for wells using alternate XYs
    uax = [bool(item["altxy"]) for item in self.xy]
    # Check validity of background definitions
    bkvalid = uax.copy()
    for idx, item in enumerate(self.xy):
        if not uax[idx]:
                valid_condition = (item["fix"] and len(item["reg"]) == ip["indsz"].get("c", 0)) or \
                                        (not item["fix"] and len(item["reg"]) == 2 and type(item["reg"]) != int)
                bkvalid[idx] = valid_condition
    # Check for mislabeled alternates
    unaltXYi = list(set(item["altxy"] for item in self.xy if item["altxy"]))
    for alt in unaltXYi:
        if uax[alt-1]:
            bkvalid[alt-1] = False

    if not all(bkvalid):
                invalid_indices = [str(i) for i, valid in enumerate(bkvalid) if not valid]
                raise ValueError(f"An XY indicated to provide background is not properly defined. Check background structure (bkg) for XY(s) {', '.join(invalid_indices)}.")

    indsz = ip["indsz"]
    meta = ip.get("bkmd", {})
    bval = ip.get("bval", 0)
if op:
    # Check running validity
    unaltXYi = list(set([item["altxy"] for item in self.xy if item in op["xypos"]]))
    bkvalid = all(alt in [0] + op["xypos"] for alt in unaltXYi)

    if not bkvalid:
        invalid_alts = [str(alt) for alt in unaltXYi if alt not in [0] + op["xypos"]]
        raise ValueError(f"An XY listed for processing requests background from an alternate XY, which is not included. Add {', '.join(invalid_alts)} to the list of XYs to process (xypos).")
    if "flatten" in op and op["flatten"]:
        if "ff" not in op or "flatxy" not in op["ff"] or not op["ff"]["flatxy"]:
            unaltXYi = [alt for alt in unaltXYi if alt != 0]
            isok = [item["bkonly"] for item in self.xy if item in unaltXYi]

            # Check and warn for bad definitions
            if not all(isok):
                invalid_indices = [str(i) for i, valid in enumerate(isok) if not valid]
                print(f"Warning: A well ({', '.join(invalid_indices)}) indicated for field flattener calculation is not marked as background only. It will not be used. Ensure background parameters are properly defined.")

            # Set list of XYs to use
            op["ff"]["flatxy"] = [unalt for unalt, valid in zip(unaltXYi, isok) if valid]
            if not op["ff"]["flatxy"]:
                raise ValueError("No XYs suitable for flatfielding were identified. Recheck background parameters or set the op.ff.flatxy parameter.")

        # Allow outlier removal call to be passed along
        if "remout" not in op["ff"]:
            op["ff"]["remout"] = False

        # Parse time range
        if len(op["trng"]) == 2:
            trng = list(range(op["trng"][0], op["trng"][1] + 1))
        else:
            trng = op["trng"]
        # Ensure DAO is available for Field Flattening
        if not dao or not isinstance(dao, ImageAccess):
            raise ValueError("A valid Data Access Object must be provided when initializing a Field Flattener via the Background Vault.")


        # Make a Flattener for each XY indicated    
        flp = {
                "camsize": [
                    self.meta["cam"]["CamSizeX"] / self.meta["cam"]["BinSizeX"],
                    self.meta["cam"]["CamSizeY"] / self.meta["cam"]["BinSizeY"]
                ]
            }
            # Add Flattening parameters if provided
        if op and "ff" in op:
            for key, value in op["ff"].items():
                flp[key] = value

            # Create flattener for each XY needed
        if op and "flatten" in op and op["flatten"]:
            for sk in op["ff"]["flatxy"]:
                # Calculate average frame for flattening
                aim = iman_meanframe(dao, xy=sk, t=trng, z=list(range(1, dao.imax["z"] + 1)), remout=op["ff"]["remout"])

                # First, remove camera baseline
                aim -= self.bval

                # Create Flattener
                self.xy[sk]["flat"] = FieldFlattener(aim, **flp)

    # IF using objective bias
    elif op and "objbias" in op and op["objbias"]:
        im = np.zeros((meta["cam"]["PixNumY"], meta["cam"]["PixNumX"], ip["indsz"]["c"][0]))

        # Get and store objbias matrix
        _, self.xy[0]["flat"] = iman_refine(im, bval, meta)
        
        
        
 ## 2
 
        if p:
            # Initialize the xy list
            self.xy = [self.xy.copy() for _ in range(p["indsz"]["xy"])]
            
            # Populate xy with data from p['bkg']
            fields = ['reg', 'dyn', 'fix', 'bkonly', 'altxy']
            for field in fields:
                for item in self.xy:
                    item[field] = p["bkg"].get(field, [])
                    # Replace empty entries with 0
                    if not item[field]:
                        item[field] = 0
                        
            # Set flags for wells using alternate XYs
            uax = [bool(item["altxy"]) for item in self.xy]
            
            # Check validity of background definitions
            bkvalid = uax.copy()
            for idx, item in enumerate(self.xy):
                if not uax[idx]:
                    valid_condition = (item["fix"] and len(item["reg"]) == p["indsz"].get("c", 0)) or \
                                      (not item["fix"] and len(item["reg"]) == 2)
                    bkvalid[idx] = valid_condition
            
            # Check for mislabeled alternates
            unaltXYi = list(set(item["altxy"] for item in self.xy if item["altxy"]))
            for alt in unaltXYi:
                if uax[alt]:
                    bkvalid[alt] = False
                    
            if not all(bkvalid):
                invalid_indices = [str(i) for i, valid in enumerate(bkvalid) if not valid]
                raise ValueError(f"An XY indicated to provide background is not properly defined. Check background structure (bkg) for XY(s) {', '.join(invalid_indices)}.")
            
            # Fill relevant image metadata
            self.indsz = p["indsz"]
            self.meta = p.get("bkmd", {})
            self.bval = p.get("bval", 0)
            
                # If op is also provided
            if op:
                # Check running validity
                unaltXYi = list(set([item["altxy"] for item in self.xy if item in op["xypos"]]))
                bkvalid = all(alt in [0] + op["xypos"] for alt in unaltXYi)

                if not bkvalid:
                    invalid_alts = [str(alt) for alt in unaltXYi if alt not in [0] + op["xypos"]]
                    raise ValueError(f"An XY listed for processing requests background from an alternate XY, which is not included. Add {', '.join(invalid_alts)} to the list of XYs to process (xypos).")

                # Process Flattening methods
                if "flatten" in op and op["flatten"]:
                    if "ff" not in op or "flatxy" not in op["ff"] or not op["ff"]["flatxy"]:
                        unaltXYi = [alt for alt in unaltXYi if alt != 0]
                        isok = [item["bkonly"] for item in self.xy if item in unaltXYi]

                        # Check and warn for bad definitions
                        if not all(isok):
                            invalid_indices = [str(i) for i, valid in enumerate(isok) if not valid]
                            print(f"Warning: A well ({', '.join(invalid_indices)}) indicated for field flattener calculation is not marked as background only. It will not be used. Ensure background parameters are properly defined.")

                        # Set list of XYs to use
                        op["ff"]["flatxy"] = [unalt for unalt, valid in zip(unaltXYi, isok) if valid]
                        if not op["ff"]["flatxy"]:
                            raise ValueError("No XYs suitable for flatfielding were identified. Recheck background parameters or set the op.ff.flatxy parameter.")

                    # Allow outlier removal call to be passed along
                    if "remout" not in op["ff"]:
                        op["ff"]["remout"] = False

                    # Parse time range
                    if len(op["trng"]) == 2:
                        trng = list(range(op["trng"][0], op["trng"][1] + 1))
                    else:
                        trng = op["trng"]

                    # Ensure DAO is available for Field Flattening
                    if not dao or not isinstance(dao, ImageAccess):
                        raise ValueError("A valid Data Access Object must be provided when initializing a Field Flattener via the Background Vault.")


def alt(self,h):
        """
        Returns the index of the alternate XY used as a background
        for this XY, if it exists. Returns the same XY if it stands
        as its own background.
        """
        # Check for an Alternate XY and return if found
        if self.xy[h]['altxy']:
            h = self.xy[h]['altxy']
        return h
    
def sample(self,dao, h, t, c=None, z=None):
        # If Channels, Z-slice not provided, use all
    if c is None:
        c = range(1, self.indsz['c'] + 1)
    if z is None:
        z = range(1, self.indsz['z'] + 1)

    nc = len(c)
    if type(z) == int:
        nz = 1
    else:
        nz = len(z)

    # Check for alternates to assert relevant index
    h = self.alt(h)

    # Get background image
    if isinstance(dao, pims.bioformats.BioformatsReader):  # If a Data Access Object was provided
        # Initialize image
        im = np.zeros((dao.sizes['y'], dao.sizes['x'], nc))
        dao.iter_axes = 't'
        dao.bundle_axes = 'yxc'
        for sc in range(nc):  # For each Channel
                    im[:, :, sc] = np.array(dao[t-1][:,:,c[sc]-1])
        # Refine image prior to sampling (assuming the function exists in Python)
        im = iman_refine(im, self.bval, self.flat(h), c)

    elif isinstance(dao, (np.ndarray, list)):  # If an image, just use
        im = dao

    # Sample Background region
    # Compute mean value across the specified region and dimensions
    bv = np.mean(im[self.xy[h]['reg'][0][1]-1:self.xy[h]['reg'][1][1],self.xy[h]['reg'][0][0]-1:self.xy[h]['reg'][1][0],:], axis=(0,1))
    return bv
    
    
def pull(self,dao, h, t, c=None, z=None):
        # Check for alternates to get relevant index
    h = self.alt(h)-1
    # If this Background is not Dynamic, only consider the first frame
    if not self.xy[h]['dyn']:
        t = 0
    # If Channels, Z-slice not provided, use all
    if c is None:
        c = range(1, self.indsz.c + 1)
    if z is None:
        z = range(1, self.indsz.z + 1)

    # If value is fixed, return it after removing baseline
    if self.xy[h]['fix']:
        out = self.xy[h].reg[t, c] - self.bval
    else:
        # Check for a pre-existing background in the vault
        if len(self.xy[h]['value']) < t or all(val == 0 for val in self.xy[h]['value']):
            # If value is not stored, collect it
            # Assuming the Sample function returns the desired value and updates the bk instance
            out = self.sample(dao, h, t, c, z)
            # Update the value in the Vault
            self.xy[h]['value'] = out
        else:
            out = self.xy[h]['value']

    return out, self
    
def flat(self,h):
    """
    Returns the Field Flattener mapped for the XY index (h).
    """
    h = self.alt(h)  # Check for alternate XY first

    # Check for Objective Bias model, stored in XY 1
    if h > 1 and not self.xy[h]['flat'] and isinstance(self.xy[0]['flat'][0][0][0],(int, float)):
        ff = self.xy[0]['flat']
    else:
        ff = self.xy[h]['flat']

    return ff
