Skip to content
This repository has been archived by the owner on Feb 26, 2020. It is now read-only.

Commit

Permalink
Cleanup producer.py
Browse files Browse the repository at this point in the history
Signed-off-by: Martin Raspaud <martin.raspaud@smhi.se>
  • Loading branch information
mraspaud committed Aug 24, 2016
1 parent c997cfc commit db42355
Showing 1 changed file with 36 additions and 35 deletions.
71 changes: 36 additions & 35 deletions trollduction/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,33 +37,39 @@
(crude/nearest/<something new>)
'''

from .listener import ListenerContainer
from mpop.satellites import GenericFactory as GF
import time
import errno
import glob
from mpop.projector import get_area_def
from threading import Thread
from pyorbital import astronomy
import numpy as np
import os
import Queue
import logging
import logging.handlers
import os
import Queue
import shutil
import socket
import tempfile
import time
from fnmatch import fnmatch
from trollduction import helper_functions
from trollsift import compose
from struct import error as StructError
from threading import Thread
from urlparse import urlparse, urlunsplit
import socket
import shutil
from xml.etree.ElementTree import tostring

import netifaces
import numpy as np
import pyinotify

from mpop.projector import get_area_def
from mpop.satellites import GenericFactory as GF
from mpop.satout.cfscene import CFScene
from posttroll.publisher import Publish
from posttroll.message import Message
from posttroll.publisher import Publish
from pyorbital import astronomy
from pyresample.utils import AreaNotFound
from trollduction import helper_functions
from trollsched.boundary import AreaDefBoundary, Boundary
from trollsched.satpass import Pass
from trollsched.boundary import Boundary, AreaDefBoundary
import errno
import netifaces
import tempfile
from trollsift import compose

from .listener import ListenerContainer

try:
from mipp import DecodeError
Expand All @@ -79,8 +85,6 @@
import gc
import pprint

from xml.etree.ElementTree import tostring
from struct import error as StructError

try:
from dwd_extensions.tools.view_zenith_angle import ViewZenithAngleCacheManager
Expand All @@ -92,8 +96,6 @@

# Config watcher stuff

import pyinotify


def get_local_ips():
inet_addrs = [netifaces.ifaddresses(iface).get(netifaces.AF_INET)
Expand Down Expand Up @@ -646,7 +648,7 @@ def run(self, product_config, msg):
keywords["resolution"] = int(group.resolution)

self.check_ready_to_read(req_channels)

self.global_data.load(req_channels, **keywords)
LOGGER.debug("loaded data: %s", str(self.global_data))
except (IndexError, IOError, DecodeError, StructError):
Expand Down Expand Up @@ -678,7 +680,7 @@ def run(self, product_config, msg):
try:
try:
actual_srch_radius = \
int(area_item.attrib["srch_radius"])
int(area_item.attrib["srch_radius"])
LOGGER.debug("Overriding search radius %s with %s",
str(srch_radius), str(actual_srch_radius))
except KeyError:
Expand Down Expand Up @@ -802,8 +804,8 @@ def check_ready_to_read(self, channels_to_load):
info_dict = self.get_parameters()
pattern = compose(wait_for_ch_cfg['pattern'], info_dict)
if self.wait_until_exists(pattern,
wait_for_ch_cfg['timeout'],
wait_for_ch_cfg['wait_after_found']):
wait_for_ch_cfg['timeout'],
wait_for_ch_cfg['wait_after_found']):
LOGGER.debug('found %s', pattern)
else:
LOGGER.error('timeout! did not found %s', pattern)
Expand Down Expand Up @@ -856,7 +858,7 @@ def check_satellite(self, config):

return True

def get_parameters(self, item = None):
def get_parameters(self, item=None):
"""Get the parameters for filename sifting.
"""

Expand Down Expand Up @@ -1219,8 +1221,7 @@ def run(self):
for key in ["output_dir",
"thumbnail_name",
"thumbnail_size"]:
if key in attrib:
del attrib[key]
attrib.pop(key, None)
if 'format' not in attrib:
attrib.setdefault('format',
os.path.splitext(item.text.strip())[1][1:])
Expand Down Expand Up @@ -1344,17 +1345,17 @@ def get_save_arguments(self, fileelem, params):
if 'blocksize' not in save_kwords:
blk_sz = fileelem.attrib.get("blocksize", None)
if blk_sz is not None:
save_kwords['blocksize'] = blk_sz
save_kwords['blocksize'] = blk_sz

if 'nbits' in save_kwords:
save_kwords['tags'] = {'NBITS': save_kwords['nbits']}
del save_kwords['nbits']
elif 'nbits' in params:
save_kwords['tags'] = {'NBITS':
params['nbits']}

return save_kwords

def write(self, obj, item, params):
"""Write to queue."""
self.prod_queue.put((obj, list(item), params.copy()))
Expand Down Expand Up @@ -1446,7 +1447,7 @@ def update_td_config(self):
# self.listener.restart_listener('file')
self.listener.restart_listener(self.td_config['topics'].split(','))
LOGGER.info("Listener restarted")

self.set_wait_for_channel_cfg()

try:
Expand Down Expand Up @@ -1491,7 +1492,7 @@ def set_wait_for_channel_cfg(self):
self.wait_for_channel_cfg = wait_for_channel_cfg
if self.data_processor is not None:
self.data_processor.set_wait_for_channel_cfg(wait_for_channel_cfg)

def cleanup(self):
'''Cleanup Trollduction before shutdown.
'''
Expand All @@ -1505,7 +1506,7 @@ def cleanup(self):
self.config_watcher.stop()
if self.listener is not None:
self.listener.stop()

if self.viewZenCacheManager is not None:
self.viewZenCacheManager.shutdown()
self.viewZenCacheManager = None
Expand Down

0 comments on commit db42355

Please sign in to comment.