In [1]:
### Script teste de busca de um padrão temporal em um série temporal
# Utiliza a biblioteca Stumpy para realizar as buscas
# A variável Q_df é a série padrão e a T_df é a série a ser localizado trechos que equivalem a Q_df
# Esse algoritmo foi desenvolvido utilizando o tutorial em https://stumpy.readthedocs.io/en/latest/Tutorial_Pattern_Matching.html

In [2]:
import time
import os
import numpy as np
import sys
import pandas as pd
import glob
import matplotlib.pyplot as plt
import cv2
import ast
import stumpy
from stumpy import config
import numpy as np
import numpy.testing as npt
import matplotlib.pyplot as plt
from matplotlib.patches import Rectangle

In [3]:
#config.STUMPY_EXCL_ZONE_DENOM = np.inf  # i ± int(np.ceil(m / {1}))
plt.rcParams['font.size'] = 10
plt.rcParams["font.family"] = "Times New Roman"
plt.style.use('https://raw.githubusercontent.com/TDAmeritrade/stumpy/main/docs/stumpy.mplstyle')

In [4]:
# -------------

In [5]:
# Scan Folder and Find
def list_scan_from_path (baseDir, file_name):
    MAIN_VD_FEATURES_LX = []
    # Scan the Folder and Save the list[]
    FOLDER_CSV_SCAN = glob.iglob( baseDir + os.sep + '**' + os.sep + '*.CSV', recursive=True )
    # Order By Name
    #FOLDER_CSV_SCAN = sorted(FOLDER_CSV_SCAN)
    # Interate frame by frame
    for filename in FOLDER_CSV_SCAN:
        if (filename.find(file_name) != -1) == True:
            #print (filename)
            MAIN_VD_FEATURES_LX.append(filename)
        #
    # Return the list
    return MAIN_VD_FEATURES_LX
#<

In [6]:
# Collect the base
def collect_current_path (original_path):
    xpath = original_path.split(os.sep)
    new_path = ''
    ii=0
    for parts in xpath:
        if ii <= len(xpath)-2:
            new_path += parts + str(os.sep)
        ii += 1
    # Return
    return new_path
#<chg

In [7]:
# Collect the base
def collect_basex (original_path):
    xpath = original_path.split(os.sep)
    new_path = ''
    ii=0
    for parts in xpath:
        if ii <= len(xpath)-2:
            new_path += parts + str(os.sep)
        ii += 1
    # Return
    return new_path
#<chg

In [8]:
# Get frames from video input
# for selecting start and end frame set EXTRACT_ALL_FRAMES=False
# start_frame --> for setting the starting frame 
# end_frame --> for setting the ending frame
#
def LOAD_VIDEO_FRAMES(path, start_frame=None, end_frame=None, EXTRACT_ALL_FRAMES=True):

    MAX_FRAMES = 5000 # Max number of frames to load
    frames={}
        
    cap = cv2.VideoCapture(path)
    
    if EXTRACT_ALL_FRAMES:
        frame_number = 0
        
        if not cap.isOpened():
            print("Error opening video")
        else:
            while True:
                ret, frame = cap.read()  # Cap frame
                if not ret:
                    #print("Error capturing frame")
                    break
        
                frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                frames[frame_number] = frame_rgb
                frame_number += 1

                if frame_number > MAX_FRAMES: # Break if the maximum number of frames is reached
                    break
                
            cap.release()

    else:
        frame_number = start_frame
        
        if not cap.isOpened():
            print("Error opening video")
        else:
            while True:
                ret, frame = cap.read()  # Cap frame
                if not ret:
                    print("Error capturing frame")
                    break
        
                if start_frame <= frame_number <= end_frame:
                    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                    frames[frame_number] = frame_rgb
                
                frame_number += 1
        
                if frame_number > end_frame:
                    break
                
                if frame_number - start_frame > MAX_FRAMES: # Break if the maximum number of frames is reached
                    break
            cap.release()
            
    return frames

In [9]:
# start_frame --> for setting the starting frame 
# end_frame --> for setting the ending frame
# max_col --> for setting number of displayed frames in each row
# for selecting start and end frame set EXTRACT_ALL_FRAMES=True
def DISPLAY_FRAMES(frames, start_frame=None, end_frame=None, max_col=5, DISPLAY_ALL_FRAMES=False):

    if DISPLAY_ALL_FRAMES:
        frames_range = frames
    else:
        frames_range = {numero_frame: frame for numero_frame, frame in frames.items() if start_frame <= numero_frame <= end_frame}
    #loop to display images 
    i=1
    n_rows = (len(frames_range) + 4) // max_col
    fig_width = 15
    fig_height  = 0.5 * (end_frame-start_frame+1)
    plt.figure(figsize=(fig_width, fig_height))
    for frame_number, frame in frames_range.items():
        plt.subplot(n_rows, max_col, i)
        plt.imshow(frame)
        plt.text(0,-10,f"frame: {frame_number}")
        plt.axis('off')
        i+=1

In [10]:
# Function 2
def UPDATE_LABEL_DF (init_in, endd_in, label_name_in, label_measur_in, data_frame_in):
    
    # Check if ENDD is Greater than Lengh
    for index_x in range(init_lab, endd_lab+1):
        idx_retur_str = data_frame_in['label_measures'][index_x]
        dicct_current = ast.literal_eval(idx_retur_str)
        # Insert Updating DICT
        dicct_current.update ({label_name_in: label_measur_in})
        # Put Dict into the Current DATA FRAME
        data_frame_in.loc[index_x, 'label_measures'] = str(dicct_current)
    
    return data_frame_in
#

In [11]:
#
# Function 3 - Get ALL LABEL CLASSES
def GET_ALL_CLASSES (data_frame_out):
    # Function Read Labels
    general_dict = {}
    # Check Unique Labels 
    for current_df in data_frame_out['label_measures']:
        general_dict.update((ast.literal_eval(current_df)))
        #
    return list(general_dict.keys())
#

In [12]:
#
# Function 4 - Get Frames from Classes
def GET_FRAMES_FROM_CLASS (data_frame_in, class_in):
    classes_to_frames_dict = {}
    for index, row in data_frame_in.iterrows():
        curr_dict = ast.literal_eval(row['label_measures'])
        curr_labels = list(curr_dict.keys())
        for label in curr_labels:
            if label not in classes_to_frames_dict:
                classes_to_frames_dict[label] = []
            classes_to_frames_dict[label].append(row['frame_seq'])
    return classes_to_frames_dict[class_in]
#

In [13]:
#
# Function 5 - Get Measures from Classes
def GET_MEASURES_FROM_CLASS (data_frame_out, class_in):
    general_dict = {}
    # Check Unique Labels 
    for current_df in data_frame_out['label_measures']:
        general_dict.update((ast.literal_eval(current_df)))
        #
    return general_dict[class_in]
#

In [14]:
def check_file_exist(local_path, file_name):
    base_path = collect_basex(local_path)
    #print(base_path)
    path_test = os.path.join(base_path, file_name)
    if os.path.exists(path_test):
        return True, path_test
    else: return False, path_test

In [15]:
# ---------------

In [16]:
# Variables
# Base Dir ../Dataset
baseDir = os.path.join('..', 'Dataset')

In [17]:
#
## Seach pacth of MEASURE files
MAIN_LIST_MEASURE = []

In [18]:
# File_name to find
MEASURE_FILE_PATH = 'VD_WORDS_OCCURRENCE.CSV'

########## - Find only in DD-Local and - YT
# Break 1 - Exclude the Folder of References
# Variables
# B1.1 - Collect in DD-Local
#baseDir_local = os.path.join(baseDir, 'DD-Local')
# Call the basic function
#MAIN_LIST_MEASURE_local = list_scan_from_path (baseDir_local, MEASURE_FILE_PATH)
MAIN_LIST_MEASURE_local = []
# Variables
# B1.2 - Collect in YT
baseDir_yt = os.path.join(baseDir, 'YT-Online')
# Call the basic function
MAIN_LIST_MEASURE_yt = list_scan_from_path (baseDir_yt, MEASURE_FILE_PATH)

# Join the Two Lists
MAIN_LIST_MEASURE = MAIN_LIST_MEASURE_local + MAIN_LIST_MEASURE_yt

In [19]:
#MAIN_LIST_MEASURE

In [20]:
SUB_FILE_NAME = 'VD_SUBTITLES.CSV'
MAIN_LIST_MEASURE_FILTER = []
for current_path in MAIN_LIST_MEASURE:
    path = collect_basex(current_path)
    check, _ = check_file_exist(path, SUB_FILE_NAME)
    if check:
        MAIN_LIST_MEASURE_FILTER.append(current_path)  

In [21]:
MAIN_LIST_MEASURE_FILTER

['..\\Dataset\\YT-Online\\VD_Y_0000000001\\VD_WORDS_OCCURRENCE.CSV',
 '..\\Dataset\\YT-Online\\VD_Y_0000000002\\VD_WORDS_OCCURRENCE.CSV',
 '..\\Dataset\\YT-Online\\VD_Y_0000000003\\VD_WORDS_OCCURRENCE.CSV',
 '..\\Dataset\\YT-Online\\VD_Y_0000000004\\VD_WORDS_OCCURRENCE.CSV',
 '..\\Dataset\\YT-Online\\VD_Y_0000000005\\VD_WORDS_OCCURRENCE.CSV',
 '..\\Dataset\\YT-Online\\VD_Y_0000000006\\VD_WORDS_OCCURRENCE.CSV',
 '..\\Dataset\\YT-Online\\VD_Y_0000000007\\VD_WORDS_OCCURRENCE.CSV',
 '..\\Dataset\\YT-Online\\VD_Y_0000000008\\VD_WORDS_OCCURRENCE.CSV',
 '..\\Dataset\\YT-Online\\VD_Y_0000000009\\VD_WORDS_OCCURRENCE.CSV',
 '..\\Dataset\\YT-Online\\VD_Y_0000000011\\VD_WORDS_OCCURRENCE.CSV',
 '..\\Dataset\\YT-Online\\VD_Y_0000000012\\VD_WORDS_OCCURRENCE.CSV',
 '..\\Dataset\\YT-Online\\VD_Y_0000000013\\VD_WORDS_OCCURRENCE.CSV',
 '..\\Dataset\\YT-Online\\VD_Y_0000000014\\VD_WORDS_OCCURRENCE.CSV',
 '..\\Dataset\\YT-Online\\VD_Y_0000000015\\VD_WORDS_OCCURRENCE.CSV',
 '..\\Dataset\\YT-Online\\VD_Y_000

In [57]:
# Select the time-serie
VD_INFO_DT = 'VD_INFO.CSV'
TOTAL = pd.DataFrame()
for current_path in MAIN_LIST_MEASURE_FILTER:
    path_dir = collect_current_path(current_path)
    vd_info_path = os.path.join(path_dir, VD_INFO_DT)
    
    vd_info = pd.read_csv(vd_info_path)
    vd_info.drop(columns=['Unnamed: 0'], inplace=True)
    video_id =  vd_info.video_id[0]
    
    try:
        VD_WORDS_OCCURRENCE = pd.read_csv(current_path)
        if 'Unnamed: 0' in VD_WORDS_OCCURRENCE.columns:
            VD_WORDS_OCCURRENCE.drop(columns=['Unnamed: 0'], inplace=True)
            
        VD_WORDS_OCCURRENCE.insert(0, 'video_id', video_id)
        #VD_WORDS_OCCURRENCE['video_id'] = video_id
        TOTAL = pd.concat([TOTAL, VD_WORDS_OCCURRENCE])
        if VD_WORDS_OCCURRENCE.empty:
            print("O arquivo CSV está vazio.", current_path)
    except pd.errors.EmptyDataError:
        print("O arquivo CSV está vazio.", current_path)       

O arquivo CSV está vazio. ..\Dataset\YT-Online\VD_Y_0000000001\VD_WORDS_OCCURRENCE.CSV
O arquivo CSV está vazio. ..\Dataset\YT-Online\VD_Y_0000000003\VD_WORDS_OCCURRENCE.CSV
O arquivo CSV está vazio. ..\Dataset\YT-Online\VD_Y_0000000012\VD_WORDS_OCCURRENCE.CSV
O arquivo CSV está vazio. ..\Dataset\YT-Online\VD_Y_0000000017\VD_WORDS_OCCURRENCE.CSV
O arquivo CSV está vazio. ..\Dataset\YT-Online\VD_Y_0000000018\VD_WORDS_OCCURRENCE.CSV
O arquivo CSV está vazio. ..\Dataset\YT-Online\VD_Y_0000000022\VD_WORDS_OCCURRENCE.CSV


In [117]:
word='HAVE'

In [118]:
FILTER = TOTAL[TOTAL['selected_word'] == word]

In [119]:
contagem = FILTER.video_id.value_counts()
filter_count = pd.DataFrame({'video_id': contagem.index, 'quantidade': contagem.values})

In [120]:
filter_count

Unnamed: 0,video_id,quantidade
0,8,8
1,37,7
2,6,7
3,36,6
4,15,5
5,11,4
6,35,4
7,2,4
8,27,3
9,28,3


In [121]:
FILTER.to_csv(f'{word}.CSV')

In [122]:
len(FILTER)

77

In [123]:
FILTER

Unnamed: 0,video_id,selected_word,start_time_seconds,end_time_seconds,ini_frame,end_frame,text
0,2,HAVE,41.120,46.399,985,1113,FREE VACCINES HAVE BEEN AVAILABLE IN 80
1,2,HAVE,46.399,50.079,1112,1201,WE STILL HAVE NEARLY 80 MILLION
2,2,HAVE,48.160,51.600,1154,1238,AMERICANS WHO HAVE FAILED TO GET THE
3,2,HAVE,106.560,110.399,2554,2647,LARGE MAJORITY OF AMERICANS WHO HAVE
0,4,HAVE,11.370,17.460,341,524,HAVE DEVELOPED A FORMULA TO HELP YOU ACE
...,...,...,...,...,...,...,...
8,37,HAVE,72.760,76.220,1819,1906,EXCITED ABOUT THE CONVERSATION\nWE'LL GET TO H...
9,37,HAVE,88.190,90.660,2204,2267,WE HAVE NO SLIDES TODAY.
10,37,HAVE,106.140,108.830,2653,2721,I SAY THAT BECAUSE YOU\nCOULD HAVE BEEN HOME
11,37,HAVE,151.000,152.590,3775,3815,"THAT WAY, ALL YOU'LL HAVE TO DO"
