##### Copyright 2023 The MediaPipe Authors. All Rights Reserved.

In [1]:
  #@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Audio Classification

In this notebook you will use the MediaPipe Tasks API to classify audio.

In [2]:
!pip install -q sounddevice==0.4.4
!pip install -q mediapipe==0.10.0
!pip install pydub
!pip install ipywidgets
!pip install ffmpeg
!pip install scikit-image

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m33.9/33.9 MB[0m [31m11.4 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting pydub
  Downloading pydub-0.25.1-py2.py3-none-any.whl (32 kB)
Installing collected packages: pydub
Successfully installed pydub-0.25.1
Collecting jedi>=0.16 (from ipython>=4.0.0->ipywidgets)
  Downloading jedi-0.19.1-py2.py3-none-any.whl (1.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m16.8 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: jedi
Successfully installed jedi-0.19.1
Collecting ffmpeg
  Downloading ffmpeg-1.4.tar.gz (5.1 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: ffmpeg
  Building wheel for ffmpeg (setup.py) ... [?25l[?25hdone
  Created wheel for ffmpeg: filename=ffmpeg-1.4-py3-none-any.whl size=6080 sha256=98b764a0924e106dd642f4aae59ef8a2b4aadc20b9dc4a5cf9aad82f62a9d5cf
  Stored in directory: /root/.cache/pip/wheels/8e/7a/69/cd6

In [3]:
!wget -O classifier.tflite -q https://storage.googleapis.com/mediapipe-models/audio_classifier/yamnet/float32/1/yamnet.tflite
!wget -q -O efficientdet.tflite -q https://storage.googleapis.com/mediapipe-models/object_detector/efficientdet_lite0/int8/1/efficientdet_lite0.tflite

In [4]:
import urllib
import ipywidgets as widgets
from IPython.display import display, Audio
import numpy as np
from pydub import AudioSegment
from mediapipe.tasks import python
from mediapipe.tasks.python.components import containers
from mediapipe.tasks.python import audio
from scipy.io import wavfile
from mediapipe.tasks.python import vision
#############################
from moviepy.editor import VideoFileClip
import cv2
import mediapipe as mp
import matplotlib.pyplot as plt
import scipy.signal
import scipy
from google.colab.patches import cv2_imshow
from skimage import segmentation
from skimage import io, color
import skimage

In [10]:
# Customize and associate model for Classifier
base_options = python.BaseOptions(model_asset_path='classifier.tflite')
options = audio.AudioClassifierOptions(
    base_options=base_options, max_results=4)

uploader = widgets.FileUpload(
    accept='.mp4',  # Accept .mp4 files only
    multiple=False  # Allow only single file upload
)

Sound = []

def on_upload_change(change):
    # Once file is uploaded, use the uploaded file
    audio_data = next(iter(uploader.value.values()))['content']


    file_type = next(iter(uploader.value.values()))['metadata']['name'].split('.')[-1]

    if file_type == 'mp4':
        with open('uploaded_video.mp4', 'wb') as f:
            f.write(audio_data)

        video = VideoFileClip('uploaded_video.mp4')
        video.audio.write_audiofile('uploaded_audio.wav')
        file_path = 'uploaded_audio.wav'


    # Step 2: Use the uploaded file for audio classification
    file_path = 'uploaded_audio.wav'
    display(Audio(file_path, autoplay=False))

    # Continue with the rest of your audio classification code
    duration = get_audio_duration(file_path)
    print("The duration of audio is ：%s seconds" % duration)

    with audio.AudioClassifier.create_from_options(options) as classifier:
        sample_rate, wav_data = wavfile.read(file_path)
        audio_clip = containers.AudioData.create_from_array(
            wav_data.astype(float) / np.iinfo(np.int16).max, sample_rate)
        classification_result_list = classifier.classify(audio_clip)

        # Calculate the number of segments based on audio duration and segment length
        segment_length = 1
        num_segments = int(duration / segment_length)

        # Iterate through clips to display classifications
        for idx in range(num_segments):
            timestamp = idx * segment_length * 1000
            classification_result = classification_result_list[idx]
            top_category = classification_result.classifications[0].categories[0]
            print(classification_result.classifications[0].categories[0].category_name)
            print(f'Timestamp {timestamp}: {top_category.category_name.lower()} ({top_category.score:.2f})')

            Sound.append(top_category.category_name.lower())
# Display the upload button
display(uploader)

# Attach the on_upload_change function to the uploader
uploader.observe(on_upload_change, names='_counter')

# Rest of your code (functions and model loading)

def get_audio_duration(file_path):
    audio = AudioSegment.from_file(file_path)
    duration = len(audio) / 1000
    return duration

FileUpload(value={}, accept='.mp4', description='Upload')

MoviePy - Writing audio in uploaded_audio.wav


                                                        

MoviePy - Done.




The duration of audio is ：5.02 seconds
Speech
Timestamp 0: speech (0.96)
Speech
Timestamp 1000: speech (0.98)
Speech
Timestamp 2000: speech (0.99)
Speech
Timestamp 3000: speech (0.94)
Speech
Timestamp 4000: speech (0.94)


In [6]:
model_path = '/absolute/path/to/lite-model_efficientdet_lite0_detection_metadata_1.tflite'

In [11]:
base_options = python.BaseOptions(model_asset_path='efficientdet.tflite')
options = vision.ObjectDetectorOptions(base_options=base_options, score_threshold=0.5)
detector = vision.ObjectDetector.create_from_options(options)
frame_index = 0
symn = [['car','vehicle','motor vehicle (road)'],['speech','person'],['animal','dog']]
#symn = ['speech','person']
def visualize(image, detection_result,frame_index,fps):

    frame_second = int(frame_index/fps)
    if frame_second > get_audio_duration('/content/uploaded_audio.wav')-1:
      frame_second = int(frame_index/fps)-1
    for detection in detection_result.detections:
      for idx in range(len(symn)):
        if Sound[frame_second] in symn[idx] and detection.categories[0].category_name in symn[idx]:
        #  if Sound[frame_second] == detection.categories[0].category_name:
            bbox = detection.bounding_box
            start_point = bbox.origin_x, bbox.origin_y
            end_point = bbox.origin_x + bbox.width, bbox.origin_y + bbox.height
            cv2.rectangle(image, start_point, end_point, (0, 0, 255), 3)

            category = detection.categories[0]
            result_text = f"{category.category_name} ({round(category.score, 2)})"
            text_location = (10 + bbox.origin_x, 10 + 10 + bbox.origin_y)
            cv2.putText(image, result_text, text_location, cv2.FONT_HERSHEY_PLAIN, 1, (0, 0, 255), 1)
        else:
            image = image



    return image

video_path = 'uploaded_video.mp4'  # Replace with your video file path
cap = cv2.VideoCapture(video_path)
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter('output_video.mp4', fourcc, 20.0, (int(cap.get(3)), int(cap.get(4))))
fps = cap.get(cv2.CAP_PROP_FPS)


while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    mp_frame = mp.Image(image_format=mp.ImageFormat.SRGB, data=frame_rgb)
    detection_result = detector.detect(mp_frame)
    annotated_frame = visualize(frame, detection_result,frame_index,fps)
    out.write(annotated_frame)
    frame_index = frame_index + 1

cap.release()
out.release()
cv2.destroyAllWindows()
original_video_clip = VideoFileClip('uploaded_video.mp4')
output_video_clip = VideoFileClip('output_video.mp4')

output_video_clip = output_video_clip.set_duration(original_video_clip.duration)

final_clip = output_video_clip.set_audio(original_video_clip.audio)

final_clip.write_videofile('final_output_video.mp4', codec='libx264', audio_codec='aac')

Moviepy - Building video final_output_video.mp4.
MoviePy - Writing audio in final_output_videoTEMP_MPY_wvf_snd.mp4




MoviePy - Done.
Moviepy - Writing video final_output_video.mp4





Moviepy - Done !
Moviepy - video ready final_output_video.mp4
