Skip to content

Commit

Permalink
Merge pull request #1 from pyobs/develop
Browse files Browse the repository at this point in the history
v0.13
  • Loading branch information
thusser committed Apr 30, 2021
2 parents fb80b7e + 501290c commit d244cc1
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 39 deletions.
5 changes: 5 additions & 0 deletions pyobs_asi/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,6 @@
"""
TODO: write doc
"""
__title__ = 'ASI camera modules'

from .asicamera import AsiCamera, AsiCoolCamera
180 changes: 142 additions & 38 deletions pyobs_asi/asicamera.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,30 @@
import math
import threading
from datetime import datetime
import time
from astropy.io import fits
from typing import List, Tuple, Any, Dict, Optional

import numpy as np
import zwoasi as asi

from pyobs.interfaces import ICamera, ICameraWindow, ICameraBinning, ICooling
from pyobs.interfaces import ICamera, ICameraWindow, ICameraBinning, ICooling, IImageFormat
from pyobs.modules.camera.basecamera import BaseCamera

from pyobs.utils.enums import ImageFormat, ExposureStatus
from pyobs.images import Image

log = logging.getLogger(__name__)


class AsiCamera(BaseCamera, ICamera, ICameraWindow, ICameraBinning):
# map of image formats
FORMATS = {
ImageFormat.INT8: asi.ASI_IMG_RAW8,
ImageFormat.INT16: asi.ASI_IMG_RAW16,
ImageFormat.RGB24: asi.ASI_IMG_RGB24
}


class AsiCamera(BaseCamera, ICamera, ICameraWindow, ICameraBinning, IImageFormat):
"""A pyobs module for ASI cameras."""
__module__ = 'pyobs_asi'

def __init__(self, camera: str, sdk: str = '/usr/local/lib/libASICamera2.so', *args, **kwargs):
"""Initializes a new AsiCamera.
Expand All @@ -29,12 +39,13 @@ def __init__(self, camera: str, sdk: str = '/usr/local/lib/libASICamera2.so', *a
# variables
self._camera_name = camera
self._sdk_path = sdk
self._camera = None
self._camera_info = None
self._camera: Optional[asi.Camera] = None
self._camera_info: Dict[str, Any] = {}

# window and binning
self._window = None
self._binning = None
# window and binning and mode
self._window = (0, 0, 0, 0)
self._binning = 1
self._image_format = ImageFormat.INT16

def open(self):
"""Open module."""
Expand Down Expand Up @@ -89,31 +100,31 @@ def close(self):
"""Close the module."""
BaseCamera.close(self)

def get_full_frame(self, *args, **kwargs) -> (int, int, int, int):
def get_full_frame(self, *args, **kwargs) -> Tuple[int, int, int, int]:
"""Returns full size of CCD.
Returns:
Tuple with left, top, width, and height set.
"""
return 0, 0, self._camera_info['MaxWidth'], self._camera_info['MaxHeight']

def get_window(self, *args, **kwargs) -> (int, int, int, int):
def get_window(self, *args, **kwargs) -> Tuple[int, int, int, int]:
"""Returns the camera window.
Returns:
Tuple with left, top, width, and height set.
"""
return self._window

def get_binning(self, *args, **kwargs) -> (int, int):
def get_binning(self, *args, **kwargs) -> Tuple[int, int]:
"""Returns the camera binning.
Returns:
Tuple with x and y.
"""
return self._binning, self._binning

def set_window(self, left: float, top: float, width: float, height: float, *args, **kwargs):
def set_window(self, left: int, top: int, width: int, height: int, *args, **kwargs):
"""Set the camera window.
Args:
Expand Down Expand Up @@ -141,7 +152,20 @@ def set_binning(self, x: int, y: int, *args, **kwargs):
self._binning = x
log.info('Setting binning to %dx%d...', x, x)

def _expose(self, exposure_time: float, open_shutter: bool, abort_event: threading.Event) -> fits.PrimaryHDU:
def list_binnings(self, *args, **kwargs) -> List[Tuple[int, int]]:
"""List available binnings.
Returns:
List of available binnings as (x, y) tuples.
"""

if 'SupportedBins' in self._camera_info:
# create list of tuples
return [(b, b) for b in self._camera_info['SupportedBins']]
else:
return []

def _expose(self, exposure_time: float, open_shutter: bool, abort_event: threading.Event) -> Image:
"""Actually do the exposure, should be implemented by derived classes.
Args:
Expand All @@ -153,18 +177,25 @@ def _expose(self, exposure_time: float, open_shutter: bool, abort_event: threadi
The actual image.
"""

# no camera?
if self._camera is None:
raise ValueError('No camera initialised.')

# get image format
image_format = FORMATS[self._image_format]

# set window, divide width/height by binning
width = int(math.floor(self._window[2]) / self._binning)
height = int(math.floor(self._window[3]) / self._binning)
log.info("Set window to %dx%d (binned %dx%d with %dx%d) at %d,%d.",
self._window[2], self._window[3], width, height, self._binning, self._binning,
self._window[0], self._window[1])
self._camera.set_roi(int(self._window[0]), int(self._window[1]), width, height,
self._binning, asi.ASI_IMG_RAW16)
self._binning, image_format)

# set status and exposure time in ms
self._change_exposure_status(ICamera.ExposureStatus.EXPOSING)
self._camera.set_control_value(asi.ASI_EXPOSURE, int(exposure_time * 1000))
self._change_exposure_status(ExposureStatus.EXPOSING)
self._camera.set_control_value(asi.ASI_EXPOSURE, int(exposure_time * 1e6))

# get date obs
log.info('Starting exposure with %s shutter for %.2f seconds...',
Expand All @@ -173,12 +204,13 @@ def _expose(self, exposure_time: float, open_shutter: bool, abort_event: threadi

# do exposure
self._camera.start_exposure()
self.closing.wait(0.01)

# wait for image
while self._camera.get_exposure_status() == asi.ASI_EXP_WORKING:
# aborted?
if abort_event.is_set():
self._change_exposure_status(ICamera.ExposureStatus.IDLE)
self._change_exposure_status(ExposureStatus.IDLE)
raise ValueError('Aborted exposure.')

# sleep a little
Expand All @@ -191,44 +223,71 @@ def _expose(self, exposure_time: float, open_shutter: bool, abort_event: threadi

# get data
log.info('Exposure finished, reading out...')
self._change_exposure_status(ICamera.ExposureStatus.READOUT)
self._change_exposure_status(ExposureStatus.READOUT)
buffer = self._camera.get_data_after_exposure()
whbi = self._camera.get_roi_format()

# decide on image format
shape = [whbi[1], whbi[0]]
data = np.frombuffer(buffer, dtype=np.uint16).reshape(shape)
if image_format == asi.ASI_IMG_RAW8:
data = np.frombuffer(buffer, dtype=np.uint8)
elif image_format == asi.ASI_IMG_RAW16:
data = np.frombuffer(buffer, dtype=np.uint16)
elif image_format == asi.ASI_IMG_RGB24:
shape.append(3)
data = np.frombuffer(buffer, dtype=np.uint8)
else:
raise ValueError('Unknown image format.')

# reshape
data = data.reshape(shape)

# special treatment for RGB images
if image_format == asi.ASI_IMG_RGB24:
# convert BGR to RGB
data = data[:, :, ::-1]

# now we need to separate the R, G, and B images
# this is easiest done by shifting the RGB axis from last to first position
# i.e. we go from RGBRGBRGBRGBRGB to RRRRRGGGGGBBBBB
data = np.moveaxis(data, 2, 0)

# create FITS image and set header
hdu = fits.PrimaryHDU(data)
hdu.header['DATE-OBS'] = (date_obs, 'Date and time of start of exposure')
hdu.header['EXPTIME'] = (exposure_time / 1000., 'Exposure time [s]')
image = Image(data)
image.header['DATE-OBS'] = (date_obs, 'Date and time of start of exposure')
image.header['EXPTIME'] = (exposure_time, 'Exposure time [s]')

# instrument and detector
hdu.header['INSTRUME'] = (self._camera_name, 'Name of instrument')
image.header['INSTRUME'] = (self._camera_name, 'Name of instrument')

# binning
hdu.header['XBINNING'] = hdu.header['DET-BIN1'] = (self._binning, 'Binning factor used on X axis')
hdu.header['YBINNING'] = hdu.header['DET-BIN2'] = (self._binning, 'Binning factor used on Y axis')
image.header['XBINNING'] = image.header['DET-BIN1'] = (self._binning, 'Binning factor used on X axis')
image.header['YBINNING'] = image.header['DET-BIN2'] = (self._binning, 'Binning factor used on Y axis')

# window
hdu.header['XORGSUBF'] = (self._window[0], 'Subframe origin on X axis')
hdu.header['YORGSUBF'] = (self._window[1], 'Subframe origin on Y axis')
image.header['XORGSUBF'] = (self._window[0], 'Subframe origin on X axis')
image.header['YORGSUBF'] = (self._window[1], 'Subframe origin on Y axis')

# statistics
hdu.header['DATAMIN'] = (float(np.min(data)), 'Minimum data value')
hdu.header['DATAMAX'] = (float(np.max(data)), 'Maximum data value')
hdu.header['DATAMEAN'] = (float(np.mean(data)), 'Mean data value')
image.header['DATAMIN'] = (float(np.min(data)), 'Minimum data value')
image.header['DATAMAX'] = (float(np.max(data)), 'Maximum data value')
image.header['DATAMEAN'] = (float(np.mean(data)), 'Mean data value')

# pixels
hdu.header['DET-PIXL'] = (self._camera_info['PixelSize'] / 1000., 'Size of detector pixels (square) [mm]')
hdu.header['DET-GAIN'] = (self._camera_info['ElecPerADU'], 'Detector gain [e-/ADU]')
image.header['DET-PIXL'] = (self._camera_info['PixelSize'] / 1000., 'Size of detector pixels (square) [mm]')
image.header['DET-GAIN'] = (self._camera_info['ElecPerADU'], 'Detector gain [e-/ADU]')

# Bayer pattern?
if image_format in [asi.ASI_IMG_RAW8, asi.ASI_IMG_RAW16]:
image.header['BAYERPAT'] = image.header['COLORTYP'] = ('GBRG', 'Bayer pattern for colors')

# biassec/trimsec
self.set_biassec_trimsec(hdu.header, *self._window)
self.set_biassec_trimsec(image.header, *self._window)

# return FITS image
log.info('Readout finished.')
self._change_exposure_status(ICamera.ExposureStatus.IDLE)
return hdu
self._change_exposure_status(ExposureStatus.IDLE)
return image

def _abort_exposure(self):
"""Abort the running exposure. Should be implemented by derived class.
Expand Down Expand Up @@ -264,7 +323,7 @@ def open(self):
# activate cooling
self.set_cooling(True, self._temp_setpoint)

def get_cooling_status(self, *args, **kwargs) -> (bool, float, float):
def get_cooling_status(self, *args, **kwargs) -> Tuple[bool, float, float]:
"""Returns the current status for the cooling.
Returns:
Expand All @@ -273,6 +332,12 @@ def get_cooling_status(self, *args, **kwargs) -> (bool, float, float):
SetPoint (float): Setpoint for the cooling in celsius.
Power (float): Current cooling power in percent or None.
"""

# no camera?
if self._camera is None:
raise ValueError('No camera initialised.')

# return
enabled = self._camera.get_control_value(asi.ASI_COOLER_ON)[0]
temp = self._camera.get_control_value(asi.ASI_TARGET_TEMP)[0]
power = self._camera.get_control_value(asi.ASI_COOLER_POWER_PERC)[0]
Expand All @@ -284,6 +349,12 @@ def get_temperatures(self, *args, **kwargs) -> dict:
Returns:
Dict containing temperatures.
"""

# no camera?
if self._camera is None:
raise ValueError('No camera initialised.')

# return
return {
'CCD': self._camera.get_control_value(asi.ASI_TEMPERATURE)[0] / 10.
}
Expand All @@ -299,6 +370,10 @@ def set_cooling(self, enabled: bool, setpoint: float, *args, **kwargs):
ValueError: If cooling could not be set.
"""

# no camera?
if self._camera is None:
raise ValueError('No camera initialised.')

# log
if enabled:
log.info('Enabling cooling with a setpoint of %.2f°C...', setpoint)
Expand All @@ -308,5 +383,34 @@ def set_cooling(self, enabled: bool, setpoint: float, *args, **kwargs):
log.info('Disabling cooling...')
self._camera.set_control_value(asi.ASI_COOLER_ON, 1)

def set_image_format(self, format: ImageFormat, *args, **kwargs):
"""Set the camera image format.
Args:
format: New image format.
Raises:
ValueError: If format could not be set.
"""
if format not in FORMATS:
raise ValueError('Unsupported image format.')
self._image_format = format

def get_image_format(self, *args, **kwargs) -> ImageFormat:
"""Returns the camera image format.
Returns:
Current image format.
"""
return self._image_format

def list_image_formats(self, *args, **kwargs) -> List[str]:
"""List available image formats.
Returns:
List of available image formats.
"""
return [f.value for f in FORMATS.keys()]


__all__ = ['AsiCamera', 'AsiCoolCamera']
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

setup(
name='pyobs-asi',
version='0.1',
version='0.13',
description='pyobs component for ASI cameras',
author='Tim-Oliver Husser',
author_email='thusser@uni-goettingen.de',
Expand Down

0 comments on commit d244cc1

Please sign in to comment.