In [None]:
from twisted.internet.protocol import Protocol, ReconnectingClientFactory
from twisted.internet import task
from twisted.internet import defer
from twisted.internet import reactor
from twisted.internet import threads
from twisted.python import log

import argparse
import Queue
import os
import sys
import numpy as np
import datetime

from astropy.io import fits
from astropy.time import Time
from astropy.visualization import ZScaleInterval

from PIL import Image
from PIL import ImageFont
from PIL import ImageDraw
from matplotlib import cm

from atv.streamprotocol import StreamProtocol
from atv.rms import sigmaclip
from atv.constellations import Constellations

In [None]:
FILE_SIZE = 4204800
IMAGE_RES = 1024
FPS = 25
SOURCES = ['Cas.A', 'Cyg.A', 'Tau.A', 'Vir.A', 'Sun', 'Moon']
CONSTELLATIONS = ['Ursa Minor']
CMD = ["ffmpeg",
       # for ffmpeg always first set input then output

       # silent audio
       '-f', 'lavfi',
       '-i', 'anullsrc=channel_layout=stereo:sample_rate=44100',

       # image
       '-re',
       '-f', 'rawvideo',           # probably required for reading from stdin
       '-s', '1024x1024',          # should match image size
       '-pix_fmt', 'rgba',
       '-i', '-',                  # read from stdin

       # encoding settings
       "-r", str(FPS),             # the framerate
       "-vcodec", "libx264",       # probably required for flv & rtmp
       "-preset", "ultrafast",     # the encoding quality preset
       "-g", "20",
       "-codec:a", "libmp3lame",   # mp3 for audio
       "-ar", "44100",             # 44k audio rate
       "-threads", "6",
       "-bufsize", "512k",
       "-f", "flv",                # required for rtmp
       ]

In [None]:
def get_configuration():
    """
    Returns a populated configuration
    """
    parser = argparse.ArgumentParser()
    
    parser.add_argument('--host', type=str, default='localhost',
                        help="host (tcp ip4)")
    parser.add_argument('--port', type=int, default=9000,
                        help="port")
    parser.add_argument('--threshold', type=float, default=5.0,
                        help="RMS Threshold to reject image.")
    parser.add_argument('--outdir', type=str, default="./",
                        help="Desitnation directory.")
    parser.add_argument('--nproc', type=int, default=3,
                        help="Max number of processes.")
    
    return parser.parse_args()

In [None]:
class Stream(Protocol):
    def __init__(self, cfg):
        self.num_processing = 0
        self.b1 = np.zeros(FILE_SIZE, dtype=np.uint8)
        self.b2 = np.zeros(FILE_SIZE, dtype=np.uint8)
        self.bb1 = np.getbuffer(self.b1)
        self.bb2 = np.getbuffer(self.b2)
        self.bytes_received = 0
        self.pqueue = Queue.PriorityQueue()
        self.imgdata = None
        self.nproc = cfg.nproc
        self.threshold = cfg.threshold
        if cfg.outdir[-1] == "/":
            self.outdir = cfg.outdir
        else:
            self.outdir = cfg.outdir+"/"

        if not os.path.isdir(self.outdir):
                os.mkdir(self.outdir)

        self.encode_task = task.LoopingCall(self.encode)
        self.encode_task.start(1.0/FPS)


    def process(self, fitsimg):
        """
        Check image RMS
        """
        if self.num_processing >= self.nproc:
            log.msg("Skipping %s [%i]" % (fitsimg.header['DATE-OBS'], self.num_processing))
            return

        t = Time(fitsimg.header['DATE-OBS'])
        self.num_processing += 1

#         log.msg("Image RMS, %f, threshhold %f" % (np.nanstd(fitsimg.data[0,0,::-1,:]) , self.threshold) )
        if np.nanstd(fitsimg.data[0,0,::-1,:]) < self.threshold:
            self.pqueue.put((t.unix, fitsimg))
        else:
            self.pqueue.put((t.unix, None))


    def encode(self):
        """
        Save fits file
        """
        
        if self.pqueue.qsize() < 1:
        # why queue more than 3? 
#             log.msg("Queue size %i" % self.pqueue.qsize())
            return

        t, imgdata = self.pqueue.get(block=False)
        self.num_processing -= 1
        self.imgdata = imgdata
        
        time = Time(t, format='unix').isot
        filename = '%s.fits' % (time)
        
        if bool(imgdata):
            if os.path.isfile(self.outdir+filename):
                log.msg("Processed %s [%i], not saved, exists." % (filename, self.num_processing))
                return 
            
            imgdata.writeto(self.outdir+filename)
            log.msg("Processed %s [%i], saved." % (filename, self.num_processing))
        else:
            log.msg("Processed %s [%i], not saved, RMS." % (filename, self.num_processing))
            



    def dataReceived(self, data):
        n = min(FILE_SIZE - self.bytes_received, len(data))
        self.bb1[self.bytes_received:self.bytes_received+n] = data[:n]

        if self.bytes_received+n >= FILE_SIZE:
            # process on another thread
            fitsimg = fits.PrimaryHDU().fromstring(self.bb1)
            #self.process(fitsimg) # (for debugging)
            threads.deferToThread(self.process, fitsimg)
            # swap buffers
            self.bb1, self.bb2 = self.bb2, self.bb1
            # copy remaining data in current buffer
            self.bytes_received = len(data) - n
            self.bb1[:self.bytes_received] = data[n:]
        else:
            self.bytes_received += n


In [None]:
class StreamFactory(ReconnectingClientFactory):
    def __init__(self, cfg):
        self.cfg = cfg


    def startedConnecting(self, connector):
        print('Started to connect.')


    def buildProtocol(self, addr):
        print('Connected.')
        self.resetDelay()
        return Stream(self.cfg)


    def clientConnectionLost(self, connector, reason):
        print('Lost connection.  Reason:', reason)
        ReconnectingClientFactory.clientConnectionLost(self, connector, reason)


    def clientConnectionFailed(self, connector, reason):
        print('Connection failed. Reason:', reason)
        ReconnectingClientFactory.clientConnectionFailed(self, connector,
                                                         reason)


In [None]:
if __name__ == "__main__":
    cfg = get_configuration()
    log.startLogging(sys.stdout)
    log.msg('%s' % (str(cfg)))
    reactor.connectTCP(cfg.host, cfg.port, StreamFactory(cfg))
    reactor.run()
