In [1]:
!python --version
%pip install numpy==2.0.1 -q
%pip install pandas==2.2.2 -q
%pip install pytubefix==6.8.1 -q
%pip install tqdm==4.66.4 -q
%pip install opencv-contrib-python==4.10.0.84 -q
%pip install scikit-learn==1.5.1 -q
%pip install matplotlib==3.9.1 -q


Python 3.9.2
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [2]:
import os
import tempfile

from io import BytesIO
from typing import Dict

import numpy as np
import pandas as pd
import cv2
import matplotlib.pyplot as plt

from tqdm import tqdm
from pytubefix import Search, YouTube
from sklearn.cluster import KMeans
from sklearn.neighbors import KNeighborsClassifier


In [3]:

def criar_pasta(pasta: str) -> str:
    """
        Garante que uma pasta esteja criada.
        
        Retorna seu endereco.
    """

    # Se pasta existe, exlui
    if os.path.exists(pasta):

        # Remove todos os arquivos
        for file in os.listdir(pasta):
            os.remove(os.path.join(pasta, file))
        
        # Remove a pasta
        os.removedirs(pasta)
    
    os.makedirs(pasta)

    return pasta

path_videos = criar_pasta('videos')
path_frames = criar_pasta('frames')


In [4]:

def carregar_videos_youtube(query: str, max_results: int = 3) -> Dict[str, BytesIO]:
    """
        Salva videos de uma busca do youtube em uma pasta.
    """

    buffers = {}

    search = Search(query)
    videos = [_ for i, _ in enumerate(search.videos) if i <= (max_results - 1)]

    for video in tqdm(videos):
        
        stream = video.streams.get_lowest_resolution()

        try:
            buffer = BytesIO()
            stream.stream_to_buffer(buffer)
            buffer.seek(0)

            buffers[video.watch_url] = buffer
            
        except:
            next

    return buffers



In [5]:
buffers = carregar_videos_youtube("Pontos Turisticos Curitiba", 30)

100%|██████████| 20/20 [03:39<00:00, 10.96s/it]


In [6]:
buffers

{'https://youtube.com/watch?v=LpnIr7NT1JI': <_io.BytesIO at 0x7f7958271040>,
 'https://youtube.com/watch?v=dnRi_jrqPdE': <_io.BytesIO at 0x7f78d7c12950>,
 'https://youtube.com/watch?v=xRBaHnSfLYo': <_io.BytesIO at 0x7f78d75a7f90>,
 'https://youtube.com/watch?v=jV15pSKHvRU': <_io.BytesIO at 0x7f78d7c3f040>,
 'https://youtube.com/watch?v=iVrAYxiq-74': <_io.BytesIO at 0x7f78d7bcc860>,
 'https://youtube.com/watch?v=83rJk_84MNU': <_io.BytesIO at 0x7f78d7c3f090>,
 'https://youtube.com/watch?v=3TpC4Vv2fVA': <_io.BytesIO at 0x7f78d7be47c0>,
 'https://youtube.com/watch?v=K5Ym2UxSJss': <_io.BytesIO at 0x7f78d7bf8a90>,
 'https://youtube.com/watch?v=3zVzZEW6FAs': <_io.BytesIO at 0x7f78d7b96d10>,
 'https://youtube.com/watch?v=YcaO6Ay9Y_o': <_io.BytesIO at 0x7f78d7ba2450>,
 'https://youtube.com/watch?v=Frc5gHtMHuw': <_io.BytesIO at 0x7f78d7bb1f40>,
 'https://youtube.com/watch?v=L5gDRn5Gebg': <_io.BytesIO at 0x7f78d7c3bea0>,
 'https://youtube.com/watch?v=cQPhTBFxpyU': <_io.BytesIO at 0x7f78d7b5ad10>,

In [7]:
def extrair_caracteristicas_imagem(imagem) -> np.ndarray:
    
    STAR = cv2.xfeatures2d.StarDetector_create()
    BRIEF = cv2.xfeatures2d.BriefDescriptorExtractor_create()

    key_points = STAR.detect(imagem, None)

    key_points, features = BRIEF.compute(imagem, key_points)

    return np.asarray(features)


In [8]:

def listar_caracteristicas_video(video_buffer: BytesIO, 
                                 intervalo_frames: int = 60
                                 ) -> pd.DataFrame:

    """
        Lista caracteristicas do video.

        # Parametros

        * `video_buffer`: Utiliza mecanismo de arquivo 
        temporario para ler Buffer em BytesIO. 
        
        * `intervalo_frames`: Registra dados de x em x
        frames.

        # Retorno
        
        `DataFrame` contendo caracteristicas da imagem.

    """

    df_features = pd.DataFrame()

    with tempfile.NamedTemporaryFile() as tmp:
        
        video_buffer.seek(0)
        tmp.write(video_buffer.read())

        capture = cv2.VideoCapture(tmp.name)

        fps = int(capture.get(cv2.CAP_PROP_FPS))

        contador: int = 0
        executando: bool = True

        while executando:
            
            executando, frame = capture.read()

            if (contador % intervalo_frames) == 0 and executando:

                try:
                    features_frame = extrair_caracteristicas_imagem(frame)
                    features_frame = pd.DataFrame(features_frame)
                    features_frame['segundo'] = contador // fps

                    df_features = pd.concat([df_features, features_frame], ignore_index=True)

                except:
                    pass
            
            
            contador += 1


    return df_features



In [9]:

def listar_caracteristicas_videos(videos: Dict[str, BytesIO], **kwargs) -> Dict[str, pd.DataFrame]:

    caracteristicas = {}

    for url, buffer in videos.items():
        caracteristicas[url] = listar_caracteristicas_video(buffer, *kwargs)

    return caracteristicas


In [10]:

caracteristicas = listar_caracteristicas_videos(buffers)
print(str(caracteristicas)[1:100])

# Para evitar sobrecarga de RAM
buffers = None

'https://youtube.com/watch?v=LpnIr7NT1JI':          0    1    2    3    4    5    6    7    8    9 


In [11]:

def empilhar_caracteristicas(caracteristicas) -> pd.DataFrame:

    empilhado = pd.DataFrame()

    for url, item in caracteristicas.items():
        item['url'] = url
        empilhado = pd.concat([empilhado, item], ignore_index=True)
        
    return empilhado


In [12]:

df = empilhar_caracteristicas(caracteristicas)

# Para evitar sobrecarga de RAM
caracteristicas = None

df.head()


Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,24,25,26,27,28,29,30,31,segundo,url
0,224,104,79,171,12,111,196,139,50,234,...,13,200,78,120,211,216,195,103,0,https://youtube.com/watch?v=LpnIr7NT1JI
1,226,244,215,61,215,86,94,37,9,229,...,13,35,170,31,108,56,226,210,0,https://youtube.com/watch?v=LpnIr7NT1JI
2,230,248,215,51,211,212,94,33,73,165,...,7,35,186,31,4,176,243,208,0,https://youtube.com/watch?v=LpnIr7NT1JI
3,49,21,106,102,40,41,161,218,50,90,...,168,220,101,244,191,223,12,45,0,https://youtube.com/watch?v=LpnIr7NT1JI
4,215,53,241,197,233,201,145,209,230,28,...,242,214,125,199,184,199,29,40,0,https://youtube.com/watch?v=LpnIr7NT1JI


In [13]:

def getKMeans(df: pd.DataFrame, k: int, random_state: float) -> KMeans:
    
    colunas_features = [col for col in df.columns if isinstance(col, int)]
    colunas_outras = [col for col in df.columns if isinstance(col, str)]
    
    df_features = df[colunas_features].copy()

    kmeans = KMeans(n_clusters=k, random_state=random_state)
    
    df_transformada = kmeans.fit_transform(df_features)
    df_transformada = np.sqrt(df_transformada ** 2)
    df_transformada = pd.DataFrame(df_transformada)

    for coluna in colunas_outras:
        df_transformada[coluna] = df[coluna]

    df_transformada = df_transformada.groupby(colunas_outras).mean(colunas_features)
    df_transformada.reset_index(drop=False, inplace=True)

    knn = KNeighborsClassifier(n_neighbors=1)
    df_transformada['segundo'] = df_transformada['segundo'].astype(str)
    knn.fit(df_transformada[[_ for _ in df_transformada.columns if str(_) not in ('url', 'segundo')]], df_transformada[['url', 'segundo']])

    return kmeans, knn, df_transformada


In [14]:
kmeans, knn, df_transformada = getKMeans(df, k=64, random_state=2002)

df_transformada.head()

In [None]:

img = cv2.imread('Estatua.jpeg')
plt.imshow(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))


In [None]:

def inferir(pathImagem, kmeans, knn):

    img = cv2.imread(pathImagem)
    features_imagem = extrair_caracteristicas_imagem(img)
    features_imagem = kmeans.transform(features_imagem)
    features_imagem = np.mean(np.sqrt(features_imagem ** 2), axis=0).reshape(1, knn.n_features_in_)
    
    df_features = knn.predict(features_imagem)

    return df_features



In [None]:

distancias = inferir('Estatua.jpeg', kmeans, knn)
distancias

In [None]:

def get_video_frame(video_url, segundo):

    video = YouTube(video_url)
    buffer = BytesIO()

    stream = video.streams.get_lowest_resolution()

    buffer = BytesIO()
    stream.stream_to_buffer(buffer)
    buffer.seek(0)

    with tempfile.NamedTemporaryFile() as tmp:
        
        buffer.seek(0)
        tmp.write(buffer.read())

        capture = cv2.VideoCapture(tmp.name)

        fps = int(capture.get(cv2.CAP_PROP_FPS))

        contador: int = 0
        executando: bool = True

        while executando:
            
            executando, frame = capture.read()

            if (contador // fps) == segundo and executando:
                img = frame
            
            contador += 1

    plt.title(video_url + "&t=" + str(segundo))
    plt.imshow(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))


In [None]:
get_video_frame(distancias[0][0], int(distancias[0][1]))