Skip to content

Commit

Permalink
Merge pull request #27 from adybbroe/correct-message-level-duplicate-…
Browse files Browse the repository at this point in the history
…scene

Correct message level duplicate scene
  • Loading branch information
adybbroe committed Jan 11, 2022
2 parents 01277a0 + 1715766 commit 7628c15
Show file tree
Hide file tree
Showing 3 changed files with 183 additions and 49 deletions.
63 changes: 60 additions & 3 deletions aapp_runner/helper_functions.py
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2014 - 2021 Pytroll Community
# Copyright (c) 2014 - 2022 Pytroll Community
#
# Author(s):
#
Expand Down Expand Up @@ -30,6 +30,63 @@
LOGGER = logging.getLogger(__name__)


def check_if_scene_is_unique(config):
"""Check if the Scene is unique.
The scene is checked against the register (holding already processed
scenes). If it overlaps in time with a previously processed scene and the
area-id's (used to collect the data (granules) are the same, then return
False - the scene is then not unique and should not be processed further.
"""
LOGGER.debug("config.job_register: %s", str(config.job_register))
LOGGER.debug("config platform_name: %s", str(config['platform_name']))
LOGGER.debug("config - collection_area_id: %s", str(config['collection_area_id']))

# Use sat id, start and end time and area_id as the unique identifier of the scene!
if (config['platform_name'] in config.job_register and
len(config.job_register[config['platform_name']]) > 0):

# Go through list of start,end time tuples and see if the current
# scene overlaps with any - only if the area ids are the same

# Get registered start and end times with area id equal to current area_id
registered_times = []
for start_t, end_t, area_id in config.job_register[config['platform_name']]:
if area_id == config['collection_area_id']:
registered_times.append((start_t, end_t))

# Get overlap status
status = overlapping_timeinterval(
(config['starttime'], config['endtime']), registered_times)

if status:
info_msg = ("Processing of scene " + config['platform_name'] +
" " + str(status[0]) + " " + str(status[1]) +
" with overlapping time has been"
" launched previously. Skip it!")
LOGGER.info(info_msg)
return False

LOGGER.debug("No overlap with any recently processed scenes...")

return True


def create_scene_id(config):
"""Create a unique scene specific ID to identify the scene process for later.
The id is created from the platform name and start and end times of the
scene available in the process config dictionary.
"""
scene_id = (str(config['platform_name']) + '_' +
config['starttime'].strftime('%Y%m%d%H%M%S') +
'_' + config['endtime'].strftime('%Y%m%d%H%M%S'))
LOGGER.debug("scene_id = " + str(scene_id))
return scene_id


def overlapping_timeinterval(start_end_times, timelist):
"""From a list of start and end times check if the current time interval
overlaps with one or more"""
Expand All @@ -39,7 +96,7 @@ def overlapping_timeinterval(start_end_times, timelist):
if ((tstart <= starttime and tend > starttime) or
(tstart < endtime and tend >= endtime)):
return tstart, tend
elif (tstart >= starttime and tend <= endtime):
if (tstart >= starttime and tend <= endtime):
return tstart, tend

return False
Expand All @@ -50,7 +107,7 @@ def run_shell_command(command, use_shell=False, use_shlex=True, my_cwd=None,
"""Run the given command as a shell and get the return code, stdout and stderr
Returns True/False and return code.
"""
from subprocess import Popen, PIPE
from subprocess import PIPE, Popen

if stdin is not None:
stdin = stdin.encode('utf-8')
Expand Down
116 changes: 116 additions & 0 deletions aapp_runner/tests/test_helper_functions.py
@@ -0,0 +1,116 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Copyright (c) 2022 Pytroll developers

# Author(s):

# Adam Dybbroe <Firstname.Lastname @ smhi.se>

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Unittesting the helper functions for the AAPP-runner.
"""

import logging
import unittest
from datetime import datetime
from unittest.mock import patch

from aapp_runner.helper_functions import check_if_scene_is_unique
from aapp_runner.read_aapp_config import AappL1Config, AappRunnerConfig
from aapp_runner.tests.test_config import (TEST_YAML_CONTENT_OK,
create_config_from_yaml)


class TestProcessConfigChecking(unittest.TestCase):
"""Test various functions checking on the (non-static) config during processing."""

def setUp(self):
self.config_complete = create_config_from_yaml(TEST_YAML_CONTENT_OK)

@patch('aapp_runner.read_aapp_config.load_config_from_file')
def test_check_if_scene_is_unique_return_value(self, config):
"""Test checking if the current scene is unique or if it has been processed earlier."""
config.return_value = self.config_complete
myfilename = "/tmp/mytestfile"
aapp_run_config = AappRunnerConfig(myfilename, 'norrkoping', 'xl-band')
aapp_config = AappL1Config(aapp_run_config.config, 'xl-band')

aapp_config['platform_name'] = 'metop03'
aapp_config['collection_area_id'] = 'euron1'
aapp_config['starttime'] = datetime(2022, 1, 8, 12, 49, 50)
aapp_config['endtime'] = datetime(2022, 1, 8, 13, 0, 26)

aapp_config.job_register = {}

result = check_if_scene_is_unique(aapp_config)
assert result

aapp_config.job_register = {'metop03': [(datetime(2022, 1, 8, 12, 49, 50),
datetime(2022, 1, 8, 13, 0, 26), 'euron1')]}
# An EARS scene (same platform and overlapping time interval and over
# the same area of interest) arrives shortly after:
aapp_config['platform_name'] = 'metop03'
aapp_config['collection_area_id'] = 'euron1'
aapp_config['starttime'] = datetime(2022, 1, 8, 12, 50)
aapp_config['endtime'] = datetime(2022, 1, 8, 13, 0)

result = check_if_scene_is_unique(aapp_config)
assert not result

@patch('aapp_runner.read_aapp_config.load_config_from_file')
def test_check_if_scene_is_unique_logging(self, config):
"""Test the logging when checking if the current scene is unique."""
config.return_value = self.config_complete
myfilename = "/tmp/mytestfile"
aapp_run_config = AappRunnerConfig(myfilename, 'norrkoping', 'xl-band')
aapp_config = AappL1Config(aapp_run_config.config, 'xl-band')

aapp_config.job_register = {'metop03': [(datetime(2022, 1, 8, 12, 49, 50),
datetime(2022, 1, 8, 13, 0, 26), 'euron1')]}
# An EARS scene (same platform and overlapping time interval and over
# the same area of interest) arrives shortly after:
aapp_config['platform_name'] = 'metop03'
aapp_config['collection_area_id'] = 'euron1'
aapp_config['starttime'] = datetime(2022, 1, 8, 12, 50)
aapp_config['endtime'] = datetime(2022, 1, 8, 13, 0)

expected_logging = ['INFO:aapp_runner.helper_functions:first message',
'INFO:aapp_runner.helper_functions:Processing of scene metop03 2022-01-08 12:49:50 2022-01-08 13:00:26 with overlapping time has been launched previously. Skip it!']

with self.assertLogs('aapp_runner.helper_functions', level='INFO') as cm:
logging.getLogger('aapp_runner.helper_functions').info('first message')
_ = check_if_scene_is_unique(aapp_config)

self.assertEqual(cm.output, expected_logging)

with self.assertLogs('aapp_runner.helper_functions', level='WARNING') as cm:
logging.getLogger('aapp_runner.helper_functions').warning('first message')
_ = check_if_scene_is_unique(aapp_config)

self.assertEqual(len(cm.output), 1)

# Scene is different (different satellite) from previous:
aapp_config['platform_name'] = 'metop01'
aapp_config['collection_area_id'] = 'euron1'
aapp_config['starttime'] = datetime(2022, 1, 8, 12, 50)
aapp_config['endtime'] = datetime(2022, 1, 8, 13, 0)

with self.assertLogs('aapp_runner.helper_functions', level='INFO') as cm:
logging.getLogger('aapp_runner.helper_functions').info('first message')
result = check_if_scene_is_unique(aapp_config)

assert result
self.assertEqual(len(cm.output), 1)
53 changes: 7 additions & 46 deletions bin/aapp_dr_runner.py
@@ -1,7 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Copyright (c) 2014 - 2021 Pytroll Community
# Copyright (c) 2014 - 2022 Pytroll Community

# Author(s):

Expand Down Expand Up @@ -51,11 +51,12 @@
from posttroll.publisher import Publish
from trollsift.parser import compose

from aapp_runner.aapp_runner_tools import set_collection_area_id
from aapp_runner.config_helpers import generate_process_config
from aapp_runner.do_commutation import do_decommutation
from aapp_runner.exceptions import DecommutationError, SatposError, TleError
from aapp_runner.helper_functions import (overlapping_timeinterval,
run_shell_command)
from aapp_runner.helper_functions import (check_if_scene_is_unique,
create_scene_id, run_shell_command)
from aapp_runner.read_aapp_config import AappL1Config, AappRunnerConfig
from aapp_runner.tle_satpos_prepare import do_tle_satpos, do_tleing

Expand Down Expand Up @@ -92,7 +93,6 @@ def reset_job_registry(objdict, key, start_end_times_area):
return



def cleanup_aapp_logfiles_archive(config):
"""Loop over the aapp log files directories and remove expired directories accordingly."""
try:
Expand Down Expand Up @@ -487,46 +487,6 @@ def check_pass_length(msg, config):
return True


def create_and_check_scene_id(msg, config):
"""Create a scene specific ID to identify the scene process for later."""
LOG.debug("config.job_register: %s", str(config.job_register))
LOG.debug("config platform_name: %s", str(config['platform_name']))
LOG.debug("config - collection_area_id: %s", str(config['collection_area_id']))

# Use sat id, start and end time and area_id as the unique identifier of the scene!
if (config['platform_name'] in config.job_register and
len(config.job_register[config['platform_name']]) > 0):

# Go through list of start,end time tuples and see if the current
# scene overlaps with any - only if the area ids are the same

# Get registed start and end times with area id equal to current area_id
registed_times = []
for start_t, end_t, area_id in config.job_register[config['platform_name']]:
if area_id == config['collection_area_id']:
registed_times.append((start_t, end_t))

# Get overlap status
status = overlapping_timeinterval(
(config['starttime'], config['endtime']), registed_times)

if status:
LOG.warning("Processing of scene " + config['platform_name'] +
" " + str(status[0]) + " " + str(status[1]) +
" with overlapping time has been"
" launched previously")
LOG.info("Skip it...")
return False
else:
LOG.debug("No overlap with any recently processed scenes...")

scene_id = (str(config['platform_name']) + '_' +
config['starttime'].strftime('%Y%m%d%H%M%S') +
'_' + config['endtime'].strftime('%Y%m%d%H%M%S'))
LOG.debug("scene_id = " + str(scene_id))
return scene_id


def which(program):
"""Check if executable is available in the system environment path."""
# Check if needed executable are available in the
Expand Down Expand Up @@ -858,10 +818,11 @@ def publish_level1(publisher, config, msg, filelist, station_name, environment):
if not generate_process_config(msg, aapp_config):
continue

scene_id = create_and_check_scene_id(msg, aapp_config)
if not scene_id:
scene_is_unique = check_if_scene_is_unique(aapp_config)
if not scene_is_unique:
continue

scene_id = create_scene_id(aapp_config)
try:
if not setup_aapp_processing(aapp_config):
raise Exception("setup_aapp_processing returned False. See above lines for details.")
Expand Down

0 comments on commit 7628c15

Please sign in to comment.