In [1]:
!pip install pyspark



In [9]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [3]:
spark = SparkSession.builder.getOrCreate()

In [4]:
# Leia o arquivo 'videos-preparados.snappy.parquet' no dataframe 'df_video'

df_video = spark.read.option('header', 'true').parquet('videos-preparados.snappy.parquet')

In [6]:
# Leia o arquivo ‘video-comments-tratados.snappy.parquet' no dataframe 'df_comments'

df_comments = spark.read.option('header', 'true').parquet('videos-comments-tratados.snappy.parquet')

In [7]:
# Crie tabelas temporárias para ambos os dataframe

df_video.createOrReplaceTempView('video')
df_comments.createOrReplaceTempView('comments')

In [10]:
df_video.printSchema()
df_comments.printSchema()

root
 |-- Title: string (nullable = true)
 |-- Video ID: string (nullable = true)
 |-- Published At: date (nullable = true)
 |-- Keyword: string (nullable = true)
 |-- Likes: integer (nullable = true)
 |-- Comments: integer (nullable = true)
 |-- Views: integer (nullable = true)
 |-- Interaction: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Keyword Index: double (nullable = true)
 |-- Features PCA: vector (nullable = true)
 |-- Features Normal: vector (nullable = true)
 |-- Features: vector (nullable = true)

root
 |-- Video ID: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Published At: date (nullable = true)
 |-- Keyword: string (nullable = true)
 |-- Likes: integer (nullable = true)
 |-- Comments: integer (nullable = true)
 |-- Views: integer (nullable = true)
 |-- Interaction: integer (nullable = true)
 |-- Year: string (nullable = true)
 |-- Comment: string (nullable = true)
 |-- Sentiment: integer (

In [20]:
# Faça um join das tabelas criadas anteriormente utilizando o spark.sql no dataframe ‘join_video_comments’

query = """
SELECT v.*, c.*
FROM video v
JOIN comments c
ON v.`Video ID` = c.`Video ID`
"""
join_video_comments = spark.sql(query)
join_video_comments.show()

+--------------------+-----------+------------+-------+-----+--------+-------+-----------+----+-----+-------------+--------------------+--------------------+--------------------+-----------+--------------------+------------+-------+-----+--------+-------+-----------+----+--------------------+---------+-------------+
|               Title|   Video ID|Published At|Keyword|Likes|Comments|  Views|Interaction|Year|Month|Keyword Index|        Features PCA|     Features Normal|            Features|   Video ID|               Title|Published At|Keyword|Likes|Comments|  Views|Interaction|Year|             Comment|Sentiment|Likes Comment|
+--------------------+-----------+------------+-------+-----+--------+-------+-----------+----+-----+-------------+--------------------+--------------------+--------------------+-----------+--------------------+------------+-------+-----+--------+-------+-----------+----+--------------------+---------+-------------+
|Apple Pay Is Kill...|wAZZ-UWGVHI|  2022-08-23

In [23]:
# Faça as mesmas etapas anteriores (1,2,3,4) utilizando repartition e coalesce

# repartition
df_video = spark.read.option('header', 'true').parquet('videos-preparados.snappy.parquet')
df_video = df_video.repartition(8)  # exemplo: ajusta para 8 partições

# coalesce
df_video = spark.read.option('header', 'true').parquet('videos-preparados.snappy.parquet')
df_video = df_video.coalesce(2)  # exemplo: une para 2 partições

# Tabela Temporária
df_video.createOrReplaceTempView('video')

In [24]:
# repartition
df_comments = spark.read.option('header', 'true').parquet('videos-comments-tratados.snappy.parquet')
df_comments = df_comments.repartition(8)

# coalesce
df_comments = spark.read.option('header', 'true').parquet('videos-comments-tratados.snappy.parquet')
df_comments = df_comments.coalesce(2)

# Tabela Temporária
df_comments.createOrReplaceTempView('comments')

In [25]:
# Join

query = """
SELECT v.*, c.*
FROM video v
JOIN comments c
ON v.`Video ID` = c.`Video ID`
"""
join_video_comments = spark.sql(query)
join_video_comments.show()

In [30]:
# Utilize o explain para entender melhor as duas formas de realizar as etapas e refaça novamente as etapas anteriores (1,2,3,4), utilizando tudo que você já aprendeu para realizar o join e filter apenas com os dados necessários.

df_video.explain()
df_comments.explain()
join_video_comments.explain()

== Physical Plan ==
Coalesce 2
+- *(1) ColumnarToRow
   +- FileScan parquet [Title#171,Video ID#172,Published At#173,Keyword#174,Likes#175,Comments#176,Views#177,Interaction#178,Year#179,Month#180,Keyword Index#181,Features PCA#182,Features Normal#183,Features#184] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/content/videos-preparados.snappy.parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Title:string,Video ID:string,Published At:date,Keyword:string,Likes:int,Comments:int,Views...


== Physical Plan ==
Coalesce 2
+- *(1) ColumnarToRow
   +- FileScan parquet [Video ID#197,Title#198,Published At#199,Keyword#200,Likes#201,Comments#202,Views#203,Interaction#204,Year#205,Comment#206,Sentiment#207,Likes Comment#208] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/content/videos-comments-tratados.snappy.parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Video ID:

In [32]:
# 1. Leitura dos dados
df_video = spark.read.option('header', 'true').parquet('videos-preparados.snappy.parquet')
df_comments = spark.read.option('header', 'true').parquet('videos-comments-tratados.snappy.parquet')

In [33]:
# 2. Seleção somente das colunas necessárias antes do join
df_video_sel = df_video.select('Video ID', 'Title')
df_comments_sel = df_comments.select('Video ID', 'Comments')

In [36]:
# 3. Criação das views temporárias para usar SQL
df_video_sel.createOrReplaceTempView('video')
df_comments_sel.createOrReplaceTempView('comments')

In [43]:
# 4. Realização do join usando apenas as colunas de interesse
query = """
SELECT v.`Video ID`, v.Title, c.Comments
FROM video v
JOIN comments c
ON v.`Video ID` = c.`Video ID`
"""
join_video_comments_otimizado = spark.sql(query)
join_video_comments_otimizado.show()

+-----------+--------------------+--------+
|   Video ID|               Title|Comments|
+-----------+--------------------+--------+
|_XFJpfSFcpU|Nadler Celebrates...|    1653|
|_XFJpfSFcpU|Nadler Celebrates...|    1653|
|dCfCJ1kAPeQ|Stocks moving in ...|      40|
|dCfCJ1kAPeQ|Stocks moving in ...|      40|
|2WPA1L9uJqo|What is the Schrö...|     896|
|2WPA1L9uJqo|What is the Schrö...|     896|
|xHWuoy4yJGo|FIDE Dunks On Mag...|     167|
|xHWuoy4yJGo|FIDE Dunks On Mag...|     167|
|xHWuoy4yJGo|FIDE Dunks On Mag...|     167|
|PeMlggyqz0Y|Machine Learning ...|     282|
|PeMlggyqz0Y|Machine Learning ...|     282|
|CzzKaMWA_xE|I Trolled the WHO...|     603|
|CzzKaMWA_xE|I Trolled the WHO...|     603|
|CzzKaMWA_xE|I Trolled the WHO...|     603|
|rOYnwhKwlw8|Xbox &amp; Bethes...|    5254|
|rOYnwhKwlw8|Xbox &amp; Bethes...|    5254|
|GkKDCgjHIOk|We Build a Bunk B...|    1286|
|GkKDCgjHIOk|We Build a Bunk B...|    1286|
|GkKDCgjHIOk|We Build a Bunk B...|    1286|
|dK2FJQX24Io|Saint Punk - Empt..

In [44]:
# checar a eficiência

df_video_sel.explain()
df_comments_sel.explain()
join_video_comments_otimizado.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Exchange RoundRobinPartitioning(4), REPARTITION_BY_NUM, [plan_id=361]
   +- Project [Video ID#224, Title#223]
      +- FileScan parquet [Title#223,Video ID#224] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/content/videos-preparados.snappy.parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Title:string,Video ID:string>


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Exchange RoundRobinPartitioning(4), REPARTITION_BY_NUM, [plan_id=369]
   +- FileScan parquet [Video ID#237,Comments#242] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/content/videos-comments-tratados.snappy.parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Video ID:string,Comments:int>


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [Video ID#224, Title#223, Comments#242]
   +- BroadcastHashJoin [Video I

In [45]:
# Salve o seu join otimizado como 'join-videos-comments-otimizado' no formato parquet

join_video_comments_otimizado.write.mode('overwrite').parquet('join-videos-comments-otimizado')

In [46]:
join_video_comments_otimizado.coalesce(1) \
    .write.mode('overwrite') \
    .parquet('join-videos-comments-otimizado')