Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create a Mask class #409

Merged
merged 10 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# Other
/deprecated
# Tutorials stuff
tutorials/products/Henize_2-10_cube.fits.gz
tutorials/products/*.fits.gz
tutorials/*.fits

# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
2 changes: 2 additions & 0 deletions src/pykoala/cubing.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,8 @@ def to_fits(self, fname=None, primary_hdr_kw=None):
data=self.intensity, name='INTENSITY', header=self.wcs.to_header()))
hdu_list.append(fits.ImageHDU(
data=self.variance, name='VARIANCE', header=hdu_list[-1].header))
# Store the mask information
hdu_list.append(self.mask.dump_to_hdu())
# Save fits
hdul = fits.HDUList(hdu_list)
hdul.writeto(fname, overwrite=True)
Expand Down
112 changes: 111 additions & 1 deletion src/pykoala/data_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
This module contains the parent class that represents the data used during the
reduction process
"""
import numpy as np

from astropy.io.fits import Header
from astropy.io.fits import Header, ImageHDU
from astropy.nddata import bitmask

from pykoala.exceptions.exceptions import NoneAttrError

Expand Down Expand Up @@ -253,6 +255,113 @@ class Parameter(object):
def __init__(self) -> None:
pass

class DataMask(object):
"""A mask to store the pixel flags of DataContainers.

Description
-----------
A mask to store the pixel flags of DataContainers.

Attributes
----------
- flag_map: dict, default={"CR": 2, "HP": 4, "DP": 8}
A mapping between the flag names and their numerical values expressed
in powers of two.
- bitmask: (np.ndarray)
The array containing the bit pixel mask.
- masks: dict
A dictionary that stores the individual mask in the form of boolean arrays
for each flag name.

Methods
-------
- flag_pixels
- get_flag_map_from_bitmask
- get_flag_map
"""
def __init__(self, shape, flag_map=None):
if flag_map is None:
self.flag_map = {"BAD": (2, "Generic bad pixel flag")}
else:
self.flag_map = flag_map
# Initialise the mask with all pixels being valid
self.bitmask = np.zeros(shape, dtype=int)
self.masks = {}
for key in self.flag_map.keys():
self.masks[key] = np.zeros(shape, dtype=bool)

def __decode_bitmask(self, value):
return np.bitwise_and(self.bitmask, value) > 0

def flag_pixels(self, mask, flag_name):
"""Add a pixel mask corresponding to a flag name.

Description
-----------
Add a pixel mask layer in the bitmask. If the mask already contains
information about the same flag, it will be overriden by the
new values.

Parameters
----------
- mask: np.ndarray
Input pixel flag. It must have the same shape as the bitmask.
"""
if flag_name not in self.flag_map:
raise NameError(f"Input flag name {flag_name} does not exist")
# Check that the bitmask does not already contain this flag
bit_flag_map = self.get_flag_map_from_bitmask(flag_name)
self.bitmask[bit_flag_map] -= self.flag_map[flag_name][0]
self.bitmask[mask] += self.flag_map[flag_name][0]
# Store the individual boolean map
self.masks[flag_name] = mask

def get_flag_map_from_bitmask(self, flag_name):
"""Get the boolean mask for a given flag name from the bitmask."""
return self.__decode_bitmask(self.flag_map[flag_name][0])

def get_flag_map(self, flag_name=None):
"""Return the boolean mask that corresponds to the input flags.

Parameters
----------
- flag_name: str or iterable, default=None
The flags to be used for constructing the mask. It can be a single
flag name or an iterable. If None, the mask will comprise every flag
that is included on the bitmask.

Returns
-------
- mask: np.ndarray
An array containing the boolean values for every pixel.
"""
if flag_name is not None:
if type(flag_name) is str:
return self.masks[flag_name]
else:
mask = np.zeros_like(self.bitmask, dtype=bool)
for flag in flag_name:
mask |= self.masks[flag]
return mask
else:
return self.bitmask > 0

def dump_to_hdu(self):
"""Return a ImageHDU containig the mask information.

Returns
-------
- hdu: ImageHDU
An ImageHDU containing the bitmask information.
"""
header = Header()
header['COMMENT'] = "Each flag KEY is stored using the convention FLAG_KEY"
for flag_name, value in self.flag_map.items():
# Store the value and the description
header[f"FLAG_{flag_name}"] = value
header['COMMENT'] = "A value of 0 means unmasked"
hdu = ImageHDU(name='BITMASK', data=self.bitmask, header=header)
return hdu

class DataContainer(object):
"""
Expand Down Expand Up @@ -287,6 +396,7 @@ def __init__(self, **kwargs):
self.info = dict()
self.log = kwargs.get("log", HistoryLog())
self.fill_info()
self.mask = kwargs.get("mask", DataMask(shape=self.intensity.shape))

def fill_info(self):
"""Check the keywords of info and fills them with placeholders."""
Expand Down
50 changes: 40 additions & 10 deletions src/pykoala/rss.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
# =============================================================================
import numpy as np
import os

from datetime import datetime
# =============================================================================
# Astropy and associated packages
# =============================================================================
from astropy.io import fits
# =============================================================================
# KOALA packages
# =============================================================================
# Modular
from pykoala import __version__
from pykoala.ancillary import vprint
from pykoala.data_container import DataContainer

Expand Down Expand Up @@ -138,7 +138,7 @@ def update_coordinates(self, new_coords=None, offset=None):
# =============================================================================
# Save an RSS object (corrections applied) as a separate .fits file
# =============================================================================
def to_fits(self, filename, overwrite=False, checksum=False):
def to_fits(self, filename, primary_hdr_kw=None, overwrite=False, checksum=False):
"""
Writes a RSS object to .fits

Expand All @@ -149,21 +149,51 @@ def to_fits(self, filename, overwrite=False, checksum=False):
----------
name: path-like object
File to write to. This can be a path to file written in string format with a meaningful name.
layer: TODO
overwrite: bool, optional
If True, overwrite the output file if it exists. Raises an OSError if False and the output file exists. Default is False.
checksum: bool, optional
If True, adds both DATASUM and CHECKSUM cards to the headers of all HDU’s written to the file.

Returns
-------
"""
# TODO: This needs to
primary_hdu = fits.PrimaryHDU(data=self.intensity)
if filename is None:
filename = 'cube_{}.fits.gz'.format(
datetime.now().strftime("%d_%m_%Y_%H_%M_%S"))
if primary_hdr_kw is None:
primary_hdr_kw = {}

# Create the PrimaryHDU with WCS information
primary = fits.PrimaryHDU()
for key, val in primary_hdr_kw.items():
primary.header[key] = val

# Include general information
primary.header['pykoala0'] = __version__, "PyKOALA version"
primary.header['pykoala1'] = datetime.now().strftime(
"%d_%m_%Y_%H_%M_%S"), "creation date / last change"

# Fill the header with the log information
primary.header = self.dump_log_in_header(primary.header)

# primary_hdu.header = self.header
primary_hdu.verify('fix')
primary_hdu.writeto(name=filename, overwrite=overwrite, checksum=checksum)
primary_hdu.close()
# Create a list of HDU
hdu_list = [primary]
# Change headers for variance and INTENSITY
hdu_list.append(fits.ImageHDU(
data=self.intensity, name='INTENSITY',
#TODO: rescue the original header?
#header=self.wcs.to_header()
)
)
hdu_list.append(fits.ImageHDU(
data=self.variance, name='VARIANCE', header=hdu_list[-1].header))
# Store the mask information
hdu_list.append(self.mask.dump_to_hdu())
# Save fits
hdul = fits.HDUList(hdu_list)
hdul.verify('fix')
hdul.writeto(filename, overwrite=overwrite, checksum=checksum)
hdul.close()
print(f"[RSS] File saved as {filename}")


Expand Down
158 changes: 158 additions & 0 deletions tutorials/data_container_mask.ipynb

Large diffs are not rendered by default.