In [1]:
%matplotlib inline

In [16]:
import pyspark
from pyspark.sql import SparkSession
import io, os
import numpy as np
import soundfile as sf
import IPython.display as ipd
import matplotlib.pyplot as plt
import librosa.display
from pyspark.sql.functions import *

In [None]:
# ! pip install librosa
# ! pip install soundfile
# ! pip install pyspark --upgrade

## librosa
[librosa](https://librosa.github.io/librosa/index.html) is a Python package for music and audio processing by [Brian McFee](https://bmcfee.github.io/). A large portion was ported from [Dan Ellis's Matlab audio processing examples](http://www.ee.columbia.edu/~dpwe/resources/matlab/).

If you receive an error with librosa.load, you may need to [install ffmpeg](https://librosa.github.io/librosa/install.html#ffmpeg).

## IPython.display.Audio
[IPython.display.Audio](https://ipython.org/ipython-doc/stable/api/generated/IPython.display.html#IPython.display.Audio) lets you play audio directly in an IPython notebook.



## Apache Spark
[Spark](https://spark.apache.org/docs/latest/) is used to do the parallel pre-processing of the audio data.

## Test File

In [None]:
!ls -alh ~/vitaFlow/TEDLiumTestDataset/raw_data/train/sph/AaronHuey_2010X.sph

In [None]:
train_sph_dir = test_sph_file = os.path.expanduser("~") + "/vitaFlow/TEDLiumTestDataset/raw_data/train/sph/"
test_sph_file = os.path.expanduser("~") + "/vitaFlow/TEDLiumTestDataset/raw_data/train/sph/AaronHuey_2010X.sph"

## Loading the wav from binary data

In [None]:
with open(test_sph_file, mode="rb") as file:
        tmp = io.BytesIO(file.read())
        print(tmp)
        data, samplerate = sf.read(tmp)

In [None]:
samplerate, data.shape

## Visualizing Audio

In [None]:
plt.figure(figsize=(14, 5))
librosa.display.waveplot(data, sr=samplerate)

## Display the Spectrum 

In [None]:
X = librosa.stft(data)
Xdb = librosa.amplitude_to_db(abs(X))
plt.figure(figsize=(14, 5))
librosa.display.specshow(Xdb, sr=samplerate, x_axis='time', y_axis='hz')

## Playing Audio
Using [IPython.display.Audio](https://ipython.org/ipython-doc/2/api/generated/IPython.lib.display.html#IPython.lib.display.Audio), you can play an audio file:

In [None]:
ipd.Audio(data, rate=samplerate)

## Parallel Preprocessing with Spark

In [3]:
spark = SparkSession.builder. \
            master("local[4]"). \
            appName("shabda").getOrCreate()
sc = spark.sparkContext
spark

In [None]:
wav_files_n_data = sc.binaryFiles(train_sph_dir)

In [None]:
# files_bin_data = wav_files_n_data.map(lambda xy : xy).collect()

In [None]:
def to_clips(file_path, data, num_clips=128, duration=20, output_dir="/tmp/"):
    file_path = file_path.replace("file:", "")
    tmp = io.BytesIO(data)
    wav_data, sampling_rate = sf.read(tmp)
    
    speaker = file_path.split("/")[-1].split(".")[0]
    speaker_dir = os.path.join(output_dir, speaker)
    if not os.path.exists(speaker_dir):
        os.makedirs(speaker_dir)

    y, _ = librosa.load(file_path, sr=sampling_rate)
    end_time = librosa.get_duration(y=y, sr=sampling_rate)
    for j in range(num_clips):
        wav_file = os.path.join(speaker_dir, str(j)) + ".wav"
        k = int(np.random.randint(0, end_time, size=1))
        librosa.output.write_wav(wav_file,
                                 y[k*sampling_rate : (k+duration)*sampling_rate],
                                 sampling_rate)
                    
    return None

In [None]:
# files_bin_data[1][0]

In [None]:
# to_clips(files_bin_data[0][0], files_bin_data[0][1])

In [None]:
wav_files_n_data.foreach(lambda xy : to_clips(file_path=xy[0], data=xy[1]))

In [None]:
rdd = sc.parallelize([("1","2"),("1","3"),("2","4"),("4","5")])

In [None]:
rdd = rdd.zipWithIndex()

In [None]:
rdd.map(lambda tuples : tuples[0][0]).collect()

In [32]:
df_1 = spark.createDataFrame([['a',1],['a', 2]], ['string_col', 'int_col'])
df_2 = spark.createDataFrame([[2,'b'], [1, 'b']], ['int_col', 'string_col'])

In [33]:
df_1.show()

+----------+-------+
|string_col|int_col|
+----------+-------+
|         a|      1|
|         a|      2|
+----------+-------+



In [34]:
df_2.show()

+-------+----------+
|int_col|string_col|
+-------+----------+
|      2|         b|
|      1|         b|
+-------+----------+



In [35]:
df_3 = df_1.union(df_2)
df_3.show()

+----------+-------+
|string_col|int_col|
+----------+-------+
|         a|      1|
|         a|      2|
|         2|      b|
|         1|      b|
+----------+-------+



In [40]:
df_3 = df_1.select(*df_1.columns).union(df_2.select(*df_1.columns))
df_3.show()

+----------+-------+
|string_col|int_col|
+----------+-------+
|         a|      1|
|         a|      2|
|         b|      2|
|         b|      1|
+----------+-------+



In [44]:
df_1 = spark.createDataFrame([['A',2000],['A',2002], ['A',2007], ['B',1999], ['B',2015]], ['Group', 'Date'])
df_1.show()

+-----+----+
|Group|Date|
+-----+----+
|    A|2000|
|    A|2002|
|    A|2007|
|    B|1999|
|    B|2015|
+-----+----+



In [45]:
from pyspark.sql.window import *
from pyspark.sql.functions import row_number

In [53]:
df_final = df_1.withColumn("rownum", row_number().over(Window.partitionBy("Group").orderBy(desc("Date"))))
df_final.show()

+-----+----+------+
|Group|Date|rownum|
+-----+----+------+
|    B|2015|     1|
|    B|1999|     2|
|    A|2007|     1|
|    A|2002|     2|
|    A|2000|     3|
+-----+----+------+



In [52]:

df_final = df_1.withColumn("rownum", row_number().over(Window.partitionBy("Group").orderBy(desc("Date")))).filter("rownum ==1").drop("rownum")
df_final.show()

+-----+----+
|Group|Date|
+-----+----+
|    B|2015|
|    A|2007|
+-----+----+

