In [47]:
from datetime import datetime
import sys
import requests
import xml.etree.ElementTree as ET
from bs4 import BeautifulSoup
import re
import pandas as pd

class InPortValidator():

    program_list_url = "https://www.fisheries.noaa.gov/inport/help/components/programs"
    instrument_list_url = "https://www.fisheries.noaa.gov/inport/help/components/instruments"
    platform_list_url = "https://www.fisheries.noaa.gov/inport/help/components/platforms"
    observing_system_list_url = "https://www.fisheries.noaa.gov/inport/help/components/observing-systems"
    

    def __init__(self, dmp_xml_file_path: str = None,
                 dmp_xml_element_tree: ET = None):
        self.dmp_xml_file_path = dmp_xml_file_path
        self.dmp_xml_element_tree = dmp_xml_element_tree
        self._handle_xml_file()
        self.inport_sample_url = "https://www.fisheries.noaa.gov/inport/help/xml-loader/inport-xml-sample-dmp.xml"
        self.sample_root = self.load_xml_from_url(self.inport_sample_url)
        self._set_valid_values_from_sample_xml()
    
    def _handle_xml_file(self):
        # Handles the XML file provided in the init by loading self.root
        # correctly
        if (self.dmp_xml_file_path is None) and (self.dmp_xml_element_tree is None):
            raise ValueError("Either dmp_xml_file_path or dmp_xml_element_tree must be provided")
        assert (self.dmp_xml_file_path is not None) ^ (self.dmp_xml_element_tree is not None), \
            "Either dmp_xml_file_path or dmp_xml_element_tree must be provided, but not both."

        if self.dmp_xml_file_path:
            # Load XML from file path
            self.root = ET.parse(self.dmp_xml_file_path).getroot()
        else:
            # Use the provided ElementTree root element
            self.root = self.dmp_xml_element_tree.getroot()
    
    def _get_max_page_number_from_inport(self, url: str = "") -> int:
        r = requests.get(url)
        soup = BeautifulSoup(r.text, "html.parser")
        found_partial = soup.find(string=re.compile('Page 1 of '))
        if found_partial:
            max_page = int(found_partial.split(' of ')[1])
            return max_page
        else:
            print("Max page not found for URL: " + url)
            return 0

    def scrape_table_from_inport(self, url: str = "") -> pd.DataFrame:
        # Get the maximum page number from the webpage.
        max_page = self._get_max_page_number_from_inport(url=url)
        # Return an empty DataFrame if no pages are found
        if max_page == 0:
            return None
        # Create an empty list to store DataFrames for each page
        dfs = []

        for curr_page in range(max_page):
            r = requests.get(url + f"?page={curr_page}")
            # Use pandas to read all tables on the page
            # This returns a list of DataFrames
            tables = pd.read_html(url)
            # Access the first table in the list (index 0)
            dfs.append(tables[0])

        # Concatenate all DataFrames in the list into a single DataFrame
        full_table = pd.concat(dfs, ignore_index=True)
        return full_table

    def load_xml_from_url(self, url):
        """
        Fetches XML content from a URL and parses it into an ElementTree root
        element.
        """
        try:
            # Fetch the content from the URL
            response = requests.get(url)
            # Raise an exception if the request was unsuccessful
            response.raise_for_status() 

            # The XML data is in response.content (bytes) or response.text
            # (string)
            # Use ET.fromstring to parse the content directly
            root = ET.fromstring(response.content)
            return root

        except requests.exceptions.RequestException as e:
            print(f"Error fetching URL: {e}")
            return None
        except ET.ParseError as e:
            print(f"Error parsing XML: {e}")
            return None
    
    def _set_valid_values_from_sample_xml(self):
        print("LOADING VALID VALUES... ", end="")
        self._set_valid_dmp_types()
        self._set_valid_programs()
        self._set_valid_instruments()
        self._set_valid_platforms()
        self._set_valid_observing_systems()
        self._set_valid_collection_scopes()
        self._set_valid_has_data_acquisition_plan_values()
        self._set_valid_has_restricted_designations()
        self._set_management_resources_available()
        self._set_dissemination_resources_available()
        self._set_archive_resources_available()
        self._set_valid_restriction_statuses()
        self._set_valid_data_for_publication_values()
        self._set_valid_data_access_methods()
        self._set_valid_cloud_access_values()
        self._set_valid_access_level_of_service_values()
        self._set_valid_dissemination_plan_values()
        self._set_archive_location_values()
        print("LOADED.")
    
    def _set_valid_programs(self):
        table = self.scrape_table_from_inport(url=self.program_list_url)
        self.valid_programs = table["Program"].tolist()
    
    def _set_valid_instruments(self):
        table = self.scrape_table_from_inport(url=self.instrument_list_url)
        self.valid_instruments = table["Identifier"].tolist()
    
    def _set_valid_platforms(self):
        table = self.scrape_table_from_inport(url=self.platform_list_url)
        self.valid_platforms = table["Identifier"].tolist()
    
    def _set_valid_observing_systems(self):
        table = self.scrape_table_from_inport(url=self.observing_system_list_url)
        self.valid_observing_systems = table["Name"].tolist()

    def _set_valid_dmp_types(self):
        # Extracts and loads a list of valid DMP types from the sample XML
        # content
        dmp_types = self.sample_root.find("./general-information/dmp-type")
        dmp_types = dmp_types.text
        dmp_types = dmp_types[dmp_types.find(":") + 1:].split(",")
        dmp_types = [dmp_type.strip(" .") for dmp_type in dmp_types]
        self.valid_dmp_types = dmp_types
    
    def _set_valid_collection_scopes(self):
        # Extracts and loads a list of valid collection scopes from the sample XML
        # content
        collection_scopes = self.sample_root.find("./coverage/temporal-coverage/collection-scope")
        collection_scopes = collection_scopes.text
        collection_scopes = collection_scopes[collection_scopes.find(":") + 1:].split(",")
        collection_scopes = [collection_scope.strip(" .") for collection_scope in collection_scopes]
        self.valid_collection_scopes = collection_scopes
    
    def _set_valid_has_data_acquisition_plan_values(self):
        # Extracts and loads a list of valid has data acquisition plan values from the sample XML
        # content
        has_data_acquisition_plan_values = self.sample_root.find("./data-acquisition/has-data-acquisition-plan")
        has_data_acquisition_plan_values = has_data_acquisition_plan_values.text
        has_data_acquisition_plan_values = has_data_acquisition_plan_values[has_data_acquisition_plan_values.find(":") + 1:].split(",")
        has_data_acquisition_plan_values = [has_data_acquisition_plan_value.strip(" .") for has_data_acquisition_plan_value in has_data_acquisition_plan_values]
        self.valid_has_data_acquisition_plan_values = has_data_acquisition_plan_values
    
    def _set_valid_has_restricted_designations(self):
        # Extracts and loads a list of valid restricted designations from the sample XML
        # content
        restricted_designations = self.sample_root.find("./data-acquisition/has-restricted-designations")
        restricted_designations = restricted_designations.text
        restricted_designations = restricted_designations[restricted_designations.find(":") + 1:].split(",")
        restricted_designations = [restricted_designation.strip(" .") for restricted_designation in restricted_designations]
        self.valid_restricted_designations = restricted_designations
    
    def _set_management_resources_available(self):
        # Extracts and loads a list of valid management resources available from the sample XML
        # content
        management_resources_available = self.sample_root.find("./resources/management-resources-available")
        management_resources_available = management_resources_available.text
        management_resources_available = management_resources_available[management_resources_available.find(":") + 1:].split(",")
        management_resources_available = [management_resource.strip(" .") for management_resource in management_resources_available]
        self.valid_management_resources_available = management_resources_available
    
    def _set_dissemination_resources_available(self):
        # Extracts and loads a list of valid dissemination resources available from the sample XML
        # content
        dissemination_resources_available = self.sample_root.find("./resources/dissemination-resources-available")
        dissemination_resources_available = dissemination_resources_available.text
        dissemination_resources_available = dissemination_resources_available[dissemination_resources_available.find(":") + 1:].split(",")
        dissemination_resources_available = [dissemination_resource.strip(" .") for dissemination_resource in dissemination_resources_available]
        self.valid_dissemination_resources_available = dissemination_resources_available
    
    def _set_archive_resources_available(self):
        # Extracts and loads a list of valid archive resources available from the sample XML
        # content
        archive_resources_available = self.sample_root.find("./resources/archive-resources-available")
        archive_resources_available = archive_resources_available.text
        archive_resources_available = archive_resources_available[archive_resources_available.find(":") + 1:].split(",")
        archive_resources_available = [archive_resource.strip(" .") for archive_resource in archive_resources_available]
        self.valid_archive_resources_available = archive_resources_available
    
    def _set_valid_restriction_statuses(self):
        # Extracts and loads a list of valid restriction statuses from the sample XML
        # content
        restriction_statuses = self.sample_root.find("./data-access/restriction-status")
        restriction_statuses = restriction_statuses.text
        restriction_statuses = restriction_statuses[restriction_statuses.find(":") + 1:].split(",")
        restriction_statuses = [restriction_status.strip(" .") for restriction_status in restriction_statuses]
        self.valid_restriction_statuses = restriction_statuses
    
    def _set_valid_data_for_publication_values(self):
        # Extracts and loads a list of valid data for publication values from the sample XML
        # content
        data_for_publication_values = self.sample_root.find("./data-access/data-for-publication")
        data_for_publication_values = data_for_publication_values.text
        data_for_publication_values = data_for_publication_values[data_for_publication_values.find(":") + 1:].split(",")
        data_for_publication_values = [data_for_publication_value.strip(" .") for data_for_publication_value in data_for_publication_values]
        self.valid_data_for_publication_values = data_for_publication_values
    
    def _set_valid_data_access_methods(self):
        # Extracts and loads a list of valid data access methods from the sample XML
        # content
        data_access_methods = self.sample_root.find("./data-access/data-access-methods/data-access-method")
        data_access_methods = data_access_methods.text
        data_access_methods = data_access_methods[data_access_methods.find(":") + 1:].split(",")
        data_access_methods = [data_access_method.strip(" .") for data_access_method in data_access_methods]
        self.valid_data_access_methods = data_access_methods
    
    def _set_valid_cloud_access_values(self):
        # Extracts and loads a list of valid cloud access values from the sample XML
        # content
        cloud_access_values = self.sample_root.find("./data-access/cloud-access")
        cloud_access_values = cloud_access_values.text
        cloud_access_values = cloud_access_values[cloud_access_values.find(":") + 1:].split(",")
        cloud_access_values = [cloud_access_value.strip(" .") for cloud_access_value in cloud_access_values]
        self.valid_cloud_access_values = cloud_access_values

    def _set_valid_access_level_of_service_values(self):
        # Extracts and loads a list of valid access level of service values from the sample XML
        # content
        access_level_of_service_values = self.sample_root.find("./data-access/access-level-of-service")
        access_level_of_service_values = access_level_of_service_values.text
        access_level_of_service_values = access_level_of_service_values[access_level_of_service_values.find(":") + 1:].split(",")
        access_level_of_service_values = [access_level_of_service_value.strip(" .") for access_level_of_service_value in access_level_of_service_values]
        self.valid_access_level_of_service_values = access_level_of_service_values

    def _set_valid_dissemination_plan_values(self):
        # Extracts and loads a list of valid dissemination plan values from the sample XML
        # content
        dissemination_plan_values = self.sample_root.find("./data-access/dissemination-plans/dissemination-plan")
        dissemination_plan_values = dissemination_plan_values.text
        dissemination_plan_values = dissemination_plan_values[dissemination_plan_values.find(":") + 1:].split(",")
        dissemination_plan_values = [dissemination_plan_value.strip(" .") for dissemination_plan_value in dissemination_plan_values]
        self.valid_dissemination_plan_values = dissemination_plan_values
    
    def _set_archive_location_values(self):
        # Extracts and loads a list of valid archive location values from the sample XML
        # content
        archive_location_values = self.sample_root.find("./long-term-preservation/archive-locations/archive-location")
        archive_location_values = archive_location_values.text
        archive_location_values = archive_location_values[archive_location_values.find(":") + 1:].split(",")
        archive_location_values = [archive_location_value.strip(" .") for archive_location_value in archive_location_values]
        self.valid_archive_location_values = archive_location_values

    def _validate_dmp_type(self, dmp_type: str) -> bool:
        return dmp_type in self.valid_dmp_types
    
    def _validate_implementing_program(self, program: str) -> bool:
        return program in self.valid_programs
    
    def _validate_funding_program(self, program: str) -> bool:
        return program in self.valid_programs
    
    def _validate_partner_program(self, program: str) -> bool:
        return program in self.valid_programs

    def _validate_collection_scope(self, collection_scope: str) -> bool:
        return collection_scope in self.valid_collection_scopes
    
    def _validate_temporal_coverage_dates(self, start_date: str, end_date: str) -> bool:
        try:
            datetime.strptime(start_date, "%Y-%m-%d")
        except ValueError:
            try:
                datetime.strptime(start_date, "%Y-%m")
            except ValueError:
                print(f"Invalid start-date format in temporal-coverage: `{start_date}` ", file=sys.stderr, end="")
                print(self.sample_root.find("./coverage/temporal-coverage/start-date").text, file=sys.stderr,)
        try:
            datetime.strptime(end_date, "%Y-%m-%d")
        except ValueError:
            try:
                datetime.strptime(end_date, "%Y-%m")
            except ValueError:
                print(f"Invalid end-date format in temporal-coverage: `{end_date}` ", file=sys.stderr, end="")
                print(self.sample_root.find("./coverage/temporal-coverage/end-date").text, file=sys.stderr,)
    
    def _validate_geographic_coverages(self, west_bound: str, east_bound: str, south_bound: str, north_bound: str) -> bool:
        
        # Check for float values.
        if (not isinstance(west_bound, float) or 
            not isinstance(east_bound, float) or 
            not isinstance(south_bound, float) or 
            not isinstance(north_bound, float)):
            print(f"Invalid geographic coverage values: `{west_bound}, {east_bound}, {south_bound}, {north_bound}` ", file=sys.stderr, end="")
            print(self.sample_root.find("./coverage/geographic-coverage").text, file=sys.stderr,)
            return False

        # Check for valid range values.
        if not (west_bound >= -180 and west_bound < 180):
            print(f"Invalid west-bound value: `{west_bound}` ", file=sys.stderr, end="")
            print(self.sample_root.find("./coverage/geographic-coverage/west-bound").text, file=sys.stderr,)
        if not (east_bound > -180 and east_bound <= 180):
            print(f"Invalid east-bound value: `{east_bound}` ", file=sys.stderr, end="")
            print(self.sample_root.find("./coverage/geographic-coverage/east-bound").text, file=sys.stderr,)
        if not (north_bound > -90 and north_bound < 90):
            print(f"Invalid north-bound value: `{north_bound}` ", file=sys.stderr, end="")
            print(self.sample_root.find("./coverage/geographic-coverage/north-bound").text, file=sys.stderr,)
        if not (south_bound > -90 and south_bound < 90):
            print(f"Invalid south-bound value: `{south_bound}` ", file=sys.stderr, end="")
            print(self.sample_root.find("./coverage/geographic-coverage/south-bound").text, file=sys.stderr,)
    
    def _validate_has_data_acquisition_plan(self, has_data_acquisition_plan: str) -> bool:
        return has_data_acquisition_plan in self.valid_has_data_acquisition_plan_values
    
    def _validate_restricted_designations(self, restricted_designations: str) -> bool:
        return restricted_designations in self.valid_restricted_designations
    
    def _validate_instrument(self, instrument: str) -> bool:
        return instrument in self.valid_instruments
    
    def _validate_platform(self, platform: str) -> bool:
        return platform in self.valid_platforms

    def _validate_observing_system(self, observing_system: str) -> bool:
        return observing_system in self.valid_observing_systems
    
    def _validate_management_resources_available(self, management_resources_available: str) -> bool:
        return management_resources_available in self.valid_management_resources_available
    
    def _validate_dissemination_resources_available(self, dissemination_resources_available: str) -> bool:
        return dissemination_resources_available in self.valid_dissemination_resources_available
    
    def _validate_archive_resources_available(self, archive_resources_available: str) -> bool:
        return archive_resources_available in self.valid_archive_resources_available
    
    def _validate_restriction_status(self, restriction_status: str) -> bool:
        return restriction_status in self.valid_restriction_statuses

    def _validate_data_for_publication(self, data_for_publication: str) -> bool:
        return data_for_publication in self.valid_data_for_publication_values
    
    def _validate_data_access_methods(self, data_access_methods: str) -> bool:
        return data_access_methods in self.valid_data_access_methods
    
    def _validate_cloud_access(self, cloud_access: str) -> bool:
        return cloud_access in self.valid_cloud_access_values
    
    def _validate_access_level_of_service(self, access_level_of_service: str) -> bool:
        return access_level_of_service in self.valid_access_level_of_service_values
    
    def _validate_dissemination_plan(self, dissemination_plan: str) -> bool:
        return dissemination_plan in self.valid_dissemination_plan_values
    
    def _validate_dissemination_date(self, dissemination_date: str) -> bool:
        try:
            datetime.strptime(dissemination_date, "%Y-%m-%d")
        except ValueError:
            print(f"Invalid dissemination-date format in data-access: `{dissemination_date}` ", file=sys.stderr, end="")
            print(self.sample_root.find("./data-access/dissemination-date").text, file=sys.stderr,)
    
    def _validate_archive_location(self, archive_location: str) -> bool:
        return archive_location in self.valid_archive_location_values
    
    def validate_xml(self):
        print("VALIDATING XML...")
        # Validate the XML content against the sample XML
        self._validate_dmp_type(dmp_type=self.root.find("./general-information/dmp-type").text)
        self._validate_implementing_program(program=self.root.find("./program-information/implementing-program-information/programs/program").text)
        # TODO: repeatable, take care of it
        self._validate_funding_program(program=self.root.find("./program-information/funding-program-information/programs/program").text)
        self._validate_partner_program(program=self.root.find("./program-information/partner-program-information/programs/program").text)
        self._validate_collection_scope(collection_scope=self.root.find("./coverage/temporal-coverage/collection-scope").text)
        self._validate_temporal_coverage_dates(start_date=self.root.find("./coverage/temporal-coverage/start-date").text,
                                               end_date=self.root.find("./coverage/temporal-coverage/end-date").text)
        self._validate_geographic_coverages(west_bound=self.root.find("./coverage/geographic-coverage/west-bound").text,
                                            east_bound=self.root.find("./coverage/geographic-coverage/east-bound").text,
                                            north_bound=self.root.find("./coverage/geographic-coverage/north-bound").text,
                                            south_bound=self.root.find("./coverage/geographic-coverage/south-bound").text)
        self._validate_has_data_acquisition_plan(has_data_acquisition_plan=self.root.find("./data-acquisition/has-data-acquisition-plan").text)
        self._validate_restricted_designations(restricted_designations=self.root.find("./data-acquisition/has-restricted-designations").text)
        self._validate_instrument(instrument=self.root.find("./data-collection/instruments/instrument/identifier").text)
        self._validate_platform(platform=self.root.find("./data-collection/platforms/platform/identifier").text)
        self._validate_observing_system(observing_system=self.root.find("./data-collection/observing-systems/observing-system/system-name").text)
        self._validate_management_resources_available(management_resources_available=self.root.find("./resources/management-resources-available").text)
        self._validate_dissemination_resources_available(dissemination_resources_available=self.root.find("./resources/dissemination-resources-available").text)
        self._validate_archive_resources_available(archive_resources_available=self.root.find("./resources/archive-resources-available").text)
        self._validate_restriction_status(restriction_status=self.root.find("./data-access/restriction-status").text)
        self._validate_data_for_publication(data_for_publication=self.root.find("./data-access/data-for-publication").text)
        self._validate_data_access_methods(data_access_methods=self.root.find("./data-access/data-access-methods/data-access-method").text)
        self._validate_cloud_access(cloud_access=self.root.find("./data-access/cloud-access").text)
        self._validate_access_level_of_service(access_level_of_service=self.root.find("./data-access/access-level-of-service").text)
        self._validate_dissemination_plan(dissemination_plan=self.root.find("./data-access/dissemination-plans/dissemination-plan").text)
        self._validate_dissemination_date(dissemination_date=self.root.find("./data-access/dissemination-date").text)
        self._validate_archive_location(archive_location=self.root.find("./long-term-preservation/archive-locations/archive-location").text)
        print("XML VALIDATED.")

In [48]:
validator = InPortValidator("./inport-dmp-xml.xml")
# print(validator.valid_instruments)
validator.validate_xml()

LOADING VALID VALUES... LOADED.
VALIDATING XML...
XML VALIDATED.


Invalid start-date format in temporal-coverage: `Not applicable; Continuous` Enter the date in ISO 8601 format (YYYY-MM-DD), up to the appropriate granularity which is significant. For example, if the date is significant only up to the month, enter YYYY-MM.
Invalid end-date format in temporal-coverage: `Not applicable; Continuous` Enter the date in ISO 8601 format (YYYY-MM-DD), up to the appropriate granularity which is significant. For example, if the date is significant only up to the month, enter YYYY-MM.
Invalid geographic coverage values: `Not applicable; Dependent on data collection surveys, Not applicable; Dependent on data collection surveys, Not applicable; Dependent on data collection surveys, Not applicable; Dependent on data collection surveys` 
      
Invalid dissemination-date format in data-access: `Enter the date in ISO 8601 format (YYYY-MM-DD)` Enter the date in ISO 8601 format (YYYY-MM-DD)
