In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install librosa==0.8.0
import numpy as np
import json as json
import librosa
import os, os.path

In [None]:
destination_db = "/content/drive/My Drive/IRCMS_GAN_collaborative_database/Experiments/colab-violingan/archon-analysis/0-5s/" #@param {type:"string"}
dest_datasize = "186704" #@param {type:"string"}
slice_size = "2" #@param {type:"string"}
hop_length = "2048" #@param {type:"string"}
sr = "44100" #@param {type:"string"}
output_filename = "/content/drive/My Drive/analysis_500ms.json" #@param {type:"string"}
analysis_filename = "/content/drive/My Drive/analysis_f.json" #@param {type:"string"}
trial_filename = "/content/drive/My Drive/trial.json" #@param {type:"string"}

hop = int(hop_length)
sr = int(sr)
slice_size = int(slice_size)
dest_datasize = int(dest_datasize)


In [None]:
# OPTIONAL
# dest_datasize = len(
#  [name for name in os.listdir(destination_db) if os.path.isfile(
#  os.path.join(destination_db, name))])
# NOTE: depending on size of folder, this can error out multiple times - 
#Colab will cache the results, so retry until success and log the number for future attempts.
print(dest_datasize)

186704


In [None]:
## STORE ANAYLSIS AS JSON

def export_to_json (data, savefile = output_filename):

  with open(savefile, 'a') as outfile:
    json.dump(data, outfile, indent=2)


## IMPORT JSON FILE

def json_load (filename):

  f = open(filename)
  l = json.load(f)
  return l

In [None]:
## GRAB DESCRIPTORS FROM AUDIODB

def grab_descriptors(filename, sr = sr):
    
  y, sr = librosa.load(filename, sr = sr)

  cent = np.median(
    np.ndarray.flatten(
    librosa.feature.spectral_centroid(y=y, sr=sr, hop_length=hop)))
  flat = np.median(
    np.ndarray.flatten(
    librosa.feature.spectral_flatness(y=y, hop_length=hop)))
  rolloff = np.median(
    np.ndarray.flatten(
    librosa.feature.spectral_rolloff(y=y, sr=sr, hop_length=hop)))
  rms = np.median(
    np.ndarray.flatten(
    librosa.feature.rms(y=y, hop_length=hop)))
    
  f0, voiced_flag, voiced_probs = librosa.pyin(y,
                                fmin=librosa.note_to_hz('C2'),
                                fmax=librosa.note_to_hz('C7'))
    
  voiced = np.median(np.ndarray.flatten(voiced_flag))

  if (voiced == True):
    f0 = f0[~np.isnan(f0)]
    pitch = np.median(np.ndarray.flatten(f0))
    pitch = str(librosa.hz_to_note(pitch))
    pitch = pitch.replace("♯", "#") 

  else: 
    pitch = "unpitched"

  dict_ = { 
        "cent": str(cent),
        "flat": str(flat),
        "rolloff": str(rolloff),
        "rms": str(rms),
        "pitch": pitch
        }
  
  return dict_


def analyze_db(db = destination_db, samplerate = 44100, outfile = output_filename):

  dict_ = {}
  print ("...looking for previous progress... WARN: this can take VERY LONG depending on directory size & layout!!")

  if (os.path.exists(output_filename)): 
    print("...loading previous progress...")
    dict_ = json_load(output_filename)
    skiplen = len(dict_.keys())
    print("OK: done, found " + str(skiplen) + " entries.")
  else:
    print ("WARN: no previous progress found.")
    
  counter = 0
  skipcounter = 0

  for entry in os.scandir(db):
    if (entry.path.endswith(".wav")):
      if (str(entry.name) in dict_.keys()): skipcounter += 1
      else:
        try: 
          dict_[str(entry.name)] = grab_descriptors(entry)
        except: print ("ERR: file " + str(entry.name) + " could not be processed.")
  
    counter += 1
    if ((counter % 5000) == 0): 

      print("OK: " + str(counter) + " completed out of " + str(dest_datasize) + ".")
      if (skipcounter > 0 & skipcounter < skiplen): print("OK: skipped " + str(skipcounter) + " files that have already been processed.")
      
      if (skipcounter != counter):
        print("...saving... ")
        if (os.path.exists(output_filename)): os.remove(output_filename)
        export_to_json(dict_, output_filename)
        print("OK: saved!")
      else: print("OK: still processing skipped files, no need to save.")

  if (os.path.exists(output_filename)): os.remove(output_filename)
  export_to_json(dict_, output_filename)
  print("OK: successfully completed!")


In [None]:
## TO ANALYZE DB AND EXPORT TO JSON
complete_analysis = analyze_db(destination_db)

In [None]:
## FIRST PASS MAY TAKE A BIT AND/OR ERROR OUT IF DIR IS HUGE - JUST BE PATIENT AND RETRY, IT WILL GO THROUGH EVENTUALLY

def sort_by_pitch (unsort_or_sort, dict_):

  counter = 0 
  pitch = ""
  print ("...loading directory... WARN: this can take VERY LONG depending on directory size & layout!!")
  
  for k, v in dict_.items():

    sample = v
    filename = k
    counter += 1

    pitch = sample.get("pitch")
    
    if (pitch != "unpitched"): 
      oct = int(pitch[-1])
      pitch = pitch.replace(str(oct), "")  
      pitch_dir = destination_db + pitch + "/" + str(oct) + "/"   

    else: 
      pitch_dir = destination_db + pitch + "/"
      cent = sample.get("cent")
      flat = sample.get("flat")
      rolloff = sample.get("rolloff")

      if float(cent) > 4000.0: pitch_dir = pitch_dir + "high_cent/"
      else: pitch_dir = pitch_dir + "low_cent/"

      if float(flat) > 0.01: pitch_dir = pitch_dir + "high_flat/"
      else: pitch_dir = pitch_dir + "low_flat/"

      if float(rolloff) > 8000: pitch_dir = pitch_dir + "high_rolloff/"
      else: pitch_dir = pitch_dir + "low_rollof/"     

    if (unsort_or_sort == "sort"):     
      if (os.path.exists(pitch_dir) == False): os.makedirs(pitch_dir)
      if (os.path.exists(pitch_dir + filename) == False): os.replace(destination_db + filename, pitch_dir + filename)
      else: print ("OK: File " + pitch_dir + filename + " has already been moved.")

    else: 
      if (os.path.exists(pitch_dir + filename) == True): 
        os.rename(pitch_dir + filename, destination_db + filename)
        print(destination_db + filename)
    
    if (counter == 1): print("OK: beginning to sort.")
    if ((counter % 5000) == 0): print("OK: processed " + str(counter) + " files.")
  
  print("OK: successfully completed! WARN: you need to flush and unmount the VM for changes to fully take effect!")

In [None]:
## TO MOVE FILES BY PITCH INTO SUBDIRECTORIES
sort_by_pitch ("sort", json_load(output_filename))
print("OK: sorted")
drive.flush_and_unmount()
print("OK: successfully unmounted VM!")