For many users, it may be valuable to pull together some particular properties that can be summarized to a simple value per each species record and view them in a spreadsheet program of one kind or another in order to slice the data in various ways or run reports. This notebook runs through all of the data generated, making a number of key decisions about how to summarize, and generates a flat CSV output in the cache folder for this type of use.

In [1]:
import pandas as pd
import json
from IPython.display import display

In [2]:
taxonomy_lookup = dict()

with open('../cache/itis.json', 'r') as f:
    itis_species = json.loads(f.read())
    f.close()

for record in [i for i in itis_species if "data" in i.keys()]:
    valid_itis_doc = next((i for i in record["data"] if i["usage"] in ["valid","accepted"]), None)
    if valid_itis_doc is not None:
        for k, v in record["parameters"].items():
            itis_lookup_identifier = v
        taxonomy_lookup[itis_lookup_identifier] = {
            "taxonomic_reference": "ITIS",
            "taxonomic_rank": valid_itis_doc["rank"]
        }
        for i in valid_itis_doc["biological_taxonomy"]:
            taxonomy_lookup[itis_lookup_identifier][i["rank"]] = i["name"]

with open('../cache/worms.json', 'r') as f:
    worms_species = json.loads(f.read())
    f.close()
    
for w in [i for i in worms_species if i["processing_metadata"]["status"] != "error"]:
    worms_accepted = next((i for i in w["data"] if i["status"] == "accepted"), None)
    if worms_accepted is not None:
        lookup_name = w["processing_metadata"]["api"].split("/")[-1].split("?")[0]
        taxonomy_lookup[lookup_name] = {
            "taxonomic_reference": "WoRMS",
            "taxonomic_rank": worms_accepted["rank"]
        }
        for i in worms_accepted["biological_taxonomy"]:
            taxonomy_lookup[lookup_name][i["rank"]] = i["name"]

In [3]:
with open('../cache/workplan_species.json', 'r') as f:
    workplan_species = json.loads(f.read())
    f.close()

for spp in workplan_species:
    if spp["ITIS TSN"] in taxonomy_lookup.keys():
        taxonomy_record = taxonomy_lookup[spp["ITIS TSN"]]
    elif spp["Lookup Name"] in taxonomy_lookup.keys():
        taxonomy_record = taxonomy_lookup[spp["Lookup Name"]]
    else:
        taxonomy_record = None
    if taxonomy_record is not None:
        for k, v in taxonomy_record.items():
            spp[k] = v

In [4]:
with open('../cache/tess.json', 'r') as f:
    tess_data = json.loads(f.read())
    f.close

tess_lookup = dict()
for r in [i for i in tess_data if "data" in i.keys()]:
    lookup_id = r["processing_metadata"]["api"].split("=")[-1].replace("]","").replace('"','')
    sp_records = r["data"]["SPECIES_DETAIL"]
    if not isinstance(sp_records, list):
        sp_records = [sp_records]
    tess_lookup[lookup_id] = {
        "STATUS": ",".join(list(set([i["STATUS"] for i in sp_records]))),
        "STATUS_TEXT": ",".join(list(set([i["STATUS_TEXT"] for i in sp_records]))),
        "ECOS_SPCODE": ",".join(list(set([i["SPCODE"] for i in sp_records])))
    }

for spp in workplan_species:
    if spp["ITIS TSN"] in tess_lookup.keys():
        tess_record = tess_lookup[spp["ITIS TSN"]]
    elif spp["Lookup Name"] in tess_lookup.keys():
        tess_record = tess_lookup[spp["Lookup Name"]]
    else:
        tess_record = None
    if tess_record is not None:
        for k, v in tess_record.items():
            spp[k] = v

In [5]:
with open('../cache/iucn.json', 'r') as f:
    iucn_data = json.loads(f.read())
    f.close

iucn_lookup = dict()
for r in [i for i in iucn_data if i["processing_metadata"]["status"] == "success"]:
    iucn_lookup[r["parameters"]["Scientific Name"]] = {
        "iucn_reference": r["data"]["doi"],
        "iucn_status_code": r["data"]["iucn_status_code"],
        "iucn_status_name": r["data"]["iucn_status_name"],
        "iucn_population_trend": r["data"]["iucn_population_trend"],
        "iucn_record_date": r["data"]["record_date"]
    }

for spp in [i for i in workplan_species if i["Lookup Name"] in iucn_lookup.keys()]:
    iucn_record = iucn_lookup[spp["Lookup Name"]]
    for k, v in iucn_record.items():
        spp[k] = v

In [6]:
with open('../cache/natureserve.json', 'r') as f:
    natureserve_data = json.loads(f.read())
    f.close

natureserve_lookup = dict()
for r in [i for i in natureserve_data if i["processing_metadata"]["status"] == "success"]:
    natureserve_lookup[r["parameters"]["Scientific Name"]] = {
        "natureserve_reference": r["data"]["natureServeGlobalConcept"]["natureServeExplorerURI"],
        "natureserve_status": r["data"]["roundedNationalConservationStatus"]
    }
    try:
        natureserve_lookup[r["parameters"]["Scientific Name"]]["natureserve_last_review_date"] = r["NatureServe Species"]["nationalConservationStatus"]["@lastReviewedDate"]
    except:
        natureserve_lookup[r["parameters"]["Scientific Name"]]["natureserve_last_review_date"] = "unknown"

for spp in [i for i in workplan_species if i["Lookup Name"] in natureserve_lookup.keys()]:
    natureserve_record = natureserve_lookup[spp["Lookup Name"]]
    for k, v in natureserve_record.items():
        spp[k] = v
        
natureserve_data = None
natureserve_lookup = None

In [7]:
with open('../cache/sgcn.json', 'r') as f:
    sgcn_data = json.loads(f.read())
    f.close()

sgcn_lookup = dict()
for r in [i for i in sgcn_data if i["processing_metadata"]["status"] == "success"]:
    sgcn_lookup[r["parameters"]["Scientific Name"]] = {
        "sgcn_statelist_2005": r["data"]["statelist_2005"],
        "sgcn_statelist_2015": r["data"]["statelist_2015"]
    }

for spp in [i for i in workplan_species if i["Lookup Name"] in sgcn_lookup.keys()]:
    for k, v in sgcn_lookup[spp["Lookup Name"]].items():
        spp[k] = v

sgcn_data = None
sgcn_lookup = None

In [8]:
with open('../cache/gbif.json', 'r') as f:
    gbif_data = json.loads(f.read())
    f.close()

gbif_lookup = dict()
for r in [i for i in gbif_data if i["processing_metadata"]["status"] == "success"]:
    gbif_lookup[r["parameters"]["Scientific Name"]] = {
        "gbif_reference": r["data"]["resolvable_identifier"],
        "gbif_taxonomic_status": r["data"]["TaxonomicStatus"]
    }
    if "count" in r["data"]["Occurrence Summary"].keys():
        gbif_lookup[r["parameters"]["Scientific Name"]]["gbif_occurrence_records"] = r["data"]["Occurrence Summary"]["count"]
    else:
        gbif_lookup[r["parameters"]["Scientific Name"]]["gbif_occurrence_records"] = 0

for spp in [i for i in workplan_species if i["Lookup Name"] in gbif_lookup.keys()]:
    for k, v in gbif_lookup[spp["Lookup Name"]].items():
        spp[k] = v

gbif_data = None
gbif_lookup = None

In [9]:
with open('../cache/xdd.json', 'r') as f:
    xdd_data = json.loads(f.read())
    f.close()

xdd_lookup = dict()
for r in [i for i in xdd_data if i["processing_metadata"]["status"] == "success"]:
    xdd_lookup[r["parameters"]["Search Term"]] = {
        "xdd_reference": r["processing_metadata"]["api"],
        "xdd_number_docs": len(r["data"])
    }

for spp in [i for i in workplan_species if i["Lookup Name"] in xdd_lookup.keys()]:
    for k, v in xdd_lookup[spp["Lookup Name"]].items():
        spp[k] = v

xdd_data = None
xdd_lookup = None

In [10]:
with open('../cache/sb_datarelease.json', 'r') as f:
    sb_data = json.loads(f.read())
    f.close()

sb_lookup = dict()
for r in [i for i in sb_data if "data" in i.keys()]:
    sb_lookup[r["parameters"]["q"]] = {
        "sb_reference": r["processing_metadata"]["api"],
        "sb_number_items": len(r["data"])
    }

for spp in [i for i in workplan_species if i["Lookup Name"] in sb_lookup.keys()]:
    for k, v in sb_lookup[spp["Lookup Name"]].items():
        spp[k] = v

sb_data = None
sb_lookup = None

In [11]:
with open('../cache/gap.json', 'r') as f:
    gap_data = json.loads(f.read())
    f.close()

with open('../cache/gap_metrics.json', 'r') as f:
    gap_metrics = json.loads(f.read())
    f.close()

gap_lookup = list()
for r in [i for i in gap_data if "data" in i.keys()]:
    gap_lookup.append({
        "scientific_name_source": r["parameters"]["Name Source"],
        "scientific_name": r["parameters"]["Scientific Name"],
        "GAP_SpeciesCode": r["data"]["GAP_SpeciesCode"],
        "GAP_StatesWithHabitat": ",".join([st["state_name"] for st in next(s for s in gap_metrics if s["GAP_SpeciesCode"] == r["data"]["GAP_SpeciesCode"])["State Metrics"]])
    })

for spp in workplan_species:
    check_list = [spp["Lookup Name"]]
    if "Species" in spp.keys():
        check_list.append(spp["Species"])
    gap_spp = next((i for i in gap_lookup if i["scientific_name"] in check_list), None)
    if gap_spp is not None:
        spp["GAP_SpeciesCode"] = gap_spp["GAP_SpeciesCode"]
        spp["GAP_StatesWithHabitat"] = gap_spp["GAP_StatesWithHabitat"]


In [12]:
[i for i in workplan_species if "GAP_SpeciesCode" in i.keys()]

[{'Guild': 'Amphibians',
  'Species Name (Common)': 'Cascade torrent salamander',
  'Scientific Name': 'Rhyacotriton cascadae',
  'Lead FWS Regional Office': 'Region 1 - Pacific (Northwest)',
  'Proposed FWS Decision Timeframe (Fiscal Year)': 2023,
  'Range': 'OR, WA',
  'Bin': None,
  'Lookup Name': 'Rhyacotriton cascadae',
  'ECOS Link': 'https://ecos.fws.gov/ecp/species/1375',
  'ITIS TSN': '550250',
  'taxonomic_reference': 'ITIS',
  'taxonomic_rank': 'Species',
  'Kingdom': 'Animalia',
  'Subkingdom': 'Bilateria',
  'Infrakingdom': 'Deuterostomia',
  'Phylum': 'Chordata',
  'Subphylum': 'Vertebrata',
  'Infraphylum': 'Gnathostomata',
  'Superclass': 'Tetrapoda',
  'Class': 'Amphibia',
  'Order': 'Caudata',
  'Family': 'Rhyacotritonidae',
  'Genus': 'Rhyacotriton',
  'Species': 'Rhyacotriton cascadae',
  'STATUS': 'UR',
  'STATUS_TEXT': 'Under Review in the Candidate or Petition Process',
  'ECOS_SPCODE': 'D03E',
  'iucn_reference': 'http://dx.doi.org/10.2305/IUCN.UK.2004.RLTS.T594

In [13]:
df_wp_spp = pd.DataFrame(workplan_species).to_csv('../cache/summarized_data.csv', index=False)