In [None]:
#minimizing Shannon entropy to determine
#wavelet decomposition level
def decomp_entropy(obj_id, band):
    #performing interpolation, selecting grid
    #in time (x) upon which to project
    interp, tmin, tmax = interpolate_signal(obj_id, band)
    x = np.linspace(tmin, tmax, 100)
    y0 = pd.Series(interp(x))
    
    e0 = stats.entropy(y0.value_counts())
    print(e0)

    y1, _ = pywt.dwt(y0, 'db1')
    y1 = pd.Series(y1)
    e1 = stats.entropy(y1.value_counts())
    print(e1)

    y2, _ = pywt.dwt(y1, 'db1')
    y2 = pd.Series(y2)
    e2 = stats.entropy(y2.value_counts())
    print(e2)

    y3, _ = pywt.dwt(y2, 'db1')
    y3 = pd.Series(y3)
    e3 = stats.entropy(y3.value_counts())
    print(e3)

    y4, _ = pywt.dwt(y3, 'db1')
    y4 = pd.Series(y4)
    e4 = stats.entropy(y4.value_counts())
    print(e4)

    y5, _ = pywt.dwt(y4, 'db1')
    y5 = pd.Series(y5)
    e5 = stats.entropy(y5.value_counts())
    print(e5)

    y6, _ = pywt.dwt(y5, 'db1')
    y6 = pd.Series(y6)
    e6 = stats.entropy(y6.value_counts())
    print(e6)

    y7, _ = pywt.dwt(y6, 'db1')
    y7 = pd.Series(y7)
    e7 = stats.entropy(y7.value_counts())
    print(e7)
    
    return

decomp_entropy(615, 2)

In [None]:
#difficult to implement for unevenly sampled time series data
#define function to perform continuous wavelet transform
#was supposed to be paired with gaussian process regressor
#which would also incorporate flux_err into the interpolation
def cwt(y): 
    
    #organizing plots for different passbands
    fig, axs = plt.subplots(nrows = 2, ncols = 3, sharex = True, sharey = True)
    
    class_separate = train_metadata.loc[train_metadata['target'] == y]
    class_indices = class_separate.object_id.unique()
    
    #choosing train_series rows where the label = y
    X = (train_series[train_series['object_id'].isin(class_indices)]).drop(columns =['flux_err', 'detected'])
    
    for ax, index, signal in zip(axs.flat, class_indices, [0, 1, 2, 3, 4, 5]):
        coeffs, freqs = pywt.cwt(X[['mjd', 'flux']].where(X['passband'] == signal && X['object_id'] == index), np.arange(1, 65), wavelet = 'mexh')
        
        ax.imshow(coeffs, cmap = 'coolwarm', aspect = 'auto')
        ax.set_title('CWT of Object ' + index + ' in band ' + signal)
        ax.spines['right'].set_visible(False)
        ax.spines['top'].set_visible(False)
        ax.set_ylabel('Scale')
        ax.set_xlabel('Time')
        
    plt.tight_layout()
    
    return

In [None]:
#couldn't get tsfresh library to work
from tsfresh import extract_features, select_features
from tsfresh.utilities.dataframe_functions import impute, check_for_nans_in_columns
from tsfresh.feature_extraction import MinimalFCParameters

#define function for tsfresh analysis of all classes
#useful for extracting features like peaks, outliers, etc. in time series data
#need to fix this method with tsfresh statistical features.
#difficult to implement for unevenly samples time series data
#has to do with finding the right versions for the right packages.

def statistical_features():
    
    #count the occurrence of each object in train_series
    #keep that in a series
    #use an iterator to move through train_metadata and multiply accordingly
    ids = train_metadata['object_id'].unique()
    
    #setting the index of the target series to the object ids
    targets = pd.Series(data = None, index= ids)

    for x in ids:
        #target for each object
        targ = int(train_metadata[train_metadata['object_id'] == x]['target'])
        #populating the empty series
        targets[x] = targ
    
    #extracting and selecting the most relevant features
    extracted_features = extract_features(train_series, column_id = 'object_id', column_value = 'flux', column_sort = 'mjd', column_kind = 'passband', default_fc_parameters = MinimalFCParameters())
    extracted_features = impute(extracted_features)
    features_filtered = select_features(extracted_features, targets)
    
    return features_filtered

In [None]:
#implement the bazin parametric fit
#simple model with no physical basis but an ability to reproduce the behavior of most light curves
#pre-processing for supernova classification

def bazin(t, A, t0, trise, tfall):
    if tfall <= trise:
        return np.zeros(t.shape)
    res = A * np.exp(-(t - t0) / tfall) / (1 + np.exp(-(t - t0) / trise))
    return res

In [None]:
#recover parameters for a given object
#model by zheng, kelly, and filippenko: specific to Ia SN
def get_zkffit(timeseries, metadata, obj_id):
    #setting up dataseries that will contain parameters for object
    bands = [0, 1, 2, 3, 4, 5]
    res = pd.DataFrame()
    res['object_id'] = [obj_id]
    res['bazin_trise'] = np.NaN
    for band in bands:
        res['bazin_tfall_{}'.format(band)] = np.NaN
        res['bazin_A_{}'.format(band)] = np.NaN
    
    #creating a new dataframe with the object's recorded events
    obj_ts = timeseries[timeseries['object_id'] == obj_id]
    obj_meta = metadata[metadata['object_id'] == obj_id]    
    #applying naive benchmark that supernovae are extragalactic
    if (obj_meta['hostgal_photoz']).mean() == 0:
        print('This not not extragalactic!')
        return res
    
    #bazin fit is sensitive to time given in mjd,
    #so we must scale so each object starts at t=0
    
    #create adjusted dataframes for each band's time
    #create time series limited to passband
    ts0 = obj_ts[(obj_ts['passband'] == 0)]
    ts1 = obj_ts[(obj_ts['passband'] == 1)]
    ts2 = obj_ts[(obj_ts['passband'] == 2)]
    ts3 = obj_ts[(obj_ts['passband'] == 3)]
    ts4 = obj_ts[(obj_ts['passband'] == 4)]
    ts5 = obj_ts[(obj_ts['passband'] == 5)]
    
    #select mjd column in time series
    mjd_init0 = ts0[['mjd']]
    mjd_init1 = ts1[['mjd']]
    mjd_init2 = ts2[['mjd']]
    mjd_init3 = ts3[['mjd']]
    mjd_init4 = ts4[['mjd']]
    mjd_init5 = ts5[['mjd']]
    
    #find minimum time value for each passband
    min0 = mjd_init0.min()
    min1 = mjd_init1.min()
    min2 = mjd_init2.min()
    min3 = mjd_init3.min()
    min4 = mjd_init4.min()
    min5 = mjd_init5.min()
    
    #create series of scaled time values
    mjd0 = (mjd_init0).subtract(min0)
    mjd1 = (mjd_init1).subtract(min1)
    mjd2 = (mjd_init2).subtract(min2)
    mjd3 = (mjd_init3).subtract(min3)
    mjd4 = (mjd_init4).subtract(min4)
    mjd5 = (mjd_init5).subtract(min5)
    
    #name column so update method can be applied
    mjd0 = pd.DataFrame(mjd0, columns = ['mjd'])
    mjd1 = pd.DataFrame(mjd1, columns = ['mjd'])
    mjd2 = pd.DataFrame(mjd2, columns = ['mjd'])
    mjd3 = pd.DataFrame(mjd3, columns = ['mjd'])
    mjd4 = pd.DataFrame(mjd4, columns = ['mjd'])
    mjd5 = pd.DataFrame(mjd5, columns = ['mjd'])
    
    #update all passband-limited time series
    #this is the information that will be manipulated from now
    ts0.update(mjd0)
    ts1.update(mjd1)
    ts2.update(mjd2)
    ts3.update(mjd3)
    ts4.update(mjd4)
    ts5.update(mjd5)
    
    return ts0

In [None]:
get_bazin(supernovae42_ts, supernovae42_meta, 130659834)

In [None]:
#recover bazin parameters for a given object
def get_bazin(timeseries, metadata, obj_id):
    #setting up dataseries that will contain parameters for object
    bands = [0, 1, 2, 3, 4, 5]
    res = pd.DataFrame()
    res['object_id'] = [obj_id]
    res['bazin_trise'] = np.NaN
    for band in bands:
        res['bazin_tfall_{}'.format(band)] = np.NaN
        res['bazin_A_{}'.format(band)] = np.NaN
    
    #creating a new dataframe with the object's recorded events
    obj_ts = timeseries[timeseries['object_id'] == obj_id]
    obj_meta = metadata[metadata['object_id'] == obj_id]    
    #applying naive benchmark that supernovae are extragalactic
    if (obj_meta['hostgal_photoz']).mean() == 0:
        print('This not not extragalactic!')
        return res
    
    #creating lists to contain all the features that will be calculated
    offset = 11
    t0s = []
    trises = []
    p0s = []
    bmins = []
    bmaxs = []
    xs = []
    ys = []
    y_errs = []
    mcovs = []
    
    #calculating features within each band
    for band in bands:
        try:
            single_ts = obj_ts[(obj_ts.object_id == obj_id) & (obj_ts.passband == band)]
            mjd_delta_prev = (single_ts.mjd - single_ts.mjd.shift(1)).fillna(100).values.ravel()
            mjd_delta_next = (single_ts.mjd.shift(-1) - single_ts.mjd).fillna(100).values.ravel()
            
            #taking the time range and average flux error of data
            x_min = single_ts.mjd.min()
            x_max = single_ts.mjd.max()
            y_err_mean = df.flux_err.mean()
            
            #scaling time because bazin function is sensitive to use of mjd
            #also creating evenly spaced samples
            #using a random number generator and the expected flux error in the band
            mjd_delta_prev = np.concatenate((100*np.ones((offset,)), mjd_delta_prev, 100*np.ones((offset,))))
            x = np.concatenate((np.linspace(x_min-500, x_min-450, offset), single_ts.mjd.values, np.linspace(x_max+450, x_max+500, offset)))
            y = np.concatenate((np.random.randn(offset) * y_err_mean, df.flux_values, np.random.randn(offset)*y_err_mean))
            y_err = np.concatenate((y_err_mean * np.ones(offset), df.flux_err, y_err_mean*np.ones(offset)))
        
            idxmax = np.argmax(y) #y index of maximum flux
            t00 = x[np.argmax(y)] #time of maximum flux
            A = y.max() #A = value of maximum flux
            #possible bounds for the constant A
            Amin = A*1.3
            Amax = 4*A
            #some sample values for parameters that must be tuned
            tstart = -5
            trise = 5
            tfall = 10
            
            #adjust guesses based on distance of point of max flux
            #to next time series point
            if mjd_delta_prev[idxmax] > 50:
                tstart = -50
                Amin = 2*A
                Amax = 5*A
                trise = 20
                tfall = 40
                tmax = 20
            if mjd_delta_next[idxmax] > 50:
                Amin = 2*A
                Amax = 5*A
                trise = 20
                tfall = 40
            
            #calculate last of parameters using parameters calculated
            #thus far
            p0 = [(Amin + Amax) / 2, t00 + tstart, trise, tfall]
            bmin = [Amin, t00 + tstart - 100, trise/10, tfall/10]
            bmax = [Amax, t00 + tstart, trise*10, tfall*10]
            bounds = (bmin, bmax)
            
            #test fit of bazin model
            def test_bazin(t, A, t0, trise, tfall):
                res = curve(t, A, t0, trise, tfall)
            return res
            
            params, params_covariance = curve_fit(test_bazin, x, y, p0, y_err, bounds = bounds, max_nfev=1000)
            
            median_cov = np.abs(np.median(params_covariance / A))
            
            if (median_cov <=10) & (band in range(0,5)):
                t0s.append(params[1])
            trises.append(params[2])
            p0s.append([params[0], params[3]])
            bmins.append([bmin[0], bmin[3]])
            bmaxs.append([bmax[0], bmax[3]])
            xs.append(x)
            ys.append(y)
            y_errs.append(y_err)
            mcovs.append(median_cov)
        except:
            continue
            
    #if there is no valid maximum value in the data,
    #the fit has failed and the object is not a supernova. 
    if len(t0s) <= 1:
        print('The fit has failed!')
        return
    t00 = np.median(t0s)
    trise = np.median(trises)
    res['bazin_trise'] = trise
    
    for pb, p0, bmin, bmax, x, y, y_err, median_cov in zip(range(0,6), p0s, bmins, bmax, xs, ys, y_errs, mcovs):
        try:
            bounds = (bmin, bmax)
            
            params, params_covariance = curve_fit(test_bazin, x, y, p0, y_err, bounds = bounds, max_nfev=1000)
            median_cov = np.median(params_covariance / A)
        
            if median_cov <= 10:
                res['bazin_tfall_.{}'.format(band)] = params[1]
                res['bazin_A_{}'.format(band)] = params[0]
        except:
            continue
    print(res)
    return res