In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [2]:
print("======================= SparkSession Starting ========================")
spark = SparkSession.builder.appName("trab-pratico-de-igti").getOrCreate()



21/11/02 21:40:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [None]:
sc = spark.sparkContext
sc.setLogLevel("ERROR")

In [3]:
print("============================== Read data =============================")
df_titles = spark.read.csv('data/title_basics.tsv', header=True, sep='\t')
df_ratings = spark.read.csv('data/title_ratings.tsv', header=True, sep='\t')



In [4]:
print("========================== Investigate data ==========================")

df_titles.printSchema()
df_titles.show(5)

root
 |-- tconst: string (nullable = true)
 |-- titleType: string (nullable = true)
 |-- primaryTitle: string (nullable = true)
 |-- originalTitle: string (nullable = true)
 |-- isAdult: string (nullable = true)
 |-- startYear: string (nullable = true)
 |-- endYear: string (nullable = true)
 |-- runtimeMinutes: string (nullable = true)
 |-- genres: string (nullable = true)

+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+
|   tconst|titleType|        primaryTitle|       originalTitle|isAdult|startYear|endYear|runtimeMinutes|              genres|
+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+
|tt0000001|    short|          Carmencita|          Carmencita|      0|     1894|     \N|             1|   Documentary,Short|
|tt0000002|    short|Le clown et ses c...|Le clown et ses c...|      0|     1892|     \N|             5|     Animation,

In [5]:
df_ratings.printSchema()
df_ratings.show(5)

root
 |-- tconst: string (nullable = true)
 |-- averageRating: string (nullable = true)
 |-- numVotes: string (nullable = true)

+---------+-------------+--------+
|   tconst|averageRating|numVotes|
+---------+-------------+--------+
|tt0000001|          5.7|    1809|
|tt0000002|          6.0|     233|
|tt0000003|          6.5|    1560|
|tt0000004|          6.1|     152|
|tt0000005|          6.2|    2383|
+---------+-------------+--------+
only showing top 5 rows



In [6]:
df_titles.select('titleType').distinct().show()

                                                                                

+------------+
|   titleType|
+------------+
|    tvSeries|
|tvMiniSeries|
|     tvMovie|
|     tvPilot|
|   tvEpisode|
|       movie|
|   tvSpecial|
|       video|
|   videoGame|
|     tvShort|
|       short|
| radioSeries|
|radioEpisode|
+------------+



In [7]:
print("Quantos filmes (incluindo os da televisão) foram lançados no ano de 2015?")
print(
    df_titles
    .filter(col('titleType').isin('tvMovie', 'movie'))
    .filter(col('startYear')=='2015')
    .count()
)

                                                                                

19987

In [8]:
print("Qual o gênero de títulos mais frequente?")
(
    df_titles
    .withColumn('genres', split(col('genres'),','))
    .select('*', explode(col('genres')).alias('unique_g'))
    .groupBy('unique_g').count()
    .orderBy(col('count').desc())
    .show()
)



+-----------+-------+
|   unique_g|  count|
+-----------+-------+
|      Drama|2247995|
|     Comedy|1653725|
|      Short|1021850|
|  Talk-Show| 900198|
|Documentary| 764885|
|    Romance| 724729|
|         \N| 643012|
|     Family| 571470|
|       News| 524662|
| Reality-TV| 423455|
|  Animation| 406284|
|      Music| 394008|
|      Crime| 351447|
|     Action| 334580|
|  Adventure| 324325|
|  Game-Show| 252533|
|      Adult| 242704|
|      Sport| 178594|
|    Fantasy| 174119|
|    Mystery| 162448|
+-----------+-------+
only showing top 20 rows



                                                                                

In [9]:
print("Qual o gênero com a melhor nota média de títulos?")

(
    df_titles
    .join(df_ratings, 'tconst')
    .withColumn('genres', split(col('genres'),','))
    .select('*', explode(col('genres')).alias('unique_g'))
    .groupBy('unique_g').agg(mean(col('averageRating')).alias('media'))
    .orderBy(col('media').desc())
    .show()
)



+-----------+------------------+
|   unique_g|             media|
+-----------+------------------+
|    History| 7.353780102645086|
|Documentary| 7.240198535554575|
|  Biography|  7.17553191489362|
|    Mystery| 7.170086406897942|
|      Crime|7.1598428684859385|
|  Adventure| 7.107629703351738|
|    Fantasy| 7.095145650845386|
|  Animation| 7.089381171483224|
|    Western| 7.080683426568711|
|     Family|  7.07005492603448|
|      Drama| 7.040979155040203|
|        War|7.0091151344149205|
|     Action|7.0070981387478835|
|      Sport| 6.966792418526429|
|     Comedy|6.9600165509184135|
|      Music| 6.927469624015715|
| Reality-TV| 6.892611170895967|
|  Game-Show| 6.876828101904185|
|    Romance| 6.864016164703973|
|      Short| 6.791292438368555|
+-----------+------------------+
only showing top 20 rows



                                                                                

In [10]:
print("Qual o vídeo game do gênero aventura mais bem avaliado em 2020?")
(
    df_titles
    .filter(col('titleType')=='videoGame')
    .filter(col('startYear')=='2020')
    .join(df_ratings, 'tconst')
    .withColumn('genres', split(col('genres'),','))
    .select('*', explode(col('genres')).alias('unique_g'))
    .filter(col('unique_g')=='Adventure')
    .orderBy(col('averageRating').desc())
    .show()
)

[Stage 23:>                                                         (0 + 5) / 5]

+----------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+-------------+--------+---------+
|    tconst|titleType|        primaryTitle|       originalTitle|isAdult|startYear|endYear|runtimeMinutes|              genres|averageRating|numVotes| unique_g|
+----------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+-------------+--------+---------+
|tt11321196|videoGame|     Half-Life: Alyx|     Half-Life: Alyx|      0|     2020|     \N|            \N|[Action, Adventur...|          9.5|     506|Adventure|
| tt7651352|videoGame|   Ghost of Tsushima|   Ghost of Tsushima|      0|     2020|     \N|            \N|[Action, Adventur...|          9.3|    5270|Adventure|
|tt14106780|videoGame|               Omori|               Omori|      0|     2020|     \N|            \N|[Adventure, Drama...|          9.2|      79|Adventure|
| tt8329350|videoGame|Ori and the Will .

                                                                                

In [11]:
print("Qual o percentual de títulos do gênero comédia lançados em 2018 em relação ao total de títulos lançados nesse ano?")
print("ERRADO!!!!!")
from pyspark.sql.window import Window

w = Window.partitionBy()

(
    df_titles
    .filter(col('startYear')=='2018')
    .withColumn('genres', split(col('genres'),','))
    .select('*', explode(col('genres')).alias('unique_g'))
    .groupBy('unique_g').count()
    .withColumn('percent', (col('count')/sum(col('count')).over(w))*100)
    .filter(col('unique_g')=='Comedy')
    .show()
)

21/11/02 21:41:11 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.

+--------+-----+------------------+
|unique_g|count|           percent|
+--------+-----+------------------+
|  Comedy|78809|12.612244963279833|
+--------+-----+------------------+



                                                                                

In [12]:
print("Qual o percentual de títulos do gênero comédia lançados em 2018 em relação ao total de títulos lançados nesse ano?")
print("CORRETO!!!!")
(
    df_titles
    .filter(col('startYear')=='2018')
    .withColumn('genres', split(col('genres'),','))
    .withColumn('comedy', when(array_contains(col('genres'), 'Comedy'),1).otherwise(0))
    .select((sum(col('comedy'))/count(col('comedy')))*100)
    .show()
)

[Stage 28:>                                                       (0 + 12) / 12]

+-------------------------------------+
|((sum(comedy) / count(comedy)) * 100)|
+-------------------------------------+
|                   19.592336989488967|
+-------------------------------------+



                                                                                

In [13]:
print("""
Considere a definição de uma udf abaixo: 

def sqr_divide(value): 

    return (value**2)/2 

sqr_divide_udf = udf(sqr_divide, IntegerType())

A definição de sqr_divide_udf possui um problema. Depois de solucionar o problema, ao executar 
""")

from pyspark.sql.types import DoubleType

def sqr_d(v):
    return (v**2)/2

sqr_d_udf = udf(sqr_d, DoubleType())

(
    df_ratings
    .withColumn('averageRating', col('averageRating').cast(DoubleType()))
    .select(sqr_d_udf('averageRating').alias('averageRating'))
    .agg(mean('averageRating').alias('averageRating'))
    .show()
)

[Stage 30:>                                                         (0 + 5) / 5]

+------------------+
|     averageRating|
+------------------+
|24.899137999843724|
+------------------+



                                                                                

In [14]:
print("Deseja-se utilizar um join para retornar somente as linhas referentes a títulos que estão sem nota, isto é, não aparecem no df_ratings")
df_titles.join(df_ratings, 'tconst', 'anti').show()

+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+
|   tconst|titleType|        primaryTitle|       originalTitle|isAdult|startYear|endYear|runtimeMinutes|              genres|
+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+
|tt0000180|    short|  Le chemin de croix|  Le chemin de croix|      0|     1898|     \N|            \N|               Short|
|tt0000185|    short|La crèche à Bethléem|La crèche à Bethléem|      0|     1898|     \N|            \N|               Short|
|tt0000189|    short|             Dorotea|             Dorotea|      0|     1898|     \N|            \N|               Short|
|tt0000191|    short|Déménagement à la...|Déménagement à la...|      0|     1898|     \N|            \N|               Short|
|tt0000193|    short|L'entrée à Jérusalem|L'entrée à Jérusalem|      0|     1898|     \N|            \N|              

In [15]:
spark.stop()
print("============================== Finished ==============================")

