In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.window import Window

In [2]:
spark = SparkSession.builder.getOrCreate()

In [48]:
# df_ratings = spark.read.csv("../data/tp/movielens/ratings.csv", header=True)

In [None]:
# df_movies = spark.read.csv("../data/tp/movielens/movies.csv", header=True)

In [None]:
# df = (
#     df_ratings
#     .groupby('movieId')
#     .agg(f.mean('rating').alias('averageRating'))
#     .join(df_movies, 'movieId')
#     .withColumn('year', f.regexp_extract(f.col('title'), '(?<=\()[0-9]+(?=\))', 0).cast('int'))
# )

In [None]:
# df.write.save('../data/tp/movielens/full_movie')

## Leitura dos Dados

In [3]:
df = spark.read.load('../data/tp/movielens/full_movie')

In [4]:
df.limit(5).toPandas()

Unnamed: 0,movieId,averageRating,title,genres,year
0,1674,3.842865,Witness (1985),Drama|Romance|Thriller,1985
1,3156,3.149579,Bicentennial Man (1999),Drama|Romance|Sci-Fi,1999
2,85020,3.46936,"Mechanic, The (2011)",Action|Drama|Thriller,2011
3,54881,3.826231,"King of Kong, The (2007)",Documentary,2007
4,671,3.674146,Mystery Science Theater 3000: The Movie (1996),Comedy|Sci-Fi,1996


In [5]:
df.printSchema()

root
 |-- movieId: string (nullable = true)
 |-- averageRating: double (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- year: integer (nullable = true)



## Resolução Simulada

### Questão 6

In [7]:
df_q6 = df.withColumn('genre', f.explode(f.split('genres', '\|')))

In [8]:
df_q6.limit(5).toPandas()

Unnamed: 0,movieId,averageRating,title,genres,year,genre
0,1674,3.842865,Witness (1985),Drama|Romance|Thriller,1985,Drama
1,1674,3.842865,Witness (1985),Drama|Romance|Thriller,1985,Romance
2,1674,3.842865,Witness (1985),Drama|Romance|Thriller,1985,Thriller
3,3156,3.149579,Bicentennial Man (1999),Drama|Romance|Sci-Fi,1999,Drama
4,3156,3.149579,Bicentennial Man (1999),Drama|Romance|Sci-Fi,1999,Romance


In [12]:
df_q6.select('genre').distinct().toPandas()

Unnamed: 0,genre
0,Crime
1,Romance
2,Thriller
3,Adventure
4,Drama
5,War
6,Documentary
7,Fantasy
8,Mystery
9,Musical


Filmes de ação e aventura em 2015

In [13]:
df_q6.filter('genre in ("Action", "Adventure") and year = "2015"').count()

378

In [14]:
df_q6.filter('genre in ("Action", "Adventure")').filter('year = "2015"').count()

378

In [15]:
df_q6.filter('genre in ("Action", "Adventure")').filter('year = "2015"').agg(f.countDistinct('movieId').alias('count')).toPandas()

Unnamed: 0,count
0,320


### Questão 7

In [16]:
(
    df
    .withColumn('genre', f.explode(f.split('genres', '\|')))
    .groupby('genre')
    .count()
    .orderBy(f.desc('count'))
    .limit(1)
    .toPandas()
)

Unnamed: 0,genre,count
0,Drama,24465


In [19]:
df_count = (
    df
    .withColumn('genre', f.explode(f.split('genres', '\|')))
    .groupby('genre')
    .count()
)

In [17]:
maximo = df_count.agg(f.max('count')).collect()[0][0]

In [20]:
df_count.filter(f'count = {maximo}').toPandas()

Unnamed: 0,genre,count
0,Drama,24465


### Questão 8

In [21]:
df_exploded = df.withColumn('genre', f.explode(f.split('genres', '\|')))

In [23]:
f.col('averageRating')

Column<'averageRating'>

In [24]:
f.col('averageRating').alias('best_rating')

Column<'averageRating AS `best_rating`'>

In [27]:
(
    df_exploded
    .groupby('genre')
    .agg(f.avg('averageRating').alias('best_rating'))
    .orderBy(f.desc('best_rating'))
    .limit(1)
    .toPandas()
)

Unnamed: 0,genre,best_rating
0,Documentary,3.382493


### Questão 9

In [28]:
(
    df_exploded
    .filter('genre = "Fantasy" and year = "2003"')
    .orderBy(f.desc('averageRating'))
    .limit(5)
    .toPandas()
)

Unnamed: 0,movieId,averageRating,title,genres,year,genre
0,151569,5.0,The Old Fairy Tale: When the Sun Was God (2003),Drama|Fantasy,2003,Fantasy
1,7153,4.09034,"Lord of the Rings: The Return of the King, The...",Action|Adventure|Drama|Fantasy,2003,Fantasy
2,153346,4.0,Woodenhead (2003),Fantasy,2003,Fantasy
3,6773,3.833869,"Triplets of Belleville, The (Les triplettes de...",Animation|Comedy|Fantasy,2003,Fantasy
4,187027,3.8,Inuyasha the Movie 3: Swords of an Honorable R...,Animation|Fantasy|Sci-Fi,2003,Fantasy


### Questão 11

In [34]:
df_agg = df.groupby('year').count().withColumnRenamed('count', 'n_titles')

In [36]:
(
    df.join(df_agg, 'year', 'left')
    .withColumn('genre', f.explode(f.split('genres', '\|')))
    .filter("year = 2018")
    .groupby('year', 'genre', 'n_titles')
    .count()
    .withColumn('perc', f.col('count')/f.col('n_titles'))
    .filter('genre = "Comedy"')
    .toPandas()
)

Unnamed: 0,year,genre,n_titles,count,perc
0,2018,Comedy,2018,493,0.244301


### Questão 14

#### Errado

In [37]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

def sqr_divide(value):
    return (value**2)/2
sqr_divide_udf = udf(sqr_divide, IntegerType())

In [38]:
(
    df.select(sqr_divide_udf('averageRating').alias('averageRating'))
    .agg(f.mean('averageRating').alias('averageRating'))
    .toPandas()
)

Unnamed: 0,averageRating
0,


#### Correto

In [39]:
type((5**2)/2)

float

In [40]:
from pyspark.sql.functions import udf
from pyspark.sql.types import  DoubleType

def sqr_divide(value):
    return (value**2)/2
sqr_divide_udf = udf(sqr_divide, DoubleType())

In [41]:
(
    df.select(sqr_divide_udf('averageRating').alias('averageRating'))
    .agg(f.mean('averageRating').alias('averageRating'))
    .toPandas()
)

Unnamed: 0,averageRating
0,4.990346
