# Estudos Spark

In [35]:
# inicia o findspark
import findspark
findspark.init()

In [36]:
# importa bibliotecas para lidar com o spark
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
import pyspark.sql.types as t

In [37]:
# cria o objeto spark
spark = SparkSession.builder.getOrCreate()

## Leitura dos Dados

In [38]:
df_titles = spark.read.csv('title_basics.tsv', header=True, sep='\t')
df_ratings = spark.read.csv('title_ratings.tsv', header=True, sep='\t')

# Perguntas

### Quantos filmes foram lançados em 2015, incluindo os da televisão?

In [14]:
df = (
    df_titles.select('titleType', 'startYear')
    .filter((f.col('titleType').contains('movie')) & (f.col('startYear') == 2015))
)

In [7]:
df_titles.filter('titleType in ("movie")').filter('startYear = "2015"').count()

16429

In [45]:
df_titles.filter('titleType in ("movie", "tvMovie")').filter('startYear = "2015"').count()

19987

### Qual o gênero de filmes mais frequente?

In [16]:
generos = df_titles.select('genres')
lista_generos = generos.withColumn('genres', f.split('genres', ','))
generos_por_linha = lista_generos.withColumn('genres', f.explode(f.col('genres')))

In [17]:
freq = generos_por_linha.groupBy('genres').count()
freq.orderBy(f.desc('count')).show(5)

+-----------+-------+
|     genres|  count|
+-----------+-------+
|      Drama|2247995|
|     Comedy|1653725|
|      Short|1021850|
|  Talk-Show| 900198|
|Documentary| 764885|
+-----------+-------+
only showing top 5 rows



### Qual o gênero com a melhor nota média de títulos?

In [6]:
df_titles_subset = df_titles.select('tconst', 'originalTitle', 'genres')
df_ratings_subset = df_ratings.select('tconst', 'averageRating')

In [7]:
df_join = df_titles_subset.join(df_ratings_subset, 'tconst')

In [8]:
df_join.show(5)

+---------+--------------------+---------------+-------------+
|   tconst|       originalTitle|         genres|averageRating|
+---------+--------------------+---------------+-------------+
|tt0000658|Le cauchemar de F...|Animation,Short|          6.5|
|tt0001732|The Lighthouse Ke...|    Drama,Short|          7.1|
|tt0002253|          Home Folks|    Drama,Short|          4.0|
|tt0002473|    The Sands of Dee|  Romance,Short|          6.5|
|tt0002588|Zigomar contre Ni...| Crime,Thriller|          6.0|
+---------+--------------------+---------------+-------------+
only showing top 5 rows



In [9]:
# split e explode
df_join = df_join.withColumn('genres', f.split('genres', ','))
df_join = df_join.withColumn('genres', f.explode(f.col('genres')))

In [11]:
df_join.toPandas()

Unnamed: 0,tconst,originalTitle,genres,averageRating
0,tt0000658,Le cauchemar de Fantoche,Animation,6.5
1,tt0000658,Le cauchemar de Fantoche,Short,6.5
2,tt0001732,The Lighthouse Keeper,Drama,7.1
3,tt0001732,The Lighthouse Keeper,Short,7.1
4,tt0002253,Home Folks,Drama,4.0
...,...,...,...,...
2330822,tt9913584,As long as you're happy,Animation,8.3
2330823,tt9913584,As long as you're happy,Comedy,8.3
2330824,tt9913584,As long as you're happy,Fantasy,8.3
2330825,tt9916038,Eco,Drama,5.6


In [15]:
# agrupo por gênero
df_join = df_join.groupBy('genres')

In [18]:
# usando agg e avg nas notas
df_join.agg(f.avg('averageRating')).orderBy(f.desc('avg(averageRating)')).limit(1).toPandas()

Unnamed: 0,genres,avg(averageRating)
0,History,7.35378


### Qual o video game do gênero aventura mais bem avaliado dm 2020?

In [18]:
df = (
    df_titles.select('tconst', 'titleType', 'originalTitle', 'startYear', 'genres')
    .filter((f.col('titleType') == 'videoGame') & (f.col('startYear') == 2020) & (f.col('genres').contains('Adventure')))
)

In [20]:
game_ratings_2020 = df.join(df_ratings, 'tconst')

In [21]:
game_ratings_2020.select('originalTitle', 'averageRating').orderBy(f.desc('averageRating')).show(5)

+--------------------+-------------+
|       originalTitle|averageRating|
+--------------------+-------------+
|     Half-Life: Alyx|          9.5|
|   Ghost of Tsushima|          9.3|
|               Omori|          9.2|
|Ori and the Will ...|          9.1|
|Final Fantasy VII...|          9.1|
+--------------------+-------------+
only showing top 5 rows



### Usando UDF
### Elevar a média de notas ao quadrado e depois agregar tirando a média de todos os valores da coluna

In [22]:
def sqr_divide(value):
    return (value**2)/2

sqr_divide_udf = f.udf(sqr_divide, t.DoubleType())

In [23]:
(
    df_ratings
    .withColumn('averageRating', f.col('averageRating').cast('double'))
    .select(sqr_divide_udf('averageRating').alias('averageRating'))
    .agg(f.mean('averageRating').alias('averageRating'))
    .show()
)

+------------------+
|     averageRating|
+------------------+
|24.899137999842086|
+------------------+

