<a href="https://colab.research.google.com/github/joestanis/USGS-SAC-25-12553801-DE-RM/blob/main/JoeStanis_USGS_SeismicData.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Supervisory Computer Scientist, Vacancy Announcement USGS-SAC-25-12553801-DE-RM

## Purpose

A project designed and developed using specific objectives, emphasizing code efficiency, readability, and adherence to best practices.

Python 3 is the language used for program code, with coding style following recommendations of *Python Enhancement Proposal 8* [ https://peps.python.org/pep-0008/ ].

## Import the required libraries.

In [1]:
# Install the external ObSpy library. After installation if other dependencies were updated Google Colab might require a restart of the session.
!pip install obspy

# Import critical modules
import obspy
import sqlite3
import requests
import xml.etree.ElementTree
import re



## Initialize the global variables.

In [2]:
# Array defining paths to each miniSEED file.
mseed_files: list[str] = [
    'https://github.com/joestanis/USGS-SAC-25-12553801-DE-RM/raw/refs/heads/main/data/SEP01.mseed',
    'https://github.com/joestanis/USGS-SAC-25-12553801-DE-RM/raw/refs/heads/main/data/SEP02.mseed',
    'https://github.com/joestanis/USGS-SAC-25-12553801-DE-RM/raw/refs/heads/main/data/SEP03.mseed'
]

# Name of the database where miniSEED data will be stored.
mseed_db_filename: str = 'usgs_miniseed_data.sqlite.db'

# Array which will hold the consolidated list of all miniSEED traces.
mseed_traces = []

# Hold the metadata about each file in a separate variable for use with the final visualization.
mseed_sources_metadata: list[dict[str:any]] = []

# For modularity define the database table names in a dictionary rather than hardcoding them directly into the scripts.
table_names: dict[str, str] = {
    'network_codes': 'network_codes',
    'station_codes': 'station_codes',
    'location_codes': 'location_codes',
    'channel_codes': 'channel_codes',
    'stations': 'stations',
    'mseed_traces': 'mseed_traces',
    'mseed_samples': 'mseed_samples',
    'site_info': 'site_info'
}

# Define the base URL to query IRIS API.
iris_api_url_base = 'https://service.iris.edu/fdsnws/station/1/query'


## Define the utility functions.

### Function to execute a general SQL statement against the database.

In [3]:
def sql_exec(sql: str = ';') -> list:
    """Executes a general SQLite statement.

    :param sql: SQL statement to execute
    :return: List containing the selected data, if any
    """

    try:
        with sqlite3.connect(database=mseed_db_filename) as db_conn:
            db_conn.row_factory = sqlite3.Row
            db_cursor = db_conn.cursor()
            db_cursor.execute(sql)
            db_conn.commit()
            return db_cursor.fetchall()

    except (sqlite3.Error, Exception) as e:
        raise e

    finally:
        db_conn.close()


### Function to insert a single data row into a database table.

In [4]:
def sql_insert(table_name: str = None,
               insert_data: dict[str, any] = None) -> int:
    """Inserts a single data row into a specified table and returns the ID of the last successfully inserted row.

    :param table_name: Table where the data will be inserted
    :param insert_data: Dictionary of name-value pairs, where key is the field name and value is the data to insert
    :return: ID of the inserted row, or 0 if unsuccessful
    """

    if table_name is None:
        raise ValueError('Missing the table name for data insert.')

    if insert_data.__len__() < 1:
        raise ValueError('Missing {field:value} dictionary to insert.')

    try:
        with sqlite3.connect(database=mseed_db_filename, isolation_level='DEFERRED') as db_conn:
            db_cursor = db_conn.cursor()

            # Extract the field names and values from the data insert dictionary. Extracting one name-value pair at a time to ensure associative integrity.
            row_fields = []
            row_values = []

            for key, val in insert_data.items():
                row_fields.append(f"[{key}]")
                row_values.append(val)

            # Build the insert statement from name-value pairs, telling SQLite to honor any UNIQUE constrains during the insert operation.
            param_placeholders = ','.join('?' * len(row_values))
            sql_query = f"INSERT OR IGNORE INTO [{table_name}] ({','.join(row_fields)}) VALUES ({param_placeholders});"

            db_cursor.execute(sql_query, row_values)
            db_conn.commit()
            if db_cursor.lastrowid:
                return db_cursor.lastrowid
            else:
                # Failed insert can result in last row ID of None, so ensure the function returns an integer.
                return 0

    except (sqlite3.Error, Exception) as e:
        raise e

    finally:
        db_conn.close()


### Function to fast insert multiple data rows into a database table.

In [5]:
def sql_insert_many(table_name: str = None,
                    field_names: list[str] = None,
                    field_values: list[tuple[any]] = None) -> int:
    """Bulk inserts data into a specified table and returns a count of inserted rows.

    :param table_name: Table where the data will be inserted
    :param field_names: List of field names to target for data insertion
    :param field_values: Data to insert into tio table, passed as tuple list items, with data in the same order as the field names
    :return: Count of the inserted rows
    """

    if table_name is None:
        raise ValueError('Missing the table name for data insert.')

    if field_names is None:
        raise ValueError('Missing names of fields for insert.')

    if field_values is None:
        raise ValueError('Missing values to insert for each field.')

    try:
        with sqlite3.connect(database=mseed_db_filename, isolation_level='DEFERRED') as db_conn:
            db_cursor = db_conn.cursor()

            # Build the insert statement from name-value pairs, telling SQLite to honor any UNIQUE constrains during the insert operation.
            param_placeholders = ','.join('?' * len(field_values[0]))
            sql_query = f"INSERT OR IGNORE INTO [{table_name}] ({','.join(field_names)}) VALUES ({param_placeholders});"

            db_cursor.executemany(sql_query, field_values)
            db_conn.commit()
            if db_cursor.lastrowid:
                return db_cursor.lastrowid
            else:
                # Failed insert can result in last row ID of None, so ensure the function returns an integer.
                return 0

    except (sqlite3.Error, Exception) as e:
        raise e

    finally:
        db_conn.close()


### Function to fetch rows from a database table.

In [6]:
def sql_fetch_rows(table_name: str = None,
                   search_values: dict[str:any] = None,
                   return_fields: list[str] = None,
                   limit_rows: int = 0,
                   insert_if_missing: bool = False) -> dict[str, any]:
    """Returns the row data for a provided search values. Optionally attempts to insert row with the search values if none was found.

    :param table_name: Table containing the data to search
    :param search_values: Criteria to use for searching the rows
    :param return_fields: List containing the fields of data to return from the row. If omitted, None, or an empty list then all fields for that row are returned.
    :param limit_rows: List containing the fields of data to return from the row. If omitted, None, or an empty list then all fields for that row are returned.
    :param insert_if_missing: If True, and the provided value is not found, then try to insert a new row with this value
    :return: Dictionary containing the specified row data for the requested search values
    """

    if table_name is None:
        raise ValueError('Missing the table name for data insert.')

    if search_values is None:
        raise ValueError('Missing the search criteria.')

    try:
        select_fields = '*'
        if return_fields is not None:
            select_fields = '[' + '],['.join(return_fields) + ']'

        where_fields = ['1=1']
        for key, val in search_values.items():
            where_fields.append(f"[{key}]='{val}'")

        sql_query = f"SELECT {select_fields} FROM [{table_name}] WHERE {' AND '.join(where_fields)}"
        if limit_rows > 0:
            sql_query += f" LIMIT {limit_rows}"
        sql_query += ';'

        with sqlite3.connect(database=mseed_db_filename) as db_conn:
            db_conn.row_factory = sqlite3.Row
            db_cursor = db_conn.cursor()
            db_cursor.execute(sql_query)
            db_conn.commit()
            result_record = db_cursor.fetchall()

            if result_record:
                # Found a matching row.
                return result_record

            if insert_if_missing:
                # No matching record but attempt to insert a new record and re-fetch.
                sql_last_row_id = sql_insert(table_name=table_name, insert_data=search_values)
                if sql_last_row_id is None:
                    # Failed to insert the new code data
                    return {}

                # Try again to fetch the station location info.
                return sql_fetch_rows(table_name=table_name, search_values=search_values,
                                      return_fields=return_fields, limit_rows=limit_rows)

            # No matching code.
            return {}


    except (sqlite3.Error, Exception) as e:
        raise e

    finally:
        db_conn.close()


### Function to empty a database table.

In [7]:
def sql_truncate_table(table_name: str = None) -> list:
    """Empties all data from a specified table, essentially truncating the table.

    :param table_name: Table to empty of data
    :return: Returns whatever result was received from the call to sql_exec
    """

    if table_name is None:
        raise ValueError('Missing the table name for data insert.')

    try:
        sql_query = f"DELETE FROM [{table_name}];"
        return sql_exec(sql=sql_query)

    except (sqlite3.Error, Exception) as e:
        raise e


### Function to search for a given column value and return the corresponding row ID.

In [8]:
def fields_to_id(table_name: str = None,
                 search_values: dict[str, any] = None,
                 insert_if_missing: bool = False) -> int:
    """Search a table for a specific column value. If found, return the first row ID in the set.

    :param table_name: Table to search
    :param search_values: Dictionary with keys as field names and values as search criteria
    :param insert_if_missing: If True, and the provided value is not found, then try to insert the search data into a new row
    :return: ID of the matching row
    """

    if table_name is None:
        raise ValueError('Missing the table name to search.')

    if search_values is None:
        raise ValueError('Missing the search data to locate the row ID.')

    try:
        sql_result = sql_fetch_rows(table_name=table_name, search_values=search_values, return_fields=['id'],
                                    limit_rows=1, insert_if_missing=insert_if_missing)
        return sql_result[0]['id']

    except (sqlite3.Error, Exception) as e:
        raise e


### Function to locate a row ID and return the requested column's value.

In [9]:
def id_to_fields(table_name: str = None,
                 search_id: int = 0,
                 return_fields: list = None) -> dict[str, any]:
    """Search a table for a given row ID. If found, return the requested column data.

    :param table_name: Table to search
    :param search_id: ID of the row to locate
    :param return_fields: List containing fields to retrieve if found a matching row
    :return: A dictionary of field values from the matching row
    """

    if table_name is None:
        raise ValueError('Missing the table name to search.')

    if return_fields is None:
        raise ValueError('Missing which field column to retrieve and return the data.')

    try:
        sql_result = sql_fetch_rows(table_name=table_name, search_values={'id': search_id}, return_fields=return_fields,
                                    limit_rows=1)
        return sql_result[0]

    except (sqlite3.Error, Exception) as e:
        raise e


### Function to query the IRIS API and return parsed XML.

In [10]:
def fetch_iris_api(url_params: list[dict[str, str]] = None) -> xml.etree.ElementTree.Element:
    """Call the IRIS API endpoint and return parsed XML.

    :param url_params: List of dictionaries containing URL filtering parameters
    :return: XML Element
    """

    try:
        api_url = f"{iris_api_url_base}?format=xml"
        if url_params is not None:
            for current_param in url_params:
                for key, val in current_param.items():
                    api_url += f"&{key}={val}"

        api_response = requests.get(api_url)

        if api_response.status_code != 200:
            # Response was not OK (HTTP 200), unexpected result.
            raise ConnectionError(f"HTTP Code {api_response.status_code}, Problem connecting to the IRIS API")

        # Parse the XML response body.
        response_body = api_response.content.decode('utf-8')
        return xml.etree.ElementTree.fromstring(response_body)

    except Exception as e:
        raise e


## Import the miniSEED files into memory.

In [11]:
try:
    if mseed_files.__len__() < 1:
        print('Error: No miniSEED files to process.')

    else:
        print(f"Attempting to import data from {mseed_files.__len__()} miniSEED files.")

        for current_file in mseed_files:
            print(f"Processing '{current_file}'")

            try:
                current_stream = obspy.read(pathname_or_url=current_file)
                print(f"Loaded {current_stream.__len__()} traces from the file.")

                mseed_sources_metadata.append({'mseed_file': current_file,
                                               'mseed_metadata': current_stream.traces[0].meta.mseed})

                for current_trace in current_stream:
                    mseed_traces.append(current_trace)

            except IOError:
                print(f"Exception: Unable to open file '{current_file}'.")

        print(f"Imported a total of {mseed_traces.__len__()} traces from all sources.")

except Exception as e:
    print(f"Exception: {e}")


Attempting to import data from 3 miniSEED files.
Processing 'https://github.com/joestanis/USGS-SAC-25-12553801-DE-RM/raw/refs/heads/main/data/SEP01.mseed'
Loaded 15 traces from the file.
Processing 'https://github.com/joestanis/USGS-SAC-25-12553801-DE-RM/raw/refs/heads/main/data/SEP02.mseed'
Loaded 5 traces from the file.
Processing 'https://github.com/joestanis/USGS-SAC-25-12553801-DE-RM/raw/refs/heads/main/data/SEP03.mseed'
Loaded 12 traces from the file.
Imported a total of 32 traces from all sources.


## Provision a database with proper normal form and constraints.

### Create and initialize the code lookup tables.

In [12]:
try:

    # Create the database schema for the NETWORK foreign key table.
    print(f"Creating the [{table_names['network_codes']}] table")
    sql_query = f"""CREATE TABLE IF NOT EXISTS [{table_names['network_codes']}] (
                id INTEGER PRIMARY KEY,
                network_code TEXT NOT NULL, -- A one or two character code identifying the owner of the data
                CONSTRAINT network_code_unique UNIQUE (network_code));"""
    sql_result = sql_exec(sql=sql_query)
    if sql_result is not None:
        # Reset the NETWORK table.
        print(f"Initializing the [{table_names['network_codes']}] table.")
        sql_result = sql_truncate_table(table_name=table_names['network_codes'])

    # Create the database schema for the STATION foreign key table.
    print(f"Creating the [{table_names['station_codes']}] table")
    sql_query = f"""CREATE TABLE IF NOT EXISTS [{table_names['station_codes']}] (
                id INTEGER PRIMARY KEY,
                station_code TEXT NOT NULL, -- A one to five character identifier for the station recording the data
                CONSTRAINT station_code_unique UNIQUE (station_code));"""
    sql_result = sql_exec(sql=sql_query)
    if sql_result is not None:
        # Reset the STATION table.
        print(f"Initializing the [{table_names['station_codes']}] table.")
        sql_result = sql_truncate_table(table_name=table_names['station_codes'])

    # Create the database schema for the LOCATION foreign key table.
    print(f"Creating the [{table_names['location_codes']}] table")
    sql_query = f"""CREATE TABLE IF NOT EXISTS [{table_names['location_codes']}] (
                id INTEGER PRIMARY KEY,
                location_code TEXT NOT NULL, -- A two character code used to uniquely identify different data streams at a single station
                CONSTRAINT location_code_unique UNIQUE (location_code));"""
    sql_result = sql_exec(sql=sql_query)
    if sql_result is not None:
        # Reset the LOCATION table.
        print(f"Initializing the [{table_names['location_codes']}] table.")
        sql_result = sql_truncate_table(table_name=table_names['location_codes'])

    # Create the database schema for the CHANNEL foreign key table.
    print(f"Creating the [{table_names['channel_codes']}] table")
    sql_query = f"""CREATE TABLE IF NOT EXISTS [{table_names['channel_codes']}] (
                id INTEGER PRIMARY KEY,
                channel_code TEXT NOT NULL, -- A three character code identifying the band, instrument type, and sensor orientation
                CONSTRAINT channel_code_unique UNIQUE (channel_code));"""
    sql_result = sql_exec(sql=sql_query)
    if sql_result is not None:
        # Reset the CHANNEL table.
        print(f"Initializing the [{table_names['channel_codes']}] table.")
        sql_result = sql_truncate_table(table_name=table_names['channel_codes'])

except sqlite3.Error as e:
    print(f"Exception: SQLite {' '.join(e.args)}.")

except Exception as e:
    print(f"Exception: {e}")


Creating the [network_codes] table
Initializing the [network_codes] table.
Creating the [station_codes] table
Initializing the [station_codes] table.
Creating the [location_codes] table
Initializing the [location_codes] table.
Creating the [channel_codes] table
Initializing the [channel_codes] table.


### Create and initialize the table to contain information about each trace set.

In [13]:
try:

    # Create the database schema for the TRACES table.
    print(f"Creating the [{table_names['mseed_traces']}] table")
    sql_query = f"""CREATE TABLE IF NOT EXISTS [{table_names['mseed_traces']}] (
                id INTEGER PRIMARY KEY,
                network_id INTEGER NOT NULL, -- Foreign key to NETWORK.id
                station_id INTEGER NOT NULL, -- Foreign key to STATION.id
                location_id INTEGER NOT NULL, -- Foreign key to LOCATION.id
                channel_id INTEGER NOT NULL, -- Foreign key to CHANNEL.id
                starttime REAL, -- Unix timestamp UTC
                endtime REAL, -- Unix timestamp UTC
                sampling_rate REAL, -- Rate of samples captured, in Hz
                delta REAL, -- Time increment between data points
                npts INTEGER, -- Number of sampling points in the trace
                calib REAL, -- Trace calibration
                site_info_id INTEGER NOT NULL, -- Foreign key to SITE_INFO.id
                FOREIGN KEY (network_id) REFERENCES {table_names['network_codes']}(id),
                FOREIGN KEY (station_id) REFERENCES {table_names['station_codes']}(id),
                FOREIGN KEY (location_id) REFERENCES {table_names['location_codes']}(id),
                FOREIGN KEY (channel_id) REFERENCES {table_names['channel_codes']}(id)
                FOREIGN KEY (site_info_id) REFERENCES {table_names['site_info']}(id));"""
    sql_result = sql_exec(sql=sql_query)
    if sql_result is not None:
        # Reset the TRACES table.
        print(f"Initializing the [{table_names['mseed_traces']}] table.")
        sql_result = sql_truncate_table(table_name=table_names['mseed_traces'])

except sqlite3.Error as e:
    print(f"Exception: SQLite {' '.join(e.args)}.")

except Exception as e:
    print(f"Exception: {e}")


Creating the [mseed_traces] table
Initializing the [mseed_traces] table.


### Create and initialize the table for holding the miniSEED samples data.

In [14]:
try:

    # Create the database schema for the SAMPLES table.
    print(f"Creating the [{table_names['mseed_samples']}] table")
    sql_query = f"""CREATE TABLE IF NOT EXISTS [{table_names['mseed_samples']}] (
                id INTEGER PRIMARY KEY,
                trace_id INTEGER, -- Foreign key to TRACES.id
                sequence_num INTEGER, -- Sequence number of the individual sample
                sample_value INTEGER, -- Value of the individual sample
                FOREIGN KEY (trace_id) REFERENCES {table_names['mseed_traces']}(id));"""
    sql_result = sql_exec(sql=sql_query)
    if sql_result is not None:
        # Reset the SAMPLES table.
        print(f"Initializing the [{table_names['mseed_samples']}] table.")
        sql_result = sql_truncate_table(table_name=table_names['mseed_samples'])

except sqlite3.Error as e:
    print(f"Exception: SQLite {' '.join(e.args)}.")

except Exception as e:
    print(f"Exception: {e}")


Creating the [mseed_samples] table
Initializing the [mseed_samples] table.


### Create and initialize the table for holding information about the trace sites.

In [15]:
try:

    # Create the database schema for the SITE_INFO table containing metadata about each target site (network + station).
    print(f"Creating the [{table_names['site_info']}] table")
    sql_query = f"""CREATE TABLE IF NOT EXISTS [{table_names['site_info']}] (
                id INTEGER PRIMARY KEY,
                network_id INTEGER NOT NULL, -- Foreign key to NETWORK.id
                station_id INTEGER NOT NULL, -- Foreign key to STATION.id
                latitude REAL, -- Decimal degrees of the site East-West location
                longitude REAL, -- Decimal degrees of the site North-South location
                site_name TEXT, -- Descriptive name for the site
                CONSTRAINT site_info_unique UNIQUE (network_id, station_id),
                FOREIGN KEY (network_id) REFERENCES {table_names['network_codes']}(id),
                FOREIGN KEY (station_id) REFERENCES {table_names['station_codes']}(id));"""
    sql_result = sql_exec(sql=sql_query)
    if sql_result is not None:
        # Reset the STATION table.
        print(f"Initializing the [{table_names['site_info']}] table.")
        sql_result = sql_truncate_table(table_name=table_names['site_info'])

except sqlite3.Error as e:
    print(f"Exception: SQLite {' '.join(e.args)}.")

except Exception as e:
    print(f"Exception: {e}")


Creating the [site_info] table
Initializing the [site_info] table.


## Import the miniSEED data into the newly created database.

In [16]:
try:
    if mseed_traces.__len__() < 1:
        print('Error: no miniSEED traces to import.')
    else:

        counter_traces = 1
        for current_trace in mseed_traces:
            print(
                f"Importing trace {counter_traces} of {mseed_traces.__len__()} into the [{table_names['mseed_traces']}] table.")

            network_id = fields_to_id(table_name=table_names['network_codes'],
                                      search_values={'network_code': current_trace.stats.network},
                                      insert_if_missing=True)
            if network_id is None:
                # Failed to fetch the required ID, skip to the next trace.
                continue

            station_id = fields_to_id(table_name=table_names['station_codes'],
                                      search_values={'station_code': current_trace.stats.station},
                                      insert_if_missing=True)
            if station_id is None:
                # Failed to fetch the required ID, skip to the next trace.
                continue

            location_id = fields_to_id(table_name=table_names['location_codes'],
                                       search_values={'location_code': current_trace.stats.location},
                                       insert_if_missing=True)
            if location_id is None:
                # Failed to fetch the required ID, skip to the next trace.
                continue

            channel_id = fields_to_id(table_name=table_names['channel_codes'],
                                      search_values={'channel_code': current_trace.stats.channel},
                                      insert_if_missing=True)
            if channel_id is None:
                # Failed to fetch the required ID, skip to the next trace.
                continue

            site_info_id = fields_to_id(table_name=table_names['site_info'],
                                        search_values={'network_id': network_id, 'station_id': station_id},
                                        insert_if_missing=True)
            if site_info_id is None:
                # Failed to fetch the required ID, skip to the next trace.
                continue

            trace_data = {
                'network_id': network_id,
                'station_id': station_id,
                'location_id': location_id,
                'channel_id': channel_id,
                'starttime': current_trace.stats.starttime.timestamp,
                'endtime': current_trace.stats.endtime.timestamp,
                'sampling_rate': current_trace.stats.sampling_rate,
                'delta': current_trace.stats.delta,
                'npts': current_trace.stats.npts,
                'calib': current_trace.stats.calib,
                'site_info_id': site_info_id
            }

            trace_id_last_insert = sql_insert(table_name=table_names['mseed_traces'], insert_data=trace_data)
            counter_traces += 1

            if trace_id_last_insert:
                # Positive row ID, implying a successful insert, so proceed with loading the associated trace samples.
                print(
                    f"Importing {current_trace.data.__len__()} samples into the [{table_names['mseed_samples']}] table.")

                samples_field_names = [
                    'trace_id',
                    'sequence_num',
                    'sample_value'
                ]

            # Build the samples data for bulk import.
            samples_data = []
            samples_list = current_trace.data.tolist()
            counter_samples = 0
            for current_sample in samples_list:
                samples_data.append((trace_id_last_insert, counter_samples, current_sample))
                counter_samples += 1

            # For improved performance use sql_insert_many to bulk import the sample data points rather than importing one at a time using sql_insert.
            sql_insert_many(table_name=table_names['mseed_samples'], field_names=samples_field_names,
                            field_values=samples_data)

except sqlite3.Error as e:
    print(f"Exception: SQLite {' '.join(e.args)}.")

except Exception as e:
    print(f"Exception: {e}")


Importing trace 1 of 32 into the [mseed_traces] table.
Importing 221810 samples into the [mseed_samples] table.
Importing trace 2 of 32 into the [mseed_traces] table.
Importing 380100 samples into the [mseed_samples] table.
Importing trace 3 of 32 into the [mseed_traces] table.
Importing 107800 samples into the [mseed_samples] table.
Importing trace 4 of 32 into the [mseed_traces] table.
Importing 2300 samples into the [mseed_samples] table.
Importing trace 5 of 32 into the [mseed_traces] table.
Importing 87700 samples into the [mseed_samples] table.
Importing trace 6 of 32 into the [mseed_traces] table.
Importing 80700 samples into the [mseed_samples] table.
Importing trace 7 of 32 into the [mseed_traces] table.
Importing 100 samples into the [mseed_samples] table.
Importing trace 8 of 32 into the [mseed_traces] table.
Importing 100 samples into the [mseed_samples] table.
Importing trace 9 of 32 into the [mseed_traces] table.
Importing 100 samples into the [mseed_samples] table.
Impor

## Retrieve location data for each site which provided sample data.

### Build a list containing dictionaries for each site identified during the import.

In [17]:
try:
    # Initialize the temporary container for the site info.
    sites_info: list[dict[str, any]] = []

    sites_sql_query = f"""SELECT * FROM [{table_names['site_info']}]
                        INNER JOIN [{table_names['network_codes']}] on [{table_names['network_codes']}].id = [{table_names['site_info']}].network_id
                        INNER JOIN [{table_names['station_codes']}] on [{table_names['station_codes']}].id = [{table_names['site_info']}].station_id;"""

    sites_sql_result = sql_exec(sql=sites_sql_query)
    if sites_sql_result is not None:
        for current_site_info in sites_sql_result:
            sites_info.append({
                'network_code': current_site_info['network_code'],
                'station_code': current_site_info['station_code'],
                'latitude': current_site_info['latitude'],
                'longitude': current_site_info['longitude'],
                'site_name': current_site_info['site_name'],
            })

except sqlite3.Error as e:
    print(f"Exception: SQLite {' '.join(e.args)}.")

except Exception as e:
    print(f"Exception: {e}")


### For each site query the IRIS API to retrieve the latitude, longitude, and site name.

In [18]:
try:
    if sites_info is not None:

        # Extract the list of network codes and station codes to separate lists.
        network_codes_list = [temp['network_code'] for temp in sites_info]
        station_codes_list = [temp['station_code'] for temp in sites_info]

        # Call the IRIS API.
        print('Fetching data from the IRIS API.')
        url_params = [{
            'network': ','.join(network_codes_list),
            'station': ','.join(station_codes_list)
        }]
        xml_data = fetch_iris_api(url_params=url_params)

        # IRIS results are namespaced so extract the current namespace from the root tag.
        xmlns = re.compile('{(.*)}').findall(xml_data.tag)[0]
        iris_ns = {'iris': xmlns}

        # Locate each site in the results and add missing information to the dictionary.
        for current_site in sites_info:
            xpath = f".//iris:Network[@code='{current_site['network_code']}']/iris:Station[@code='{current_site['station_code']}']"
            found_site = xml_data.find(path=xpath, namespaces=iris_ns)
            current_site['latitude'] = found_site.find(path='./iris:Latitude', namespaces=iris_ns).text
            current_site['longitude'] = found_site.find(path='./iris:Longitude', namespaces=iris_ns).text
            current_site['site_name'] = found_site.find(path='./iris:Site/iris:Name', namespaces=iris_ns).text

except Exception as e:
    print(f"Exception: {e}")


Fetching data from the IRIS API.


### Query the IRIS API for additional HOA and SUG stations of interest.

In [19]:
try:
    if sites_info is not None:
        # Specify API parameters to retrieve all HOA and SUG stations.
        url_params = [{
            'station': 'HOA,SUG'
        }]

        # Call the IRIS API.
        print('Fetching data from the IRIS API.')
        xml_data = fetch_iris_api(url_params=url_params)

        # IRIS results are namespaced so extract the current namespace from the root tag.
        xmlns = re.compile('{(.*)}').findall(xml_data.tag)[0]
        iris_ns = {'iris': xmlns}

        # Locate the new sites and add them to the dictionary.
        found_networks = xml_data.findall(path='.//iris:Network', namespaces=iris_ns)
        for current_network in found_networks:
            new_network = current_network.get('code')

            found_stations = current_network.findall(path='./iris:Station', namespaces=iris_ns)
            for current_station in found_stations:
                new_station = current_station.get('code')
                new_latitude = current_station.find(path='./iris:Latitude', namespaces=iris_ns).text
                new_longitude = current_station.find(path='./iris:Longitude', namespaces=iris_ns).text
                new_name = current_station.find(path='./iris:Site/iris:Name', namespaces=iris_ns).text

                # Located all required information for the new site, add it to the dictionary.
                sites_info.append({
                    'network_code': new_network,
                    'station_code': new_station,
                    'latitude': new_latitude,
                    'longitude': new_longitude,
                    'site_name': new_name
                })

except Exception as e:
    print(f"Exception: {e}")


Fetching data from the IRIS API.


### Add any new network and station codes to the database and update site information.

In [20]:
try:
    if sites_info is not None:

        try:
            # Iterate the sites dictionary to add new codes and update site information.
            print('Adding new codes to the database and updating site information.')

            for current_site in sites_info:

                network_id = fields_to_id(table_name=table_names['network_codes'],
                                          search_values={'network_code': current_site['network_code']},
                                          insert_if_missing=True)
                if network_id is None:
                    # Failed to fetch the required ID, skip to the next site.
                    continue

                station_id = fields_to_id(table_name=table_names['station_codes'],
                                          search_values={'station_code': current_site['station_code']},
                                          insert_if_missing=True)
                if station_id is None:
                    # Failed to fetch the required ID, skip to the next site.
                    continue

                site_id = fields_to_id(table_name=table_names['site_info'],
                                          search_values={'network_id': network_id, 'station_id': station_id},
                                          insert_if_missing=True)
                if site_id is None:
                    # Failed to fetch the required ID, skip to the next site.
                    continue

                # Update the site info
                sql_query = f"""UPDATE [{table_names['site_info']}] SET [latitude] = {current_site['latitude']},
                                [longitude] = {current_site['longitude']},
                                [site_name] = '{current_site['site_name']}'
                                WHERE [id] = {site_id};"""

                with sqlite3.connect(database=mseed_db_filename) as db_conn:
                    db_cursor = db_conn.cursor()
                    db_cursor.execute(sql_query)
                    db_conn.commit()

        except sqlite3.Error as e:
            print(f"Exception: SQLite {' '.join(e.args)}.")

        finally:
            db_conn.close()

except Exception as e:
    print(f"Exception: {e}")


## Create a visualization for the data.

In [21]:
print('TODO')

TODO
