In [1]:
from pyspark.sql import SparkSession, DataFrame
import pyspark.sql.functions as f
from pyspark.sql.window import Window
from isodate import parse_duration
import pyspark.sql.types as t
from typing import Tuple, List
import os

In [2]:
@f.udf(returnType=t.FloatType())
def converter_minutos(tempo: str) -> float:
    """Converte para o total de seguntos

    Args:
        tempo (str): tempo

    Returns:
        int: total segundos
    """
    duracao = parse_duration(tempo)
    total_segundos = duracao.total_seconds() 
    total_minutos = total_segundos / 60
   
    return round(total_minutos, 2)


In [3]:
spark = SparkSession.builder.appName('tratamento_camada_ouro').getOrCreate()
spark

24/02/02 21:29:18 WARN Utils: Your hostname, rodrigo-nitro-AN515-54 resolves to a loopback address: 127.0.1.1; using 192.168.0.103 instead (on interface wlp8s0)
24/02/02 21:29:18 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/02/02 21:29:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [17]:
def criar_particao_ouro(dataframe: DataFrame, nome_arquivo: str ):
    caminho_datalake_ouro = '../dados/ouro'
    dataframe.write. \
                parquet(os.path.join(caminho_datalake_ouro, nome_arquivo),  )

In [23]:
def transformacao_dataframe_trends(dataframe_transform: DataFrame):
    dataframe_transform = dataframe_transform.select(
        'data_extracao',
        'DATA_PUBLICACAO',
        'ID_CANAL',
        'NM_CANAL',
        'ID_CATEGORIA',
        'ID_VIDEO',
        'TITULO_VIDEO',
        'DESCRICAO',
        'TAGS',
        'DURACAO_VIDEOS',
        'TOTAL_VISUALIZACOES',
        'TOTAL_LIKES',
        'TOTAL_FAVORITOS',
        'TOTAL_COMENTARIOS',
        
     )
    dataframe_transform  = dataframe_transform.na.drop('all')
    dataframe_transform = dataframe_transform.withColumn("data_hora_extracao", f.col('data_extracao'))
    dataframe_transform = dataframe_transform.withColumn("data_extracao", f.date_format(f.unix_timestamp("data_extracao", "yyyy-MM-dd HH:mm:ss").cast("timestamp"), "yyyy-MM-dd" ))
    dataframe_transform = dataframe_transform.withColumn('DURACAO_VIDEO_MINUTOS', converter_minutos(f.col('DURACAO_VIDEOS')))
    dataframe_transform = dataframe_transform.withColumn('TOTAL_VISUALIZACOES', f.col('TOTAL_VISUALIZACOES').cast('int'))
    dataframe_transform = dataframe_transform.withColumn('TOTAL_LIKES', f.col('TOTAL_LIKES').cast('int'))
    dataframe_transform = dataframe_transform.withColumn('TOTAL_FAVORITOS', f.col('TOTAL_FAVORITOS').cast('int'))
    dataframe_transform = dataframe_transform.withColumn('TOTAL_COMENTARIOS', f.col('TOTAL_COMENTARIOS').cast('int'))
    dataframe_transform = dataframe_transform.withColumn('TOTAL_CARACTERE_VIDEO', f.length('TITULO_VIDEO'))
    dataframe_transform = dataframe_transform.withColumn('TOTAL_TAGS', f.when(f.size('TAGS') > 0, f.size('TAGS')).otherwise(0))
    dataframe_transform = dataframe_transform.withColumn('HORA_EXTRACAO', f.hour("data_hora_extracao"))
    dataframe_transform = dataframe_transform.withColumn(
        'TURNO_EXTRACAO',
        (
            f.when(
                (dataframe_transform.HORA_EXTRACAO >=6) & (dataframe_transform.HORA_EXTRACAO < 12), 'Manhã')
            .when((dataframe_transform.HORA_EXTRACAO >= 12) & (dataframe_transform.HORA_EXTRACAO < 18), 'Tarde')
            .otherwise('Noite')
        )
    )
    dataframe_transform = dataframe_transform.withColumn(
        'INDICE_TURNO_EXTRACAO',
        (
            f.when(
                (dataframe_transform.TURNO_EXTRACAO == 'Manhã'), 1)
            .when((dataframe_transform.TURNO_EXTRACAO == 'Tarde'), 2)
            .otherwise(3)
        )
    )
    return dataframe_transform
  

In [5]:
def transformacao_dataframe(dataframe_transform: DataFrame):
    dataframe_transform = dataframe_transform.select(
        'ASSUNTO',
        'data_extracao',
        'DATA_PUBLICACAO',
        'ID_CANAL',
        'NM_CANAL',
        'ID_CATEGORIA',
        'ID_VIDEO',
        'TITULO_VIDEO',
        'DESCRICAO',
        'TAGS',
        'DURACAO_VIDEOS',
        'TOTAL_VISUALIZACOES',
        'TOTAL_LIKES',
        'TOTAL_FAVORITOS',
        'TOTAL_COMENTARIOS',
        
     )
    dataframe_transform  = dataframe_transform.na.drop('all')
    dataframe_transform = dataframe_transform.withColumn("data_hora_extracao", f.col('data_extracao'))
    dataframe_transform = dataframe_transform.withColumn("data_extracao", f.date_format(f.unix_timestamp("data_extracao", "yyyy-MM-dd HH:mm:ss").cast("timestamp"), "yyyy-MM-dd" ))
    dataframe_transform = dataframe_transform.withColumn('DURACAO_VIDEO_MINUTOS', converter_minutos(f.col('DURACAO_VIDEOS')))
    dataframe_transform = dataframe_transform.withColumn('TOTAL_VISUALIZACOES', f.col('TOTAL_VISUALIZACOES').cast('int'))
    dataframe_transform = dataframe_transform.withColumn('TOTAL_LIKES', f.col('TOTAL_LIKES').cast('int'))
    dataframe_transform = dataframe_transform.withColumn('TOTAL_FAVORITOS', f.col('TOTAL_FAVORITOS').cast('int'))
    dataframe_transform = dataframe_transform.withColumn('TOTAL_COMENTARIOS', f.col('TOTAL_COMENTARIOS').cast('int'))
    dataframe_transform = dataframe_transform.withColumn('TOTAL_CARACTERE_VIDEO', f.length('TITULO_VIDEO'))
    dataframe_transform = dataframe_transform.withColumn('TOTAL_TAGS', f.when(f.size('TAGS') > 0, f.size('TAGS')).otherwise(0))
    dataframe_transform = dataframe_transform.withColumn('HORA_EXTRACAO', f.hour("data_hora_extracao"))
    dataframe_transform = dataframe_transform.withColumn(
        'TURNO_EXTRACAO',
        (
            f.when(
                (dataframe_transform.HORA_EXTRACAO >=6) & (dataframe_transform.HORA_EXTRACAO < 12), 'Manhã')
            .when((dataframe_transform.HORA_EXTRACAO >= 12) & (dataframe_transform.HORA_EXTRACAO < 18), 'Tarde')
            .otherwise('Noite')
        )
    )
    dataframe_transform = dataframe_transform.withColumn(
        'INDICE_TURNO_EXTRACAO',
        (
            f.when(
                (dataframe_transform.TURNO_EXTRACAO == 'Manhã'), 1)
            .when((dataframe_transform.TURNO_EXTRACAO == 'Tarde'), 2)
            .otherwise(3)
        )
    )
    return dataframe_transform
  

In [6]:
@f.udf(returnType=t.StringType())
def traduzir_dia_da_semana(dia):
    traducao = {
        'Sunday': 'Domingo',
        'Monday': 'Segunda-feira',
        'Tuesday': 'Terça-feira',
        'Wednesday': 'Quarta-feira',
        'Thursday': 'Quinta-feira',
        'Friday': 'Sexta-feira',
        'Saturday': 'Sábado'
    }
    return traducao.get(dia, dia)

In [7]:
@f.udf(returnType=t.IntegerType())
def indice_semana(dia):
    traducao = {
        'Domingo': 1,
        'Segunda-feira': 2,
        'Terça-feira': 3,
        'Quarta-feira': 4,
        'Quinta-feira': 5,
        'Sexta-feira': 6,
        'Sábado': 7
    }
    return traducao.get(dia, dia)

In [8]:
df_estatisticas_gerais = spark.read.parquet('../dados/prata/arquivos_assuntos.parquet')
df_estatisticas_gerais.columns

                                                                                

['data_extracao',
 'ID_VIDEO',
 'DATA_PUBLICACAO',
 'ID_CANAL',
 'NM_CANAL',
 'ID_CATEGORIA',
 'TITULO_VIDEO',
 'DESCRICAO',
 'TAGS',
 'DURACAO_VIDEOS',
 'TOTAL_VISUALIZACOES',
 'TOTAL_LIKES',
 'TOTAL_FAVORITOS',
 'TOTAL_COMENTARIOS',
 'ASSUNTO']

In [9]:
df_estatisticas_gerais = transformacao_dataframe(dataframe_transform=df_estatisticas_gerais)
df_estatisticas_gerais.show()

[Stage 1:>                                                          (0 + 1) / 1]

+--------------------+-------------+--------------------+--------------------+--------------------+------------+-----------+--------------------+--------------------+--------------------+--------------+-------------------+-----------+---------------+-----------------+-------------------+---------------------+---------------------+----------+-------------+--------------+---------------------+
|             ASSUNTO|data_extracao|     DATA_PUBLICACAO|            ID_CANAL|            NM_CANAL|ID_CATEGORIA|   ID_VIDEO|        TITULO_VIDEO|           DESCRICAO|                TAGS|DURACAO_VIDEOS|TOTAL_VISUALIZACOES|TOTAL_LIKES|TOTAL_FAVORITOS|TOTAL_COMENTARIOS| data_hora_extracao|DURACAO_VIDEO_MINUTOS|TOTAL_CARACTERE_VIDEO|TOTAL_TAGS|HORA_EXTRACAO|TURNO_EXTRACAO|INDICE_TURNO_EXTRACAO|
+--------------------+-------------+--------------------+--------------------+--------------------+------------+-----------+--------------------+--------------------+--------------------+--------------+--------

                                                                                

In [10]:
def construir_depara_canal(dataframe: DataFrame):
    dataframe.select(
        dataframe.ID_CANAL,
        dataframe.NM_CANAL,
        dataframe.ID_VIDEO,
        dataframe.TITULO_VIDEO
    ).distinct().sort('ID_CANAL').write.\
        parquet(f'../dados/ouro/depara_canal.parquet')


In [11]:
def obter_total_publicacao_video(dataframe : DataFrame) -> DataFrame:
   dataframe =  dataframe.select(
    'DATA_PUBLICACAO',
    'ID_CANAL',
    'NM_CANAL'
    ).withColumn(
       'DATA_PUBLICACAO', f.date_format('DATA_PUBLICACAO', 'yyyy-MM-dd')
    ) \
        .sort('ID_CANAL') \
        .withColumn(
            'SEMANA_TRADUZIDA', 
            traduzir_dia_da_semana(
                f.date_format('DATA_PUBLICACAO', 'EEEE')
            )
            ).distinct() \
                .groupBy('DATA_PUBLICACAO','SEMANA_TRADUZIDA', 'ID_CANAL', 'NM_CANAL'). \
                    count(). \
                        alias('TOTAL_VIDEOS'). \
                            withColumn('INDICE_SEMANA', indice_semana('SEMANA_TRADUZIDA')) \
                    .sort('INDICE_SEMANA')\
                        .select('DATA_PUBLICACAO','SEMANA_TRADUZIDA' ,'ID_CANAL', 'NM_CANAL', f.col('TOTAL_VIDEOS.COUNT').alias('TOTAL_VIDEOS'))       
   return dataframe

In [12]:
df_estatisticas_gerais.show()

+--------------------+-------------+--------------------+--------------------+--------------------+------------+-----------+--------------------+--------------------+--------------------+--------------+-------------------+-----------+---------------+-----------------+-------------------+---------------------+---------------------+----------+-------------+--------------+---------------------+
|             ASSUNTO|data_extracao|     DATA_PUBLICACAO|            ID_CANAL|            NM_CANAL|ID_CATEGORIA|   ID_VIDEO|        TITULO_VIDEO|           DESCRICAO|                TAGS|DURACAO_VIDEOS|TOTAL_VISUALIZACOES|TOTAL_LIKES|TOTAL_FAVORITOS|TOTAL_COMENTARIOS| data_hora_extracao|DURACAO_VIDEO_MINUTOS|TOTAL_CARACTERE_VIDEO|TOTAL_TAGS|HORA_EXTRACAO|TURNO_EXTRACAO|INDICE_TURNO_EXTRACAO|
+--------------------+-------------+--------------------+--------------------+--------------------+------------+-----------+--------------------+--------------------+--------------------+--------------+--------

In [16]:
df_estatisticas_gerais.count()

                                                                                

12068

In [19]:
criar_particao_ouro(
        dataframe=df_estatisticas_gerais, 
        nome_arquivo='dados_tratado_estatisticas_gerais.parquet',
    )

[Stage 6:>                                                          (0 + 8) / 8]

24/02/02 21:32:14 WARN MemoryManager: Total allocation exceeds 95,00% (906.992.014 bytes) of heap memory
Scaling row group sizes to 96,54% for 7 writers
24/02/02 21:32:14 WARN MemoryManager: Total allocation exceeds 95,00% (906.992.014 bytes) of heap memory
Scaling row group sizes to 84,47% for 8 writers
24/02/02 21:32:15 WARN MemoryManager: Total allocation exceeds 95,00% (906.992.014 bytes) of heap memory
Scaling row group sizes to 96,54% for 7 writers


                                                                                

In [25]:
df_trends = spark.read.parquet('/home/rodrigo/Documentos/projetos/analise_dados_youtube/dados/prata/arquivos_assuntos_trends.parquet')
df_trends = transformacao_dataframe_trends(df_trends)
criar_particao_ouro(dataframe=df_trends, nome_arquivo='estatisticas_trends.parquet')
df_trends.show()

                                                                                

+-------------+--------------------+--------------------+------------------+------------+-----------+--------------------+--------------------+--------------------+--------------+-------------------+-----------+---------------+-----------------+-------------------+---------------------+---------------------+----------+-------------+--------------+---------------------+
|data_extracao|     DATA_PUBLICACAO|            ID_CANAL|          NM_CANAL|ID_CATEGORIA|   ID_VIDEO|        TITULO_VIDEO|           DESCRICAO|                TAGS|DURACAO_VIDEOS|TOTAL_VISUALIZACOES|TOTAL_LIKES|TOTAL_FAVORITOS|TOTAL_COMENTARIOS| data_hora_extracao|DURACAO_VIDEO_MINUTOS|TOTAL_CARACTERE_VIDEO|TOTAL_TAGS|HORA_EXTRACAO|TURNO_EXTRACAO|INDICE_TURNO_EXTRACAO|
+-------------+--------------------+--------------------+------------------+------------+-----------+--------------------+--------------------+--------------------+--------------+-------------------+-----------+---------------+-----------------+-----------

In [26]:
spark.stop()