In [7]:
%pip install pyspark==3.1.3

Collecting pyspark==3.1.3
  Downloading pyspark-3.1.3.tar.gz (214.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m214.0/214.0 MB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m00:01[0m00:02[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m198.6/198.6 kB[0m [31m6.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: py4j, pyspark
  Attempting uninstall: py4j
    Found existing installation: py4j 0.10.9.5
    Uninstalling py4j-0.10.9.5:
      Successfully uninstalled py4j-0.10.9.5
  Attempting uninstall: pyspark
    Found existing installation: pyspark 3.3.1
    Uninstalling pyspark-3.3.1:
      Successfully uninstalled pyspark-3.3.1
[33m  DEPRECATION: pyspark is being installed using the legacy 'setup.py install' method, because it does not have a 'pyproject.toml' and the 'wheel' package is not ins

In [1]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession\
    .builder\
    .appName("twitter_silver")\
    .getOrCreate()

In [6]:
caminho_dados_silver = "/home/pedro/projetos/projetos_python/proj_twitter_pipeline/datalake/silver/twitter_datascience/tweet/"

df_tweet = spark.read.json(caminho_dados_silver)

In [7]:
df_tweet.printSchema()

root
 |-- author_id: string (nullable = true)
 |-- conversation_id: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- id: string (nullable = true)
 |-- like_count: long (nullable = true)
 |-- quote_count: long (nullable = true)
 |-- reply_count: long (nullable = true)
 |-- retweet_count: long (nullable = true)
 |-- text: string (nullable = true)
 |-- process_date: date (nullable = true)



In [8]:
from pyspark.sql import functions as f

In [9]:
# Cria uma consulta sobre os dados da camada silver agregando informacoes
# Para cada dia analisado, retorna: 
# - dia (Y-m-d)
# - qtd de pessoas distintas que estão fazendo Tweets relacionados ao tema data science
# - somatorio de curtidas (likes)
# - somatorio de citações (quote)
# - somatorio de respostas (reply)
# - somatorio de retransmições (retweets) 
# - dia da semana

tweet_conversas = df_tweet.alias("tweet")\
                  .groupBy(f.to_date("created_at").alias("created_date"))\
                  .agg(
                      f.countDistinct("author_id").alias("n_tweets"),
                      f.sum("like_count").alias("n_like"),
                      f.sum("quote_count").alias("n_quote"),
                      f.sum("reply_count").alias("n_reply"),
                      f.sum("retweet_count").alias("n_retweet") )\
                  .withColumn("weekday", f.date_format("created_date", "E"))
                  

In [10]:
tweet_conversas.show()

[Stage 2:>                                                          (0 + 2) / 2]

+------------+--------+------+-------+-------+---------+-------+
|created_date|n_tweets|n_like|n_quote|n_reply|n_retweet|weekday|
+------------+--------+------+-------+-------+---------+-------+
|  2023-11-08|       8|   529|    537|    469|      460|    Wed|
|  2023-11-12|     101| 49924|  49582|  49279|    49996|    Sun|
|  2023-11-10|     101| 49987|  49155|  49611|    50626|    Fri|
|  2023-11-09|      10|   557|    501|    529|      502|    Thu|
|  2023-11-13|       9|   466|    562|    543|      504|    Mon|
|  2023-11-11|      10|   580|    703|    641|      389|    Sat|
+------------+--------+------+-------+-------+---------+-------+



                                                                                