# Pipeline Stage 3
The messages in what I called the "mq_sgcn_items" message queue contain records from ScienceBase that should have a new file to process. Those files contain names of species that the states/territories consider to be of greatest conservation need. There is some messiness in the data as there are cases where there may not be known scientific names or other information has been added to the spreadsheets that can get in the way of nice, clean processing to align the names with taxonomic authorities and put together other details. The initial processing step that slurps up files is encapsulated in the process_sgcn_source_item function from the pysgcn package. It picks up the messages, reads the specified file via URL into memory, and does some basic processing to harmonize across the slightly variable files as much as possible and infuse a little bit of additional metadata from the root collection item.

The two pieces of additional information the function infuses come from files attached to the root collection. These include the following:

* An indication of whether the name was included in the 2005 SWAP-based list coming from a master list stored from original processing. This helps us maintain consistency in total counts of species.
* A number of names had to be tracked down with a bit of research to assign ITIS TSN identifiers. These serve as overrides to the name matching process.

Each dataset can be processed at this stage and stand on its own somewhere. The point here is to get the original data file from ScienceBase, making sure we can read it into memory as a dataframe, infusing the extra information from the reference files stored at the collection, and then stashing the data somewhere online for further processing in later steps. I put all of that logic together into the cache_item_data function that calls the more fundamental processing function, process_sgcn_source_item. The caching function will check the cache when called to make sure the cache is intact and then delete the message if that's the case. Or it will go ahead and fire the processing function to retrieve and process the file and then write it to cache. For this exercise, I cache the data to a folder within the space designated by the DATA_CACHE environment variable as a feather binary file. These are lightweight, fast, and easy to work with across programs. For the online instantiation of this workflow, we will probably want to go ahead and write them to a relational database as we can later assemble them from there into final usable data.

I run the process as a while loop on the messages in temporary storage, but this will run as messages that get flushed from the queue as lambdas operate on them.

Note: I have experienced some variation in the speed of this step, which seem to track back to ScienceBase variability.

In [1]:
import pysgcn
sgcn = pysgcn.sgcn.Sgcn()

After running into a problem that seemed to crop up randomly with urlopen errors on trying to read files directly from ScienceBase, I wrote a function to cache the raw data locally for processing. I use the ScienceBase-generated path on disk for the file name and stash in a configured path. This could be changed to send the files to S3 or some other storage ajacent to the processing engine.

In [2]:
sgcn.cache_raw_data()

{'files_written': [],
 'files_in_cache': ['/Users/sbristol/data/raw/b4aebf82009a2aaadaa4d7b84fdcade7589c722b',
  '/Users/sbristol/data/raw/2673b2cbcd76e24a33be947897708d1b5a74f49b',
  '/Users/sbristol/data/raw/e9690e1a26d17f001245f4199396cccaea7fb7fe',
  '/Users/sbristol/data/raw/e797a83ec94951289141e98776670f989628f250',
  '/Users/sbristol/data/raw/9d676d655fb463f971cfe28ffa322a1d493858d4',
  '/Users/sbristol/data/raw/6eec7ce1a534122f801f1ca671ce629543bfde66',
  '/Users/sbristol/data/raw/e58b61cae166b1965b572e8748189199ad848491',
  '/Users/sbristol/data/raw/f3915e1455b3499a00945519fd8867abd7c034e8',
  '/Users/sbristol/data/raw/0843ec239115e47e91a2946837546a3c2a61fafc',
  '/Users/sbristol/data/raw/6550d55f0af2d0e96e47be64d1c39f508cd33918',
  '/Users/sbristol/data/raw/6103e0e72d3f616072e0c5b46d41d0753a51e0de',
  '/Users/sbristol/data/raw/f1a6b317c5c33d6077e2db27735a1351a5b87a27',
  '/Users/sbristol/data/raw/4bcbd5fd378583d8ab9d2eaa7ca802e0f83441a3',
  '/Users/sbristol/data/raw/1b946780c

In [3]:
processable_item = sgcn.get_message("mq_sgcn_items")
processable_item

{'id': '50394f41e31de134720d2388035db965f4f6f8cf',
 'date_inserted': '2019-12-20T20:26:44.085406',
 'body': {'sciencebase_item_id': 'https://www.sciencebase.gov/catalog/item/5787cd0ae4b0d27deb3754f2',
  'state': 'Louisiana',
  'year': '2005',
  'source_file_url': 'https://www.sciencebase.gov/catalog/file/get/5787cd0ae4b0d27deb3754f2?f=__disk__b4%2Fae%2Fbf%2Fb4aebf82009a2aaadaa4d7b84fdcade7589c722b',
  'source_file_date': '2016-07-14T17:33:42.000Z'}}

In [4]:
%%time
while processable_item is not None:
    sgcn.cache_item_data(processable_item["body"])
    sgcn.delete_message("mq_sgcn_items", processable_item["id"])
    processable_item = sgcn.get_message("mq_sgcn_items")

CPU times: user 2min 14s, sys: 1min 59s, total: 4min 14s
Wall time: 6min 35s
