<a href="https://colab.research.google.com/github/k7sung/clap2choir/blob/master/sync.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
SONG_NAME = "A Promise" #@param ["A Promise", "On Eagle's Wings"]

import pytz
from datetime import datetime, timezone
time_zone = 'US/Central' #@param ["US/Central", "Europe/London", "Asia/Taipei"] {allow-input:true}
tz = pytz.timezone(time_zone)
now = datetime.now(tz)
TODAY_DATE = now.strftime("%m%d")

SHARED_FOLDER_PATH = 'Shared drives/CR ESSC Virtual Choir' #@param {type:"string"}
SHARED_FOLDER_PATH = "/content/drive/" + SHARED_FOLDER_PATH
RAW_PATH = f"./inputs/{SONG_NAME}/"
INPUT_PATH = f"./inputs/temp/{SONG_NAME}/"
OUTPUT_PATH = f"./outputs/{TODAY_DATE}/{SONG_NAME}/"
VID_FILE_GLOBS = ["*.mov","*.MOV", "*.mp4"]
AUD_FILE_GLOBS = ["*.m4a", "*.mp3"]

VIDEO_RES = "1920x1080" #@param ["1920x1080", "960x540", "540x360"] 
W,H = [int(t) for t in VIDEO_RES.split("x")]



### would be nice:

* Loudness normalization (currently peak normalization). But supported in ffmpeg!
* Easier mounting mechanism of Google Drive?
* Better clap detection that also uses amplitude and time since last peak
* Noise removal









In [0]:
from google.colab import drive
### need to add the shared folder to my drive ("Add to my drive") in Google Drive first
drive.mount("/content/drive")
%cd "$SHARED_FOLDER_PATH"
%ls "$INPUT_PATH"
%mkdir -p "$OUTPUT_PATH"
%mkdir -p "$OUTPUT_PATH"/compare
%mkdir -p "$OUTPUT_PATH"/solo


### Resize and cache videos for faster processing later

In [0]:
import os
from os.path import getmtime
from pathlib import Path

raw_paths = list(Path(RAW_PATH).iterdir()) #, key=os.path.getmtime
cached_paths = list(Path(INPUT_PATH).iterdir())
cached_files = dict([(c.name, getmtime(c)) for c in cached_paths])

to_process = []
#create or update
for i in raw_paths:
  if i.name.startswith('_'):
    continue
  if i.name in cached_files:
    if getmtime(i) > cached_files[i.name]:
      to_process.append(i.name) #update
  else:
    to_process.append(i.name) #create
#delete
to_delete = cached_files.keys() - set([i.name for i in raw_paths])

print(to_process)
print(to_delete)

[]
set()


In [0]:
for name in to_delete:
  print("removing ", name)
  os.remove(INPUT_PATH+name)


In [0]:
from moviepy.editor import *
import shutil

vFx = lambda v:(v.resize(height=1080/4)
  .fx(afx.audio_normalize)
  )

for filename in to_process:
  if (filename.split('.')[-1].lower() in ["mp3", "wav", "m4a"]):
    print(f"copy {filename} to temp folder...")
    shutil.copyfile(RAW_PATH+filename, INPUT_PATH+filename)
    print("done")
    continue
  v = VideoFileClip(RAW_PATH+filename)
  vo = vFx(v)
  vo.write_videofile(INPUT_PATH+os.path.basename(v.filename), fps=30, codec='libx264', ffmpeg_params=['-crf','18'])



In [0]:
import numpy as np
from numpy import mean, median, var, diff, correlate as corr, zeros, transpose
from moviepy.editor import *
from moviepy.audio.AudioClip import AudioArrayClip

import glob
import os
import matplotlib.pyplot as plt
from itertools import combinations as comb
from random import shuffle, randint


### function to find the clapping sounds ###

In [0]:
def get_regular_patterns(ts):
    candi_vars = [[var(diff([ts[i] for i in c])), c] for c in comb(range(len(ts)), 4)]
    most_regular = min(candi_vars)
    return [ts[t] for t in most_regular[1]]

def get_tops(peak_ts, peak_vals, d_peak_vals, top_n):
    tops = sorted([(t, 0.0*v+1.0*dv) for t, v, dv in zip(peak_ts, peak_vals, d_peak_vals)], key=lambda k:k[1], reverse=True)[:top_n]
    print("tops: ", tops)
    return tops

def estimate_start(audio, name="", search_secs=20, is_finetune=False):
    print("\n"+name+"\n")
    c_dur = 0.1 #every chunk is 0.1 seconds
    chunks = [None]*int(search_secs/c_dur)

    # read the music in search range
    # chunks = [chunk[:,0] for chunk in audio.iter_chunks(chunk_duration=c_dur)] 
    iter = audio.iter_chunks(chunk_duration=c_dur)
    for i in range(len(chunks)):
      chunk = iter.__next__()
      chunks[i] = chunk[:,0]
    
    # peaks = [(chunk.argmax(),chunk[chunk.argmax()]) for chunk in chunks[:int(search_secs/c_dur)]]
    peaks = [(chunk.argmax(),chunk[chunk.argmax()]) for chunk in chunks]
    peak_vals = [p[1] for p in peaks]
    peak_ts = [(sec + p[0]*1/audio.fps)*c_dur for sec, p in zip(range(len(peaks)), peaks) ]
    d_peaks = [0]+[t1-t0 for t0, t1 in zip(peak_vals[:-1], peak_vals[1:])]


    is_master = False
    if os.path.basename(name).lower().startswith('m') and not is_finetune:
      is_master = True
      
    # top_vals = [v for t, v in tops]
    # is_master = False
    # if mean(top_vals) > 0.75*max(top_vals):
    #   is_master = True

    # print("is master: ", is_master , mean(top_vals), max(top_vals))
    if is_master:
      tops = get_tops(peak_ts, peak_vals, d_peaks, 8)
      clap_times = sorted([t for t, v in tops[:8]])[-4:]
    else:
      #clap_times = sorted([t for t, v in tops[:4]])
      # clap_times = get_regular_patterns(sorted([t for t, v in tops[:8]]))
      if is_finetune:
        tops = get_tops(peak_ts, peak_vals, d_peaks, 8)
        clap_times = get_regular_patterns(sorted([t for t, v in tops[:4]]))
      else:
        tops = get_tops(peak_ts, peak_vals, d_peaks, 10)
        clap_times = get_regular_patterns(sorted([t for t, v in tops]))
        cheat_list = {#"A_Lina_1.MOV":[4,5,6,7],
                      #"T_Preston.mp4":[1,2,3,4] #,
                      #"T_Enoch_1.mp4":[2,3,4,5]
                      }
        if os.path.basename(name) in cheat_list:
          sorted_t = sorted([t for t, v in tops])
          clap_times = [sorted_t[i] for i in cheat_list[os.path.basename(name)]]

    print ("detected claps at ", np.array(clap_times))

    diffs = [t1-t0 for t0, t1 in zip(clap_times[0:3], clap_times[1:4])]
    print("intervals: ", diffs)
    est_mean = mean(diffs)
    est=[0,0,0,0]
    est[0] = clap_times[0]+est_mean*3.5
    est[1] = clap_times[1]+est_mean*2.5
    est[2] = clap_times[2]+est_mean*1.5
    est[3] = clap_times[3]+est_mean*0.5

    start_est = mean(est) - est_mean*4
    print("start est:", start_est)

    tops = np.array(tops)
    p1 = plt.subplot(211)
    plt.plot(peak_ts, peak_vals,  '-' ,color = "red")
    plt.plot(tops[:,0], tops[:,1], "x")
    p2 = plt.subplot(212)
    plt.plot(peak_ts, d_peaks,'-', color = 'blue')
    plt.plot(clap_times, [0]*len(clap_times), "x")
    plt.show()
    return start_est


In [0]:
def get_vid_array_dim(n):
  i=0 
  while True:
    i+=1
    if i*i >= n:
      break

  if i*i == n:
    return (i, i)
  if (i-1)*i >= n:
    return (i-1,i)
  return (i,i)

filenames = [filename for pattern in VID_FILE_GLOBS for filename in glob.glob(INPUT_PATH+pattern)]
filenames = [filename for filename in filenames if not os.path.basename(filename).startswith('_')]
print("Videos: ", filenames)
input_vids = [VideoFileClip(filename) for filename in filenames]
vid_start_times = [estimate_start(v.audio, v.filename) for v in input_vids]
ROWS,COLS = get_vid_array_dim(len(filenames))

filenames = [filename for pattern in AUD_FILE_GLOBS for filename in glob.glob(INPUT_PATH+pattern)]
filenames = [filename for filename in filenames if not os.path.basename(filename).startswith('_')]
input_auds = [AudioFileClip(filename) for filename in filenames]
print("Audios: ", filenames)
aud_start_times = [estimate_start(a, a.filename) for a in input_auds]


### Finetune start time

In [0]:

input_vids_t = [v.subclip(t_start=max(t,0)) for v, t in zip(input_vids, vid_start_times)]
vid_start_times_fine = [estimate_start(
    v.audio, v.filename, search_secs=5, is_finetune=True) for v in input_vids_t]

# input_auds_t = [a.subclip(t_start=t) for a, t in zip(input_auds, aud_start_times)]
# aud_start_times_fine = [estimate_start(
#     a, a.filename, search_secs=5, is_finetune=True) for a in input_auds_t]


### Just syncing audio for now ###

In [0]:
tFx = lambda v, startTime:v.subclip(startTime).volumex(1/(len(input_vids))**0.5)
out_vids = [tFx(v, max(st,0)+dt) for v, st, dt in zip(input_vids, vid_start_times, vid_start_times_fine)]
#out_auds = [tFx(a, st) for a, st, dt in zip(input_auds, aud_start_times, aud_start_times_fine)]
#cc = CompositeAudioClip([v.audio for v in out_vids]+out_auds)
cc = CompositeAudioClip([v.audio for v in out_vids])
cc.write_audiofile(OUTPUT_PATH+"output.mp3", fps=44100, codec='mp3')
#ipython_display(cc, fps=44100, maxduration=360)


[MoviePy] Writing audio in ./outputs/0422/A Promise/output.mp3


100%|██████████| 5146/5146 [01:08<00:00, 75.02it/s] 

[MoviePy] Done.





### Create solo-group stereo track for comparison and study


In [0]:
group = cc.to_soundarray(fps=44100)[:,0]
for v in out_vids:
  vfname = os.path.basename(v.filename)
  if vfname.lower().startswith("m"):
    continue
  a = v.audio.to_soundarray()[:,0]
  mlen = min(len(a), len(group))
  mix = AudioArrayClip(np.transpose([a[:mlen], group[:mlen]]), fps=44100)
  mix.write_audiofile(OUTPUT_PATH+"compare/"+vfname+"_output.mp3", codec='mp3')

### try to sync audio by clipping, do not merge ###


In [0]:
tFx = lambda v, startTime:v.subclip(startTime).volumex(1/(len(input_vids))**0.5)
#out_vids = [tFx(v, start_time) for v, start_time in zip(input_vids, vid_start_times)]
out_vids = [tFx(v, max(st,0)+dt) for v, st, dt in zip(input_vids, vid_start_times, vid_start_times_fine)]

#out_auds = [tFx(a, start_time) for a, start_time in zip(input_auds, aud_start_times)]

for vid in out_vids:
  vid.audio.write_audiofile(OUTPUT_PATH+"solo/"+os.path.basename(vid.filename)+".mp3", codec='mp3')
#for aud in out_auds:
#  aud.write_audiofile(OUTPUT_PATH+os.path.basename(aud.filename)+".mp3", codec='mp3')

### Mixing clips using clip_array ###

In [0]:
def SATB_order(v):
  key = ''
  if 'filename' in v.__dict__:
    filepath = v.filename
    filename = os.path.basename(filepath)
    key = filename[0].lower()
  m={'s':0,'a':1,'t':2,'b':3}
  if key in m.keys():
    return m[key]
  return randint(0,3)

vFx = lambda v, startTime:(v.subclip(startTime)
  .crop(width=W,height=H, x_center=v.w//2, y_center=v.h//2)
  .resize(height=H//ROWS)
  .crop(width=W//COLS, x_center=v.w//2)
#  .crop(width=W//COLS,height=H//ROWS, x_center=v.w//2, y_center=v.h//2)
#  .resize(width=W//COLS)
#  .crop(width=W//COLS, height=H//ROWS)
#  .fx(afx.audio_normalize)
  .volumex(1/len(input_vids)**0.5)
  )

## TODO: mix out_auds into the final output ##
#out_vids = [vFx(v, start_time) for v, start_time in zip(input_vids, vid_start_times)]
out_vids = [vFx(v, max(st,0)+dt) for v, st, dt in zip(input_vids, vid_start_times, vid_start_times_fine)]
clip_duration = max([i.duration for i in out_vids])
empty_clip = ColorClip((W//COLS,H//ROWS), (0,0,0), duration=clip_duration)
out_vids = out_vids+[empty_clip]*(ROWS*COLS-len(out_vids))
shuffle(out_vids)
out_vids.sort(key=SATB_order)
tiles = np.reshape(out_vids, (COLS, ROWS)) #dimension is inversed because need to transpose in next line
tiles = np.transpose(tiles)
cc = clips_array(tiles, cols_widths=[W//COLS]*COLS)


#ipython_display(cc, t=30)

cc.write_videofile(OUTPUT_PATH+"output.mp4", fps=30, codec='libx264', ffmpeg_params=['-crf','18'], threads=4)