In [1]:
import os
import shutil
from moviepy.editor import VideoFileClip

#비디오를 길이가 짧은 순서대로 정렬한 후 '1.mp4', '2.mp4' ... 이름으로 다시 저장하고 각 비디오의 오디오 트랙을 1.wav, 2.wav...로 추출하여 저장하는 함수
def sort_videos_by_length_and_extract_audio(input_folder, output_audio_folder, output_video_folder):
    # input_folder에서 비디오 파일 리스트 가져오기
    video_files = [f for f in os.listdir(input_folder) if f.endswith(('.mp4', '.avi', '.mov'))]
    video_paths = [os.path.join(input_folder, f) for f in video_files]
    
    # 비디오 파일마다 길이 저장
    video_lengths = []
    for video_path in video_paths:
        with VideoFileClip(video_path) as video:
            video_length = video.duration
        video_lengths.append((video_path, video_length))
    
    # 비디오 길이에 따라 정렬
    sorted_videos = sorted(video_lengths, key=lambda x: x[1])
    
    # 정렬된 순서대로 파일 복사 및 이름 변경, 오디오 추출
    for i, (video_path, _) in enumerate(sorted_videos, start=1):
        # 새로운 파일 이름 설정
        new_video_name = f"{i}{os.path.splitext(video_path)[1]}"
        
        # 원본 비디오 파일을 새 위치로 복사
        new_video_path = os.path.join(output_video_folder, new_video_name)
        shutil.copy(video_path, new_video_path)
        
        # 오디오 추출
        with VideoFileClip(new_video_path) as video:
            audio_path = os.path.join(output_audio_folder, f"{i}.wav")
            video.audio.write_audiofile(audio_path)

In [2]:
import numpy as np
from scipy.signal import correlate
from scipy.io import wavfile
import os

#기준 영상과 비교 영상의 오디오 싱크를 맞췄을 때 비교 영상의 앞부분에서 얼만큼의 시간을 조정해야 하는지를 계산하는 함수
def cal_adjustment_time(audio_path):
    # audio_path 내의 모든 wav 파일 리스트 가져오기
    audio_files = [f for f in os.listdir(audio_path) if f.endswith('.wav')]
    audio_paths = [os.path.join(audio_path, f) for f in audio_files]

    # 기준 영상 대비 비교 영상의 앞부분이 얼마만큼 밀려있는지를 저장하는 리스트
    front_adjustment_lst = []

    # 기준 영상의 길이 계산
    _, base_audio = wavfile.read(audio_paths[0])
    base_duration = len(base_audio) / 44100  # 기준 오디오 파일의 길이(초)

    # audio_path 내의 모든 target audio 파일에 대해 adjusted_front_duration 계산
    for audio_path in audio_paths[1:]:
        rate, target_audio = wavfile.read(audio_path)
        target_duration = len(target_audio) / rate  # target 오디오 파일의 길이(초)

        base_waveform = base_audio  # 기준 무대 영상의 waveform
        target_waveform = target_audio  # 비교할 무대 영상의 waveform

        # waveform 정규화
        base_waveform = base_waveform / np.max(np.abs(base_waveform))
        target_waveform = target_waveform / np.max(np.abs(target_waveform))

        # 상호 상관 함수 계산
        correlation = correlate(target_waveform, base_waveform, mode='same')

        # 상호 상관 함수에서 피크값 찾기
        peak_index = np.argmax(correlation)

        # base_audio와 target_audio간의 adjusted_front_duration 계산
        adjusted_front_duration = ((peak_index - len(base_waveform)) // 2) / rate
        front_adjustment_lst.append(adjusted_front_duration)

    return front_adjustment_lst

## 순차처리

In [None]:
from moviepy.editor import VideoFileClip, concatenate_videoclips, ColorClip
from PIL import Image
import os

# 기준 영상과 오디오 싱크를 맞추고 영상 길이가 기준 영상보다 길 경우 잘라내며 최종적으로 프레임을 추출하여 이미지로 저장하는 함수
def trim_and_extract_frames(input_video_paths, output_frame_paths, front_adjustment_lst, resize_size, fps=30):
    # input_video_paths에 있는 모든 비디오 파일들의 경로를 불러옴
    video_files = [f for f in os.listdir(input_video_paths) if f.endswith('.mp4')]
    video_paths = [os.path.join(input_video_paths, f) for f in video_files]

    # 첫 번째 영상(기준 영상)에 대한 처리를 위해 더미 값(0)을 앞에 추가
    front_adjustment_lst = [0] + front_adjustment_lst

    # 첫 번째 영상(기준 영상)의 프레임 수를 계산
    base_video = VideoFileClip(video_paths[0])
    base_video_frames = int(base_video.duration * fps)

    # 각 영상 파일에 대해 앞부분을 조정하고 프레임 추출 작업 진행
    for idx, video_path in enumerate(video_paths):
        front_adjustment = front_adjustment_lst[idx] #비교 영상이 기준 영상에 비해 얼마나 지연됐는가에 대한 변수
        video_name = os.path.splitext(os.path.basename(video_path))[0]
        frames_folder = os.path.join(output_frame_paths, video_name + '_frames')
        os.makedirs(frames_folder, exist_ok=True)

        with VideoFileClip(video_path) as video:
            # 영상의 앞부분이 기준 영상보다 짧을 경우 시작 부분에 추가될 검은색 프레임의 수 계산
            front_black_frames = int(abs(front_adjustment) * fps) if front_adjustment < 0 else 0

            # 영상의 앞부분이 기준 영상보다 짧을 경우 시작 부분에 추가될 검은색 프레임
            black_frame = np.zeros((video.size[1], video.size[0], 3), dtype=np.uint8)

            # 프레임 추출 및 저장
            for i in range(base_video_frames): #추출될 프레임의 갯수를 base_video_frames의 갯수에 맞춰줌
                if front_black_frames != 0:
                    if i < front_black_frames: #오디오 싱크를 맞췄을 때 영상의 앞부분이 기준 영상보다 짧은 경우에만 임의로 검은색 프레임 추가
                        frame = black_frame
                    else:
                        frame_time = i / fps - front_adjustment
                        frame = video.get_frame(frame_time)
                else:
                    frame_time = i / fps + front_adjustment
                    frame = video.get_frame(frame_time)
                
                # 이미지 크기 축소
                frame_image = Image.fromarray(frame).resize(resize_size, Image.Resampling.LANCZOS)

                # 이미지 저장
                frame_path = os.path.join(frames_folder, f"frame_{i:05d}.jpg")
                frame_image.save(frame_path, 'JPEG', quality=70) #quality도 임의로 지정함


In [None]:
# 폴더 경로 설정
input_folder = 'input_videos'
output_audios_folder = 'output_audios' #길이순으로 정렬된 영상에서 추출한 오디오 트랙들이 저장되는 폴더
output_videos_folder = 'output_videos' #길이순으로 정렬된 영상이 새로운 이름으로 저장되는 폴더
output_frames_folder = 'output_frames' # 추출한 프레임들이 저장될 폴더

# 프레임 추출 특성 설정
resize_size = (1280, 720)  # 이미지 리사이즈 크기
fps = 3

# 함수 실행
sort_videos_by_length_and_extract_audio(input_folder, output_audios_folder, output_videos_folder) #input_folder에 있는 영상들을 길이순으로 정렬하고 오디오 트랙 추출 및 이름 변경하여 새롭게 저장
front_adjustment_lst = cal_adjustment_time(output_audios_folder) #길이가 가장 짧은 무대 영상을 기준으로 했을 때 비교할 영상이 앞뒤로 얼마만큼 조정해야지를 계산
trim_and_extract_frames(output_videos_folder, output_frames_folder, front_adjustment_lst, resize_size, fps) #비교 영상이 기준 영상과 길이가 같고 오디오 싱크가 맞도록 조정한 후 프레임을 추출하여 저장

### CPU 병렬처리

In [None]:
from trim_vids_and_extract_frames import trim_and_extract_frames_parallel

# 폴더 경로 설정
input_folder = 'input_videos'
output_audios_folder = 'output_audios' #길이순으로 정렬된 영상에서 추출한 오디오 트랙들이 저장되는 폴더
output_videos_folder = 'output_videos' #길이순으로 정렬된 영상이 새로운 이름으로 저장되는 폴더
output_frames_folder = 'output_frames' # 추출한 프레임들이 저장될 폴더

# 프레임 추출 특성 설정
fps = 3 # 추출할 초당 프레임수 지정
resize_size = (1280, 720)  # 이미지 리사이즈 크기

# 함수 실행
sort_videos_by_length_and_extract_audio(input_folder, output_audios_folder, output_videos_folder) #input_folder에 있는 영상들을 길이순으로 정렬하고 오디오 트랙 추출 및 이름 변경하여 새롭게 저장
front_adjustment_lst = cal_adjustment_time(output_audios_folder) #길이가 가장 짧은 무대 영상을 기준으로 했을 때 비교할 영상이 앞뒤로 얼마만큼 조정해야지를 계산
trim_and_extract_frames_parallel(output_videos_folder, output_frames_folder, front_adjustment_lst, resize_size, fps) #비교 영상이 기준 영상과 길이가 같고 오디오 싱크가 맞도록 조정한 후 프레임을 추출하여 저장