In [None]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta

# ---
# 1. Preparação dos dados
# ---
df_posts = spark.read.table("default.posts_creator")

# Conversão do timestamp Unix em uma data legível e garante o tipo timestamp
df_posts_clean = df_posts.withColumn("published_date", from_unixtime(col("published_at")).cast("timestamp"))

# Usa a coluna 'yt_user' como a chave de análise e renomeia para 'user_id'
df_combined = df_posts_clean.withColumnRenamed("yt_user", "user_id")
print("DataFrame combinado:")
df_combined.show(5, False)

# Define o período dos últimos 6 meses
six_months_ago = datetime.now() - timedelta(days=180)
# Filtra o DataFrame para posts dos últimos 6 meses
df_recent_posts = df_combined.filter(col("published_date") >= six_months_ago)
# ---
# 2. Análise dos Top Posts (Likes e Views)
# ---
# Define a janela de ranking para cada usuário
window_spec_likes = Window.partitionBy("user_id").orderBy(col("likes").desc())
window_spec_views = Window.partitionBy("user_id").orderBy(col("views").desc())
# Top 3 posts por likes
print("Top 3 posts por likes de cada creator nos últimos 6 meses:")
df_ranked_likes = df_recent_posts.withColumn("rank", rank().over(window_spec_likes))
df_ranked_likes.filter(col("rank") <= 3) \
    .select("user_id", "title", "likes", "rank") \
    .orderBy("user_id", "rank") \
    .show(truncate=False)

# Top 3 posts por views
print("Top 3 posts por views de cada creator nos últimos 6 meses:")
df_ranked_views = df_recent_posts.withColumn("rank", rank().over(window_spec_views))
df_ranked_views.filter(col("rank") <= 3) \
    .select("user_id", "title", "views", "rank") \
    .orderBy("user_id", "rank") \
    .show(truncate=False)
# ---
# 3. Análise de usuários sem correspondência
# ---

print("Usuários em 'posts_creator' que não foram encontrados na Wikipedia:")
df_users_from_wiki = spark.read.table("default.users_yt")
df_missing_users = df_posts_clean.join(df_users_from_wiki, df_posts_clean.yt_user == df_users_from_wiki.user_id, "left_anti")

# Mostra os usuários que não têm um user_id na tabela users_yt
df_missing_users.select("yt_user").distinct().show(truncate=False)

# ---
# 4. Análise de publicações por mês
# ---

print("Quantidade de publicações por mês de cada creator:")
df_posts_by_month = df_combined.withColumn("month_year", date_format(col("published_date"), "yyyy-MM")) \
    .groupBy("user_id", "month_year") \
    .agg(count("*").alias("post_count")) \
    .orderBy("user_id", "month_year")

df_posts_by_month.show(truncate=False)

# ---
# Desafio 1: Mostra 0 nos meses que não teve vídeo.
# ---

print("Publicações por mês, incluindo meses com 0 vídeos:")
# Cria um DataFrame com todos os user_id e todos os meses no período
all_users = df_combined.select("user_id").distinct()
start_date = df_combined.select(min("published_date")).collect()[0][0]
end_date = df_combined.select(max("published_date")).collect()[0][0]

months_list = []
current_month = datetime(start_date.year, start_date.month, 1)
end_month = datetime(end_date.year, end_date.month, 1)

while current_month <= end_month:
    months_list.append(current_month)
    current_month += relativedelta(months=1)

df_all_months = spark.createDataFrame(months_list, "timestamp") \
    .withColumnRenamed("value", "date") \
    .withColumn("month_year", date_format(col("date"), "yyyy-MM")) \
    .select("month_year").distinct()

# Combina todos os usuários com todos os meses para criar um esqueleto completo
df_skeleton = all_users.crossJoin(df_all_months)

# Left outer join para preencher os meses sem posts com 0.
df_posts_by_month_full = df_skeleton.join(df_posts_by_month, ["user_id", "month_year"], "left_outer") \
    .fillna(0, subset=["post_count"]) \
    .orderBy("user_id", "month_year")

df_posts_by_month_full.show(truncate=False)

# ---
# Transformando a tabela no formato onde a primeira coluna é o user_id e há uma coluna para cada mês.
# ---

print("Tabela pivotada de publicações por mês:")
df_pivoted = df_posts_by_month_full.groupBy("user_id").pivot("month_year").agg(first("post_count"))
df_pivoted.show(truncate=False)