In [1]:
pip install pydub

Collecting pydub
  Downloading pydub-0.25.1-py2.py3-none-any.whl (32 kB)
Installing collected packages: pydub
Successfully installed pydub-0.25.1


In [2]:
import cv2
import tensorflow as tf
import numpy as np
import os
from tensorflow.keras import layers, models, Sequential
import zipfile
import sklearn
from sklearn.model_selection import train_test_split
from sklearn import preprocessing
import pydub
from pydub import AudioSegment
import scipy
from scipy import signal
from scipy.io import wavfile
import random

In [3]:
def open_zip(zip_file):
  try:
    with zipfile.ZipFile(zip_file) as z:
        z.extractall()
        print("Extracted all")
  except:
    print("Invalid file")

For creating the dummy data, we use the environment sound dataset (ESC-50.zip) from Harvard University.

https://dataverse.harvard.edu/dataset.xhtml?persistentId=doi:10.7910/DVN/YDEPUT

In [3]:
# dataset.zip should contain the numpy arrays of audio and image
for zf in ["dataset.zip", "env_sound.zip"]:
  open(zf)

Extracted all


In [5]:
# load numpy arrays from each file
audio = np.load('/content/content/dataset/audio/audio.npy')
audio = np.multiply(audio, 0.01)
image = np.load('/content/content/dataset/image/image.npy')

In [10]:
# store the dot product of audio and image
audio_and_image = []

In [11]:
# get the dot product of image and audio.
size = audio.shape[0]
for i in list(range(size)):
  im = image[i].transpose()
  au = audio[i].reshape(30)                                                 
  audio_and_image.append(np.dot(im, au))

For dummy audio and user input.

In [12]:
def log_spectogram(wav_path):
  sample_rate, samples = wavfile.read(wav_path)
  f, t, Sxx = signal.spectrogram(samples, sample_rate)
  Sxx = np.add(Sxx, 1)
  Sxx = np.log10(Sxx)
  return Sxx

In [13]:
def audio_conv(audio_log):
  audio_log = np.resize(audio_log, (127, 196))
  audio_log = np.reshape(audio_log, (1,) + audio_log.shape + (1,))
  audio_log = layers.Conv2D(32, 5, activation='relu', input_shape=audio_log.shape[1:])(audio_log)
  audio_log = layers.MaxPool2D()(audio_log)
  audio_log = layers.Conv2D(32, 5, activation='relu', input_shape=audio_log.shape[1:])(audio_log)
  audio_log = layers.MaxPool2D()(audio_log)
  audio_log = layers.Flatten()(audio_log)
  audio_log = layers.Dense(30, activation='relu')(audio_log)
  return audio_log

In [14]:
# convert mp4 file to numpy array
def video_processing(mp4_file):
  frames = []
  cap = cv2.VideoCapture(mp4_file)
  ret = True
  while ret:
    ret, img = cap.read()
    if ret:
      frames.append(img)
  video = np.stack(frames, axis=0)
  video = video.astype('float32')
  return video

In [15]:
# convert nparray to (128, 256, 256) array using conv architecture
def image_conv(video):
  rgb_weights = [0.2989, 0.5870, 0.1140]
  # convert image into gray scale.
  video = np.dot(video[...,:3], rgb_weights)
  video = np.resize(video, (30, 720, 1280, 1))
  #video = layers.Conv2D(32, 5, activation='relu', input_shape=video.shape[1:])(video)
  video = layers.MaxPool2D()(video)
  #video = layers.Conv2D(32, 5, activation='relu', input_shape=video.shape[1:])(video)
  video = layers.MaxPool2D()(video)
  video = layers.MaxPool2D()(video)
  video = np.resize(video, (30, 90, 160))
  return video

In [16]:
dummy_audio = []

In [17]:
# convert dummy audio (= env sound) to log_spectogram 
# and apply CNN to compress the information.
def process_dummy(path):
  for f in os.listdir(path):
    p = path + f
    try:
      spectogram = log_spectogram(p)
      dummy_audio.append(audio_conv(spectogram))
    except:
      print("failed: " + f)
  dummy_audio = np.multiply(np.array(dummy_audio), 0.01)

In [None]:
process_dummy('/content/env_sound/')

In [20]:
# assign shuffled env_sound to each image.
for i in list(range(size)):
  im = image[i].transpose()
  s = dummy_audio.shape[0]
  r = random.randint(0, s-1)
  au = dummy_audio[r].reshape(30)
  audio_and_image.append(np.dot(im, au))

In [21]:
audio_and_image = np.array(audio_and_image)

In [22]:
# making labels, 1 for true, 0 for false.
labels = []
labels.append(np.ones((size)))
labels.append(np.zeros((size)))
labels = np.array(labels)
labels = labels.reshape(size*2, 1)

In [23]:
# split the data into train and test
(trainX, testX, trainY, testY) = train_test_split(audio_and_image, labels, test_size=0.1, random_state=32)

In [24]:
# construct the model
model = models.Sequential()
model.add(layers.Conv2D(32, (3, 3), activation='relu', input_shape=(160, 90, 1)))
model.add(layers.MaxPooling2D((2, 2)))
model.add(layers.Dropout(0.5))
model.add(layers.Conv2D(32, (3, 3), activation='relu'))
model.add(layers.MaxPooling2D((2, 2)))
model.add(layers.Dropout(0.5))
model.add(layers.Conv2D(32, (3, 3), activation='relu'))
model.add(layers.MaxPooling2D((2, 2)))
model.add(layers.Flatten())
model.add(layers.Dense(1, activation='sigmoid'))

In [25]:
model.compile(optimizer='adam',
              loss='binary_crossentropy',
              metrics=['accuracy'])

In [26]:
from keras import backend as K
K.set_value(model.optimizer.learning_rate, 0.001)

In [None]:
history = model.fit(trainX, trainY, epochs=100, 
                    validation_data=(testX, testY))

In [34]:
from google.colab import files
def process_input(a_path, i_path):
  input = []
  v_numpy = image_conv(video_processing(i_path))
  a_numpy = audio_conv(log_spectogram(a_path))
  input.append(np.dot(im, au))
  input = np.array(input)
  predict = model.predict(input)
  if predict > 0.5:
    print("image and audio matches!")
  else:
    print("image and audio doesn't match!")

In [None]:
process_input('/content/audio0 81sec.wav', '/content/video29 31sec.mp4')