In [None]:
# this file resides in the extractors folder
# it should resemble the utils file at the toolchain root folder

In [21]:
# constants 
packages=[]
failed_packages=[]
colab=''

In [22]:
def import_packages(names, packages=packages, failed_packages=failed_packages, vb=False):
    # tries to import a package from names list and add its name to the packages list if successful
    # adds failed package names to failed_packages list
    if not names:
        return
    if not isinstance(names, list): name=[name] 
    if not isinstance (names[0], list):
        names=[names]
    failed=[]
    for ls in names:
        for name in ls:
            
            try: 
                # create import statement
                n=name
                if not 'from' in name:
                    n = 'import '+ name
                # package name
                pn=n.split()[1]
                # module name (possibly submodule or renamed by 'as')
                mn = n.split()[-1]
                if vb: print(f'trying {n}')
                exec(n)
                if vb: print(eval(mn))
                globals()[mn] = eval(mn)
                packages.append(pn)
                #if vb: print(f'imported: {pn}')
            except Exception as e:
                if vb:
                    print(f'error: ', pn)
                    print(e)
                failed_packages.append(pn)

In [38]:
builtin_requirements=['os','time', 'sys','datetime','importlib','subprocess']
python_requirements=['from contextlib import contextmanager','logging as lg','import_ipynb',  'json', 'from collections import defaultdict',
                    'from time import perf_counter',]
video_requirments=['cv2','from moviepy.video.io.VideoFileClip import VideoFileClip', 'ffmpeg']
data_requirements=['pandas as pd',]
os_requirements=['magic', ]
config_requirements=['settings']
requirements=[builtin_requirements, python_requirements, video_requirments, data_requirements, os_requirements, config_requirements]

import_packages(requirements)

In [39]:
# ffmpeg
# !pip install ffmpeg-python

In [40]:
def msg_failed_packages(msg):
    for pn in failed_packages:
        if pn in msg:
            print(f'user warning: failed to import {msg[pn]}')

In [41]:
msg={'magic':f'python magic. filetype cannot be checked. will assume video type is mp4', 
    'cv2': f'opencv. save_frames will fail', 'ffmpeg':'ffmpeg. video editing functions may not be available. install using pip install '
            'ffmpeg-python. also requires ffmpeg executable, see https://ffmpeg.org/download.html'}
msg_failed_packages(msg)



In [44]:
# constants
colab=''
vb=None
if 'settings' in globals():
    vb=settings.vb    

In [33]:
# load config data
# returns defaultdict with None as default value
# this function has never been used and will probably be retired
def load_conf(conf_path='config.txt'):
    conf = open(conf_path, 'r').read()
    d={}
    for line in conf.split('\n'):
        line=line.strip()
        if line and not line.startswith('#'):
            sp = line.split(' ')
            # print(sp)
            var, val = (sp[0], sp[1]) if len(sp)>1 else (sp[0], '')
            try:
                val=int(val)
            except:
                if ',' in val:
                    try:
                        val=val.split(',')
                        val=tuple(None if 'None' in v else int(v) for v in val)
                    except:
                        raise ValueError(f'expected int tuple but got {val}')
            if val=='True':
                val=True
            elif val=='False':
                val=False
            elif val == 'None':
                    val=None
            d[var]=val
    d = defaultdict(lambda:None, **d)
    return d

In [35]:
# print stack trace
def print_trace(e):
    r=[]
    t = e.__traceback__
    while t:
        print(f'{t.tb_frame}, \nt.tb_lasti: {t.tb_lasti},   \ndir:{dir(t)}, \ntb.frame-f.code: {t.tb_frame.f_code}, \nfilename: {t.tb_frame.f_code.co_filename}: \nline: {t.tb_lineno}')#{getattr(t)},
        t = t.tb_next

In [29]:
def catch(func, vb=None):
    def w():
        try:
            func()
        except Exception as e:
            if vb: print(f'{func.__name__} error: {e}')
    return w

In [30]:
# check video file type
def check_filetype(path):
    try:
        return magic.from_file(path)
    except:
        return None

#### video functions

In [31]:
# load data
def get_videos(in_folder, min_size=50, start=0, end=None, extensions=None):
    # return files (filenames) from in_folder
    # min_size: min file size in kB 
    # start, end: start, end index of file list. default: entire file list
    # extensions: iterable of extensions. files without such extensions are excluded 
    # return: list of filenames (string)
    
    if not in_folder or not os.path.isdir(in_folder):
        return []
    extensions = extensions if extensions else ['flv', 'f4v', 'f4p', 'f4a', 'f4b', 'nsv', 'roq', 'mxf', '3g2', '3gp', 'svi', 'm4v', 'mpg', 'mpeg', 'm2v', 'mpg', 'mp2', 'mpeg', 'mpe', 'mpv', 'mp4', 'm4p', 'm4v', 'amv', 'asf', 'rmvb', 'rm', 'yuv', 'wmv', 'mov', 'qt', 'MTS', 'M2TS', 'TS', 'avi', 'mng', 'gifv', 'gif', 'drc', 'ogv', 'ogg', 'vob', 'flv', 'flv', 'mkv']
    videos = os.listdir(in_folder)
    
    videos = [v for v in videos if '.' in v and os.path.splitext(v)[1][1:] in extensions]
    videos = [v for v in videos if os.path.getsize(os.path.join(in_folder, v))/1000 > min_size]
    videos.sort()
    l = len(videos)
    end = l if not isinstance(end,int) or end < 0 or end > l else end 
    start = 0 if (not isinstance(start,int) or start < 0 or start > l) else start 

    return videos[start:end]

In [34]:
def get_frames(path, skip_frame=10):
    # yield video frames
    cap = cv2.VideoCapture(path)
    r,c = True,0
    while r:
        r, frame = cap.read()
        if not r:break 
        if c%skip_frame==0:
            yield frame
        c+=1

In [45]:
# helper for saving specified frames      
def save_frames(video, name, out_folder, frames, skip_frame=None, vb=False):
    # saves specified frames from video
    # frames are either specified as int, list (frame) or as by indicating number of frames to be skipped (skip_frame)
    # skip_frame takes priority over frame
    # returns True if successful
    # save format: out_folder/name_<frame_number>.jpg
    cap=cv2.VideoCapture(video)
    max_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    if skip_frame:
        frames=[f for f in range(max_frames) if f%skip_frame==0]
    r=True
    if isinstance(frames, int):
        frames=[frames]
    image_dir=os.path.join(out_folder, name)
    os.makedirs(image_dir, exist_ok=True)
    for fr in frames:
        if fr>=max_frames:
            return True
        cap.set(cv2.CAP_PROP_POS_FRAMES, fr)
        _, image = cap.read()
        image_path=os.path.join(out_folder, name, '_'+str(fr)+".jpg")
        if vb:print(f'writing image {fr} to {image_path}')
        if not cv2.imwrite(os.path.join(out_folder, name, '_'+str(fr)+".jpg"), image):
            r=False
    return r

In [53]:
def mkv2mp4(path, vb=False):
    # see: https://stackoverflow.com/questions/64519818/converting-mkv-files-to-mp4-with-ffmpeg-python
    # converts mkv file to mp4
    if not isinstance(path, str):
        return
    if not os.path.isfile(path):
        raise ValueError(f'cant stat {path}')
    name, ext = os.path.splitext(path)
    bn,dn=os.path.basename(name), os.path.dirname(name)
    if not ext=='.mkv':
        raise ValueError(f'expected .mkv extension but got {ext}')
    ffmpeg.input(path).output(f'{name}.mp4').run()
    if vb:print(f'converted {name}.mkv to {name}.mp4 in {dn}')

#### exec functions

In [46]:
def passthrough_and_capture_output(args):
    process = subprocess.Popen(args, stdout=subprocess.PIPE, universal_newlines=True)
    # universal_newlines means that the output of the process will be interpreted as text
    capture = ''
    s = process.stdout.read(1)
    while len(s) > 0:
        sys.stdout.write(s)
        sys.stdout.flush()
        capture += s
        s = process.stdout.read(1)
    return capture

In [47]:
def execute(cmd, timeout=3):
    from subprocess import Popen, PIPE
    p = Popen(cmd, stdout = PIPE)
    while p.poll() is None:
        time.sleep(0.2)
    stdout, stderr = process.communicate()
    return p.poll(), stdout, stderr

In [32]:
# timer
# see https://stackoverflow.com/questions/33987060/python-context-manager-that-measures-time
try:
    @contextmanager
    def timer():
        start = perf_counter()
        yield lambda: perf_counter() - start
except Exception as e:
    print(e)

name 'contextmanager' is not defined


In [33]:
# silencing output
# see https://stackoverflow.com/questions/4178614/suppressing-output-of-module-calling-outside-library
try:
    @contextmanager
    def suppress_stdout():
        with open(os.devnull, "w") as devnull:
            old_stdout,old_stderr = sys.stdout, sys.stderr
            sys.stdout, sys.stderr = devnull, devnull
            try:  
                yield
            finally:
                sys.stdout = old_stdout
                sys.stderr = old_stderr
except Exception as e:
    print(e)

name 'contextmanager' is not defined


#### logging functions

In [42]:
class _status_logger:
    def __init__(self, log_file):
        self.log_file = log_file

    def log(self, message):
        timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        log_entry = f'{timestamp} - {message}\n'
        with open(self.log_file, 'a') as file:
            file.write(log_entry)

    def info(self, message):
        self.log(f'INFO: {message}')

    def warning(self, message):
        self.log(f'WARNING: {message}')

    def error(self, message):
        self.log(f'ERROR: {message}')

    def critical(self, message):
        self.log(f'CRITICAL: {message}')

In [43]:
try:
    def get_sys_logger(name='google_extractor', fn=None, mode='w', level=lg.DEBUG, fmt=None):
        # logging
        lr = lg.getLogger(name)
        f_h = lg.FileHandler(fn, mode=mode)
        f_h.setLevel(level)
        if fmt is None:
            fmt = lg.Formatter(fmt='%(asctime)s -  %(message)s',datefmt='%Y-%m-%d,%H:%M:%S', )
            #lg.Formatter('%(asctime)s - %(message)s')
        f_h.setFormatter(fmt)
        lr.addHandler(f_h)
        return lr
except Exception as e:
    print(e)

In [44]:
def get_status_logger(filename=None):
    if not filename:
        raise ValueError('cant initialize logger: no filename')
    lr=_status_logger(filename)
    return lr

In [47]:
def _check_colab():
    try:
        from google.colab import drive
        drive.mount('drive')
        colab='/content/drive/MyDrive/toolchain'
    except Exception as e:
        print(e)
        colab=''
        in_folder='../in_folder'

In [45]:
# test get_videos
if __name__=='__main__':
    if not colab:
        try:
            from google.colab import drive
            drive.mount('drive')
            colab='/content/drive/MyDrive/toolchain'
        except Exception as e:
            print(e)
            colab=''
            in_folder='../in_folder'
    if colab:
        os.chdir(colab)
    # test get_videos
    v=get_videos(in_folder)
    print(v)

No module named 'google'
[]


In [36]:
# test print_trace
if __name__=='__main__':
    try:
        a=1/0
    except Exception as e:
        print_trace(e)

<frame at 0x104af8ca0, file '/var/folders/bf/vqv15x3s20jfm_p_wzxnz6km0000gn/T/ipykernel_3141/1144046883.py', line 6, code <module>>, 
t.tb_lasti: 20,   
dir:['tb_frame', 'tb_lasti', 'tb_lineno', 'tb_next'], 
tb.frame-f.code: <code object <module> at 0x1043fd7d0, file "/var/folders/bf/vqv15x3s20jfm_p_wzxnz6km0000gn/T/ipykernel_3141/1144046883.py", line 1>, 
filename: /var/folders/bf/vqv15x3s20jfm_p_wzxnz6km0000gn/T/ipykernel_3141/1144046883.py: 
line: 4


In [37]:
# test print_trace
if __name__=='__main__':
    try:
        r=get_videos('../in_folder', start=1, end=2)
        print(r)
    except Exception as e:
        print(e)

[]


In [40]:
# test save_frames
if __name__=='__main__':
    try:
        r=save_frames(video='../../videos/archive 0-999/video10.mp4', name='video10',out_folder='out_folder/', frames=[1,10])
        print(r)
    except Exception as e:
        print(e)

name 'cv2' is not defined
