In [None]:
from importlib import reload
from pathlib import Path
import os
import numpy as np

from obspy.clients.filesystem.sds import Client as SDSClient
from obspy.clients.fdsn import RoutingClient
from obspy.core import UTCDateTime as UTC, read

import matplotlib.pyplot as plt
plt.style.use('tableau-colorblind10')

from data_quality_control import sds_db, base, util, analysis, dqclogging
from data_quality_control.analysis import Analyzer

## Define parameters

In [None]:
# NSLC
nslc_code = "GR.BFO..BHZ"

overlap = 60 #3600
fmin, fmax = (4, 14)
nperseg = 2048
winlen_in_s = 3600
proclen = 24*3600
sampling_rate = 20

outdir = 'output'

sds_root = os.path.abspath('../../sample_sds/')
inventory_routing_type = "eida-routing"

In [None]:
startdate = UTC("2020-12-24")
enddate = UTC("2021-01-15")

In [None]:
reload( sds_db)
reload(base)
reload(util)
processor = sds_db.SDSProcessor(
        nslc_code,
        inventory_routing_type, 
        sds_root, outdir=outdir, 
        overlap=overlap, nperseg=nperseg, 
        winlen_seconds=winlen_in_s, 
        proclen_seconds=proclen,
        amplitude_frequencies=(fmin, fmax),
        sampling_rate=sampling_rate)

print(processor)

processor.process(startdate, enddate, force_new_file=True)

In [None]:
reload(analysis)
reload(base)
#reload(util)
lyza = analysis.Analyzer(outdir, nslc_code,
                            fileunit="year")

lyza.get_data(startdate, enddate)

In [None]:
lyza.plot_spectrogram();

In [None]:
reload(base)
reload(util)
reload(analysis)
polly = analysis.Interpolator(outdir, nslc_code )
dqclogging.configure_handlers(polly.logger, "INFO", "INFO", "output/polly.log", True)
polly.interpolate(6, 3, "output/interpolated/", force_new_file=True,
                 #starttime=startdate, endtime=enddate
                 )

In [None]:
print(polly)

In [None]:
reload(analysis)
reload(base)
#reload(util)
lyza = analysis.Analyzer(Path("output/interpolated/"), nslc_code,
                            fileunit="year")

lyza.get_data(startdate, enddate)

In [None]:
print(lyza)

In [None]:
lyza.plot_spectrogram();

# Decorator experiments

In [None]:
winlen_in_seconds = 8*3600
fileunit = "year"
if fileunit=="year":
    #for i in [365, 366]:
        #r = 24*3600*i % winlen_in_seconds
        
    if not all([(24*3600*i % winlen_in_seconds) == 0 
                for i in [365, 366]]):

        raise UserWarning("Illegal winlen")

In [None]:
winlen_in_seconds = 8*3600
fileunit = "month"
if fileunit=="month":
    #for i in [365, 366]:
        #r = 24*3600*i % winlen_in_seconds
        
    if not all([(24*3600*i % winlen_in_seconds) == 0 
                for i in [28, 30, 31]]):

        raise UserWarning("Illegal winlen")

In [None]:
winlen_in_seconds = 2*3600
fileunit = "day"
if fileunit=="day":
        
    if not all([(24*3600*i % winlen_in_seconds) == 0 
                for i in [1]]):

        raise UserWarning("Illegal winlen")

In [None]:
winlen_in_seconds = 150 #1*3600
fileunit = "hour"
if fileunit=="hour":
        
    if not all([(24*3600*i % winlen_in_seconds) == 0 
                for i in [1/24]]):

        raise UserWarning("Illegal winlen")

In [None]:
def assert_integer_quotient_of_winlen_for_fileunit(fileunit):
    possible_durs = {"year": [365, 366],
                    "month": [28, 30, 31],
                    "day": [1],
                    "hour": [1/24]}
    possible_durs = {k: [24*3600*i for i in v] 
                     for k, v in possible_durs.items()}
    if not all([(dur % winlen_seconds) == 0 for 
                dur in possible_durs[fileunit]]):
        raise UserWarning("Illegal winlen")
    

In [None]:

def assert_integer_quotient_of_winlen_for_fileunit(func):
    def wrapper(x, fileunit, winlen_seconds):
        func(x, fileunit, winlen_seconds)
        possible_durs = {"year": [365, 366],
                        "month": [28, 30, 31],
                        "day": [1],
                        "hour": [1/24]}
        possible_durs = {k: [24*3600*i for i in v] 
                         for k, v in possible_durs.items()}
        if not all([(dur % winlen_seconds) == 0 for 
                    dur in possible_durs[fileunit]]):
            raise UserWarning("Illegal winlen")
    return wrapper

In [None]:
class DecorationTest():
    def __init__(self, win, fileunit):
        self.set_value(fileunit, win)
    
    @assert_integer_quotient_of_winlen_for_fileunit
    def set_value(self, val1, val2):
        self.val1 = val1
        self.val2 = val2

In [None]:
x = DecorationTest(5*3600, "year")

In [None]:
def assert_integer_quotient_of_winlen_for_fileunit(func):
    def wrapper(xclass, *args, **kwargs):
        func(xclass, *args, *kwargs)
        possible_durs = {"year": [365, 366],
                        "month": [28, 30, 31],
                        "day": [1],
                        "hour": [1/24]}
        possible_durs = {k: [24*3600*i for i in v] 
                         for k, v in possible_durs.items()}
        if not all([(dur % xclass.winlen_seconds) == 0 for 
                    dur in possible_durs[xclass.fileunit]]):
            raise UserWarning("Illegal winlen")
    return wrapper

In [None]:
class DecorationTest():
    @assert_integer_quotient_of_winlen_for_fileunit
    def __init__(self, win, fileunit):
        self.winlen_seconds = win
        self.fileunit = fileunit
        
    
    @assert_integer_quotient_of_winlen_for_fileunit
    def set_value(self, val1, val2):
        self.val1 = val1
        self.val2 = val2

In [None]:
x = DecorationTest(2*3600, "year")

In [None]:
reload(util)
def assert_integer_quotient_of_wins_per_fileunit(winlen_seconds, fileunit):
    intquot = all([(dur % winlen_seconds) == 0 for 
                   dur in util.possible_durations_per_fileunit[fileunit]])
    if not intquot:
        raise UserWarning("Illegal winlen")
    

def decorator_assert_integer_quotient_of_wins_per_fileunit(func):
    def wrapper(*args, **kwargs):
        func(*args, **kwargs)
        print(kwargs)
        winlen_seconds = kwargs["winlen_seconds"]
        fileunit = kwargs["fileunit"]
        return assert_integer_quotient_of_wins_per_fileunit(
            winlen_seconds, fileunit)
    return wrapper

In [None]:
class DecorationTest():
    @decorator_assert_integer_quotient_of_wins_per_fileunit
    def __init__(self, winlen_seconds=None, fileunit=None):
        self.winlen_seconds = winlen_seconds
        self.fileunit = fileunit
        
    
    @assert_integer_quotient_of_winlen_for_fileunit
    def set_value(self, val1, val2):
        self.val1 = val1
        self.val2 = val2

In [None]:
x = DecorationTest(winlen_seconds=1*3600, fileunit="year")
x = DecorationTest(winlen_seconds=5*3600, fileunit="year")

### pass class attribute as arg in decorator on class method

In [None]:

def assert_integer_quotient_of_winlen_for_fileunit(func):
    def wrapper(fileunit, winlen_seconds, *args, **kwargs):
        func(*args, **kwargs)
        possible_durs = {"year": [365, 366],
                        "month": [28, 30, 31],
                        "day": [1],
                        "hour": [1/24]}
        possible_durs = {k: [24*3600*i for i in v] 
                         for k, v in possible_durs.items()}
        if not all([(dur % winlen_seconds) == 0 for 
                    dur in possible_durs[fileunit]]):
            raise UserWarning("Illegal winlen")
    return wrapper

In [None]:
class DecorationTest():
    def __init__(self, win, fileunit):
        self.set_value(fileunit, win)
    
    @assert_integer_quotient_of_winlen_for_fileunit(self.fileunit, self.winlen_seconds)
    def set_value(self, val1, val2):
        self.fileunit = val1
        self.winlen_seconds = val2

In [None]:
x = DecorationTest(5*3600, "year")