# Earthquake data to TFRecords

In [1]:
import pandas as pd
import numpy as np
from multiprocessing import Pool, Process, Queue
import os.path as osp
import tensorflow as tf
tf.enable_eager_execution()
import glob
from collections import defaultdict
import time
from tqdm import tqdm_notebook
import pickle

In [2]:
SEGMENTS_DIR = '/workspace/persistent-data/earthquake/segments'

In [3]:
SPLIT_SEGMENTS = 1
NORMALIZE = False
RANGE = [-1, 1]

In [4]:
segments = glob.glob(osp.join(SEGMENTS_DIR, '*.pickle'))

In [5]:
def timestamp():
    return time.strftime('%Y-%m-%d %H:%M:%S')

In [6]:
if NORMALIZE:
    # Get range values
    range_pickle_filename = osp.join(osp.dirname(SEGMENTS_DIR), 'range.pickle')
    ranges = {}
    for i, segment in enumerate(segments):
        df = pd.read_pickle(segment)
        for c in df.columns:
            if c not in ranges:
                ranges[c] = {}
                ranges[c]['min'] = 999999999
                ranges[c]['max'] = -999999999
            ranges[c]['min'] = min(ranges[c]['min'], df[c].min())
            ranges[c]['max'] = max(ranges[c]['max'], df[c].max())
        
        n = i+1
        if n % 10000 == 0:
            print('[%s] Examined %d segments' % (timestamp(), n))
    with open(range_pickle_filename, 'wb') as f:
        pickle.dump(ranges, f)
    print('wrote ranges to %s' % range_pickle_filename)

In [7]:
with open(osp.join(osp.dirname(SEGMENTS_DIR), 'range.pickle'), 'rb') as f:
    ranges = pickle.load(f)
ranges

{'acoustic_data': {'max': 5444, 'min': -5515},
 't_minus': {'max': -9.550396316600001e-05, 'min': -16.1074}}

In [8]:
len(segments)

153600

Train/test split

In [9]:
segments = np.random.permutation(segments)

In [10]:
N_TRAIN = int(len(segments) * 0.85)
trainsegs = segments[:N_TRAIN]
testsegs = segments[N_TRAIN:]

In [11]:
def get_tfrecord_defs(segfiles, name):
    return [{'%s-%d' % (name, i): segfiles[i]} for i in range(len(segfiles))]

In [12]:
len(trainsegs), len(testsegs)

(130560, 23040)

In [13]:
train_parts = get_tfrecord_defs(trainsegs, 'train')

In [14]:
test_parts = get_tfrecord_defs(testsegs, 'test')

In [15]:
len(train_parts), len(test_parts)

(130560, 23040)

In [16]:
test_parts[0]

{'test-0': '/workspace/persistent-data/earthquake/segments/54036.pickle'}

In [17]:
test_parts[-1]

{'test-23039': '/workspace/persistent-data/earthquake/segments/111027.pickle'}

# Convert to `TFRecords`

In [18]:
# The following functions can be used to convert a value to a type compatible
# with tf.Example.

# The following functions come from the TFRecords example [https://www.tensorflow.org/tutorials/load_data/tf-records]
# and a Float64 hack from https://github.com/tensorflow/tensorflow/issues/12876

def _bytes_feature(value):
    """Returns a bytes_list from a string / byte."""
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=value))

def _float_feature(value):
    """Returns a float_list from a float / double."""
    return tf.train.Feature(float_list=tf.train.FloatList(value=value))

def _float64_feature(value):
    """Returns a bytes_list of encoded float64"""
    value = [str(x).encode() for x in value]
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=value))

def _int64_feature(value):
    """Returns an int64_list from a bool / enum / int / uint."""
    return tf.train.Feature(int64_list=tf.train.Int64List(value=value))

In [19]:
def serialize_example(acoustic_data, tminus):
    """
    Creates a tf.Example message ready to be written to a file.
    """
    
    # Create a dictionary mapping the feature name to the tf.Example-compatible
    # data type.
    
    feature = {
        'acousticdata': _int64_feature(acoustic_data),
        'tminus': _float64_feature(tminus)
    }
    
    # Create a Features message using tf.train.Example.
    
    example_proto = tf.train.Example(features=tf.train.Features(feature=feature))
    return example_proto.SerializeToString()


In [20]:
def serialize_df(df):
    df['acoustic_data'] = df['acoustic_data'].astype(np.int16)
    df['t_minus'] = df['t_minus'].astype(np.float64)
    example = serialize_example(df['acoustic_data'].values, df['t_minus'].values)
    return example

In [21]:
int(np.log(-pd.read_pickle(segments[0])['t_minus'].min()))

2

In [31]:
!rm -rf /workspace/persistent-data/earthquake/tfrecords2

In [22]:
SCALES = ['1e0', '1e1', '1e2', '1e3', '1e4', '1e-9', '1e-8', '1e-7',
           '1e-6', '1e-5', '1e-4', '1e-3', '1e-2', '1e-1']

In [34]:
for i in SCALES:
    CMD='mkdir -p /workspace/persistent-data/earthquake/tfrecords2/train/%s' % i
    !{CMD}
    
    CMD='mkdir -p /workspace/persistent-data/earthquake/tfrecords2/test/%s' % i
    !{CMD}

In [23]:
def serialize_segment_package(q):
    
    while True:
        try:
            elem = q.get()
        except:
            break
        
        tfrecords_dir = osp.join(osp.dirname(SEGMENTS_DIR), 'tfrecords2')
        name = list(elem.keys())[0]
        segfile = elem[name]
        df = pd.read_pickle(segfile)
        last_label_scale = int(np.log(-df['t_minus'].min()))
        last_label_scale_str = SCALES[last_label_scale]
        traintest, segnum = name.split('-')
        filename = traintest + '-' + segnum
        tfrecords_location = osp.join(tfrecords_dir, traintest, last_label_scale_str, filename + '.tfrecords')

        with tf.python_io.TFRecordWriter(tfrecords_location) as writer:
            writer.write(serialize_df(df))

In [24]:
NUM_PROCESSES = 28
q = Queue(maxsize=len(segments))

In [25]:
for i in train_parts:
    q.put(i)
for i in test_parts:
    q.put(i)

In [26]:
processes = [Process(name='serializer-%d' % num,
                     target=serialize_segment_package,
                     args=(q,)) for num in range(NUM_PROCESSES)]

for p in processes:
    p.start()

In [27]:
while True:
    sz = q.qsize()
    print('[%s] Queue size=%d' % (timestamp(), sz))
    if sz == 0:
        break
    time.sleep(20)
for p in processes:
    p.terminate()

[2019-01-21 06:18:41] Queue size=152445
[2019-01-21 06:19:01] Queue size=126829
[2019-01-21 06:19:21] Queue size=100843
[2019-01-21 06:19:42] Queue size=74929
[2019-01-21 06:20:02] Queue size=48885
[2019-01-21 06:20:22] Queue size=22709
[2019-01-21 06:20:42] Queue size=0


In [54]:
import os
trainlens = {}
testlens = {}
for i in SCALES:
    traindir='/workspace/persistent-data/earthquake/tfrecords2/train/%s' % i
    testdir='/workspace/persistent-data/earthquake/tfrecords2/test/%s' % i
    trainlen = len(os.listdir(traindir))
    testlen = len(os.listdir(testdir))
    trainlens[i] = trainlen
    testlens[i] = testlen
for x in sorted(trainlens.keys()):
    print('%s\ttrain=%d  \ttest=%d' % (x, trainlens[x], testlens[x]))

1e-1	train=2967  	test=528
1e-2	train=1096  	test=192
1e-3	train=392  	test=80
1e-4	train=145  	test=31
1e-5	train=53  	test=11
1e-6	train=18  	test=4
1e-7	train=9  	test=1
1e-8	train=2  	test=0
1e-9	train=1  	test=1
1e0	train=29086  	test=5086
1e1	train=55648  	test=9889
1e2	train=41143  	test=7217
1e3	train=0  	test=0
1e4	train=0  	test=0


In [31]:
np.exp(2)

7.38905609893065

In [33]:
np.exp(-9)

0.00012340980408667956

# Read a few records back

In [34]:
#record_iterator = tf.python_io.tf_record_iterator('/workspace/persistent-data/earthquake/tfrecords/train/train-0.tfrecords')

In [35]:
#features

In [36]:
def deserialize(serialized_example):
    #example = tf.train.Example()
    #example.ParseFromString(string_record)
    features = {
        'acousticdata': tf.FixedLenFeature((4096), tf.int64),
        'tminus': tf.FixedLenFeature((4096), tf.string)
    }
    features = tf.parse_single_example(
        serialized_example,
        features=features
    )
    
    features['tminus'] = tf.strings.to_number(string_tensor=features['tminus'], out_type=tf.float64)
    
    return features

In [49]:
files_ds = tf.data.Dataset.from_tensor_slices(glob.glob(r'/workspace/persistent-data/earthquake/tfrecords2/train/1e-2/*.tfrecords'))

In [50]:
ds = tf.data.TFRecordDataset(files_ds)

In [51]:
ds = ds.map(deserialize, num_parallel_calls=8)

In [52]:
for f in ds.take(3):
    print(pd.Series(f['tminus'].numpy().shape, f['acousticdata'].numpy().shape))
    print(pd.Series(f['tminus'].numpy()).max())

4096    4096
dtype: int64
-0.082595512439
4096    4096
dtype: int64
-0.060695491095000005
4096    4096
dtype: int64
-0.089995510513


In [53]:
tf.__version__

'1.12.0-rc2'