# iNaturalist API Example: Update Project Definition
- Link: https://jumear.github.io/stirpy/lab?path=iNat_update_project.ipynb
- GitHub Repo: https://github.com/jumear/stirpy

This example shows how to update a project via an undocumented API endpoint `PUT /v1/projects`.

The primary use case for this is add large sets of project rules to a collection project. For example, if you have a lot of taxa that you want to include in your project, you can define a source set of taxa as (1) your own custom list of ids, (2) extracted from another project, or (3) extracted from an iNaturalist taxon list.

There is also some bonus code near the end that shows how to update other parts of a project, and you can use / adapt parts of the code here to get lists of taxa from a project or an iNat list.

## Get / Update Data from the iNaturalist API

In [None]:
# load required modules
import asyncio # used for asynchronous fetching
import math # used for a ceiling method
from functools import partial # used for pre-loading functions with some arguments
from datetime import datetime # used to convert string datetimes into actual datetimes
import json # used to create the body for PUT and POST requests that require a payload

# use Pyodide's pyfetch module if possible, but fall back to urllib3 outside of Pyodide
try:
    from pyodide.http import pyfetch # Pyodide's fetch function (asynchronous)
except:
    #!pip install urllib3
    import urllib3 # fall back to urllib3 if pyfetch isn't available. it can be made asynchronous using asynchio.to_thread().

In [None]:
# define custom functions used for getting data

def params_to_dict(params_string):
    """Convert a parameter string into a dict.
    ex.: 'taxon_id=1&user_id=kueda,loarie&field:Eating' => {'taxon_id': ['1'], 'user_id': ['kueda', 'loarie'], 'field:Eating': None}
    """
    params_dict = {pkv[0]: pkv[1].split(',') if len(pkv) > 1 else None for pkv in [ps.split('=') for ps in params_string.split('&')]}
    return params_dict

def url_with_params(url_base, params=None):
    """Combine a base url with a set of parameters. Can handle the following types of cases:
    1. 'https://api.inaturalist.org/v1/observations' + {'taxon_id': [1], 'user_id': ['kueda','loarie']} => 'https://api.inaturlaist.org/v1/observations?taxon_id=1&user_id=kueda,loarie'
    2. 'https://api.inaturalist.org/v1/places/{id}' + {'id': [1,2,3], 'admin_level': [0,10]} => 'https://www.api.inaturalist.org/v1/places/1,2,3?admin_level=0,10'
    """
    if params is None:
        params = {}
    url = url_base
    for p, v in params.items():
        pv = ','.join(v) if v is not None else None
        if url.find(pp:=f'{{{p}}}') >= 0:
            url = url.replace(pp, pv)
        else:
            s = '?' if url.find('?') < 0 else '&'
            url += f'{s}{p}={pv}' if pv is not None else f'{s}{p}'
    return url

async def fetch_data(url, method='GET', use_authorization=False, delay=0, body=None):
    """Fetch and convert repsonse to JSON"""
    await asyncio.sleep(delay)
    req_headers = {}
    if use_authorization and jwt:
        req_headers = req_headers_base.copy() # make a copy
        req_headers['Authorization'] = jwt
    if 'pyfetch' in globals():
        response = await pyfetch(url, method=method, headers=req_headers, body=body)
        data = await response.json()
    else:
        response = await asyncio.to_thread(urllib3.request, method, url, headers=req_headers, body=body)
        data = response.json()
    print(f'Fetch complete: {method} {url}')
    return data

async def get_total_results(endpoint, params=None, use_authorization=False, delay=0):
    """GET total_results (count) from the API"""
    if params is None:
        params = {}
    rp = params.copy() # make a copy
    rp.pop('per_page', None) # remove per_page parameter, if it exists
    rp['per_page'] = ['0'] # set this to 0, since we need only the count, not the actual records
    data = await fetch_data(url_with_params(endpoint['url'], rp), use_authorization=use_authorization, delay=delay)
    total_results = int(data[endpoint.get('record_count_field','total_results')])
    print(f'Total records: {str(total_results)}')
    return total_results

async def get_results_single_page(endpoint, params=None, use_authorization=False, parse_function=None, pre_parse_filter_function=None, post_parse_filter_function=None, delay=0):
    """GET a single page of results from the API. Can be called directly but generally is intended to be called by get_results.
    Additional parsing and additional filtering before and after parsing can happen here, too.
    """
    if params is None:
        params = {}
    rp = params.copy() # make a copy
    data = await fetch_data(url_with_params(endpoint['url'], rp), use_authorization=use_authorization, delay=delay)
    results = data.get(endpoint.get('record_array','results'),[])
    if pre_parse_filter_function:
        results = list(filter(pre_parse_filter_function, results))
    if parse_function:
        results = parse_function(results)
    if post_parse_filter_function:
        results = list(filter(post_parse_filter_function, results))
    return results

async def get_results(endpoint, params=None, get_all_pages=False, use_authorization=False, parse_function=None, pre_parse_filter_function=None, post_parse_filter_function=None):
    """GET results from the API. When get_all_pages=True, get results over multiple pages using 1 of 2 methods:
    1. When the endpoint definition includes a page_key field, group key items into batches of up to a max number of records per page / batch.
       Suppose: endpoint = {'url': 'https://api.inaturalist.org/v1/taxa/{id}', 'page_key': 'id', 'max_per_page': 30 } and params = {'id': ['1','2','3',...,'60']}
       Then: GET https://api.inaturalist.org/v1/taxa/1,2,3,...,30; GET https://api.inaturalist.org/v1/taxa/31,32,33,...,60
    2. In other cases, get pages with the max records per page, up to the maximum record limit that the API endpoint provides.
       Suppose: endpoint = {'url': 'https://api.inaturalist.org/v1/observations', 'max_records': 10000, 'max_per_page': 200 } and params = {'taxon_id': ['1']}
       Then: GET https://api.inaturalist.org/v1/observations?taxon_id=1&per_page=200&page=1; GET https://api.inaturalist.org/v1/observations?taxon_id=1&per_page=200&page=2; etc...
    Get pages in parallel, with each page request having an incrementally delayed start. (iNaturalist suggests limiting requests to ~1 req/second.)
    """
    if params is None:
        params = {}
    results = []
    if (page_key := endpoint.get('page_key')):
        if not (page_key_values := params.get(page_key)):
            print(f'Cannot query from this endpoint without values for {page_key} parameter')
            return None
        # if more values are input than the max per page, split these into multiple batches
        max_per_page = endpoint['max_per_page']
        total_key_values = len(page_key_values)
        batches = [page_key_values[i:i+max_per_page] for i in range(0, total_key_values, max_per_page)]
        print(f'There are {total_key_values} {page_key} values, requiring {len(batches)} API requests to retrieve. Retrieving {"all sets" if get_all_pages else "only the first set"}...')
        async with asyncio.TaskGroup() as tg: # available in Python 3.11+
            tasks = []
            for i in (range(len(batches) if get_all_pages else 1)):
                rp = params.copy() # make a copy
                rp[page_key] = batches[i]
                tasks.append(tg.create_task(get_results_single_page(endpoint, params=rp, use_authorization=use_authorization, parse_function=parse_function, pre_parse_filter_function=pre_parse_filter_function, post_parse_filter_function=post_parse_filter_function, delay=i)))
        for t in tasks:
            results += t.result()
    else:
        max_page = math.ceil(endpoint['max_records'] / endpoint['max_per_page']) if get_all_pages else 1
        if get_all_pages:
            # when getting all pages, make a small query first to find how many total records there are.
            # this allows us to calculate how many requests we need to make in total.
            # if total records exceeds the maximum that the API will return, then retrieve only up to the maximum.
            total_results = await get_total_results(endpoint, params, use_authorization)
            total_pages = math.ceil(total_results / endpoint['max_per_page'])
            if total_pages < max_page:
                max_page = total_pages
            print(f'Pages to retrieve: {str(max_page)}')
        async with asyncio.TaskGroup() as tg: # available in Python 3.11+
            tasks = []
            for i in range(max_page):
                rp = params.copy() # make a copy
                if get_all_pages:
                    # if getting all pages, remove per_page and page parameters if they exist in the base params
                    # and then set per_page = max and increment page for each request
                    rp.pop('per_page', None)
                    rp.pop('page', None)
                    rp['per_page'] = [str(endpoint['max_per_page'])] # set this to the max if we're getting all pages
                    rp['page'] = [str(i+1)]
                tasks.append(tg.create_task(get_results_single_page(endpoint, params=rp, use_authorization=use_authorization, parse_function=parse_function, pre_parse_filter_function=pre_parse_filter_function, post_parse_filter_function=post_parse_filter_function, delay=i)))
        for t in tasks:
            results += t.result()
    print(f'Total records retrieved: {str(len(results))}')
    return results

def parse_simple(results, field_list):
    """Return only specific fields from the results. Only top-level items may be specified in the field list, but children of selected items will be included with the returned values."""
    return [{k: r.get(k) for k in field_list} for r in results]

async def get_taxa_from_list(list_id, additional_params_string=None, get_all_pages=False, parse_function=None, pre_parse_filter_function=None, post_parse_filter_function=None):
    """Get a list of taxa from an iNaturalist list, given a list ID and optional additional parameters (ex."rank=species").
    Note: For some reason, calls to this API endpoint will require authorization. So make sure JWT is defined in global variables.
    """
    req_params = params_to_dict('order_by=taxon_id&id=' + str(list_id) + (f'&{s}' if (s := additional_params_string) else ''))
    results = await get_results(endpoint_get_lists, params=req_params, get_all_pages=get_all_pages, use_authorization=True, parse_function=parse_function, pre_parse_filter_function=pre_parse_filter_function, post_parse_filter_function=post_parse_filter_function)
    return results
    
async def get_taxon_ids_from_list(list_id, additional_params_string=None):
    """Get a unique list of taxon_ids from an iNaturalist list, given a list ID and optional additional parameters (ex."rank=species").
    Note: For some reason, calls to this API endpoint will require authorization. So make sure JWT is defined in global variables.
    """
    print(f'Getting taxa from list {list_id}...')
    results = await get_taxa_from_list(list_id, additional_params_string, get_all_pages=True, parse_function=partial(parse_simple,field_list=['taxon_id']))
    return list(set([r['taxon_id'] for r in results]))

async def update_project(project_id, body=None):
    """Update a project given a project_id and payload.
    Note: This requires authorization. So make sure JWT is defined in global variables.
    """
    results = await fetch_data(url_with_params(endpoint_put_projects_id['url'], params_to_dict(f'id={project_id}')), method='PUT', use_authorization=True, body=body)
    return results

async def update_project_rules(project_id, project_rules_updates):
    """Updates a project, given a project id and a list of rules updates"""
    max_batch_size = 500 # API requests seem to be blocked if too many deletes are included in the request. so we will process updates in batches.
    rules_updates_count = len(project_rules_updates)
    print(f'Processing project rule updates in {str(math.ceil(rules_updates_count/max_batch_size))} batches of up to {str(max_batch_size)} updates...')
    for i in range(0, rules_updates_count, max_batch_size):
        print(f'Executing batch {str(int(i/max_batch_size+1))}...')
        project_update = {
            'project': {
                'project_observation_rules_attributes': project_rules_updates[i:i+max_batch_size]
            }
        }
        results = await update_project(project_id, body=json.dumps(project_update))

async def get_project_rules(project_id, operand_type=None, operator=None, parse_function=None, pre_parse_filter_function=None, post_parse_filter_function=None):
    req_params = params_to_dict(f'rule_details=true&id={project_id}')
    project_details = await get_results(endpoint_get_projects_id, req_params, parse_function=parse_function, pre_parse_filter_function=pre_parse_filter_function, post_parse_filter_function=post_parse_filter_function)
    project_rules = [r for r in project_details[0]['project_observation_rules'] if ((operand_type is None or r['operand_type'] == operand_type) and (operand_type is None or r['operator'] == operator))]
    return project_rules

async def get_project_rules_operand_ids(project_id, operand_type, operator, pre_parse_filter_function=None, post_parse_filter_function=None):
    print(f'Getting project rules from project {project_id} for operand_type={operand_type} and operator={operator}...')
    results = await get_project_rules(project_id, operand_type, operator)
    return list(set([r['operand_id'] for r in results]))

def items_to_batches(items, max_batch_size=500, separator=',', prefix=''):
    """String together a list of items into batches of up to a max number of items per set.
    (The original intended use case is to create URLs linking to the iNaturalist Explore or Identification page, filtered for batches of specific observations.)
    """
    batches = []
    for i in range(0, len(items), max_batch_size):
        items_string = prefix + separator.join(map(str, items[i:i+max_batch_size]))
        batches.append(items_string)
        print(f'Batch {int(i/max_batch_size+1)}: {items_string}')
    return batches

In [None]:
# define the parameters needed for your request
project_id = 'pisum-s-personal-project'
req_headers_base = {'Content-Type': 'application/json', 'Accept': 'application/json'}

# to make authorized calls, set jwt to the "api_token" value from https://www.inaturalist.org/users/api_token.
# the JWT is valid for 24 hours. it can be used to do / access anything your iNat account can access. so keep it safe, and don't share it.
# you will also have to set use_authorization=True when making your API request below.
jwt = None

# define endpoints
endpoint_get_lists = {
    'method': 'GET',
    'url': 'https://www.inaturalist.org/lists/{id}.json',
    'max_records': 10000,
    'max_per_page': 200,
    'record_array': 'listed_taxa',
    'record_count_field': 'total_entries',
}
endpoint_get_projects_id = {
    'method': 'GET',
    'url': 'https://api.inaturalist.org/v1/projects/{id}',
    'max_records': 10000,
    'max_per_page': 200,
}
endpoint_put_projects_id = {
    'method': 'PUT',
    'url': 'https://api.inaturalist.org/v1/projects/{id}',
}

In [None]:
# define the model rules that you want to replicate into your project
# set source_rules[type][include/exclude][ids] = None (or just don't define ids) if you want to skip updates related to these rules
# set source_rules[type][include/exclude][ids] = [] if you want to delete all existing related rules
# use get_project_rules_operand_ids(project_id, operand_type, operator) to get a list of operator ids from another project's rules
# use get_taxon_ids(list_id, additional_params_string) to get a list of taxon ids from an iNatualist list

source_rules = {
    'taxon': {
        'operand_type': 'Taxon',
        'include': {'operator': 'in_taxon?'},
        'exclude': {'operator': 'not_in_taxon?'},
    },
    'place': {
        'operand_type': 'Place',
        'include': {'operator': 'observed_in_place?'},
        'exclude': {'operator': 'not_observed_in_place?'},
    },
    'user': {
        'operand_type': 'User',
        'include': {'operator': 'observed_by_user?'},
        'exclude': {'operator': 'not_observed_by_user?'},
    },
}

source_rules['taxon']['include']['ids'] = await get_taxon_ids_from_list(946645, additional_params_string='rank=species')
#source_rules['taxon']['include']['ids'] = await get_project_rules_operand_ids('dangerous-plants-animals-and-fungi-of-the-united-kingdom', operand_type=source_rules['taxon']['operand_type'], operator=source_rules['taxon']['include']['operator'])
#source_rules['taxon']['include']['ids'] = []
#source_rules['taxon']['include']['ids'] = None

source_rules['taxon']['exclude']['ids'] = await get_taxon_ids_from_list(4347551)
#source_rules['taxon']['exclude']['ids'] = []


In [None]:
# Compare the source rules vs the existing target project rules to determine the changes to make on the target project rules

project_rules = await get_project_rules(project_id)
#print(project_rules)

project_rules_updates = [];
for source_type_key, source_type in source_rules.items():
    for source_set_key, source_set in source_type.items():
        if isinstance(source_set, dict) and (operator := source_set.get('operator')) and (ids := source_set.get('ids') is not None) and (operand_type := source_type.get('operand_type')):
            existing_rules = [r for r in project_rules if (r['operator'] == operator and r['operand_type'] == operand_type)]
            print(f'For {source_set_key} {source_type_key} rules:')
            # ignore existing rules which are also in the source
            rules_to_ignore = [{'id': r['id'], 'operator': r['operator'], 'operand_type': r['operand_type'], 'operand_id': r['operand_id'], '_destroy': False} for r in existing_rules if r['operand_id'] in source_set['ids']]
            print(f'- ignore {str(len(rules_to_ignore))} existing rules')
            # delete existing rules which are not in the source
            rules_to_delete = [{'id': r['id'], 'operator': r['operator'], 'operand_type': r['operand_type'], 'operand_id': r['operand_id'], '_destroy': True} for r in existing_rules if r['operand_id'] not in source_set['ids']]
            print(f'- delete {str(len(rules_to_delete))} existing rules')
            # add new rules for taxa from the source which do not already exist
            rules_to_add = [{'operator': operator, 'operand_type': operand_type, 'operand_id': s} for s in source_set['ids'] if s not in [r['operand_id'] for r in existing_rules]]
            print(f'- add {str(len(rules_to_add))} new rules')
            project_rules_updates += rules_to_delete + rules_to_add

In [None]:
# Execute the request(s) the will finalize the changes
results = await update_project_rules(project_id, project_rules_updates)

In [None]:
# get a new snapshot of the project rules
project_rules_after_update = await get_project_rules(project_id)

project_rules_flattened = [
    {
        'id': r['id'],
        'operator': r['operator'],
        'operand_type': r['operand_type'],
        'operand_id': r['operand_id'],
        'id': r['id'],
        'taxon_name': d['name'] if (d := r.get('taxon')) else None,
        'taxon_rank': d['rank'] if (d := r.get('taxon')) else None,
        'place_slug': d['slug'] if (d := r.get('place')) else None,
        'place_name': d['name'] if (d := r.get('place')) else None,
        'user_login': d['login'] if (d := r.get('user')) else None,
    }
    for r in project_rules_after_update
]
#print(project_rules_flattened)

In [None]:
# this is just some bonus code to show how to get taxa from a list

# list_taxa = await get_taxa_from_list(946645, additional_params_string='rank=species', get_all_pages=False)
# list_taxa_flattened = [{'id': t['id'], 'taxon_id': t['taxon_id'], 'name': t['taxon']['name'], 'rank': t['taxon']['rank']} for t in list_taxa]

#print(list_taxa_flattened)

In [None]:
# this is just some bonus code that shows how to update other parts of the project definition (in this case, the project description)

# project_definition_updates = {
#     'project': {
#         'description': 'test'
#     }
# }
# results = await update_project(project_id, body=json.dumps(project_definition_updates))

## Write Data to CSV

Ths takes the results retrieved above and writes them to a CSV file. The file will appear in the main folder of the file tree (the topmost tab in the left pane of the JupyterLab interface). Files generated in JupyterLite are saved to the browser's storage. So those will need to be downloaded to a more permanent location if they need to be archived more permanently.

In [None]:
# load required modules
import csv # used to output CSV files

In [None]:
def data_to_csv(data, csv_filename='export.csv'):
    """Write data to a CSV file"""
    csv_fields = list(data[0]) # get fields from the keys of the first record in the dataset
    with open(csv_filename, 'w', newline='') as csv_file:
        csv_writer = csv.DictWriter(csv_file, fieldnames=csv_fields)
        csv_writer.writeheader()
        csv_writer.writerows(data)
        print(f'Created CSV file {csv_filename} with {len(data)} records.')

In [None]:
# export to CSV
data_to_csv(project_rules_flattened,'project_rules.csv')
#data_to_csv(list_taxa_flattened,'list_taxa.csv')