In [1]:
import h5py
import numpy as np
import pandas as pd
import json
import matplotlib.pyplot as plt
import os
import sys
from sklearn.model_selection import train_test_split
import time

import random


%matplotlib inline

In [2]:
class BigDataLoader:
    def __init__(self, upsample = False, downsample = False, frac_diff = 1, test = False, seed = None):
        self.upsample = upsample
        self.downsample = downsample
        self.balance = self.upsample and self.downsample
        self.seed = seed
        self.test = test
        self.source_path = "F:\Thesis_ssd"
        
        #TODO: Make these two lines more generic
        self.filename = f"{self.source_path}\LargeDataset\merge.hdf5"
        self.csv_file = f"{self.source_path}\LargeDataset\merge.csv"
        
        self.info_file = self.parse_csv(self.csv_file)
        self.data_file = h5py.File(self.filename, 'r',  rdcc_nbytes = 10**9).get('data')
        
        self.event_names = list(self.data_file.keys())
        if self.test:
            np.random.shuffle(self.event_names)
            self.event_names = self.event_names[0:int(len(self.event_names)*0.02)]
        self.name_label = self.create_name_label_array(self.event_names)
        self.handler = BigDataHandler(self)
        self.balancer = Balancer(self, self.handler)
        
        if upsample or downsample:
            self.name_label = self.balancer.balance_dataset(self.name_label, downsample, upsample, frac_diff = frac_diff)
    
    def parse_csv(self, csv_file):
        col_names = pd.read_csv(csv_file, nrows=0).columns
        non_string_or_time = {
                      'receiver_latititude' : float,
                      'receiver_longitude' : float,
                      'p_weight' : float,
                      'p_travel_secs' : float,
                      'source_latitude' : float,
                      'source_longitude' : float,
                      'source_magnitude' : float,
                      'source_distance_deg' : float,
                      'source_distance_km' : float,
                      'back_azimuth_deg' : float,
                      'snr_db' : object,
                      'code_end_sample' : object}
        non_string_or_time.update({col: str for col in col_names if col not in non_string_or_time})
        df = pd.read_csv(self.csv_file, dtype = non_string_or_time)
        return df
           

    def get_label_by_name(self, event_name):
        # Fastest way to get label
        labels = {'EV': 'earthquake' , 'NO' : 'noise'}
        return labels[event_name.split('_')[-1]]


    def create_name_label_array(self, name_list):
        name_label = np.empty((len(name_list), 2), dtype='<U32')
        for idx, name in enumerate(name_list):
            name_label[idx] = [name, self.get_label_by_name(name)]
        return name_label

    def get_dataset_distribution(self, name_labels):
        labels = [x[1] for x in name_labels]
        uniques, counts = np.unique(labels, return_counts = True)
        return uniques, counts

In [3]:
class BigDataHandler():
    
    def __init__(self, data_loader):
        self.loader = data_loader
        self.label_dict = {'earthquake':0, 'noise': 1}
        self.source_path = self.loader.source_path
        self.seed = self.loader.seed
        self.data_file = self.loader.data_file
    
    def create_train_val_test(self, name_label, val_test_size = 0.1, val_test_prop = 0.5, seed = None, shuffle = False):
        train, val_test = train_test_split(name_label, test_size = val_test_size, random_state = seed, shuffle = shuffle)
        val, test = train_test_split(val_test, test_size = val_test_prop, random_state = seed, shuffle = shuffle)
        return train, val, test
    
    def name_to_trace(self, name):
        return self.data_file.get(name)[:]
    
    def csv_to_trace_label(self, data_file, info_file, index):
        name = info_file['trace_name'][index]
        event = data_file.get('data').get(name)[:]
        label = self.get_label_by_name(name)
        return event, label
    
    def get_csv_row_by_name(self, event_name):
        return self.loader.df_csv[self.loader.df_csv['trace_name'] == event_name].values
    
    def get_trace_shape(self, df):
        some_name = df[0][0]
        trace = np.transpose(self.name_to_trace(some_name))
        num_channels, num_timesteps = trace.shape
        return num_channels, num_timesteps
    
    def batch_to_trace_binary_label(self, batch):
        names = batch[:,0]
        labels = batch[:,1]
        batch_trace = np.empty((len(batch), 6000, 3))
        batch_info = np.empty((len(batch), 1))
        for idx, name in enumerate(names):
            batch_trace[idx] = self.name_to_trace(name)
            batch_info[idx] = self.label_dict.get(labels[idx])
        batch_trace = np.reshape(batch_trace, (len(batch), 3, 6000))
        return batch_trace, batch_info

    def transform_batch(self, scaler, batch_X):
        transformed_X = batch_X
        for i in range(len(batch_X)):
            transformed_X[i] = scaler.transform(batch_X[i])
        return transformed_X      
        
        

In [4]:

class Balancer():
    
    def __init__(self, data_loader, handler):
        self.loader = data_loader
        self.handler = handler
        self.seed = data_loader.seed
        
    def downsample_label(self, target_label, name_label_df, n_samples, seed):
        # Downsamples target label
        target_df = name_label_df.loc[name_label_df["label"] == target_label]
        downsampled_target_df = target_df.sample(n_samples, random_state = self.seed)
        
        non_target_df_len = len(name_label_df) - len(target_df) 
        downsampled_df = np.empty((non_target_df_len + len(downsampled_target_df), 2), dtype = '<U32')
        downsampled_df[0:non_target_df_len] = name_label_df.loc[name_label_df["label"] != target_label]
        downsampled_df[non_target_df_len:non_target_df_len + len(downsampled_target_df)] = downsampled_target_df
        downsampled_df = pd.DataFrame(downsampled_df, columns=["name", "label"])
        
        return downsampled_df
        
    def upsample_label(self, target_label, name_label_df, n_samples, seed):
        # Selects n_samples from the target label to include in the new df in addition to the non-target label dps.
        target_df = name_label_df.loc[name_label_df["label"] == target_label]
        
        random_selection = np.empty((len(name_label_df)-len(target_df)+n_samples, 2), dtype = '<U32')
        random_selection[0:len(name_label_df)-len(target_df)] = name_label_df.loc[name_label_df["label"] != target_label]
        
        current_len = len(name_label_df) - len(target_df)
        random_selection[current_len:current_len + n_samples] = target_df.sample(n_samples, replace = True, random_state = seed)
        random_selection = pd.DataFrame(random_selection, columns = ["name", "label"])
        
        return random_selection
    
    def frac_diff_n_samples(self, frac_diff, min_counts, max_counts):
        diff = max_counts - min_counts
        n_samples = int(min_counts + diff*frac_diff)
        return n_samples
        

    def balance_dataset(self, name_label, downsample, upsample, frac_diff = 1):
        """
        Balance the dataset. Downsample, upsample or both.
        
        PARAMETERS:
        ------------------------------
        name_label: np.array - array of all event names and their respective label.
        downsample: bool -     True then will downsample
        upsample:   bool -     True then will upsample such that the length of the least occuring label is 
                               equal to the most occuring
        frac_diff: float -     Fraction of the most prominent label that will be downsampled. 
                               0 will mean that it will be downsampled so that its length is equal 
                               to that of the least occuring label
        
        """
        balancing = pd.DataFrame(name_label, columns = ["name", "label"], dtype='<U32')
        if downsample:
            uniques, counts = self.loader.get_dataset_distribution(np.array(balancing, dtype = '<U32'))
            most_occuring_label = uniques[np.where(counts == max(counts))][0]
            frac_diff_n_samples = self.frac_diff_n_samples(frac_diff, min(counts), max(counts))
            balancing = self.downsample_label(most_occuring_label, balancing, frac_diff_n_samples, self.seed)
                     
        if upsample:
            uniques, counts = self.loader.get_dataset_distribution(np.array(balancing,  dtype = '<U32'))
            least_occuring_label = uniques[np.where(counts == min(counts))][0]
            n_samples_for_balance = max(counts)
            balancing = self.upsample_label(least_occuring_label, balancing, n_samples_for_balance, self.seed)
        balancing = balancing.sample(frac = 1, random_state = self.seed).reset_index(drop=True)
        return np.array(balancing)

In [5]:
from keras.utils import np_utils

class BigDataGenerator():
    
    def __init__(self, data_loader):
        self.loader = data_loader
        self.handler = self.loader.handler
   
   
    def data_generator(self, ds, batch_size, use_scaler = False, scaler = None):
        channels, timesteps = self.handler.get_trace_shape(ds)
        num_samples = len(ds)
        while True:
            for offset in range(0, num_samples, batch_size):
                # Get the samples you'll use in this batch
                self.batch_samples = np.empty((batch_size,2), dtype = np.ndarray)
                
                # Handle what happens when asking for a batch but theres no more new data
                if offset+batch_size > num_samples:
                    overflow = offset + batch_size - num_samples
                    self.batch_samples[0:batch_size-overflow] = ds[offset:offset+batch_size]
                    i_start = random.randint(0, num_samples-overflow)
                    self.batch_samples[batch_size-overflow:batch_size] = ds[i_start:i_start+overflow]           
                else:
                    self.batch_samples = ds[offset:offset+batch_size]
                # Preprocessinng
                X, y = self.preprocessing(self.batch_samples, use_scaler, scaler)
                try:
                    y = np_utils.to_categorical(y, len(np.unique(y)), dtype=np.int64)
                except:
                    raise Exception(f'Error when doing to_categorical. Inputs are y: {y} and num_classes: {len(np.unique(y))}')               
                yield X, y
    
    def preprocessing(self, batch_samples, use_scaler, scaler):
        batch_trace, batch_label = self.handler.batch_to_trace_binary_label(batch_samples)
        if use_scaler:
            batch_trace = self.handler.transform_batch(scaler, batch_trace)
        return batch_trace, batch_label
        

In [6]:
from joblib import dump, load
from os import path
import sys

class BigScalerFitter():
    
    def __init__(self, train_ds, scaler, data_loader):
        self.train_ds = train_ds
        self.scaler = scaler
        self.loader = data_loader

    def subsample(self, ds, shuffle = False, subsample_rate = 0.2):
        channels, timesteps = self.data_loader.handler.get_trace_shape(ds)
        num_samples = len(ds)
        num_samples = int(num_samples*subsample_rate)
        if shuffle:
            ds = ds.sample(frac = 1, random_state = self.loader.seed)
        subsample_X = np.empty((num_samples, channels, timesteps))
        subsample_y = np.empty((num_samples,1), dtype=np.dtype('<U10'))
        for idx, name, label in enumerate(ds):
            subsample_X[idx] = self.handler.name_to_trace(name)
            subsample_y[idx] = label
        return subsample_X, subsample_y

    def transform_subsample(self, ds, subsample_rate = 0.2, shuffle = False):
        subsamples_X, subsamples_y = self.subsample(ds, shuffle, subsample_rate)
        for i in range(len(subsamples_X)):
            subsamples_X[i] = self.scaler.transform(subsamples_X[i])
        return subsamples_X, subsamples_y


    def transform_sample(self, sample_X):
        return self.scaler.transform(sample_X)
    
    def save_fit(self, scaler):
        dump(scaler, f'{self.scaler_folder}\{self.scaler_name}_{self.handler.seed}')
    
    def load_fit(self, scaler_type):
        if path.exists(f'{self.scaler_folder}\{scaler_type}_{self.handler.seed}'):
            return load(f'{self.scaler_folder}\{self.scaler_name}_{self.handler.seed}')
        else:
            return None
    
    def progress_bar(self, current, total, barLength = 20):
        percent = float(current) * 100 / total
        arrow   = '-' * int(percent/100 * barLength - 1) + '>'
        spaces  = ' ' * (barLength - len(arrow))
        print('Fitting scaler progress: [%s%s] %d %%' % (arrow, spaces, percent), end='\r')

from sklearn.preprocessing import StandardScaler
class BigStandardScalerFitter(BigScalerFitter):
    
    def __init__(self, train_ds, handler):
        self.train_ds = train_ds
        self.handler = handler
        self.scaler_name = "BigStandardScaler"
        self.scaler_folder = self.handler.source_path + "\MasterThesis\Scalers"
        self.scaler = StandardScaler()

    def fit_scaler(self):
        if self.load_fit(self.scaler_name) != None:
            self.scaler = self.load_fit(self.scaler_name)
            return self.scaler
        else:
            ds = self.train_ds
            channels, timesteps = self.handler.get_trace_shape(ds)
            num_samples = len(ds)
            ds = np.array(ds)

            for i in range(num_samples):
                self.progress_bar(i, num_samples, "Fitting scaler")
                X = self.handler.name_to_trace(ds[i][0])

                self.scaler.partial_fit(X)
            self.save_fit(self.scaler)
            return self.scaler

In [27]:
data_loader = BigDataLoader(test = False, upsample = False, downsample = False, frac_diff = 0, seed = 2)

In [8]:
train, val, test = data_loader.handler.create_train_val_test(data_loader.name_label, seed = data_loader.seed)

In [9]:
data_loader.handler.name_to_trace(train[9][0]).shape

(6000, 3)

In [28]:
data_loader.get_dataset_distribution(data_loader.name_label)

(array(['earthquake', 'noise'], dtype='<U10'),
 array([1030231,  235426], dtype=int64))

## Optimizing speed

In [10]:
scaler = BigStandardScalerFitter(train, data_loader.handler).fit_scaler()



In [11]:
data_gen = BigDataGenerator(data_loader)
train_gen = data_gen.data_generator(train, 64, use_scaler = True, scaler = scaler)

In [12]:
def time_n_iterations(gen, n_iter):
    start = time.time()
    for i in range(n_iter):
        X, y = next(gen)
    end = time.time()
    print(f"Exectuted {n_iter} iterations in {end - start} seconds.")

In [13]:
time_n_iterations(train_gen, 10)

Exectuted 10 iterations in 24.51870083808899 seconds.


In [14]:
train_gen_no_scaler = data_gen.data_generator(train, 64, use_scaler = False, scaler = scaler)

In [15]:
time_n_iterations(train_gen_no_scaler, 10)

Exectuted 10 iterations in 20.894123792648315 seconds.


In [16]:
def n_simple_traces(names, n):
    traces = np.empty((n, 6000, 3))
    start = time.time()
    for i in range(n):
        traces[i] = data_loader.handler.name_to_trace(names[i])
    end = time.time()
    print(f"Exectuted {n} iterations in {end - start} seconds.")

In [17]:
n_simple_traces(data_loader.name_label[:,0][0:64*10], 64*10)

Exectuted 640 iterations in 20.94095802307129 seconds.


In [18]:
data_loader.name_label[:,0]

array(['B046.PB_201204121520_NO', 'GDXB.NC_201105111239_NO',
       'YFT.SN_20140216034642_EV', ..., 'SPNN.AV_20180221012506_EV',
       'ARSB.KR_20180115121130_NO', 'OMMB.NN_20140608093441_EV'],
      dtype=object)

In [19]:
data_file = data_loader.data_file

In [20]:
num_events = len(list(data_file.keys()))

In [21]:
data_shape = (num_events, 6000, 3)

In [22]:
data_shape

(1265657, 6000, 3)

In [23]:
filename = data_loader.filename

In [24]:
filename

'F:\\Thesis_ssd\\LargeDataset\\merge.hdf5'

In [25]:
loaded = .get('')

SyntaxError: invalid syntax (<ipython-input-25-9412b3c58d8d>, line 1)

In [None]:
print(loaded)

In [None]:
data_file = h5py.File(filename, 'r',rdcc_nbytes = 10**11)["data"]
def name_to_trace(name):
    return data_file[name][:]

In [None]:
n_lookups = 1000
start = time.time()
for i in range(n_lookups):   
    dump = name_to_trace(data_loader.event_names[i])
end = time.time()
total_time = end - start
print(f"Exectuted {n_lookups} iterations in {total_time} seconds.")
print(f"Each lookup took {total_time/n_lookups} seconds")