# AGSL Metadata Update

### Modified 2023-08-17

In [71]:
import arcpy
import requests
import re

import xml.etree.ElementTree as ET

from arcpy import metadata as md
from datetime import datetime
from pathlib import Path


# Metadata Update Stuff

In [72]:
def get_dataset_metadata(dataset) -> tuple[str,md.Metadata,ET.Element]:
    """Gets a dataset's metadata
    
    Returns a tuple with three objects:
    - XML as a single string
    - arcpy.metadata.Metadata object
    - xml.etree.ElementTree Element Object for the root element (<metadata>)
    
    :param str dataset: The path to the dataset as a string
    :returns: a tuple with three object representations of the dataset's metadata
    :rtype: tuple[str,md.Metadata,ET.Element]
    """
    
    # get the md.Metadata object.
    dataset_Metadata_object = md.Metadata(dataset)
    
    # isReadOnly test to make sure we can write to the object else error
    if dataset_Metadata_object.isReadOnly is None: # This means that nothing was passed
        #ERROR
        print("A blank metadata object was created")
        return
    elif dataset_Metadata_object.isReadOnly is True: # This means that a URI was passed, but it isn't a valid XML document
        #ERROR
        print("Not a valid URI")
        return
    else:
        # create an ET for the root
        dataset_root_Element = ET.fromstring(dataset_Metadata_object.xml)
        
        # Return a tuple with the XML as a string, a Metadata object, and a Element object for the metadata (root) tag
        return dataset_Metadata_object.xml, dataset_Metadata_object, dataset_root_Element    

In [73]:
def check_if_existing_identifier(rootElement, find_string) -> bool:
    """ Check if the specified identifier exists in the metadata record
    
    Used in write_identifiers()
    
    returns a boolean (True or False) indicating if the string passed exists.
    Uses the ET.findall() syntax to create a list of matches.
    If there is 1 or more matches, returns true. Else returns false.
    
    :param xml.etree.ElementTree.Element rootElement: The root Element of the metadata
    :param str find_string: a string using xml.etree.ElementTree.findall syntax
    :returns: True if one or more of the element is found, False if none found.
    :rtype: bool    
    """
    dataset_identifier_list = rootElement.findall(find_string) # Returns a list
    if len(dataset_identifier_list) > 0:
        return True
    else:
        return False

In [74]:
def get_alt_title(metadata_root_element) -> str:
    dataset_identifier_list = metadata_root_element.findall('.//resAltTitle') # Returns a list
    if len(dataset_identifier_list) == 1:
        return dataset_identifier_list[0].text
    else:
        return

In [70]:
def write_identifiers(rootElement, arkid, right_string, *identifiers) -> ET.Element:
    """ Inserts the arkid as text property of specified identifiers in the metadata Element 
    
    Returns the Element if successful.
    Valid identifiers: "mdFileID", "dataSetURI", "identCode"
    
    TODO: Test to ensure the ark that is passed is valid? With a REGEX or maybe even a request?
    TODO: Access constraints
        dct_accessrights_s - XSLT looks at legalconstraints/other constraints
        /MD_Metadata/identificationInfo/MD_DataIdentification/resourceConstraints[13]/MD_LegalConstraints/otherConstraints/gco:CharacterString
    TODO: Update the dataSetURI to use the download URL rather than the application URL
        
    :param xml.etree.Element rootElement: The root Element of the metadata
    :param str arkid: the full arkid as a string (e.g. '77981/gmgsfq9q48t')
    :param str right_string: 'public', 'restricted-uwm', 'restricted-uw-system' # Matches directory on the Apache server
    :param str *identifiers: a strig indicating which identifier to target. Conrolled Vocab.
    :returns: The root Element (metadata), having been updated.
    :rtype: xml.etree.ElementTree.Element
    """   
    
    ark_URI = 'https://geodiscovery.uwm.edu/ark:-' + arkid.replace('/','-')
    
    alt_title = get_alt_title(rootElement)
    download_URI = 'https://geodata.uwm.edu/' + right_string + '/' + alt_title + '.zip'
    
    for identifier in identifiers:    
        if identifier not in ["mdFileID","dataSetURI","identCode"]:
            print("Valid options for identifier are 'mdFileID', 'dataSetURI', or 'identCode'.")
            return
        elif identifier == 'identCode':
            identifier = r'.//citId/identCode'
            if check_if_existing_identifier(rootElement, identifier) is True:
                dataset_identCode = rootElement.find(identifier)
                dataset_identCode.text = ark_URI
            else:
                dataset_idCitation = rootElement.find('.dataIdInfo/idCitation')
                citId_Element = ET.SubElement(dataset_idCitation, 'citId', xmls="")
                identCode_Element = ET.SubElement(citId_Element, 'identCode')
                identCode_Element.text = ark_URI
        else:
            if check_if_existing_identifier(rootElement, identifier) is True:
                dataset_identifier = rootElement.find(identifier)
                if identifier == 'mdFileID':
                    dataset_identifier.text = f'ark:/77981/{arkid.split("/")[1]}'
                elif identifier == 'dataSetURI':
                    dataset_identifier.text = download_URI
            else:
                identifier_Element = ET.SubElement(rootElement, identifier)
                if identifier == 'mdFileID':
                    identifier_Element.text = f'ark:/77981/{arkid.split("/")[1]}'
                elif identifier == 'dataSetURI':
                    identifier_Element.text = download_URI 
    print(f'The URI for the dataset is {ark_URI}')
    print(f'The Download URL for the dataset is {download_URI}')
    return rootElement

In [12]:
def dual_metadata_export(dataset, md_outputdir=None, md_filename=None) -> tuple[Path,Path]:
    """Exports an ISO and FGDC format XML file for the dataset.
    
    :param str dataset: The path to the dataset as a string
    :param str md_outputdir: The directory to store the 2 new files. Default is the parent directory, or grandparent if parent is a FileGeodatabase.
    :param str md_filename: A AGSL format filename (e.g. geography_theme_year) to use as the metadata filename. 
    :returns: a tuple containing the Path to the ISO and FGDC XML Files
    :rtype: tuple[Path,Path]
    """
    # Create the Metadata Object
    dataset_md = md.Metadata(dataset)
    
    # If the md_outputdir optional argument is specified, use it was the output directory
    # Otherwise default to the parent directory of the dataset.
    # If the parent directory is a FileGeodatabase, use the Grandparent (parent dir of the Geodatabase)
    if md_outputdir is None:
        if Path(dataset).parent.suffix == ".gdb":
            parent = Path(dataset).parent.parent
        elif Path(dataset).parent.is_dir() is False:
            print("Parent is not a dir!")
            return
        else:
            parent = Path(dataset).parent
    else:
        if Path(md_outputdir).is_dir():
            parent = Path(md_outputdir)
        else:
            print("specified output directory is not a valid directory!")
            return
        
    # If the md_filename optional argument is specified, use it as the filename for the new metadata.
    # Otherwise, use the dataset's filename (the Path.stem) as the filename to prefix the _ISO or _FGDC suffix.
    if md_filename is None: # Default
        output_ISO_Path = parent / f'{Path(dataset).stem}_ISO.xml'
        output_FGDC_Path = parent / f'{Path(dataset).stem}_FGDC.xml'
    else:
        output_ISO_Path = parent / f'{md_filename}_ISO.xml'
        output_FGDC_Path = parent / f'{md_filename}_FGDC.xml'
    
    # Print the ISO path to the console and export the metadata
    print(output_ISO_Path)
    dataset_md.exportMetadata(output_ISO_Path, 'ISO19139_GML32', metadata_removal_option='REMOVE_ALL_SENSITIVE_INFO')
    
    # Print the FGDC path to the console and export the metadata
    print(output_FGDC_Path)
    dataset_md.exportMetadata(output_FGDC_Path, 'FGDC_CSDGM', metadata_removal_option='REMOVE_ALL_SENSITIVE_INFO')
    
    return output_ISO_Path, output_FGDC_Path

In [13]:
def searchForID(text) -> list:
    """Searches for an arkid in the response string
    
    Used in mintArk()
    
    Returns a list with three items:
    - [0] the full arkid
    - [1] the Name Authority Number
    - [2] the Assigned Name
    
    :param str text: The returned text from the minter
    :returns: a list containing the regex groups
    :rtype: list
    """
    arkregex = re.compile(r"(\d{5})\/(\w{11})")
    arkid = arkregex.search(text)
    return arkid

# Noid stuff

In [14]:
def mintArk(minter) -> dict:
    """Send the request to the minter. Catch connection errors.
    
    Returns a dictionary with three items:
    - "arc": the full arkid
    - "nan": the Name Authority Number
    - "asn": the Assigned Name
    
    :param str minter: The URL for the minter
    :returns dict arkDict: a dictionary with regex results
    :rtype: dict
    """
    try:
        r = requests.get(minter)
    except:
        print("There was a connection error! Check the URL you used to request the id!")
        return
    # Check the status code the minter returns. It should be 200 if the request was completed.
    if r.status_code != 200:
        print("There was a non-200 status code from the minter: " + r.status_code)
        return
    # If the status code is 200, then grab the text, run it through the searchForID function to get a string with the arkID
    else:
        minter_text = r.text
        regex_result = searchForID(minter_text) # This should return a list
        arkDict = {}
        arkDict.update({"arc": regex_result[0]})
        arkDict.update({"nan": regex_result[1]})
        arkDict.update({"asn": regex_result[2]})
        return arkDict

In [15]:
def get_date_range(metadata) -> str:
    while metadata.__class__ == arcpy.metadata.Metadata:
        root_Element = ET.fromstring(metadata.xml)
        try:
            tmBegin = root_Element.find('.//tmBegin').text
            tmEnd = root_Element.find('.//tmEnd').text
            date_when = f'{tmBegin}/{tmEnd}'
            return date_when
        except:
            date_when = root_Element.find('.//tmPosition').text
            return date_when
        else:
            return

In [16]:
def create_bind_params(metadata, right_string) -> dict:
    '''Create Bind Parameters
    Defines the DC Kernel Metadata objects that will bind to the Ark using the NOID binder.
    Input must be a arcpy.metadata.Metadata object.
    '''    
    if metadata.__class__ == arcpy.metadata.Metadata:
        root_Element = ET.fromstring(metadata.xml)
        mdfileid = root_Element.find('mdFileID').text
        ark_URI = 'https://www.geodiscovery.uwm.edu/' + mdfileid.replace('/','-')
        date_when = get_date_range(metadata)
        time_now = datetime.now().replace(microsecond=0).isoformat()
        parameter_dictionary = {
            "who": f'{metadata.credits}',
            "what": f'{metadata.title}',
            "when": f'{date_when}',
            "where": f'{ark_URI}',
            "meta-who": "University of Wisconsin-Milwaukee Libraries",
            "meta-when": f'{time_now}',
            "rights": f'{right_string}'  
        }
        return parameter_dictionary
    else:
        return

In [17]:
def construct_bind_request(arkid, bind_params) -> str:
    '''Construct Noid Bind request string
    This function will take an Ark ID and bind parameters and format the request data needed for the bind POST request.
    Returns a string.
    :param str arkid: from the metadata formatted like '77981/gmgsbr8mf8v'
    :param dict bind_params: A dictionary or DC-like parameters generated by create_bind_params() 
    :return str param_string: The data that will get passed to the bind request. It's noid bind set commands separated by newlines.
    '''
    if not arkid.__class__ == str and bind_params.__class__ == dict:
        print("at least one function argument is the wrong class")
        return
    if not len(bind_params) >= 1:
        print("the bind_params are a dict but it's empty.")
        return
    
    param_string = ''
    for key, value in bind_params.items():
        param_string = param_string + f'bind set {arkid} {key} "{value}"' + '\n'
    
    return param_string

In [18]:
def bindArk(metadata, arkid, right_string, binder) -> requests.models.Response:
    '''Bind Ark
    Sends a request to NOID to bind some parameters to an Ark ID
    :param str bind_url: The URL for the post request. Since we're adding multiple parameters, we use a - as it's only HTTP arg and then it expects a newline separated list of NOID commands
    :param arcpy.metadata.Metadata metadata: an ArcGIS Metadata object
    :param str right_string: 'public', 'restricted-uw-system', 'restricted-uwm'
    :param str arkid: the ArkID formatted like '77981/gmgsbr8mf8v'
    :return requests.models.Response response: A response object from the requests lib    
    '''
    bind_params = create_bind_params(metadata, right_string)
    bind_params_commands = construct_bind_request(arkid, bind_params)
    
    try:
        r = requests.post(binder, data=bind_params_commands)
    except:
        print("There was a connection error! Check the URL you used to bind the id!")
        return
    # Check the status code the minter returns. It should be 200 if the request was completed.
    if r.status_code != 200:
        print("There was a non-200 status code from the binder: " + r.status_code)
        return
    # If the status code is 200, then grab the text, run it through the searchForID function to get a string with the arkID
    else:
        return r


# Main Update Function

In [19]:
def agsl_metadata_update(dataset,
                         minter=r'https://digilib-dev.uwm.edu/noidu_gmgs?mint+1',
                         right_string ='public',
                         binder = r'https://digilib-dev.uwm.edu/noidu_gmgs?-',
                         md_output_directory=None,
                         md_filename=None
                        ) -> tuple[md.Metadata, str]:
    """AGSL Metadata Update
    
    Take a dataset, an optional minterURL, an optional rights string, an optional binder URL
    an optional output directory for metadata, and optional custom filename for the metadata outputs.
    
    Inserts the minted arkID into the metadata, exports ISO and FGDC format metadata, and then binds some core metadata parameters to the ArkID using NOID.
    
    :param str dataset: the path to the dataset as a string
    :param str minter: The URL for the minter
    :param str md_outputdir: The directory to store the 2 new files. Default is the parent directory, or grandparent if parent is a FileGeodatabase.
    :param str md_filename: A AGSL format filename (e.g. geography_theme_year) to use as the metadata filename.
    :returns: a tuple with the arcpy.metadata.Metadata object and a string representation of the arkid
    :rtype: tuple[arcpy.metadata.Metadata, str]
    """
    
    # Mint the Ark ID
    try:
        ark = mintArk(minter)
        print(f"The full ARK is: {ark['arc']}")
        print(f"The Name Authority # is: {ark['nan']}")
        print(f"The Asssigned Name is: {ark['asn']}")
        arkid = ark['arc']
    except:
        print("There was a problem with the mint request")
        return
    
    # Access and write the item Metadata
    try:    
        xml_text, dataset_Metadata, dataset_Element = get_dataset_metadata(dataset)
        dataset_Metadata.xml = ET.tostring(write_identifiers(dataset_Element,arkid, right_string, 'identCode', 'mdFileID', 'dataSetURI'),encoding='unicode')
        dataset_Metadata.save()
    except:
        print("there was a problem getting the dataset metadata")
        return
    
    # Export Metadata to FGDC and ISO formats
    try:
        dual_metadata_export(dataset, md_output_directory, md_filename)
    except:
        print("there was a problem exporting the metadata")
        return
    
    # Bind the ArkID
    try:
        bind_response = bindArk(dataset_Metadata, arkid, right_string, binder)
    except:
        print("There was a problem with the bind request")
        return
    
    return dataset_Metadata, arkid

# Listing and testing directories

In [20]:
def list_all_dirs(rootdir) -> list[tuple[Path,int]]:
    '''List All Directories
    - Lists all directories in a given root directory
    - Includes File Geodatabases with a .gdb extension
    - Does not include Feature Datasets within a File Geodatabase
    - Returns a list of pathlib.Path type directory paths
    '''
    rootdir = Path(rootdir)
    all_directories = []
    for path in sorted(rootdir.rglob("*")):
        if path.is_dir():
            depth = len(path.relative_to(rootdir).parts)
            path_tuple = (path, depth)
            all_directories.append(path_tuple)
    return all_directories

In [21]:
def test_dataset_type(rootdir) -> int:
    '''Test Dataset Type
    - Iterates through all subpaths
    - Checks for certain file extensions to determine type
    - Returns an integer for type:
        - 0: Error
        - 1: Shapefile
        - 2: FileGeodatabase
        - 3: ArcGRID Raster
        - 4: Other/Mutliple
    '''    
    rootdir = Path(rootdir)
    
    gdb_count = 0
    shp_count = 0
    raster_count = 0
    
    for path in Path(rootdir).rglob("*"):
        if path.suffix == ".gdb":
            gdb_count += 1
            continue
        elif path.suffix == ".shp":
            shp_count += 1
        elif path.suffix == ".adf":
            raster_count += 1
    
    if gdb_count == 0 and shp_count == 0 and raster_count == 0:
        return 0
    elif gdb_count == 0 and shp_count == 1 and raster_count == 0:
        return 1
    elif gdb_count == 1 and shp_count == 0 and raster_count == 0:
        return 2
    elif gdb_count == 0 and shp_count == 0 and raster_count >= 1:
        # Note that there might be more than one .adf file!
        return 3
    elif (gdb_count + shp_count + raster_count) >= 1:
        return 4
    else:
        return 0

In [22]:
def fetch_dataset_from_directory(directory) -> Path:
    '''Fetech Dataset From Directory
    Pass this function a direcotry Path or path as string and it will test what type of dataset it is and then return a Path to the dataset.
    :param pathlib.Path/str directory: a path to the directory as a pathlib.Path object or as a string
    :return pathlib.Path dataset: a pathlib.Path to the dataset that can be used in the arcpy.metadata.Metadata constructor
    '''
    directory = Path(directory) # This ensures that the directory is a Path (A Path(Path) is still a Path)
    dataset_type = test_dataset_type(directory)
    if dataset_type != 0: # 0 would mean there is an error
        if dataset_type == 1: # Shapefile
            dataset = next(Path(directory).rglob("*.shp"))
        elif dataset_type == 2: # FileGeodatabase
            geodatabase = next(Path(directory).rglob("*.gdb")) # Path Representation of the geodatabase
            arcpy.env.workspace = str(geodatabase) # This can't be a Path, it has to be a path as string.
            feature_dataset_list = arcpy.ListDatasets("*","feature")
            if not len(feature_dataset_list) > 1:
                dataset = Path(geodatabase) / feature_dataset_list[0]
            else:
                return
        elif dataset_type == 3: # Raster Dataset... ArcGrid only for now.
            arcpy.env.workspace = str(directory) # This can't be a Path, it has to be a path as string.
            raster_dataset_list = arcpy.ListRasters("*")
            if not len(raster_dataset_list) > 1:
                dataset = directory / raster_dataset_list[0]
            else:
                return  
        elif dataset_type == 4:
            return        
    else:
        return
    
    if md.Metadata(dataset).__class__ == arcpy.metadata.Metadata:
        return dataset
    else:
        return

# Run it on the test fixture:

In [59]:
for dataset_directory in list_all_dirs(r"C:\\Users\\srappel\\Desktop\\Test Fixture Data"): # Gets all the directories
    if dataset_directory[1] == 1: # Filters it to only children of the root directory
        dataset = fetch_dataset_from_directory(dataset_directory[0]) # fetches the dataset path
        if not dataset is None: 
            print(f'Processing: {dataset}:')
            arkid = agsl_metadata_update(dataset)[1] # updates the metadata and returns the arkid
            print(f'NOID URL is: https://digilib-dev.uwm.edu/noidu_gmgs?get+{arkid}' + '\n')
        
            

Processing: C:\Users\srappel\Desktop\Test Fixture Data\DoorCounty_Lighthouses_2010_UW\DoorCounty_Lighthouses_2010.shp:
The full ARK is: 77981/gmgsqv3c348
The Name Authority # is: 77981
The Asssigned Name is: gmgsqv3c348
The URI for the dataset is https://www.geodiscovery.uwm.edu/ark:-77981-gmgsqv3c348
C:\Users\srappel\Desktop\Test Fixture Data\DoorCounty_Lighthouses_2010_UW\DoorCounty_Lighthouses_2010_ISO.xml
C:\Users\srappel\Desktop\Test Fixture Data\DoorCounty_Lighthouses_2010_UW\DoorCounty_Lighthouses_2010_FGDC.xml
NOID URL is: https://digilib-dev.uwm.edu/noidu_gmgs?get+77981/gmgsqv3c348

Processing: C:\Users\srappel\Desktop\Test Fixture Data\Milwaukee_AldermanicWards_1896-1901\Milwaukee_AldermanicWards_1896-1901.shp:
The full ARK is: 77981/gmgsm61bp34
The Name Authority # is: 77981
The Asssigned Name is: gmgsm61bp34
The URI for the dataset is https://www.geodiscovery.uwm.edu/ark:-77981-gmgsm61bp34
C:\Users\srappel\Desktop\Test Fixture Data\Milwaukee_AldermanicWards_1896-1901\Milwau