[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/drive/1fjiCLEnB9GlJrjJZFf-iQ53SDSw2o6W8)


This notebook reads an S3 inventory parquet file generated by AWS, and creates an m3u8 inventory file which further is used to create a catalogue parquet file of all `.ts` files in the S3 bucket. 

In [23]:
pip install s3fs

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [27]:
import dask.dataframe as dd

In [28]:
import pandas as pd

In [29]:
import fsspec

In [30]:
from dask import delayed

In [31]:
from pathlib import Path, PurePath

In [39]:
# reading AWS inventory
inventory = dd.read_parquet('s3://orcasound-inventory/streaming-orcasound-net/orcasound-streaming-inventory/data', storage_options={'anon':True})

In [40]:
# inventory.describe().compute()

In [None]:
%%time
# selecting only files which contain m3u8: these are the playlist files containing the filenames and duration
inventory_m3u8 = inventory[inventory.key.str.contains("m3u8")].compute()

In [45]:
inventory_m3u8.shape

(68786, 5)

In [43]:
inventory_m3u8.head()

Unnamed: 0.2,Unnamed: 0,Unnamed: 0.1,bucket,key,size,last_modified_date
0,0,1166,streaming-orcasound-net,rpi_bush_point/hls/1626481819/live.m3u8,65243.0,2021-07-17 06:30:07+00:00
1,1,3328,streaming-orcasound-net,rpi_bush_point/hls/1626503420/live.m3u8,65148.0,2021-07-17 12:30:10+00:00
2,2,5489,streaming-orcasound-net,rpi_bush_point/hls/1626525021/live.m3u8,65248.0,2021-07-17 18:30:13+00:00
3,3,7650,streaming-orcasound-net,rpi_bush_point/hls/1626546622/live.m3u8,65254.0,2021-07-18 00:30:06+00:00
4,4,9811,streaming-orcasound-net,rpi_bush_point/hls/1626568220/live.m3u8,65221.0,2021-07-18 06:30:04+00:00


In [35]:
# store them to local file (TODO: also store them to S3)
inventory_m3u8.to_csv("inventory_m3u8.csv")

### Reading `m3u8` inventory

In [44]:
# upload file before running
inventory_m3u8 = pd.read_csv("inventory_m3u8.csv")

In [46]:
inventory_m3u8.head()

Unnamed: 0.1,Unnamed: 0,bucket,key,size,last_modified_date
0,1166,streaming-orcasound-net,rpi_bush_point/hls/1626481819/live.m3u8,65243,2021-07-17 06:30:07+00:00
1,3328,streaming-orcasound-net,rpi_bush_point/hls/1626503420/live.m3u8,65148,2021-07-17 12:30:10+00:00
2,5489,streaming-orcasound-net,rpi_bush_point/hls/1626525021/live.m3u8,65248,2021-07-17 18:30:13+00:00
3,7650,streaming-orcasound-net,rpi_bush_point/hls/1626546622/live.m3u8,65254,2021-07-18 00:30:06+00:00
4,9811,streaming-orcasound-net,rpi_bush_point/hls/1626568220/live.m3u8,65221,2021-07-18 06:30:04+00:00


In [47]:
fs = fsspec.filesystem('s3', anon=True)

Each `m3u8` file contains information about each `ts` file in the folder and the corresponding duration: usually close to 10s but with some variation. The idea is to extract this information into a dataframe format, and merge all those dataframes in one big parquet file.

#### Process the first `m3u8` file

In [48]:
bucket = Path(inventory_m3u8.iloc[0]["bucket"])
filepath = inventory_m3u8.iloc[0]["key"]
# extract the folder time (should not use /)
basetime = int(filepath.split("/")[-2])

In [49]:
# reading the file
with fs.open(bucket / filepath, 'r') as f:
             txt = f.readlines()

In [50]:
# extract durations and filenames from text (most probably there is a way to convert it directly to dataframe) 
durations = [float(item.split(":")[1].split(",")[0]) for item in txt[5:] if "#EXTINF" in item]
filenames = [item.strip('\n') for item in txt[5:] if ".ts" in item]

In [51]:
# create the dataframe
df = pd.DataFrame({'filename': filenames, 'duration' : durations})

In [52]:
# create start and end time columns from durations
df["end_time"] = df["duration"].cumsum() + basetime
df["start_time"] = df["end_time"] - df["duration"] 

In [53]:
# name of hydrophone
df["hydrophone"] = filepath.split("/")[0]

In [54]:
# filename does not have full path because it comes from the m3u8 file, not the actual parquet file
df["fullpath"] = [str(bucket / PurePath(filepath).parent / item) for item in df["filename"]]

In [55]:
# setting the basetime of the folder
df["basetime"] = basetime

In [56]:
# function to process on m3u8 file: reads it and generates a dataframe with start and end columns
def process_m3u8(fs, bucket, m3u8_key):
  """ A function to create a dataframe with filenames and start and end dates
      from a m3u8 file
      
      Inputs
      ------
        fs: filesystem
        bucket: bucket name
        m3u8_key: path to m3u8 file

      Returns
      -------
      df: dataframe with columns: filename,	duration,	end_time,	start_time,	
          hydrophone,	fullpath

  """

  # extract the folder time (should not use /)
  basetime = int(m3u8_key.split("/")[-2])
  with fs.open(bucket / m3u8_key, 'r') as f:
      txt = f.readlines()
  durations = [float(item.split(":")[1].split(",")[0]) for item in txt[5:] if "#EXTINF" in item]
  filenames = [item.strip('\n') for item in txt[5:] if ".ts" in item]
  df = pd.DataFrame({'filename': filenames, 'duration' : durations})

  # start and end times
  df["end_time"] = df["duration"].cumsum() + basetime
  df["start_time"] = df["end_time"] - df["duration"] 

  # name of hydrophone
  df["hydrophone"] = m3u8_key.split("/")[0]

  # filename does not have full path because it comes from the m3u8 file, not the actual parquet file
  df["fullpath"] = [str(bucket / PurePath(m3u8_key).parent / item) for item in df["filename"]]

  return(df)

In [None]:
# process first file
df = process_m3u8(fs, bucket, inventory_m3u8.iloc[0]["key"])

In [None]:
# reading credentials
keys = pd.read_csv("OrcaSoundKeys.csv")

In [None]:
# uploading to S3
ddf = dd.from_pandas(df, npartitions=1)
ddf.to_parquet("s3://orcasound-inventory/streaming-orcasound-net/orcasound-streaming-inventory/catalogue.parquet", 
      storage_options={"key": keys[" Access key ID"][0].strip(" "), "secret": keys["Secret access key"][0].strip(" ")})

         filename   duration      end_time    start_time      hydrophone  \
0      live000.ts  10.010044  1.626482e+09  1.626482e+09  rpi_bush_point   
1      live001.ts  10.005356  1.626482e+09  1.626482e+09  rpi_bush_point   
2      live002.ts  10.005333  1.626482e+09  1.626482e+09  rpi_bush_point   
3      live003.ts   9.984022  1.626482e+09  1.626482e+09  rpi_bush_point   
4      live004.ts  10.005344  1.626482e+09  1.626482e+09  rpi_bush_point   
...           ...        ...           ...           ...             ...   
2147  live2147.ts  10.005389  1.626503e+09  1.626503e+09  rpi_bush_point   
2148  live2148.ts  10.005378  1.626503e+09  1.626503e+09  rpi_bush_point   
2149  live2149.ts  10.005400  1.626503e+09  1.626503e+09  rpi_bush_point   
2150  live2150.ts   9.984056  1.626503e+09  1.626503e+09  rpi_bush_point   
2151  live2151.ts   9.998000  1.626503e+09  1.626503e+09  rpi_bush_point   

                                               fullpath  
0     streaming-orcasound-net

(None,)

### Processing All Files

Although the idea was to process all files using the `dask` library: delay the `process_m3u8` function, combine results into a dask dataframe, and output to a parquet file, it turned out it is a bit tricky to catch errors, so instead it was easier to generate the parquet with a simple `for loop`.

In [None]:
# process with a for loop

for filename in inventory_m3u8["key"][1:]:
  try:
    df = process_m3u8(fs, bucket, filename)
    ddf = dd.from_pandas(df, npartitions=1)
    ddf.to_parquet("s3://orcasound-inventory/streaming-orcasound-net/orcasound-streaming-inventory/catalogue.parquet", 
      storage_options={"key": keys[" Access key ID"][0].strip(" "), "secret": keys["Secret access key"][0].strip(" ")},
      append=True, ignore_divisions=True)
  except Exception as e:
    print(filename)
    print(e)
    pass
    

rpi_bush_point/hls/1629894621/live.m3u8
Appended dtypes differ.
{('start_time', dtype('float64')), ('fullpath', 'object'), ('filename', 'object'), ('hydrophone', 'object'), ('fullpath', dtype('float64')), ('filename', dtype('float64')), ('end_time', 'float64'), ('end_time', dtype('float64')), ('__null_dask_index__', 'int64'), ('__null_dask_index__', dtype('int64')), ('duration', 'float64'), ('hydrophone', dtype('O')), ('start_time', 'float64'), ('duration', dtype('float64'))}
rpi_bush_point/hls/1632508221/live.m3u8
Appended dtypes differ.
{('start_time', dtype('float64')), ('fullpath', 'object'), ('filename', 'object'), ('hydrophone', 'object'), ('fullpath', dtype('float64')), ('filename', dtype('float64')), ('end_time', 'float64'), ('end_time', dtype('float64')), ('__null_dask_index__', 'int64'), ('__null_dask_index__', dtype('int64')), ('duration', 'float64'), ('hydrophone', dtype('O')), ('start_time', 'float64'), ('duration', dtype('float64'))}
rpi_bush_point/hls/1632573019/live.m3u