In [223]:
import numpy as np
import scipy
from scipy.io import wavfile
from scipy import signal
from pydub import AudioSegment
import tensorflow as tf
from tensorflow import keras

In [178]:
path = 'sample_audio/'
filename = 'divinecomedy_longfellow_01.wav'
filename_it = 'divinacommedia_01.wav'

In [183]:
''' Convert wav file to TFRecord of short clips
INPUT
    file_in: location of wav file
    file_out: destination for TFRecord output
    lang: language identifier, must be int
        currently using 0=en, 1=it
    seconds: length of each clip
OUTPUT
    None, writes file to destination
'''
def wav2TFRecord(file_in, file_out, lang, seconds=3):
    
    # read data and get sampling rate
    fs, read = wavfile.read(file_in)
    data = read.copy()
    
    # collapse channels into one (in case using stereo)
    data = np.mean(data, axis=1)
    
    # define time interval in samp. rate units
    time_break = fs * seconds
    
    # pad data to break evenly when splitting
    pad = time_break - (len(data) % time_break)
    data = np.pad(data, (0,pad), mode='constant', constant_values=0)
    
    # reshape into (n_samples, data_per_samle)
    # each row is a sample
    data = data.reshape((len(data)//time_break, time_break))
    
    # write data to TFRecord with compression
    options = tf.io.TFRecordOptions(compression_type='GZIP')
    with tf.io.TFRecordWriter(file_out, options=options) as writer:
        
        # for each clip
        ## create spectrogram
        ## convert to feature formatted TF Example
        ## write to file
        for i,row in enumerate(data):
            freq, time, sxx = signal.spectrogram(row, fs)
            example = tf.train.Example(
                features=tf.train.Features(feature={
                    'frequency':tf.train.Feature(
                        float_list=tf.train.FloatList(value=freq)
                    ),
                    'time':tf.train.Feature(
                        float_list=tf.train.FloatList(value=time)
                    ),
                    'sxx':tf.train.Feature(
                        float_list=tf.train.FloatList(value=sxx.flatten())
                    ),
                    'language':tf.train.Feature(
                        int64_list=tf.train.Int64List(value=[lang])
                    )
                })
            )
            writer.write(example.SerializeToString())

In [160]:
# sample for function usage
#wav2TFRecord(path+filename, 'sample_audio/example_zip.tfrecord', 0)

In [184]:
#wav2TFRecord(path+filename_it, 'sample_audio/example_it_zip.tfrecord', 1)

In [214]:
# filenames of all records
filenames = ['sample_audio/example_zip.tfrecord',
            'sample_audio/example_it_zip.tfrecord']

# feature dict used to parse records
feature_description = {
    'frequency': tf.io.FixedLenFeature([129], tf.float32),
    'time': tf.io.FixedLenFeature([295], tf.float32),
    'sxx': tf.io.FixedLenFeature([38055], tf.float32),
    'language': tf.io.FixedLenFeature([1], tf.int64)
}

# function to parse record
## takes in encoded TFRecord entry
## returns  tuple: (sxx reshaped into 2-d array, language label)
def parse_data(record):
    parsed = tf.io.parse_single_example(record, feature_description)
    sxx = parsed['sxx']
    sxx = tf.reshape(
        sxx, 
        (len(parsed['frequency']), len(parsed['time']))
    )
    lang = parsed['language']
    return (sxx, lang)

# use function to parse records into dataset
dataset = tf.data.TFRecordDataset(
    filenames,
    compression_type='GZIP'
).map(
    parse_data
).shuffle(
    buffer_size=1000,
    reshuffle_each_iteration=False
)

In [216]:
# get total number or samples for splitting
n_samples = 0
for item in dataset:
    n_samples += 1
n_samples

1566

In [218]:
# define splits
train_split = int(0.7*n_samples)
val_split = int(0.15*n_samples)
test_split = int(0.15*n_samples)

# batch size
batch_size = 16

# split into train, val, test
train_ds = dataset.take(train_split).batch(batch_size)
test_ds = dataset.skip(train_split)
val_ds = test_ds.take(val_split).batch(batch_size)
test_ds = test_ds.skip(val_split).batch(batch_size)

In [222]:
# verify shape
for i,j in train_ds.take(2):
    print(i.shape)

(16, 129, 295)
(16, 129, 295)


In [225]:
keras.backend.clear_session()

inputs = keras.Input(shape=[129,295,1])
x = keras.layers.Conv2D(32, 16, padding='same')(inputs)
x = keras.layers.ReLU()(x)
x = keras.layers.MaxPooling2D((2,2), padding='same')(x)
x = keras.layers.Flatten()(x)
x = keras.layers.Dense(2, activation='softmax')(x)

model = keras.Model(inputs=inputs, outputs=x)

model.summary()

Model: "model"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_1 (InputLayer)         [(None, 129, 295, 1)]     0         
_________________________________________________________________
conv2d (Conv2D)              (None, 129, 295, 32)      8224      
_________________________________________________________________
re_lu (ReLU)                 (None, 129, 295, 32)      0         
_________________________________________________________________
max_pooling2d (MaxPooling2D) (None, 65, 148, 32)       0         
_________________________________________________________________
flatten (Flatten)            (None, 307840)            0         
_________________________________________________________________
dense (Dense)                (None, 2)                 615682    
Total params: 623,906
Trainable params: 623,906
Non-trainable params: 0
_______________________________________________________

In [227]:
model.compile(
    optimizer=keras.optimizers.Adam(),
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy']
)

model.fit(train_ds, epochs=3, validation_data=val_ds, steps_per_epoch=20)

Epoch 1/3
Epoch 2/3
Epoch 3/3


<tensorflow.python.keras.callbacks.History at 0x1a3450ed070>

In [228]:
model.evaluate(test_ds)



[30.87372589111328, 0.9279661178588867]

In [233]:
for i,j in test_ds.take(1):
    pred = model.predict(i)
    true_label = j

In [234]:
pred

array([[0., 1.],
       [0., 1.],
       [1., 0.],
       [0., 1.],
       [1., 0.],
       [1., 0.],
       [0., 1.],
       [1., 0.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [1., 0.],
       [0., 1.]], dtype=float32)

In [235]:
true_label

<tf.Tensor: shape=(16, 1), dtype=int64, numpy=
array([[1],
       [1],
       [0],
       [1],
       [0],
       [0],
       [1],
       [0],
       [1],
       [1],
       [1],
       [1],
       [1],
       [1],
       [0],
       [0]], dtype=int64)>