### **Exercise SEP Developer Test**

This test is designed to assess your ability to design and develop a project from a limited set of goals, provided below. We will evaluate your project based on its fulfillment of the outlined objectives, code efficiency, readability, and adherence to best practices. Additionally, we will evaluate the structure and content of your GitHub repository according to standard practices.

1. Please perform all coding within the Google Colab environment: http://colab.research.google.com
    * Please store your project in a GitHub repository, so we can fork and run it in Colab.
        * Additionally, please create a standard GitHub repository with the expected files and directory structure.    
    * Please note that Colab doesn't automatically include additional files. To avoid potential issues, we recommend only including files that the evaluators already have access to.    
    * The evaluators will use the files SEP01.mseed, SEP02.mseed, and SEP03.mseed, which can be found in this GitHub project.
2. Import the mseed files
3. Create a database with proper normal form and constraints
4. Import the data into the newly created database
5. Create a visualization for the data. The visualization can be static but must include:    
    * A title
    * Text section displaying metadata about the miniseed data
    * Helicorder-style charts of the miniseed data (These may be built from the database data or the mseed files)
    * A map with icons indicating station locations based on the miniseed data (station lat and lon can be found on iris' API)
    * Add to the map additional stations found on IRIS' data API. Stations of interest include HOA and SUG
        * Data API information can be found at http://service.iris.edu/fdsnws/station/1/



## **Install Packages**


### Overview

Packages used in this project include:
- **ObsPy** - used for seismic data processing and analysis.
- **Folium** - used for creating interactive maps of seismic station locations.
- **SQLAlchemy** (installed via ObsPy) - supports database operations for managing seismic station and waveform data.

### Code

In [1]:
# Install necessary quietly
!pip install -q obspy folium > /dev/null 2>&1

# Confirm successful installation
print("All Packages Successfully Installed.")

All Packages Successfully Installed.


## **Database Creation and Testing**


### Overview


This database is implemented using SQLAlchemy, enabling integration with Python objects, and is set up using file-based stucture named seismic_data.db.


####Database Structure
1. **Station Table**:
   - Stores metadata about seismic stations.
   - **Columns**:
     - `station_id`: Unique identifier for each station (Primary Key).
     - `station`: Name of the station.
     - `network`: Network code.
     - `channel`: Channel code (e.g., EHZ).
     - `latitude`: Latitude of the station.
     - `longitude`: Longitude of the station.

2. **WaveformTrace Table**:
   - Stores individual seismic waveform traces.
   - **Columns**:
     - `trace_id`: Unique identifier for each trace (Primary Key).
     - `station_id`: Links to the corresponding station (Foreign Key).
     - `start_time`: Start time of the trace.
     - `end_time`: End time of the trace.
     - `trace_data`: Serialized binary data of the waveform.
     - `sampling_rate`: Sampling rate of the waveform in Hz.

####Functions in the Database Creation

1. **setup_database()**:
   - Initializes the database.
   - Creates tables for `Station` and `WaveformTrace`.
   - Returns an file-based SQLite engine and a session object.

2. **initialize_database(stream, session)**:
   - Populates the database with waveform data from an ObsPy `Stream` object.
   - For each trace:
     - Adds station metadata if it doesn’t already exist.
     - Adds the individual seismic waveform traces.

3. **get_coordinates(station_name)**:
   - Uses IRIS data API to pull lattitude and longitude if they are missing from the .mseed data
   -If IRIS data API is missing, allows user to manually record the coordinates.

####Functions in the Unit Tests


1. **test_full_trace_count**:
   - Validates that all traces from the stream are correctly stored in the database.
   - Ensures the total count matches the expected number of traces (32).

2. **test_query_by_time_range**:
   - Checks the database's ability to filter traces by a specific time range.
   - Confirms the expected number of traces fall within the queried time window.

3. **test_query_by_station**:
   - Tests filtering traces by the station name.
   - Ensures all returned traces correspond to the station "SEP".

4. **test_query_by_sampling_rate**:
   - Verifies filtering by the sampling rate.
   - Confirms all traces have the specified sampling rate.

5. **test_query_by_channel**:
   - Ensures the database can filter traces by channel.
   - Validates that all results belong to the queried channel.

6. **test_query_by_location**:
   - Confirms traces can be filtered by geographic coordinates (latitude and longitude).
   - Verifies all returned traces correspond to the specified location.

7. **test_validate_trace_metadata**:
   - Validates metadata and waveform data for individual traces.
   - Compares stored values (e.g., start time, end time, sampling rate, data length) with expected results.
   - Ensures data integrity in the database.

### Code

In [46]:
import os
from obspy import Trace, Stream, UTCDateTime, read
from obspy.clients.fdsn import Client
from sqlalchemy import create_engine, Column, Integer, String, Float, LargeBinary, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import pickle

#This database uses the version of SQLAlchemy compatible with Obspy as opposed to the newest version
os.environ["SQLALCHEMY_SILENCE_UBER_WARNING"] = "1"

#Environment Setup
Base = declarative_base()
client = Client("IRIS")

class Station(Base):
    __tablename__ = 'stations'
    station_id = Column(Integer, primary_key=True, autoincrement=True)
    station = Column(String, nullable=False)
    network = Column(String, nullable=False)
    channel = Column(String, nullable=False)
    latitude = Column(Float, nullable=True)
    longitude = Column(Float, nullable=True)

class WaveformTrace(Base):
    __tablename__ = 'waveform_traces'
    trace_id = Column(Integer, primary_key=True, autoincrement=True)
    station_id = Column(Integer, ForeignKey('stations.station_id'), nullable=False)
    start_time = Column(String, nullable=False)
    end_time = Column(String, nullable=False)
    trace_data = Column(LargeBinary, nullable=False)
    sampling_rate = Column(Float, nullable=False)

def setup_database():
    #Creates database named "sqlite:///seismic_data.db"
    #create_engine("sqlite:///:memory:") could be used for faster processing but less scalability
    engine = create_engine("sqlite:///seismic_data.db")
    Base.metadata.create_all(engine)
    Session = sessionmaker(bind=engine)
    session = Session()
    return engine, session

def get_coordinates(station_name):
    #Pulls coordinates from IRIS API using station name or user input if unavailable
    latitude = None
    longitude = None
    try:
        inventory = client.get_stations(station=station_name, level="station")
        station_info = inventory[0][0]
        latitude = station_info.latitude
        longitude = station_info.longitude
    except:
        print(f"Unable to retrieve coordinates automatically:")
        while latitude is None:
            try:
                latitude = float(input("Enter latitude: "))
            except ValueError:
              print("Invalid input. Please enter a numeric value for latitude.")
            while longitude is None:
              try:
                longitude = float(input("Enter longitude: "))
              except ValueError:
                print("Invalid input. Please enter a numeric value for longitude.")
    return latitude, longitude

def initialize_database(stream, session):
    #Initialize database with stream and session
    for trace in stream:
        station_entry = session.query(Station).filter_by(
            station=trace.stats.station, network=trace.stats.network
        ).first()
        #.mseed data may not have location
        if not station_entry:
            latitude = None
            longitude = None

            if hasattr(trace.stats, 'latitude') and hasattr(trace.stats, 'longitude'):
                latitude = trace.stats.latitude
                longitude = trace.stats.longitude
            else:
                #Search IRIS for station location
                latitude, longitude = get_coordinates(trace.stats.station)

            station_entry = Station(
                station=trace.stats.station,
                network=trace.stats.network,
                channel=trace.stats.channel,
                latitude=latitude,
                longitude=longitude
            )
            session.add(station_entry)
            session.commit()

        #Add each trace to the database
        new_trace = WaveformTrace(
            station_id=station_entry.station_id,
            start_time=str(trace.stats.starttime),
            end_time=str(trace.stats.endtime),
            trace_data=pickle.dumps(trace.data),
            sampling_rate=trace.stats.sampling_rate
        )
        session.add(new_trace)

    session.commit()
    print("Database initialized successfully.")

### Unit Tests

In [48]:
import unittest
from obspy import Stream, read
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
import pickle
from datetime import datetime
import os

def setup_database():
    #Database setup from the previous code for unit tests
    engine = create_engine("sqlite:///seismic_data.db")
    Base.metadata.create_all(engine)
    Session = sessionmaker(bind=engine)
    session = Session()
    return engine, session

class VerboseTestResult(unittest.TextTestResult):
    #Add custom success and failure messages
    def addSuccess(self, test):
        super().addSuccess(test)
        self.stream.writeln(f"✔️ SUCCESS: {test.shortDescription() or str(test)}")

    def addFailure(self, test, err):
        super().addFailure(test, err)
        self.stream.writeln(f"❌ FAILURE: {test.shortDescription() or str(test)}")

class VerboseTestRunner(unittest.TextTestRunner):
    def _makeResult(self):
        return VerboseTestResult(self.stream, self.descriptions, self.verbosity)

class TestDatabase(unittest.TestCase):
    def setUp(self):
        if os.path.exists("seismic_data.db"):
          os.remove("seismic_data.db")
        #Setup database and session for each test
        self.engine, self.session = setup_database()

        #Load traces directly from .mseed files
        self.stream = Stream()
        for file in ["SEP/SEP01.mseed", "SEP/SEP02.mseed", "SEP/SEP03.mseed"]:
            try:
                self.stream += read(file)
            except Exception as e:
                print(f"Error reading file {file}: {e}")

        #Initialize database with these traces
        initialize_database(self.stream, self.session)

    def test_full_trace_count(self):
        "Query by Full Trace Count"
        #Query database for all traces
        traces = self.session.query(WaveformTrace).all()

        #Verify all traces are in the database
        self.assertEqual(len(traces), 32, "All traces should be in the database")

    def test_query_by_time_range(self):
        "Query by Time Range"
        #Query based on a specific time range
        start_time = "2023-11-11T00:00:00.000001Z"
        end_time = "2023-11-11T02:31:38.090001Z"

        traces = self.session.query(WaveformTrace).filter(
            WaveformTrace.start_time >= start_time,
            WaveformTrace.end_time <= end_time
        ).all()

        #Expected number of traces in this range
        expected_count = 7
        self.assertEqual(len(traces), expected_count, f"Expected {expected_count} traces in the given time range.")

    def test_query_by_station(self):
        "Query by Station"
        #Query based on station name
        station_name = "SEP"
        traces = self.session.query(WaveformTrace).join(Station).filter(Station.station == station_name).all()

        #Verify all traces belong to the station "SEP"
        self.assertEqual(len(traces), 32, "All 32 traces should belong to station 'SEP'.")

    def test_query_by_sampling_rate(self):
        "Query by Sampling Rate"
        #Query based on a specific sampling rate
        sampling_rate = 100.0
        traces = self.session.query(WaveformTrace).filter(WaveformTrace.sampling_rate == sampling_rate).all()

        #Verify all traces have a sampling rate of 100.0 Hz
        self.assertEqual(len(traces), 32, "All 32 traces should have a sampling rate of 100.0 Hz.")

    def test_query_by_channel(self):
        "Query by Channel"
        #Query based on a specific channel
        channel = "EHZ"
        traces = self.session.query(WaveformTrace).join(Station).filter(Station.channel == channel).all()

        #Verify all traces belong to the channel "EHZ"
        self.assertEqual(len(traces), 32, "All 32 traces should belong to channel 'EHZ'.")

    def test_query_by_location(self):
        "Query by Location"
        #Query based on station latitude and longitude
        latitude = 46.19978
        longitude = -122.190857
        traces = self.session.query(WaveformTrace).join(Station).filter(
            Station.latitude == latitude,
            Station.longitude == longitude
        ).all()

        #Verify all traces have the correct coordinates
        self.assertEqual(len(traces), 32, "All traces should belong to the specified location.")

    def test_validate_trace_metadata(self):
        "Validation of Trace Metadata"
        #Query all traces
        traces = self.session.query(WaveformTrace).all()

        #Verify metadata for the first 7 traces
        expected_values = [
            ("2023-11-11T00:00:00.000001Z", "2023-11-11T00:36:58.090001Z", 100.0, 221810),
            ("2023-11-11T00:37:45.100001Z", "2023-11-11T01:41:06.090001Z", 100.0, 380100),
            ("2023-11-11T01:43:07.100001Z", "2023-11-11T02:01:05.090001Z", 100.0, 107800),
            ("2023-11-11T02:01:47.100001Z", "2023-11-11T02:02:10.090001Z", 100.0, 2300),
            ("2023-11-11T02:02:24.100001Z", "2023-11-11T02:17:01.090001Z", 100.0, 87700),
            ("2023-11-11T02:17:49.100001Z", "2023-11-11T02:31:16.090001Z", 100.0, 80700),
            ("2023-11-11T02:31:37.100001Z", "2023-11-11T02:31:38.090001Z", 100.0, 100)
        ]

        for trace, expected in zip(traces[:7], expected_values):
            self.assertEqual(trace.start_time, expected[0], "Start time does not match")
            self.assertEqual(trace.end_time, expected[1], "End time does not match")
            self.assertEqual(trace.sampling_rate, expected[2], "Sampling rate does not match")
            self.assertEqual(len(pickle.loads(trace.trace_data)), expected[3], "Data length does not match")

if __name__ == "__main__":
    unittest.main(testRunner=VerboseTestRunner, argv=[''], exit=False)

.

Database initialized successfully.


✔️ SUCCESS: Query by Full Trace Count
.

Database initialized successfully.


✔️ SUCCESS: Query by Channel
.

Database initialized successfully.


✔️ SUCCESS: Query by Location
.

Database initialized successfully.


✔️ SUCCESS: Query by Sampling Rate
.

Database initialized successfully.


✔️ SUCCESS: Query by Station
.

Database initialized successfully.


✔️ SUCCESS: Query by Time Range
.✔️ SUCCESS: Validation of Trace Metadata

----------------------------------------------------------------------
Ran 7 tests in 9.135s

OK


Database initialized successfully.
