## 1. Locate a Station Near The Event

### Global Variable Constants 

In [106]:
from obspy.clients.fdsn import Client
from obspy.core.utcdatetime import UTCDateTime
from types import MappingProxyType

def Day(num):
    return 60*60*24*num

def get_event_info(name):
    """
    Provides a dictionary (MappingProxyType) of information to populate the global variables needed in the file.
    The chosen event is in a seismologically active area. A station is chosen near the event to get data.
    @param name: Can be one of - "Amatrice", "Patitirion"
    @return: ProxyMapType, containing the information about the event.
    """
    info: MappingProxyType = None
    if name == 'Amatrice':
        info = dict(
            Latitude='42.790',
            Longitude='13.150',
            Time="2016-08-24 03:36:32",
            StartTime="2012-08-24",
            EndTime="2018-08-24",
            Client="INGV",
            Name="Amatrice",
            StationRadius=.2,
            LocalEventRadius=1
        )

    elif name == 'Oklahoma':
        info = dict(
            Latitude='37.081',
            Longitude='-97.309',
            Time="2018-5-15 20:14:49",
            StartTime="2015-10-15",
            EndTime="2017-10-15",
            Client="IRIS",
            Name="Oklahoma",
            StationRadius=1,
            LocalEventRadius=1
        )
    
    elif name == "SouthAmerica":
        info = dict(
            Latitude='-25.3',
            Longitude='-71',
            Time="2018-02-20 23:18:32",
            StartTime="2013-10-15",
            EndTime="2018-01-15",
            Client="IRIS",
            Name="SouthAmerica",
            StationRadius=3,
            LocalEventRadius=1
        )
    
    elif name == "California":
        info = dict(
            Latitude='33.557',
            Longitude='-115.888',
            Time="2018-02-20 23:18:32",
            StartTime="2016-10-15",
            EndTime="2018-01-15",
            Client="IRIS",
            Name="California",
            StationRadius=1,
            LocalEventRadius=1
        )
    
    elif name == 'Alaska':
        info = dict(
            Latitude='63.134',
            Longitude='-151.115',
            Time="2018-02-20 23:18:32",
            StartTime="2010-01-01",
            EndTime="2017-02-26",
            Client="IRIS",
            Network="NP",
            Station="8040",
            Name="Alaska",
            StationRadius=6,
            LocalEventRadius=1.5
        )
    
    if info: 
        return MappingProxyType(info)
    raise ValueError(f"Name '{name}' not found")

In [107]:
# CHANGE THIS VARIABLE TO CHANGE EVERYTHING IN THE FILE
event_name = 'Alaska'  

In [108]:
# Retrieving the quake information            
event = get_event_info(event_name)   # dict containing the information to populate the variables

# Constants
LATITUDE = event['Latitude']                # Latitude of Event
LONGITUDE = event['Longitude']              # Longitude of Event
STARTTIME = UTCDateTime(event['StartTime']) # Starttime of Data Collection
ENDTIME = UTCDateTime(event['EndTime'])     # Endtime of Data Collection
CLIENT_NAME = event['Client']               # Client to retrieve event from
NAME = event['Name'] + 'Quakes'             # Folder name to write spectrograms to

STATION_MAX_RADIUS = event['StationRadius'] # Pick as station within this distance of the event
MAX_RADIUS = event['LocalEventRadius']      # How far local events can be from the station
NONLOCAL_MIN_RADIUS = 6                     # Min Radius for Nonlocal Events
NUM_EVENTS = 2000                            # How many events to retrieve and write                     # Event time window duration
NUM_NOISE_EVENTS = NUM_EVENTS
DURATION = 15          

NETWORK = event.get('Network')
STATION = event.get('Station')

# Filtering Constants for the Spectrograms
MIN_FREQ = 3
MAX_FREQ = 30

### Get Nearest Station 

In [109]:
from obspy.core.inventory import Inventory, Station, Network

# Types
inventory: Inventory
network: Network
station: Station
    

# Code
client = Client(CLIENT_NAME)

inventory = client.get_stations(startbefore=STARTTIME, 
                               endafter=ENDTIME,
                               latitude=LATITUDE,
                               longitude=LONGITUDE,
                               maxradius=STATION_MAX_RADIUS
                              )

network = (inventory.select(NETWORK) if NETWORK else inventory)[0]

station = (network.select(STATION) if STATION else network)[0]

In [110]:
# Verify that waveform FDSN works, so you don't waste time getting to the bottom 
def get_channel_names(stream):
    """ Returns a list of names of each component, in the order they appear in the stream """
    return [trace.stats.channel for trace in stream]

def verify_fsdn(network, station):
    """ Makes a call to the server """
    client = Client(CLIENT_NAME)
    client.get_waveforms(network.code, station.code, "*", "HN*", STARTTIME, STARTTIME + DURATION)
    return True

def get_valid_stations(inventory):
    """ Attempts to get waveforms from a station, with the veriy_fsdn function. Returns valid stations """
    valid = []
    for network in inventory:
        for station in network:
            try:
                verify_fsdn(network, station)
            except Exception:
                print("Failed station:", station.code)
            else:
                print("Succeeded station:", station.code)
                valid.append(station)
    return valid
                
assert verify_fsdn(network, station)

In [111]:
# Optional, Visualize the Station
print(station)
# inventory.plot(projection='local')
# inventory.plot()

Station 8040 (AK:Anchorage;R B Atwood Bld)
	Station Code: 8040
	Channel Count: 0/106 (Selected/Total)
	2003-09-05T18:00:00.000000Z - 2599-12-31T23:59:59.000000Z
	Access: open 
	Latitude: 61.21, Longitude: -149.89, Elevation: -17.4 m
	Available Channels:



## 2. Retrieve Local and Non-Local Events From The Station 

### Functions for getting the noise times

In [112]:
import random
import math
from bisect import bisect
from collections import namedtuple
from typing import List


# Holds a UTCDateTime start and end times of an event
Time = namedtuple('Time', ('start', 'end'))   

def find_closest_index(a, x):
    """
    Find index of rightmost value less than x via binary search
    @param a: Indexable object, the list to look through - MUST BE SORTED
    @param x: Any, the item to find the closest index for in a
    """
    i = bisect(a, x) 
    if i:
        return i
    raise ValueError

def overlaps(time: Time, time2: Time):
    """
    Determines whether two times overlap.
    @returns: boolean, True if the time1 overlaps with time2  
    """
    # Starts after the second time ends, or ends before the second time starts
    return not ((time.start > time2.end) or (time.end < time2.start))

def encompassed(time: Time, sorted_times: List[Time]):
    """
    Determines whether time overlaps with any times in sorted_times
    @param time: Time
    @param sorted_times: sorted list of Time - the list MUST be sorted for this to work
    @return: boolean, True if the time overlaps
    """
    index = find_closest_index(sorted_times, time)
    try:
        left = overlaps(time, sorted_times[index-1])
        curr = overlaps(time, sorted_times[index])
        right = overlaps(time, sorted_times[index+1])
        return any((left, curr, right))
    except IndexError:
        return True     

def get_noise_times(times_to_exclude: List[UTCDateTime], 
                    startafter: UTCDateTime, endbefore: UTCDateTime, amount: int, duration: float):
    """
    Generates a list of noise times, which are not in the times_to_exclude
    @param times_to_exclude: a list of times of events 
    @param startafter: Generate noise windows after this time
    @param endbefore: Generate noise windows before this time
    @param amount: How many noise windows to generate
    @param duration: How long each event in times_to_exclude and noise window should be, to prevent overlapping
    """
    exclude = [Time(time, time + duration) for time in times_to_exclude]
    exclude.sort()
    noise_times = []
    
    def random_time():
        return startafter + random.randint(0, math.floor(endbefore - startafter))
    
    # Generate a random time
    while len(noise_times) < amount:
        rt = random_time()
        rand_time = Time(rt, rt + duration)
                         
        if not encompassed(rand_time, exclude):
            noise_times.append(rt)
            index = bisect(exclude, rand_time)
            exclude = exclude[:index] + [rand_time] + exclude[index:]
        
    return noise_times




In [113]:
from obspy.clients.fdsn import Client
from obspy.core.event import Catalog


# Types
local_catalog_events: Catalog
nonlocal_catalog_events: Catalog
    

# Code
client = Client(CLIENT_NAME)

# Local Events
local_catalog_events = client.get_events(latitude=station.latitude, 
                                 longitude=station.longitude,
                                 maxradius=MAX_RADIUS,  # Local
                                 starttime=STARTTIME,
                                 limit=NUM_EVENTS,
                                 endtime=ENDTIME
                                 )

# Nonlocal Events
nonlocal_catalog_events = client.get_events(latitude=station.latitude, 
                                 longitude=station.longitude,
                                 minradius=NONLOCAL_MIN_RADIUS,  # Nonlocal
                                 limit=NUM_EVENTS,
                                 starttime=STARTTIME,
                                 endtime=ENDTIME
                                 )

print("LOCAL EVENTS:", len(local_catalog_events))
print("NONLOCAL EVENTS:", len(nonlocal_catalog_events))

LOCAL EVENTS: 2000
NONLOCAL EVENTS: 2000


In [114]:
local_catalog = local_catalog_events[:NUM_EVENTS]  
nonlocal_catalog = nonlocal_catalog_events[:NUM_EVENTS]

print("LOCAL EVENTS:", len(local_catalog))
print("NONLOCAL EVENTS:", len(nonlocal_catalog))

LOCAL EVENTS: 2000
NONLOCAL EVENTS: 2000


### Visualize Events Local and NonLocal

In [26]:
# Plot Local Events
#local_catalog.plot()
#local_catalog.plot(projection="local")

In [27]:
# Plot Nonlocal Events
# nonlocal_catalog.plot()

## 3. Retrieve Waveforms 

### Get times where events occur

In [128]:
from obspy.core import Stream
from pprint import pprint


# Functions
def get_event_times(catalog: Catalog) -> List[UTCDateTime]:
    """
    Gets event times from a catalog and returns a list of them.
    """
    return [event.origins[0].time for event in catalog]


# Types
local_times: List[UTCDateTime]
non_local_times: List[UTCDateTime]
noise_times: List[UTCDateTime]
    
    
# Code
catalog_local_times = get_event_times(local_catalog_events)
catalog_nonlocal_times = get_event_times(nonlocal_catalog_events)


# Get times for each category
noise_times = get_noise_times(catalog_local_times + catalog_nonlocal_times, 
                              min(catalog_nonlocal_times), # startafter
                              max(catalog_nonlocal_times), # endbefore
                              NUM_NOISE_EVENTS, 
                              DURATION)

local_times =  get_event_times(local_catalog)
nonlocal_times = get_event_times(nonlocal_catalog)

print("Got times")

Got times


### Prepare data for the client

In [133]:
# Functions
end = lambda start_time: start_time + DURATION

def create_bulk(times):
    return [(network.code, station.code, "*", "HN*", time, end(time)) for time in times]


# Code
local_bulk = create_bulk(local_times)
nonlocal_bulk = create_bulk(nonlocal_times)
noise_bulk = create_bulk(noise_times)
print("Created Bulks")

Created Bulks


### Get Waveforms 

#### Need to do a time test to compare the next two cells 

In [134]:
##### The non-async way #####

# Get Waveforms From Server

#waveforms = client.get_waveforms_bulk(bulk) # Can't figure out how to plot this correctly
#waveforms = [client.get_waveforms(*data, attach_response=True) for data in local_bulk]

# print("Got Waveforms")

In [135]:
import asyncio  
import time
from functools import wraps
from obspy.clients.fdsn.header import FDSNNoDataException


def slide(iterable, num, stagger=None):
    """ 
    A sliding window over an iterator, yielding sub arrays.
    @param iterable: Iterable
    @param num: int, the size of each subarray
    @param stagger: int, the number of elements to start the next subarray by 
    
    Example:
    for arr in slide(list(range(98)), 5):
        print(arr)
    """
    stagger = num - stagger if stagger else 0
    start = 0
    end = num
    
    while start + stagger < len(iterable):
        yield iterable[start:end]
        start = end - stagger
        end += num - stagger
        
        

class AsyncClient(Client):
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.loop = asyncio.get_event_loop()
    
    async def async_get_waveforms(self, *args, **kwargs):
        """ 
        Get waveforms asyncronously 
        @param *args, **kwargs: the arguments passed to Client.get_waveforms
        @return: future
        """
        
        # kwargs can't be passed to loop.run_in_executor, so create a wrapper function.
        @wraps(self.get_waveforms)
        def call_with_kwargs(*args):
            return self.get_waveforms(*args, **kwargs)
            
        future = self.loop.run_in_executor(None, call_with_kwargs, *args)
        return await future
    
    def _get_batch(self, batch: List, bulk_kwargs: dict):
        """
        Retrieves a batch of waveforms by calling 'get_waveforms' for each list of args in the batch
        @param batch: list[list],each sublist is arguments to be passed to get_waveforms
        @param bulk_kwargs: keyword arguments to pass the get_waveforms function. Will be applied to every batch.
        """
        tasks = [self.async_get_waveforms(*data, **bulk_kwargs) for data in batch] # List of Asyncio Tasks
        results = self.loop.run_until_complete(asyncio.gather(*tasks))             # Blocks until finished 
        return results
    
    def get_waveforms_bulk(self, bulk: List[List], batch_size=10, separation=1, bulk_kwargs=None, skip_errors=True):
        """
        Gets waveforms in a bulk asynchronously.
        @param bulk: List[List], data to pass get_waveforms like at - https://docs.obspy.org/packages/autogen/obspy.clients.fdsn.client.Client.get_waveforms_bulk.html
        @param batch_size: how many simultaneous requests are sent at one. Keep this number < 10 
        @param bulk_kwargs: dict, kwargs to pass to each get_waveforms request because you can't store them in the bulk list.
        @param skip_errors: boolean, if False raises FSDN error if the server is missing a waveform  
        @return: List[Stream], a list of the waveforms
        """
        if batch_size > 10: raise ValueError("Batch size too high, could overload server.")
            
        waveforms = []
        bulk_kwargs = bulk_kwargs or {}
        
        generate_waveforms = self.yield_waveforms_bulk(bulk, batch_size, separation, bulk_kwargs, skip_errors)
        
        for i, result in enumerate(generate_waveforms):
            waveforms.extend(result)
            print(f"Got batch {i+1}: {len(waveforms)}/{len(bulk)}", end="\r")
        
        print("\nDone.")
        return waveforms
    
    def yield_waveforms_bulk(self, bulk, batch_size=10, separation=1, bulk_kwargs=None, skip_errors=True):
        """
        Yields a list of streams of size batch_size. Each call to the generator makes another call to the server.
        This way you can be more strategic about how you get your data, hold it in memory, etc.
        @yield: List[Stream]
        """
        bulk_kwargs = bulk_kwargs or {}

        if batch_size > 10: 
            raise ValueError("Batch size too high, could overload server.")

        for i, batch in enumerate(slide(bulk, batch_size)): 
            try:
                yield self._get_batch(batch, bulk_kwargs)
                time.sleep(separation)
            except FDSNNoDataException as e:
                if skip_errors:
                    print(f"Skipping batch {i} FDSNNoDataException")
                    continue
                raise e

                


In [136]:
# Get Waveforms
import obspy
client = AsyncClient(CLIENT_NAME)

local_waveforms = client.get_waveforms_bulk(local_bulk, 
                                            batch_size=5, 
                                            bulk_kwargs=dict(attach_response=True),
                                            skip_errors=True)


nonlocal_waveforms = client.get_waveforms_bulk(nonlocal_bulk, 
                                               batch_size=5, 
                                               bulk_kwargs=dict(attach_response=True),
                                               skip_errors=True)

noise_waveforms = client.get_waveforms_bulk(noise_bulk, 
                                               batch_size=5, 
                                               bulk_kwargs=dict(attach_response=True),
                                               skip_errors=True)


print("\nRetrieved Local:", len(local_waveforms))
print("\nRetrieved Nonlocal:", len(nonlocal_waveforms))
print("\nRetrieved Noise:", len(noise_waveforms))

Skipping batch 159 FDSNNoDataException
Skipping batch 177 FDSNNoDataException
Skipping batch 194 FDSNNoDataException
Skipping batch 195 FDSNNoDataException
Skipping batch 196 FDSNNoDataException
Skipping batch 197 FDSNNoDataException
Skipping batch 198 FDSNNoDataException
Skipping batch 199 FDSNNoDataException
Skipping batch 200 FDSNNoDataException
Skipping batch 201 FDSNNoDataException
Skipping batch 202 FDSNNoDataException
Skipping batch 203 FDSNNoDataException
Skipping batch 204 FDSNNoDataException
Skipping batch 205 FDSNNoDataException
Skipping batch 206 FDSNNoDataException
Skipping batch 210 FDSNNoDataException
Skipping batch 239 FDSNNoDataException
Skipping batch 264 FDSNNoDataException
Skipping batch 265 FDSNNoDataException
Skipping batch 266 FDSNNoDataException
Skipping batch 267 FDSNNoDataException
Skipping batch 268 FDSNNoDataException
Skipping batch 269 FDSNNoDataException
Skipping batch 270 FDSNNoDataException
Skipping batch 271 FDSNNoDataException
Skipping batch 272 FDSNNo

## 4. Filter 

In [137]:


def filter_waveforms(waveforms: List[Stream]) -> List[Stream]:
    """ 
    Filters the waveforms (in place) 
    @return: generator -> Stream
    """
    waveforms = map(lambda stream: stream.remove_response().filter('bandpass',  
                                                                   freqmin=MIN_FREQ, 
                                                                   freqmax=MAX_FREQ)\
                                                            .taper(max_percentage=.5), waveforms) 
    return waveforms
    

local_waves = list(filter_waveforms(local_waveforms.copy()))
nonlocal_waves = list(filter_waveforms(nonlocal_waveforms.copy()))
noise_waves = list(filter_waveforms(noise_waveforms.copy())) 
print("Filtered")

Filtered


## 5. Write Events to Spectrogram Image 

### Functions for Writing

In [138]:
from typing import List
from obspy import Stream
from multiprocessing import Pool
import matplotlib.pyplot as plt
from matplotlib.ticker import NullLocator
import glob
import os

def create_directory(path):
    if not os.path.exists(path):
        os.makedirs(path)
        
        
def write_waveforms(waveforms: List[Stream], path):
    create_directory(path)
    for i, stream in enumerate(waveforms):
        stream.write(os.path.join(path, f'{i}.mseed'), format="MSEED")
        
        
def write_waveform(stream, file_path):
    print("Writing: ", os.path.basename(file_path), end='\r')
    stream.write(file_path, format="MSEED")
    

def async_write_waveforms(waveforms: List[Stream], path):
    print("Writing Waveforms...")
    create_directory(path)

    work = [] 
    
    # Prep the file paths
    for i, stream in enumerate(waveforms):
        file_path = os.path.join(path, f'{i}.mseed')
        
        # Add the args for each stream
        work.append(
            (stream, file_path)  
        )

    # Run with multiprocess
    pool = Pool()
    pool.starmap(write_waveform, work)
    
    print("\nWrote Waveforms")
    

def remove_borders(path):
    """ 
    Removes borders of all .png files in curr directory  
    Must have ImageMagik installed.
    """
    png_paths = glob.glob(f"{path}/*.png")
    
    for file in png_paths:
        print("Trimming: ", file, end='\r')
        !convert $file -trim +repage $file;
        

def write_spectrogram(stream, path):
    """
    @param stream: Stream, the file to make a spectrogram from.
                   Each trace should be one component.
    
    @param path: Str, the directory to write into.
    
    Each component of the stream is written as its own figure, a .png file.
    Components eg. "NNE" are saved as the filename eg. "NNE.png".
    """
    create_directory(path)
    figure_list = stream.spectrogram(show=False, title="")
    
    for figure, channel in zip(figure_list, get_channel_names(stream)):
        axes = figure.gca() 
        axes.set_xlim([0, DURATION])
        axes.set_ylim([0, MAX_FREQ])

        # Get Rid of Whitespace
        axes.axis("off")
        axes.xaxis.set_major_locator(NullLocator())
        axes.yaxis.set_major_locator(NullLocator())
        figure.savefig(f"{path}/{channel}.png", bbox_inches='tight', pad_inches=0, transparent=True)
        
        # Free memory
        figure.clf()
        plt.close(figure) 

    remove_borders(path)  # Remove borders of all the spectrograms
    
    

def write_spectrograms(waveforms, path):   
    for i, stream in enumerate(waveforms):
        dir_path = os.path.join(path, str(i))
        create_directory(dir_path)
        write_spectrogram(stream, dir_path)
    

def async_write_spectrograms(waveforms, path):
    """
    @param waveforms: List[Stream], the waveforms to make spectrograms from.
    @param path: str, The directory to write to
    
    1. Calls 'write_spectrogram' on each Stream. 
    2. Each stream is written to its own folder
    3. All components are written to their own file as a spectrogram .png image.
    """
    print("Writing Files...")

    work = []   # Args that will be passed to the write_spectrogram function
    
    # Prep the file paths
    for i, stream in enumerate(waveforms):
        dir_path = os.path.join(path, str(i))
        create_directory(dir_path)
        
        # Add the args for each stream
        work.append(
            (stream, os.path.join(path, str(i)))  # arguments to the 'write_spectrogram' function
        )

    # Run with multiprocess
    pool = Pool()
    pool.starmap(write_spectrogram, work)             # Map the 'write_spectrogram' function to the work
    
    print("\nWrote Files")


In [139]:
import warnings

# Constants
DIR_PATH = os.path.join(os.getcwd(), f"waveforms/{NAME}")

# Write Waveforms
with warnings.catch_warnings():
    warnings.simplefilter("ignore")
    async_write_waveforms(local_waves, os.path.join(DIR_PATH, "local"))   
    async_write_waveforms(nonlocal_waves, os.path.join(DIR_PATH, "non_local"))   
    async_write_waveforms(noise_waves, os.path.join(DIR_PATH, "noise"))   
    
print("Finished Writing")

Writing Waveforms...

Wrote Waveforms
Writing Waveforms...

Wrote Waveforms
Writing Waveforms...

Wrote Waveforms
Finished Writing


In [None]:
#### READ WAVEFORMS ####

# from obspy import read
# import glob
# import os

# DIR_PATH = os.path.join(os.getcwd(), f"waveforms/{NAME}/noise")
# def read_waveforms(dir_path):
#     files = glob.glob(os.path.join(dir_path, '*.mseed'))
#     waveforms = [read(file) for file in files]
#     return waveforms

# noise_waves = read_waveforms(DIR_PATH)

In [None]:
# Constants
DIR_PATH = os.path.join(os.getcwd(), f"spectrograms/{NAME}")

# Write Waveforms
async_write_spectrograms(local_waves, os.path.join(DIR_PATH, "local"))   
async_write_spectrograms(nonlocal_waves, os.path.join(DIR_PATH, "non_local"))   
async_write_spectrograms(noise_waves, os.path.join(DIR_PATH, "noise"))   


Writing Files...




Trimming:  /data/pytorch_earthquake_recognition/spectrograms/AlaskaQuakes/local/0/HNZ.png



Trimming:  /data/pytorch_earthquake_recognition/spectrograms/AlaskaQuakes/local/1565/HNE.png
Wrote Files
Writing Files...




Trimming:  /data/pytorch_earthquake_recognition/spectrograms/AlaskaQuakes/non_local/63/HNZ.pngg



Trimming:  /data/pytorch_earthquake_recognition/spectrograms/AlaskaQuakes/non_local/0/HNE.pngng



Trimming:  /data/pytorch_earthquake_recognition/spectrograms/AlaskaQuakes/non_local/319/HNE.png

# Tests

In [None]:
import unittest 

class TestMethods(unittest.TestCase):
 

    def test_overlaps(self):
        time1 = UTCDateTime(2015, 1, 1, 10, 0, 10)
        time2 = UTCDateTime(2015, 1, 1, 10, 0, 19)
        time3 = UTCDateTime(2015, 1, 1, 10, 0, 30)
        t1 = Time(time1, time1 + 10)
        t2 = Time(time2, time2 + 10)
        t3 = Time(time3, time3 + 10)
        self.assertTrue(overlaps(t1, t2))
        self.assertFalse(overlaps(t2, t3))

        
    def test_encompassed(self):
        t = UTCDateTime(2015, 1, 1, 10, 0, 0) 
        times = [Time(t + 10*i, t + 10*(i+1)) for i in range(1, 7)]  # 1 minute of times 
        times.append(Time(t+1000, t+1010))
        times.append(Time(t+1010, t+1020))
        times.sort()                           # sorted just to remember that the function requires sorted times
        
        
        # Encompassed within the times
        self.assertTrue(encompassed(Time(t+30, t+40), times))
        
        # Not encompassed
        self.assertFalse(encompassed(Time(t+990, t+999), times))
        
        # Value should be within the range of times... otherwise it is invalid.
        # This is because we only have knowledge of events between the start and end time
        # we can't generate noise times outside this range.
        with self.assertRaises(ValueError):
            encompassed(Time(t-10, t), times) 
            
            
    def test_find_closest_index(self):
        lst = [1, 2, 3, 5, 6]
        self.assertEqual(find_closest_index(lst, 2), 2)  # defaults to finding index to the right  
        self.assertEqual(find_closest_index(lst, 4), 3) 
        
        with self.assertRaises(ValueError):
            find_closest_index(lst, 0)
            
            
            
    def test_get_noise_times(self):
        t = UTCDateTime(2015, 1, 1, 10, 0, 0) 
        exclude = [t + 60*i for i in range(1, 11)]  # 10 times in a 10 minute range 
    
        times = get_noise_times(exclude, startafter=exclude[0], endbefore=exclude[-1], amount=10, duration=10)
        
        
        # Make sure none of the noise times overlap with the event (exclude) times
        exclude_times = [Time(time, time+10) for time in exclude]
        self.assertFalse(any((encompassed(Time(time, time+10), exclude_times)) for time in times))
        
        # Make sure that the times are generated in the correct time window
        times.sort()
        self.assertTrue((times[0] > exclude[0]) and (times[-1] < exclude[-1]))
        
        # Should generate the correct amount
        self.assertEqual(len(times), 10)
        
        
    def test_slide(self):
        gen = slide(list(range(7)), 3)
        self.assertEqual(next(gen), [0, 1, 2])
        self.assertEqual(next(gen), [3, 4, 5])
        self.assertEqual(next(gen), [6])
        
        res = list(slide(list(range(7)), 3, 2))
        self.assertEqual(res, [[0, 1, 2], [2, 3, 4], [4, 5, 6]])
    

In [None]:
methods = TestMethods()

suite = unittest.TestLoader().loadTestsFromModule(methods)
unittest.TextTestRunner().run(suite)
