In [6]:
from pyspark.sql import SparkSession, DataFrame
import pyspark.sql.functions as f
from pyspark.sql.window import Window
from isodate import parse_duration
import pyspark.sql.types as t
from typing import Tuple, List
import os

In [7]:
@f.udf(returnType=t.FloatType())
def converter_minutos(tempo: str) -> float:
    """Converte para o total de seguntos

    Args:
        tempo (str): tempo

    Returns:
        int: total segundos
    """
    duracao = parse_duration(tempo)
    total_segundos = duracao.total_seconds() 
    total_minutos = total_segundos / 60
   
    return round(total_minutos, 2)


In [8]:
spark = (
    SparkSession.builder
    .appName('tratamento_camada_ouro')
    .config("spark.hadoop.fs.s3a.endpoint", "http://192.168.0.112:9001") 
    .config("spark.hadoop.fs.s3a.access.key", "GyRUCHFyONKd4BSb9dbS") 
    .config("spark.hadoop.fs.s3a.secret.key", "KZuHsceRBf2UrNWQWmoa444S1w6HJwJpVRrKEijD") 
    .config("spark.hadoop.fs.s3a.path.style.access", True)
    .getOrCreate()
)

In [9]:
def carregar_dataframe_prata(spark_session : SparkSession, assunto: str, metrica: str = None):
    if metrica is not None:
        df_cities_skylines = spark_session \
            .read \
                .parquet(f'/home/rodrigo/projetos/analise_dados_youtube/data/dados_youtube/prata/{assunto}/extracao_data_*/{metrica}/*.parquet')
    else:
        df_cities_skylines = spark_session \
            .read \
                .parquet(f'/home/rodrigo/projetos/analise_dados_youtube/data/dados_youtube/prata/{assunto}/extracao_data_*/*.parquet')
    return df_cities_skylines

In [21]:
def criar_particao_ouro(dataframe: DataFrame, assunto_pesquisa: str, metrica: str,  nome_arquivo: str, colunas_particao: Tuple = None ):
    caminho_datalake_ouro = 's3a://youtubedatalake:/ouro'
    caminho_completo = f'{caminho_datalake_ouro}/{assunto_pesquisa}/{metrica}/{nome_arquivo}'
    if colunas_particao == None:
        dataframe.write. \
            parquet(caminho_completo)
    else: 
        if isinstance(colunas_particao, Tuple) :
            dataframe.write. \
                partitionBy(*colunas_particao). \
                parquet(caminho_completo)
        else:
            dataframe.write. \
                partitionBy(colunas_particao). \
                parquet(caminho_completo)


In [11]:
def transformacao_dataframe(dataframe_transform: DataFrame):
    dataframe_transform = dataframe_transform.select(
        'data_extracao',
        f.explode('DATA_PUBLICACAO').alias('DATA_PUBLICACAO'),
        'ID_CANAL',
        'NM_CANAL',
        'ID_CATEGORIA',
        'ID_VIDEO',
        'TITULO_VIDEO',
        'DESCRICAO',
        'TAGS',
        'DURACAO_VIDEOS',
        'TOTAL_VISUALIZACOES',
        'TOTAL_LIKES',
        'TOTAL_FAVORITOS',
        'TOTAL_COMENTARIOS',
        
     ).select(
                'data_extracao',
                'DATA_PUBLICACAO',
                f.explode('ID_CANAL').alias('ID_CANAL'),
                'NM_CANAL',
                'ID_CATEGORIA',
                'ID_VIDEO',
                'TITULO_VIDEO',
                'DESCRICAO',
                'TAGS',
                'DURACAO_VIDEOS',
                'TOTAL_VISUALIZACOES',
                'TOTAL_LIKES',
                'TOTAL_FAVORITOS',
                'TOTAL_COMENTARIOS',
        ).select(
                'data_extracao',
                'DATA_PUBLICACAO',
                'ID_CANAL',
                f.explode('NM_CANAL').alias('NM_CANAL'), 
                'ID_CATEGORIA',
                'ID_VIDEO',
                'TITULO_VIDEO',
                'DESCRICAO',
                'TAGS',
                'DURACAO_VIDEOS',
                'TOTAL_VISUALIZACOES',
                'TOTAL_LIKES',
                'TOTAL_FAVORITOS',
                'TOTAL_COMENTARIOS',
        ).select(
                'data_extracao',
                'DATA_PUBLICACAO',
                'ID_CANAL',
                'NM_CANAL',
                f.explode('ID_CATEGORIA').alias('ID_CATEGORIA'),
                'ID_VIDEO',
                'TITULO_VIDEO',
                'DESCRICAO',
                'TAGS',
                'DURACAO_VIDEOS',
                'TOTAL_VISUALIZACOES',
                'TOTAL_LIKES',
                'TOTAL_FAVORITOS',
                'TOTAL_COMENTARIOS',
        ).select(
                'data_extracao',
                'DATA_PUBLICACAO',
                'ID_CANAL',
                'NM_CANAL',
                'ID_CATEGORIA',
                f.explode('ID_VIDEO').alias('ID_VIDEO'),
                'TITULO_VIDEO',
                'DESCRICAO',
                'TAGS',
                'DURACAO_VIDEOS',
                'TOTAL_VISUALIZACOES',
                'TOTAL_LIKES',
                'TOTAL_FAVORITOS',
                'TOTAL_COMENTARIOS',
        ).select(
                'data_extracao',
                'DATA_PUBLICACAO',
                'ID_CANAL',
                'NM_CANAL',
                'ID_CATEGORIA',
                'ID_VIDEO',
                f.explode('TITULO_VIDEO').alias('TITULO_VIDEO'),
                'DESCRICAO',
                'TAGS',
                'DURACAO_VIDEOS',
                'TOTAL_VISUALIZACOES',
                'TOTAL_LIKES',
                'TOTAL_FAVORITOS',
                'TOTAL_COMENTARIOS',
        ).select(
                'data_extracao',
                'DATA_PUBLICACAO',
                'ID_CANAL',
                'NM_CANAL',
                'ID_CATEGORIA',
                'TITULO_VIDEO',
                'ID_VIDEO',
                f.explode('DESCRICAO').alias('DESCRICAO'),
                'TAGS',
                'DURACAO_VIDEOS',
                'TOTAL_VISUALIZACOES',
                'TOTAL_LIKES',
                'TOTAL_FAVORITOS',
                'TOTAL_COMENTARIOS',
        ).select(
                'data_extracao',
                'DATA_PUBLICACAO',
                'ID_CANAL',
                'NM_CANAL',
                'ID_CATEGORIA',
                'ID_VIDEO',
                'TITULO_VIDEO',
                'DESCRICAO',
                f.explode('TAGS').alias('TAGS'),
                'DURACAO_VIDEOS',
                'TOTAL_VISUALIZACOES',
                'TOTAL_LIKES',
                'TOTAL_FAVORITOS',
                'TOTAL_COMENTARIOS',
        ).select(
                'data_extracao',
                'DATA_PUBLICACAO',
                'ID_CANAL',
                'NM_CANAL',
                'ID_CATEGORIA',
                'ID_VIDEO',
                'TITULO_VIDEO',
                'DESCRICAO',
                'TAGS',
                f.explode('DURACAO_VIDEOS').alias('DURACAO_VIDEOS'),
                'TOTAL_VISUALIZACOES',
                'TOTAL_LIKES',
                'TOTAL_FAVORITOS',
                'TOTAL_COMENTARIOS',
        ).select(
                'data_extracao',
                'DATA_PUBLICACAO',
                'ID_CANAL',
                'NM_CANAL',
                'ID_CATEGORIA',
                'ID_VIDEO',
                'TITULO_VIDEO',
                'DESCRICAO',
                'TAGS',
                'DURACAO_VIDEOS',
                f.explode('TOTAL_VISUALIZACOES').alias('TOTAL_VISUALIZACOES'),
                'TOTAL_LIKES',
                'TOTAL_FAVORITOS',
                'TOTAL_COMENTARIOS',
        ).select(
                'data_extracao',
                'DATA_PUBLICACAO',
                'ID_CANAL',
                'NM_CANAL',
                'ID_CATEGORIA',
                'ID_VIDEO',
                'TITULO_VIDEO',
                'DESCRICAO',
                'TAGS',
                'DURACAO_VIDEOS',
                'TOTAL_VISUALIZACOES',
                f.explode('TOTAL_LIKES').alias('TOTAL_LIKES'),
                'TOTAL_FAVORITOS',
                'TOTAL_COMENTARIOS',
        ).select(
                'data_extracao',
                'DATA_PUBLICACAO',
                'ID_CANAL',
                'NM_CANAL',
                'ID_CATEGORIA',
                'ID_VIDEO',
                'TITULO_VIDEO',
                'DESCRICAO',
                'TAGS',
                'DURACAO_VIDEOS',
                'TOTAL_VISUALIZACOES',
                'TOTAL_LIKES',
                f.explode('TOTAL_FAVORITOS').alias('TOTAL_FAVORITOS'),
                'TOTAL_COMENTARIOS',
        ).select(
                'data_extracao',
                'DATA_PUBLICACAO',
                'ID_CANAL',
                'NM_CANAL',
                'ID_CATEGORIA',
                'ID_VIDEO',
                'TITULO_VIDEO',
                'DESCRICAO',
                'TAGS',
                'DURACAO_VIDEOS',
                'TOTAL_VISUALIZACOES',
                'TOTAL_LIKES',
                'TOTAL_FAVORITOS',
                f.explode('TOTAL_COMENTARIOS').alias('TOTAL_COMENTARIOS'),
        )
    dataframe_transform  = dataframe_transform.na.drop('all')
    dataframe_transform = dataframe_transform.withColumn("data_hora_extracao", f.col('data_extracao'))
    dataframe_transform = dataframe_transform.withColumn("data_extracao", f.date_format(f.unix_timestamp("data_extracao", "yyyy-MM-dd HH:mm:ss").cast("timestamp"), "yyyy-MM-dd" ))
    dataframe_transform = dataframe_transform.withColumn('DURACAO_VIDEO_MINUTOS', converter_minutos(f.col('DURACAO_VIDEOS')))
    dataframe_transform = dataframe_transform.withColumn('TOTAL_VISUALIZACOES', f.col('TOTAL_VISUALIZACOES').cast('int'))
    dataframe_transform = dataframe_transform.withColumn('TOTAL_LIKES', f.col('TOTAL_LIKES').cast('int'))
    dataframe_transform = dataframe_transform.withColumn('TOTAL_FAVORITOS', f.col('TOTAL_FAVORITOS').cast('int'))
    dataframe_transform = dataframe_transform.withColumn('TOTAL_COMENTARIOS', f.col('TOTAL_COMENTARIOS').cast('int'))
    dataframe_transform = dataframe_transform.withColumn('TOTAL_CARACTERE_VIDEO', f.length('TITULO_VIDEO'))
    dataframe_transform = dataframe_transform.withColumn('TOTAL_TAGS', f.when(f.size('TAGS') > 0, f.size('TAGS')).otherwise(0))
    dataframe_transform = dataframe_transform.withColumn('HORA_EXTRACAO', f.hour("data_hora_extracao"))
    dataframe_transform = dataframe_transform.withColumn(
        'TURNO_EXTRACAO',
        (
            f.when(
                (dataframe_transform.HORA_EXTRACAO >=6) & (dataframe_transform.HORA_EXTRACAO < 12), 'Manh√£')
            .when((dataframe_transform.HORA_EXTRACAO >= 12) & (dataframe_transform.HORA_EXTRACAO < 18), 'Tarde')
            .otherwise('Noite')
        )
    )
    dataframe_transform = dataframe_transform.withColumn(
        'INDICE_TURNO_EXTRACAO',
        (
            f.when(
                (dataframe_transform.TURNO_EXTRACAO == 'Manh√£'), 1)
            .when((dataframe_transform.TURNO_EXTRACAO == 'Tarde'), 2)
            .otherwise(3)
        )
    )
    return dataframe_transform
  

In [12]:
@f.udf(returnType=t.StringType())
def traduzir_dia_da_semana(dia):
    traducao = {
        'Sunday': 'Domingo',
        'Monday': 'Segunda-feira',
        'Tuesday': 'Ter√ßa-feira',
        'Wednesday': 'Quarta-feira',
        'Thursday': 'Quinta-feira',
        'Friday': 'Sexta-feira',
        'Saturday': 'S√°bado'
    }
    return traducao.get(dia, dia)

In [13]:
@f.udf(returnType=t.IntegerType())
def indice_semana(dia):
    traducao = {
        'Domingo': 1,
        'Segunda-feira': 2,
        'Ter√ßa-feira': 3,
        'Quarta-feira': 4,
        'Quinta-feira': 5,
        'Sexta-feira': 6,
        'S√°bado': 7
    }
    return traducao.get(dia, dia)

In [14]:
df_estatisticas_teste = carregar_dataframe_prata(spark_session=spark, assunto='assunto_cities_skylines', metrica='estatisticas')
df_estatisticas_teste.show()

[Stage 1:>                                                          (0 + 1) / 1]

+-------------------+-------------+--------------------+--------------------+--------------------+------------+--------------------+--------------------+--------------------+--------------+-------------------+-----------+---------------+-----------------+
|      data_extracao|     ID_VIDEO|     DATA_PUBLICACAO|            ID_CANAL|            NM_CANAL|ID_CATEGORIA|        TITULO_VIDEO|           DESCRICAO|                TAGS|DURACAO_VIDEOS|TOTAL_VISUALIZACOES|TOTAL_LIKES|TOTAL_FAVORITOS|TOTAL_COMENTARIOS|
+-------------------+-------------+--------------------+--------------------+--------------------+------------+--------------------+--------------------+--------------------+--------------+-------------------+-----------+---------------+-----------------+
|               null|         null|                null|                null|                null|        null|                null|                null|                null|          null|               null|       null|           

                                                                                

In [15]:
df_estatisticas_teste = carregar_dataframe_prata(spark_session=spark, assunto='assunto_cities_skylines', metrica='estatisticas')
df_estatisticas_teste = transformacao_dataframe(df_estatisticas_teste)
df_estatisticas_teste.show(truncate=False)

[Stage 3:>                                                          (0 + 1) / 1]

+-------------+--------------------+------------------------+----------------------------+------------+-----------+------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

<p>Depara Canal V√≠deo</p>

In [16]:
def construir_depara_canal(dataframe: DataFrame, assunto: str):
    dataframe.select(
        dataframe.ID_CANAL,
        dataframe.NM_CANAL,
        dataframe.ID_VIDEO,
        dataframe.TITULO_VIDEO
    ).distinct().sort('ID_CANAL').write.\
        parquet(f's3://youtubedatalake/ouro/depara/assunto_{assunto}/depara_canal.parquet')


In [17]:
def obter_total_publicacao_video(dataframe : DataFrame) -> DataFrame:
   dataframe =  dataframe.select(
    'DATA_PUBLICACAO',
    'ID_CANAL',
    'NM_CANAL'
    ).withColumn(
       'DATA_PUBLICACAO', f.date_format('DATA_PUBLICACAO', 'yyyy-MM-dd')
    ) \
        .sort('ID_CANAL') \
        .withColumn(
            'SEMANA_TRADUZIDA', 
            traduzir_dia_da_semana(
                f.date_format('DATA_PUBLICACAO', 'EEEE')
            )
            ).distinct() \
                .groupBy('DATA_PUBLICACAO','SEMANA_TRADUZIDA', 'ID_CANAL', 'NM_CANAL'). \
                    count(). \
                        alias('TOTAL_VIDEOS'). \
                            withColumn('INDICE_SEMANA', indice_semana('SEMANA_TRADUZIDA')) \
                    .sort('INDICE_SEMANA')\
                        .select('DATA_PUBLICACAO','SEMANA_TRADUZIDA' ,'ID_CANAL', 'NM_CANAL', f.col('TOTAL_VIDEOS.COUNT').alias('TOTAL_VIDEOS'))       
   return dataframe

In [18]:
obter_total_publicacao_video(df_estatisticas_teste).show()



+---------------+----------------+--------------------+--------------------+------------+
|DATA_PUBLICACAO|SEMANA_TRADUZIDA|            ID_CANAL|            NM_CANAL|TOTAL_VIDEOS|
+---------------+----------------+--------------------+--------------------+------------+
|     2023-10-15|         Domingo|UCrOH1V-FyMunBIMr...|     ChratosGameplay|           1|
|     2023-10-22|         Domingo|UC1mk6EtfMjxR4eEZ...|       Irm√£os Cities|           1|
|     2023-10-15|         Domingo|UC1mk6EtfMjxR4eEZ...|       Irm√£os Cities|           1|
|     2023-10-22|         Domingo|UCJ_PUBiy-NwJ-woO...|        silva tweaks|           1|
|     2023-10-15|         Domingo|UCvEj9EurcC8q5hkV...|                Sr L|           1|
|     2023-10-22|         Domingo|UCfY4ggUDPeecGSCN...|           MetalBear|           1|
|     2023-10-22|         Domingo|UCDGo_s0I-2JpO3o3...|          Simularies|           1|
|     2023-10-22|         Domingo|UCrUcdH_bCfX77xpX...|       Canal do Void|           1|
|     20

                                                                                

In [19]:
lista_assunto = [
        'Power BI',
        'Python AND dados',
        'Cities Skylines',
        'Cities Skylines 2'
    ]

<p>ETL Para obter o total de v√≠deos p√∫blicado por semaba</p>

In [22]:
for assunto in lista_assunto:
    print('ETL PARA ', assunto)
    assunto = assunto.replace(' ', '_').lower()
    df_estatisticas_etl = carregar_dataframe_prata(
        spark_session=spark, 
        assunto=f'assunto_{assunto}', 
        metrica='estatisticas'
        )
    df_estatisticas_etl = transformacao_dataframe(df_estatisticas_etl)
    df_estatisticas_etl = obter_total_publicacao_video(df_estatisticas_etl)
    criar_particao_ouro(
        dataframe=df_estatisticas_etl, 
        assunto_pesquisa=f'assunto_{assunto}', 
        metrica='total_video_publicado_semana',
        nome_arquivo='total_video_publicado_semana.parquet',
        colunas_particao=('DATA_PUBLICACAO', 'ID_CANAL')
    )
    
    

ETL PARA  Power BI


Py4JJavaError: An error occurred while calling o1191.parquet.
: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2688)
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3431)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
	at org.apache.spark.sql.execution.datasources.DataSource.planForWritingFileFormat(DataSource.scala:461)
	at org.apache.spark.sql.execution.datasources.DataSource.planForWriting(DataSource.scala:558)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:793)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
	at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2592)
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2686)
	... 25 more


In [23]:
spark.stop()

<p>Dia da semana com mais visualiza√ß√£o</p>

In [12]:
@f.udf(returnType=t.StringType())
def semana(indice: int) -> str:
    semana = {
        1: 'Domingo',
        2: 'Segunda-feira',
        3: 'Ter√ßa-feira',
        4: 'Quarta-feira',
        5: 'Quinta-feira',
        6: 'Sexta-feira',
        7: 'S√°bado'
    }
    return semana.get(indice, indice)
    

<p>V√≠sualiza√ß√µes por semana</p>

In [13]:
def obter_total_visualizacoes_semana(
        dataframe: DataFrame, 
        colunas_agrupamento: Tuple, 
        lista_campos_agupamento: List[str],
  
        ) -> DataFrame:
    window_coluna = Window.partitionBy('ID_VIDEO').orderBy('data_extracao')

    op_agrupamento = [ 
        f.max(metrica).alias(metrica) 
        for metrica in lista_campos_agupamento
        ]
    df_estatisticas_etl = dataframe \
        .groupBy(*colunas_agrupamento) \
        .agg(
            *op_agrupamento
        )
    
    for atributo_agrupamento in lista_campos_agupamento:
        df_estatisticas_etl = df_estatisticas_etl.withColumn(
            f'VARIACAO_{atributo_agrupamento}', 
            f.coalesce(
                f.lag(
                    f.col(atributo_agrupamento)).over(window_coluna), f.col(atributo_agrupamento)
            ) 
        ).withColumn(
            f'{atributo_agrupamento}_DIA',
            f.when(
                f.col(atributo_agrupamento) - f.col(f'VARIACAO_{atributo_agrupamento}') > 0, 
                f.col(atributo_agrupamento) - f.col(f'VARIACAO_{atributo_agrupamento}')
            ).otherwise(f.col(atributo_agrupamento)).cast('int')
        ).drop(f'VARIACAO_{atributo_agrupamento}')

    
    
    return df_estatisticas_etl

In [14]:
def obter_total_visualizacoes_semana_turno(
        dataframe: DataFrame, 
        colunas_agrupamento: Tuple, 
        lista_campos_agupamento: List[str],
  
        ) -> DataFrame:
    op_agrupamento = [ 
        f.max(metrica).alias(metrica) 
        for metrica in lista_campos_agupamento
        ]
    df_estatisticas_etl = dataframe \
        .groupBy(*colunas_agrupamento) \
        .agg(
            *op_agrupamento
        )
    window_coluna = Window.partitionBy('ID_VIDEO').orderBy(f.col('data_extracao'), f.col('INDICE_TURNO_EXTRACAO'))
    for atributo_agrupamento in lista_campos_agupamento:
        df_estatisticas_etl = df_estatisticas_etl.withColumn(
            f'VARIACAO_{atributo_agrupamento}', 
            f.coalesce(
                f.lag(
                    f.col(atributo_agrupamento)).over(window_coluna), f.col(atributo_agrupamento)
            ) 
        ).withColumn(
            f'{atributo_agrupamento}_TURNO',
            f.when(
                f.col(atributo_agrupamento) - f.col(f'VARIACAO_{atributo_agrupamento}') > 0, 
                f.col(atributo_agrupamento) - f.col(f'VARIACAO_{atributo_agrupamento}')
            ).otherwise(f.col(atributo_agrupamento)).cast('int')
        ).drop(f'VARIACAO_{atributo_agrupamento}')
      

    
    
    return df_estatisticas_etl

In [17]:
df_estatisticas_teste.show()

+-------------+--------------------+--------------------+--------------------+------------+-----------+--------------------+--------------------+--------------------+--------------+-------------------+-----------+---------------+-----------------+-------------------+---------------------+---------------------+----------+-------------+--------------+---------------------+
|data_extracao|     DATA_PUBLICACAO|            ID_CANAL|            NM_CANAL|ID_CATEGORIA|   ID_VIDEO|        TITULO_VIDEO|           DESCRICAO|                TAGS|DURACAO_VIDEOS|TOTAL_VISUALIZACOES|TOTAL_LIKES|TOTAL_FAVORITOS|TOTAL_COMENTARIOS| data_hora_extracao|DURACAO_VIDEO_MINUTOS|TOTAL_CARACTERE_VIDEO|TOTAL_TAGS|HORA_EXTRACAO|TURNO_EXTRACAO|INDICE_TURNO_EXTRACAO|
+-------------+--------------------+--------------------+--------------------+------------+-----------+--------------------+--------------------+--------------------+--------------+-------------------+-----------+---------------+-----------------+-----

In [18]:
df_estatisticas_teste.printSchema()

root
 |-- data_extracao: string (nullable = true)
 |-- DATA_PUBLICACAO: string (nullable = true)
 |-- ID_CANAL: string (nullable = true)
 |-- NM_CANAL: string (nullable = true)
 |-- ID_CATEGORIA: string (nullable = true)
 |-- ID_VIDEO: string (nullable = true)
 |-- TITULO_VIDEO: string (nullable = true)
 |-- DESCRICAO: string (nullable = true)
 |-- TAGS: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- DURACAO_VIDEOS: string (nullable = true)
 |-- TOTAL_VISUALIZACOES: integer (nullable = true)
 |-- TOTAL_LIKES: integer (nullable = true)
 |-- TOTAL_FAVORITOS: integer (nullable = true)
 |-- TOTAL_COMENTARIOS: integer (nullable = true)
 |-- data_hora_extracao: string (nullable = true)
 |-- DURACAO_VIDEO_MINUTOS: float (nullable = true)
 |-- TOTAL_CARACTERE_VIDEO: integer (nullable = true)
 |-- TOTAL_TAGS: integer (nullable = false)
 |-- HORA_EXTRACAO: integer (nullable = true)
 |-- TURNO_EXTRACAO: string (nullable = false)
 |-- INDICE_TURNO_EXTRACAO: integer (n

In [19]:
df_estatisticas_teste.select('data_extracao',  'TURNO_EXTRACAO', 'INDICE_TURNO_EXTRACAO', 'TOTAL_VISUALIZACOES') \
.filter(
     df_estatisticas_teste.ID_VIDEO == 'XOUzWJ0bHuw'
).sort('data_extracao', f.col('INDICE_TURNO_EXTRACAO'), f.col('TOTAL_VISUALIZACOES').desc()).show(40)



+-------------+--------------+---------------------+-------------------+
|data_extracao|TURNO_EXTRACAO|INDICE_TURNO_EXTRACAO|TOTAL_VISUALIZACOES|
+-------------+--------------+---------------------+-------------------+
|   2023-10-15|         Tarde|                    2|               2954|
|   2023-10-15|         Tarde|                    2|               2723|
|   2023-10-15|         Tarde|                    2|               2439|
|   2023-10-15|         Tarde|                    2|               2242|
|   2023-10-15|         Tarde|                    2|               2025|
|   2023-10-15|         Tarde|                    2|               1777|
|   2023-10-15|         Tarde|                    2|               1740|
|   2023-10-15|         Tarde|                    2|               1676|
|   2023-10-15|         Noite|                    3|               4562|
|   2023-10-15|         Noite|                    3|               4251|
|   2023-10-15|         Noite|                    3

                                                                                

In [20]:

obter_total_visualizacoes_semana_turno(
    dataframe=df_estatisticas_teste,
    colunas_agrupamento=('data_extracao', 'NM_CANAL', 'TITULO_VIDEO', 'ID_VIDEO', 'TURNO_EXTRACAO','INDICE_TURNO_EXTRACAO' ),
    lista_campos_agupamento=['TOTAL_VISUALIZACOES' ,'TOTAL_COMENTARIOS', 'TOTAL_LIKES'],

).filter(
    df_estatisticas_teste.ID_VIDEO == 'XOUzWJ0bHuw'
).sort('data_extracao','INDICE_TURNO_EXTRACAO', f.col('TOTAL_VISUALIZACOES').desc()).select('data_extracao',  'TURNO_EXTRACAO', 'TOTAL_VISUALIZACOES', 'TOTAL_VISUALIZACOES_TURNO', 'TOTAL_LIKES', 'TOTAL_LIKES_TURNO').show(60)

                                                                                

+-------------+--------------+-------------------+-------------------------+-----------+-----------------+
|data_extracao|TURNO_EXTRACAO|TOTAL_VISUALIZACOES|TOTAL_VISUALIZACOES_TURNO|TOTAL_LIKES|TOTAL_LIKES_TURNO|
+-------------+--------------+-------------------+-------------------------+-----------+-----------------+
|   2023-10-15|         Tarde|               2954|                     2954|        531|              531|
|   2023-10-15|         Noite|               4562|                     1608|        687|              156|
|   2023-10-16|         Manh√£|               6469|                     1907|        826|              139|
|   2023-10-16|         Tarde|               7130|                      661|        892|               66|
|   2023-10-16|         Noite|               7399|                      269|        923|               31|
|   2023-10-19|         Manh√£|               8872|                     1473|       1029|              106|
|   2023-10-19|         Tarde|     

In [191]:

obter_total_visualizacoes_semana(
    dataframe=df_estatisticas_teste,
    colunas_agrupamento=('data_extracao', 'NM_CANAL', 'TITULO_VIDEO', 'ID_VIDEO'),
    lista_campos_agupamento=['TOTAL_VISUALIZACOES' ,'TOTAL_COMENTARIOS', 'TOTAL_LIKES' ],

).filter(
    df_estatisticas_teste.ID_VIDEO == 'XOUzWJ0bHuw'
).sort('data_extracao').select('data_extracao', 'TOTAL_VISUALIZACOES', 'TOTAL_VISUALIZACOES_DIA', 'TOTAL_LIKES', 'TOTAL_LIKES_DIA').show()

+-------------+-------------------+-----------------------+-----------+---------------+
|data_extracao|TOTAL_VISUALIZACOES|TOTAL_VISUALIZACOES_DIA|TOTAL_LIKES|TOTAL_LIKES_DIA|
+-------------+-------------------+-----------------------+-----------+---------------+
|   2023-10-15|               4562|                   4562|        687|            687|
|   2023-10-16|               7399|                   2837|        923|            236|
|   2023-10-19|               8949|                   1550|       1036|            113|
|   2023-10-20|               9112|                    163|       1049|             13|
|   2023-10-21|               9263|                    151|       1060|             11|
|   2023-10-22|               9351|                     88|       1063|              3|
|   2023-10-23|               9404|                     53|       1070|              7|
|   2023-10-24|               9451|                     47|       1072|              2|
|   2023-10-25|               94

In [80]:
df_estatisticas_teste.show(3)

+-------------+--------------------+--------------------+---------------+------------+-----------+--------------------+--------------------+--------------------+--------------+-------------------+-----------+---------------+-----------------+-------------------+---------------------+---------------------+----------+
|data_extracao|     DATA_PUBLICACAO|            ID_CANAL|       NM_CANAL|ID_CATEGORIA|   ID_VIDEO|        TITULO_VIDEO|           DESCRICAO|                TAGS|DURACAO_VIDEOS|TOTAL_VISUALIZACOES|TOTAL_LIKES|TOTAL_FAVORITOS|TOTAL_COMENTARIOS| data_hora_extracao|DURACAO_VIDEO_MINUTOS|TOTAL_CARACTERE_VIDEO|TOTAL_TAGS|
+-------------+--------------------+--------------------+---------------+------------+-----------+--------------------+--------------------+--------------------+--------------+-------------------+-----------+---------------+-----------------+-------------------+---------------------+---------------------+----------+
|   2023-10-27|2023-10-24T00:50:04Z|UCNv3_KHnW

<p>ETL Para visializa√ß√µes por semana<P>

In [24]:
for assunto in lista_assunto:
    print('ETL PARA ', assunto)
    assunto = assunto.replace(' ', '_').lower()
    df_estatisticas_etl = carregar_dataframe_prata(
        spark_session=spark, 
        assunto=f'assunto_{assunto}', 
        metrica='estatisticas'
        )
    
    df_estatisticas_etl = transformacao_dataframe(df_estatisticas_etl)

    # df_estatisticas_etl_total_visualizacoes = obter_total_visualizacoes_semana(
    #     dataframe=df_estatisticas_etl,
    #     colunas_agrupamento=('data_extracao', 'ID_CANAL',  'NM_CANAL','ID_VIDEO',  'TITULO_VIDEO' , 'TOTAL_CARACTERE_VIDEO', 'TAGS', 'DURACAO_VIDEO_MINUTOS', 'TOTAL_TAGS'),
    #     lista_campos_agupamento=['TOTAL_VISUALIZACOES' ,'TOTAL_COMENTARIOS', 'TOTAL_LIKES'],
    # )


    df_estatisticas_etl_total_visualizacoes = obter_total_visualizacoes_semana_turno(
        dataframe=df_estatisticas_etl,
        colunas_agrupamento=('data_extracao', 'ID_CANAL', 'INDICE_TURNO_EXTRACAO', 'NM_CANAL','ID_VIDEO',  'TITULO_VIDEO' , 'TOTAL_CARACTERE_VIDEO', 'TAGS', 'DURACAO_VIDEO_MINUTOS', 'TOTAL_TAGS', 'TURNO_EXTRACAO'),
        lista_campos_agupamento=['TOTAL_VISUALIZACOES' ,'TOTAL_COMENTARIOS', 'TOTAL_LIKES'],
    )

    criar_particao_ouro(
        dataframe=df_estatisticas_etl_total_visualizacoes, 
        assunto_pesquisa=f'assunto_{assunto}', 
        metrica='total_visualizacoes_por_semana',
        colunas_particao=('data_extracao', 'TURNO_EXTRACAO', 'ID_CANAL'),
        nome_arquivo='total_visualizacoes_por_semana.parquet',
    )

   

    

ETL PARA  Power BI


                                                                                

ETL PARA  Python AND dados


                                                                                

ETL PARA  Cities Skylines


                                                                                

ETL PARA  Cities Skylines 2


                                                                                

<p>Coment√°rios</p>

In [25]:
df_comentarios = carregar_dataframe_prata(
        spark_session=spark, 
        assunto=f'assunto_power_bi', 
        metrica='comentarios'
)

In [26]:
def extrair_comentarios(dataframe: DataFrame) -> DataFrame:
    dataframe = dataframe \
        .select('ID_CANAL', 'ID_COMENTARIO', 'ID_VIDEO', 'TEXTO_COMENTARIO')  \
        .distinct().sort('ID_VIDEO') 
    return dataframe
# Ugw-AOQ6Kh-Qof2U9bd4AaABAg - Gl60rzPP9VQ

In [27]:
extrair_comentarios(df_comentarios).show(truncate=False)



+------------------------+--------------------------+-----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|ID_CANAL                |ID_COMENTARIO             |ID_VIDEO   |TEXTO_COMENTARIO                                                                                                                                                                                                   |
+------------------------+--------------------------+-----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|UCg6yqeJJ2WMFVt9bXeUd-zA|UgwcmMNJM8lHN6YKgF94AaABAg|-Jg3j6EH7cg|Um tecn√≥logo com conhecimentos t√©cnicos e profici√™ncia em idiomas √© suficiente, desde que tamb√©m

                                                                                

<p>Extacao coment√°rio</p>

In [28]:
for assunto in lista_assunto:
    print('ETL PARA ', assunto)
    assunto = assunto.replace(' ', '_').lower()
    df_comentarios = carregar_dataframe_prata(
        spark_session=spark, 
        assunto=f'assunto_{assunto}', 
        metrica='comentarios'
    )
    df_comentarios = extrair_comentarios(df_comentarios)
    criar_particao_ouro(
        dataframe=df_comentarios, 
        assunto_pesquisa=f'assunto_{assunto}', 
        metrica='comentarios',
        colunas_particao=('ID_CANAL', 'ID_VIDEO'),
        nome_arquivo='comentarios.parquet',
    )

ETL PARA  Power BI


                                                                                

ETL PARA  Python AND dados


                                                                                

ETL PARA  Cities Skylines


                                                                                

ETL PARA  Cities Skylines 2


                                                                                

In [29]:
for assunto in lista_assunto:
    print('ETL PARA ', assunto)
    assunto = assunto.replace(' ', '_').lower()

    df_resposta_comentarios = carregar_dataframe_prata(
        assunto=f'assunto_{assunto}',
        spark_session=spark,
        metrica='resposta_comentarios'
    )   


    df_resposta_comentarios = df_resposta_comentarios.select(
        'ID_CANAL', 
        'ID_RESPOSTA_COMENTARIOS', 
        'TEXTO').withColumnRenamed(
            'ID_CANAL', 'ID_CANAL'
        ).distinct()
    criar_particao_ouro(
        dataframe=df_resposta_comentarios, 
        assunto_pesquisa=f'assunto_{assunto}', 
        metrica='reposta_comentarios',
        colunas_particao=('ID_CANAL', 'ID_RESPOSTA_COMENTARIOS'),
        nome_arquivo='reposta_comentarios.parquet',
    )

ETL PARA  Power BI


                                                                                

ETL PARA  Python AND dados


                                                                                

ETL PARA  Cities Skylines


                                                                                

ETL PARA  Cities Skylines 2


                                                                                

In [30]:
df_resposta_comentarios.select('ID_CANAL', 'ID_RESPOSTA_COMENTARIOS', 'TEXTO').distinct().show(truncate=False)

+------------------------+-------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|ID_CANAL                |ID_RESPOSTA_COMENTARIOS                          |TEXTO                                                                                                                                                                                                                                                                                                                         |
+------------------------+-------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------

<p>Trends do Youtube</p>

In [23]:
df_trends = carregar_dataframe_prata(
    spark_session=spark,
    assunto='top_brazil',
)

df_trends.show()

+-------------------+------------+--------------------+------------------+-----------+----------------------+--------+-------------------------+-----------------+---------------+-----------+-------------------+
|      data_extracao|ID_CATEGORIA|            ID_CANAL|          NM_CANAL|   ID_VIDEO|          TITULO_VIDEO| DURACAO|                DESCRICAO|TOTAL_COMENTARIOS|TOTAL_FAVORITOS|TOTAL_LIKES|TOTAL_VISUALIZACOES|
+-------------------+------------+--------------------+------------------+-----------+----------------------+--------+-------------------------+-----------------+---------------+-----------+-------------------+
|2023-10-23 08:32:43|          20|UCHuZmQ0lSW8TG81d...|        LipaoGamer|qvgmMih8FOc|  SPIDER MAN 2 PS5 ...|PT26M59S|     V√≠deos todos os d...|              295|              0|      17428|             500862|
|2023-10-23 08:32:43|          17|UCv-Nx8pSfG_LxbVi...|Jovem Pan Esportes|Rq1RqHl2AO0|  "CAD√ä O 'PLANO' N...|PT21M28S|     CRISE NO PALMEIRA...|          

In [24]:
df_trends = df_trends.withColumn('data_hora_extracao', df_trends['data_extracao'])
df_trends = df_trends.withColumn("data_extracao", f.date_format(f.unix_timestamp("data_extracao", "yyyy-MM-dd HH:mm:ss").cast("timestamp"), "yyyy-MM-dd"))
df_trends = df_trends.withColumn('DURACAO', converter_minutos(f.col('DURACAO')))
df_trends = df_trends.withColumn('TOTAL_VISUALIZACOES', f.col('TOTAL_VISUALIZACOES').cast('int'))


In [25]:
df_trends.show()

[Stage 14:>                                                         (0 + 1) / 1]

+-------------+------------+--------------------+------------------+-----------+----------------------+-------+-------------------------+-----------------+---------------+-----------+-------------------+-------------------+
|data_extracao|ID_CATEGORIA|            ID_CANAL|          NM_CANAL|   ID_VIDEO|          TITULO_VIDEO|DURACAO|                DESCRICAO|TOTAL_COMENTARIOS|TOTAL_FAVORITOS|TOTAL_LIKES|TOTAL_VISUALIZACOES| data_hora_extracao|
+-------------+------------+--------------------+------------------+-----------+----------------------+-------+-------------------------+-----------------+---------------+-----------+-------------------+-------------------+
|   2023-10-23|          20|UCHuZmQ0lSW8TG81d...|        LipaoGamer|qvgmMih8FOc|  SPIDER MAN 2 PS5 ...|  26.98|     V√≠deos todos os d...|              295|              0|      17428|             500862|2023-10-23 08:32:43|
|   2023-10-23|          17|UCv-Nx8pSfG_LxbVi...|Jovem Pan Esportes|Rq1RqHl2AO0|  "CAD√ä O 'PLANO' N...

                                                                                

In [26]:
df_trends = df_trends.withColumn('HORA_EXTRACAO', f.hour("data_hora_extracao"))
df_trends = df_trends.withColumn(
        'TURNO_EXTRACAO',
        (
            f.when(
                (df_trends.HORA_EXTRACAO >=6) & (df_trends.HORA_EXTRACAO < 12), 'Manh√£')
            .when((df_trends.HORA_EXTRACAO >= 12) & (df_trends.HORA_EXTRACAO < 18), 'Tarde')
            .otherwise('Noite')
        )
    )
df_trends = df_trends.withColumn(
        'INDICE_TURNO_EXTRACAO',
        (
            f.when(
                (df_trends.TURNO_EXTRACAO == 'Manh√£'), 1)
            .when((df_trends.TURNO_EXTRACAO == 'Tarde'), 2)
            .otherwise(3)
        )
    )

df_trends.show()

+-------------+------------+--------------------+------------------+-----------+----------------------+-------+-------------------------+-----------------+---------------+-----------+-------------------+-------------------+-------------+--------------+---------------------+
|data_extracao|ID_CATEGORIA|            ID_CANAL|          NM_CANAL|   ID_VIDEO|          TITULO_VIDEO|DURACAO|                DESCRICAO|TOTAL_COMENTARIOS|TOTAL_FAVORITOS|TOTAL_LIKES|TOTAL_VISUALIZACOES| data_hora_extracao|HORA_EXTRACAO|TURNO_EXTRACAO|INDICE_TURNO_EXTRACAO|
+-------------+------------+--------------------+------------------+-----------+----------------------+-------+-------------------------+-----------------+---------------+-----------+-------------------+-------------------+-------------+--------------+---------------------+
|   2023-10-23|          20|UCHuZmQ0lSW8TG81d...|        LipaoGamer|qvgmMih8FOc|  SPIDER MAN 2 PS5 ...|  26.98|     V√≠deos todos os d...|              295|              0|   

                                                                                

In [27]:
df_trends.columns

['data_extracao',
 'ID_CATEGORIA',
 'ID_CANAL',
 'NM_CANAL',
 'ID_VIDEO',
 'TITULO_VIDEO',
 'DURACAO',
 'DESCRICAO',
 'TOTAL_COMENTARIOS',
 'TOTAL_FAVORITOS',
 'TOTAL_LIKES',
 'TOTAL_VISUALIZACOES',
 'data_hora_extracao',
 'HORA_EXTRACAO',
 'TURNO_EXTRACAO',
 'INDICE_TURNO_EXTRACAO']

In [28]:
df_trends = obter_total_visualizacoes_semana_turno(
    dataframe=df_trends,
    colunas_agrupamento=('data_extracao', 'ID_CATEGORIA','ID_CANAL', 'NM_CANAL',  'ID_VIDEO', 'TITULO_VIDEO', 'INDICE_TURNO_EXTRACAO', 'TURNO_EXTRACAO'),
    lista_campos_agupamento=['TOTAL_VISUALIZACOES', 'TOTAL_FAVORITOS', 'TOTAL_COMENTARIOS', 'TOTAL_LIKES'],
)
df_trends.show()

[Stage 18:>                                                         (0 + 1) / 1]

+-------------+------------+--------------------+------------+-----------+--------------------+---------------------+--------------+-------------------+---------------+-----------------+-----------+-------------------------+---------------------+-----------------------+-----------------+
|data_extracao|ID_CATEGORIA|            ID_CANAL|    NM_CANAL|   ID_VIDEO|        TITULO_VIDEO|INDICE_TURNO_EXTRACAO|TURNO_EXTRACAO|TOTAL_VISUALIZACOES|TOTAL_FAVORITOS|TOTAL_COMENTARIOS|TOTAL_LIKES|TOTAL_VISUALIZACOES_TURNO|TOTAL_FAVORITOS_TURNO|TOTAL_COMENTARIOS_TURNO|TOTAL_LIKES_TURNO|
+-------------+------------+--------------------+------------+-----------+--------------------+---------------------+--------------+-------------------+---------------+-----------------+-----------+-------------------------+---------------------+-----------------------+-----------------+
|   2023-10-15|          20|UCOPi8iWwV6BkL-bo...|     HeyDavi|-8A4gKs9CxE|ENTREI NO C√âREBRO...|                    2|         Tarde|

                                                                                

In [35]:
df_total_vis_sem_trend = obter_total_visualizacoes_semana(
    dataframe=df_trends,
    colunas_agrupamento=('data_extracao', 'ID_CATEGORIA','ID_CANAL', 'NM_CANAL',  'ID_VIDEO', 'TITULO_VIDEO'),
    lista_campos_agupamento=['TOTAL_VISUALIZACOES', 'TOTAL_FAVORITOS', 'TOTAL_COMENTARIOS', 'TOTAL_LIKES'],
    )

df_total_vis_sem_trend.show()

+-------------+------------+--------------------+---------------+-----------+--------------------+-------------------+---------------+-----------------+-----------+-----------------------+-------------------+---------------------+---------------+
|data_extracao|ID_CATEGORIA|            ID_CANAL|       NM_CANAL|   ID_VIDEO|        TITULO_VIDEO|TOTAL_VISUALIZACOES|TOTAL_FAVORITOS|TOTAL_COMENTARIOS|TOTAL_LIKES|TOTAL_VISUALIZACOES_DIA|TOTAL_FAVORITOS_DIA|TOTAL_COMENTARIOS_DIA|TOTAL_LIKES_DIA|
+-------------+------------+--------------------+---------------+-----------+--------------------+-------------------+---------------+-----------------+-----------+-----------------------+-------------------+---------------------+---------------+
|   2023-10-15|          20|UCOPi8iWwV6BkL-bo...|        HeyDavi|-8A4gKs9CxE|ENTREI NO C√âREBRO...|             133464|              0|              168|       5392|                 133464|                  0|                  168|           5392|
|   2023-10

In [29]:
df_trends.columns

['data_extracao',
 'ID_CATEGORIA',
 'ID_CANAL',
 'NM_CANAL',
 'ID_VIDEO',
 'TITULO_VIDEO',
 'INDICE_TURNO_EXTRACAO',
 'TURNO_EXTRACAO',
 'TOTAL_VISUALIZACOES',
 'TOTAL_FAVORITOS',
 'TOTAL_COMENTARIOS',
 'TOTAL_LIKES',
 'TOTAL_VISUALIZACOES_TURNO',
 'TOTAL_FAVORITOS_TURNO',
 'TOTAL_COMENTARIOS_TURNO',
 'TOTAL_LIKES_TURNO']

In [31]:
df_trends = df_trends.select(
    'data_extracao',
    'NM_CANAL',
    'TITULO_VIDEO',
    'INDICE_TURNO_EXTRACAO',
    'TOTAL_VISUALIZACOES',
    'TOTAL_FAVORITOS',
    'TOTAL_COMENTARIOS',
    'TOTAL_LIKES',
    'TOTAL_VISUALIZACOES_TURNO',
    'TOTAL_FAVORITOS_TURNO',
    'TOTAL_COMENTARIOS_TURNO',
    'TOTAL_LIKES_TURNO',
    'ID_CATEGORIA',
    'ID_CANAL',
    'ID_VIDEO',
    'TURNO_EXTRACAO',
)

In [33]:
criar_particao_ouro(
    dataframe=df_trends,
    assunto_pesquisa='trends_brazil',
    colunas_particao=('data_extracao', 'ID_CATEGORIA', 'ID_CANAL', 'ID_VIDEO','TURNO_EXTRACAO'),
    metrica='trends_brazil',
    nome_arquivo='trends_brazil.parquet'
)

                                                                                

In [37]:
df_trends \
    .groupBy('data_extracao', 'ID_CANAL') \
    .agg(
        f.max('data_hora_extracao'),
        f.max('TOTAL_VISUALIZACOES')
    ).show()

+-------------+--------------------+-----------------------+------------------------+
|data_extracao|            ID_CANAL|max(data_hora_extracao)|max(TOTAL_VISUALIZACOES)|
+-------------+--------------------+-----------------------+------------------------+
|   2023-10-15|UC-2Y8dQb0S6DtpxN...|    2023-10-15 22:01:13|                 1481250|
|   2023-10-15|UC-4TwmWQShd0R6YZ...|    2023-10-15 22:01:13|                  708858|
|   2023-10-15|UC-JrEOZQwdI5nvlo...|    2023-10-15 22:01:13|                 1729415|
|   2023-10-15|UC-bOPhs_XA5bETT3...|    2023-10-15 22:01:13|                 1756861|
|   2023-10-15|UC0bC92gFO25qcJcE...|    2023-10-15 22:01:13|                  195887|
|   2023-10-15|UC0uRT_armQXqds_r...|    2023-10-15 22:01:13|                  214528|
|   2023-10-15|UC12HMtO5MYph9dCZ...|    2023-10-15 16:30:58|                 1273114|
|   2023-10-15|UC1H-sZk-cj-tDpfT...|    2023-10-15 22:01:12|                  165818|
|   2023-10-15|UC2EWGw-KBjEReUbX...|    2023-10-15 22:

In [35]:
spark.stop()