Skip to content
This repository has been archived by the owner on Feb 26, 2020. It is now read-only.

Commit

Permalink
Clean Trollduction.run_single() and DataProcessor.save_images()
Browse files Browse the repository at this point in the history
  • Loading branch information
pnuu committed Nov 17, 2016
1 parent a71df89 commit e5e7b4d
Showing 1 changed file with 48 additions and 27 deletions.
75 changes: 48 additions & 27 deletions trollduction/producer.py
Expand Up @@ -815,6 +815,26 @@ def get_parameters(self, item=None):

return params

def _draw_image(self, product):
"""Inner worker for draw_images()"""
# Collect optional composite parameters from config
composite_params = {}
cp = product.find('composite_params')
if cp is not None:
composite_params = dict(
(item.tag, helper_functions.eval_default(item.text))
for item in cp.getchildren())

# Check if this combination is defined
func = getattr(self.local_data.image, product.attrib['id'])
LOGGER.debug("Generating composite \"%s\"", product.attrib['id'])
img = func(**composite_params)
img.info.update(self.global_data.info)
img.info["product_name"] = \
product.attrib.get("name", product.attrib["id"])

return img

def draw_images(self, area):
'''Generate images from local data using given area name and
product definitions.
Expand Down Expand Up @@ -862,22 +882,7 @@ def draw_images(self, area):
continue

try:
# Collect optional composite parameters from config
composite_params = {}
cp = product.find('composite_params')
if cp is not None:
composite_params = dict(
(item.tag, helper_functions.eval_default(item.text))
for item in cp.getchildren())

# Check if this combination is defined
func = getattr(self.local_data.image, product.attrib['id'])
LOGGER.debug("Generating composite \"%s\"",
product.attrib['id'])
img = func(**composite_params)
img.info.update(self.global_data.info)
img.info["product_name"] = \
product.attrib.get("name", product.attrib["id"])
img = self._draw_image(product)
except AttributeError as err:
# Log incorrect product funcion name
LOGGER.error('Incorrect product id: %s for area %s (%s)',
Expand Down Expand Up @@ -1416,6 +1421,29 @@ def shutdown(self):
'''
self.stop()

def _get_sensors(self, msg_data):
"""Get sensors from the message data"""
if isinstance(msg_data['sensor'], (list, tuple, set)):
sensors = set(msg_data['sensor'])
else:
sensors = set((msg_data['sensor'], ))

return sensors

def _is_overpass_processed(self, msg_data):
"""Check if the data has """
process_only_once = self.td_config.get('process_only_once',
"false").lower() in \
["true", "yes", "1"]
platforms_match = self._previous_pass["platform_name"] == \
msg_data["platform_name"]
start_times_match = self._previous_pass["start_time"] == \
msg_data["start_time"]

if process_only_once and platforms_match and start_times_match:
return True
return False

def run_single(self):
"""Run trollduction.
"""
Expand All @@ -1425,30 +1453,23 @@ def run_single(self):
try:
msg = self.listener.output_queue.get(True, 5)
except AttributeError:
# Maybe the queue has a different name
msg = self.listener.queue.get(True, 5)
except KeyboardInterrupt:
self.stop()
raise
except Queue.Empty:
continue
LOGGER.debug(str(msg))
if isinstance(msg.data['sensor'], (list, tuple, set)):
sensors = set(msg.data['sensor'])
else:
sensors = set((msg.data['sensor'], ))

sensors = self._get_sensors(msg.data)

prev_pass = self._previous_pass
if (msg.type in ["file", 'collection', 'dataset'] and
sensors.intersection(
self.td_config['instruments'].split(','))):
try:
if self.td_config.get('process_only_once',
"false").lower() in \
["true", "yes", "1"] and \
self._previous_pass["platform_name"] == \
msg.data["platform_name"] and \
self._previous_pass["start_time"] == \
msg.data["start_time"]:
if self._is_overpass_processed(msg.data):
LOGGER.info(
"File was already processed. Skipping.")
continue
Expand Down

0 comments on commit e5e7b4d

Please sign in to comment.