In [1]:
#default_exp pleiades_prep_worker

# Pleiades Job

> Data prep workflow via redis: testing and deployment.

This notebook is used to troubleshoot the the data prep workflow when deployed as job to worker via redis queue. Thanks to using nbdev it is also used as the actual deployment script exported from this notebook: `pleiades_prep_worker`.

In [2]:
#hide
%load_ext autoreload
%autoreload 2

In [3]:
#hide
from nbdev.showdoc import *

## Prep Workflow

In [4]:
#export
import json
from sac_stac.pleiades import prep_pleiades

In [5]:
#export
def process_scene(json_data):
    loaded_json = json.loads(json_data)
    prep_pleiades(**loaded_json)

## Job Processing

In [6]:
#export
import os
import logging
import datetime

from sac_stac.rediswq import RedisWQ

In [7]:
#export
level = os.getenv("LOGLEVEL", "ERROR").upper()
logging.basicConfig(format="%(asctime)s %(levelname)-8s %(name)s %(message)s", 
                    datefmt="%Y-%m-%d %H:%M:%S", level=level)

In [8]:
#export
host = os.getenv("REDIS_SERVICE_HOST", "redis-master")
q = RedisWQ(name="jobPL", host=host)

In [10]:
#export
logger = logging.getLogger("worker")
logger.info(f"Worker with sessionID: {q.sessionID()}")
logger.info(f"Initial queue state: empty={q.empty()}")

2020-11-11 14:12:27 INFO     worker Worker with sessionID: 8b8d4e5b-d69f-4333-a58d-c0a13a8597e8
2020-11-11 14:12:27 INFO     worker Initial queue state: empty=True


In [11]:
#export
while not q.empty():
    item = q.lease(lease_secs=1800, block=True, timeout=600)
    if item is not None:
        itemstr = item.decode("utf=8")
        logger.info(f"Working on {itemstr}")
        start = datetime.datetime.now().replace(microsecond=0)

        process_scene(itemstr)
        q.complete(item)

        end = datetime.datetime.now().replace(microsecond=0)
        logger.info(f"Total processing time {end - start}")
    else:
        logger.info("Waiting for work")

In [12]:
#export
logger.info("Queue empty, exiting")

2020-11-11 14:12:31 INFO     worker Queue empty, exiting


## Export

In [13]:
#hide
from nbdev.export import notebook2script; notebook2script()

Converted 00_rediswq.ipynb.
Converted 00_utils.ipynb.
Converted 01A_pleiades.ipynb.
Converted 01B_pleiades_prep_worker.ipynb.
Converted 02A_spot.ipynb.
Converted index.ipynb.
