In [1]:
import subprocess
import shlex
import json
import os
import math
import logging as log
from typing import *
import multiprocessing

In [14]:
log.basicConfig(
    format="[%(levelname)s][%(asctime)s.%(msecs)03d][%(filename)s:%(lineno)s,%(funcName)s()] %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
    level=log.ERROR)

In [3]:
def proc_run(arg: List[int]):
    process = subprocess.Popen(arg,
                                shell=True,
                               stdin=subprocess.PIPE,
                               stdout=subprocess.PIPE,
                               stderr=subprocess.PIPE,
                               encoding='CP949', # for windows
                               universal_newlines=True)
    (stdout, stderr) = process.communicate()
    if len(stdout) > 0:
        log.info(stdout)
    if len(stderr) > 0:
        log.error(stderr)
    return stdout, stderr

In [10]:
def path_listdir(dir:str):
    for dirpath,_,filenames in os.walk(dir):
        for f in filenames:
            yield os.path.abspath(os.path.join(dirpath, f))

def path_split(path:str) -> Tuple[str, str, str]:
    path = os.path.abspath(path)
    left, ext = os.path.splitext(path)
    dir = os.path.dirname(left)
    name = os.path.basename(left)
    return dir, name, ext

def path_join(dir:str, name:str, ext:str) -> str:
    return os.path.join(dir, name+ext)

def path_sanitize(filepath: str) -> str:
    filepath = os.path.abspath(filepath)
    if not os.path.exists(filepath):
        raise FileNotFoundError
    if os.path.isdir(filepath):
        raise IsADirectoryError
    return filepath

In [13]:
def ffprobe_json(infile: str) -> json:
    infile = path_sanitize(infile)
    arg = shlex.split('ffprobe -v quiet -print_format json -show_streams') + [infile]
    out, _ = proc_run(arg)
    return json.loads(out)

def ffprobe_video_time(infile: str) -> float:
    infile = path_sanitize(infile)
    arg = shlex.split('ffprobe -v error -select_streams v:0 -show_entries format=duration -of default=noprint_wrappers=1:nokey=1') + [infile]
    out, _ = proc_run(arg)
    return float(out)

def ffprobe_video_frame(infile: str) -> int:
    infile = path_sanitize(infile)
    arg = shlex.split('ffprobe -v error -select_streams v:0 -count_packets -show_entries stream=nb_read_packets -of csv=p=0') + [infile]
    out, _ = proc_run(arg)
    return float(out)

In [16]:
def ffmpeg_vp9(infile:str, outfile: str):
    infile = path_sanitize(infile)
    outfile = path_sanitize(outfile)
    arg = shlex.split('ffmpeg -hide_banner -loglevel warning -y -i')+[infile]
    arg+=shlex.split('-c:v libvpx-vp9 -b:v 0 -pix_fmt:v yuv420p -crf:v 27')+[outfile]
    out, err = proc_run(arg)

# extract and transcode audio
def ffmpeg_audio(infile:str) -> str:
    infile = path_sanitize(infile)
    dir, name, _ = path_split(infile)
    outfile = path_join(dir, name+'_temp', '.ogg')

    arg = shlex.split('ffmpeg -hide_banner -loglevel warning -y -i')+[infile]
    arg+=shlex.split('-c:a libopus -b:a 128k -map 0:a:0?')+[outfile]
    proc_run(arg)
    return outfile

def ffmpeg_video_split(infile:str, files:int) -> str:
    infile = path_sanitize(infile)
    dir, name, ext = path_split(infile)

    # plan for splitting video
    time = ffprobe_video_time(infile)
    unit_time = max(20, math.ceil(float(time) / float(files)))
    files = math.ceil(time / unit_time)

    # split video
    arg = shlex.split('ffmpeg -hide_banner -loglevel warning -y -i')+[infile]
    arg+=shlex.split('-f segment -segment_time') +[str(unit_time)]
    arg+=shlex.split('-reset_timestamps 1 -c:v copy -map 0:v:0')+[path_join(dir, name+'_%d', ext)]
    proc_run(arg)

    result = [path_join(dir, name+'_'+str(i), ext) for i in range(files)]
    for fp in list(result):
        if not (os.path.exists(fp) and os.path.isfile(fp)):
            result.remove(fp)

    return result

def ffmpeg_segment(infile:str, worker:int):
    pass
    # audio mux
    #arg = shlex.split('ffmpeg -hide_banner -loglevel warning -y -i')+[infile]
    #arg+=shlex.split('-c:a libopus -b:a 128k -map 0:a:0')+[path_join(dir, +'_temp', '.ogg')]
    #proc_run(arg)
        

In [17]:
for fp in path_listdir('C:/Users/heeyong/Desktop/test'):
    #ffmpeg_video_split(fp, 12)
    ffmpeg_audio(fp)

In [6]:
list(zip([1,2,3],['a' for i in range(3)]))

[(1, 'a'), (2, 'a'), (3, 'a')]

In [11]:
fps_video_encode = ['C:\\Users\\heeyong\\Desktop\\test\\0_video_012_encoded.webm', 'C:\\Users\\heeyong\\Desktop\\test\\0_video_1_encoded.webm', 'C:\\Users\\heeyong\\Desktop\\test\\0_video_2_encoded.webm', 'C:\\Users\\heeyong\\Desktop\\test\\0_video_3_encoded.webm']

In [13]:
import src.path as path

dir, common_name, ext = path.split(fps_video_encode[0])
for fp in fps_video_encode:
    _dir, _, _ext = path.split(fp)
    if not (_dir == dir and _ext == ext):
        print('fuck you')
        break
    
import re

common_name = re.sub('_[0-9]*_encoded', '', common_name)
print(common_name)
path.join(dir, common_name, '.txt')

0_video


'C:\\Users\\heeyong\\Desktop\\test\\0_video.txt'