# Data analysis with Apache Spark - Dataframe 2

## 1. Data analysis with Spark DataFrames: loading datasets

In [1]:
import findspark
findspark.init("/home/alumno/spark-3.2.2-bin-hadoop2.7")
from pyspark import SparkContext, SparkConf
conf=SparkConf().setAppName("intro").setMaster("local")
sc = SparkContext(conf=conf)
from pyspark.sql import SparkSession
spark=SparkSession(sc)

In [2]:
! ls ml-latest-small

links.csv  movies.csv  ratings.csv  README.txt	tags.csv


In [3]:
#load movies and ratings datasets to start the analysis
movies_df = spark.read.option("inferSchema","true").option("header", "true").csv("ml-latest-small/movies.csv")
ratings_df = spark.read.option("inferSchema","true").option("header", "true").csv("ml-latest-small/ratings.csv")

In [4]:
movies_df.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



In [5]:
movies_df.show(5, truncate=False)

+-------+----------------------------------+-------------------------------------------+
|movieId|title                             |genres                                     |
+-------+----------------------------------+-------------------------------------------+
|1      |Toy Story (1995)                  |Adventure|Animation|Children|Comedy|Fantasy|
|2      |Jumanji (1995)                    |Adventure|Children|Fantasy                 |
|3      |Grumpier Old Men (1995)           |Comedy|Romance                             |
|4      |Waiting to Exhale (1995)          |Comedy|Drama|Romance                       |
|5      |Father of the Bride Part II (1995)|Comedy                                     |
+-------+----------------------------------+-------------------------------------------+
only showing top 5 rows



In [6]:
ratings_df.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



## 2. Training Datasets

In [7]:
#we join both datasets as they were relational tables and have a new movie_ratings DatFrame
movie_ratings = ratings_df.join(movies_df, ["movieId"], "left")
movie_ratings.show(5, truncate=False)

+-------+------+------+---------+---------------------------+-------------------------------------------+
|movieId|userId|rating|timestamp|title                      |genres                                     |
+-------+------+------+---------+---------------------------+-------------------------------------------+
|1      |1     |4.0   |964982703|Toy Story (1995)           |Adventure|Animation|Children|Comedy|Fantasy|
|3      |1     |4.0   |964981247|Grumpier Old Men (1995)    |Comedy|Romance                             |
|6      |1     |4.0   |964982224|Heat (1995)                |Action|Crime|Thriller                      |
|47     |1     |5.0   |964983815|Seven (a.k.a. Se7en) (1995)|Mystery|Thriller                           |
|50     |1     |5.0   |964982931|Usual Suspects, The (1995) |Crime|Mystery|Thriller                     |
+-------+------+------+---------+---------------------------+-------------------------------------------+
only showing top 5 rows



In [8]:
movie_ratings.count()

100836

### **Question 1:**  
### Can you obtain a basic summary list of statistics for our new movie ratings dataframe? Interesting information is the count, mean, max, and some selected percentiles. For each question of the tutorial, you must provide the following information:
- What command are you going to use? Why?
- Which is your Spark operation to solve the question?
- Which output is providing your Spark command (3 lines max.)


In [9]:
basic_summary = movie_ratings.describe()
basic_summary.show()

+-------+----------------+------------------+------------------+--------------------+--------------------+------------------+
|summary|         movieId|            userId|            rating|           timestamp|               title|            genres|
+-------+----------------+------------------+------------------+--------------------+--------------------+------------------+
|  count|          100836|            100836|            100836|              100836|              100836|            100836|
|   mean|19435.2957177992|326.12756356856676| 3.501556983616962|1.2059460873684695E9|                null|              null|
| stddev|35530.9871987003| 182.6184914635004|1.0425292390606342|2.1626103599513078E8|                null|              null|
|    min|               1|                 1|               0.5|           828124615|"11'09""01 - Sept...|(no genres listed)|
|    max|          193609|               610|               5.0|          1537799250|À nous la liberté...|           W

In [10]:
from pyspark.sql.functions import expr, percentile_approx

percentiles = movie_ratings.select(expr("percentile_approx(rating, array(0.25, 0.5, 0.75))").alias('25%, 50%, 75%'))
percentiles.show()

+---------------+
|  25%, 50%, 75%|
+---------------+
|[3.0, 3.5, 4.0]|
+---------------+



Create a training/validation dataset to evaluate Spark tools to build prediction models. We are going to create data splits of initial DataFrames

In [11]:
help(movie_ratings.randomSplit)
(train_df, v_df) = movie_ratings.randomSplit([0.8, 0.2], 0)
movie_ratings.count()

Help on method randomSplit in module pyspark.sql.dataframe:

randomSplit(weights, seed=None) method of pyspark.sql.dataframe.DataFrame instance
    Randomly splits this :class:`DataFrame` with the provided weights.
    
    .. versionadded:: 1.4.0
    
    Parameters
    ----------
    weights : list
        list of doubles as weights with which to split the :class:`DataFrame`.
        Weights will be normalized if they don't sum up to 1.0.
    seed : int, optional
        The seed for sampling.
    
    Examples
    --------
    >>> splits = df4.randomSplit([1.0, 2.0], 24)
    >>> splits[0].count()
    2
    
    >>> splits[1].count()
    2



100836

In [12]:
train_df.count()

80487

In [13]:
v_df.count()

20349

In [14]:
v_df.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- userId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



To avoid having empty recommendations, we need to be sure that users and movies used to train are also available in the validation dataset

In [15]:
validation_df = ( v_df
    .join(train_df, ["userId"], "left_semi")
    .join(train_df, ["movieId"], "left_semi")
)
non_matching_recs = v_df.join(validation_df, ["movieId", "userId"], "left_anti")
train_df = train_df.union(non_matching_recs)

### **Question 2:** 
### What kind of join operations are used in left semi and left anti? Can you explain these operations with our validation example?

The left_semi operation returns those rows that match the columns in both the left dataframe and right dataframe, whereas left_anti  returns those which do not match. In the example we are creating the validation_df by using the left semi to obtain those rows where userId matches in v_df and train_df, then we do the same thing but for movieId with the resulting dataframe of the previous operation and train_df. We use left anti to get thoser rows that do not match columns movieId and UserId in v_df and validation_df to later perform an union with the train_df and add those which arent in the train_df to it.

### **Question 3:**  
### train_df has now more or less records than initially? Why?

Now, train_df has more records than initially beause we have added the ones that were in v_df and that were not initially in train_df

## 3. Managing columns of validation datasets

In [16]:
#import some functions available at pyspark
from pyspark.sql.functions import col, countDistinct, mean, split, explode, count

In [17]:
#data review by sorting out dataset by descendin g order of ratings
validation_df.sort(col("rating").desc()).show(5)

+-------+------+------+----------+--------------------+--------------------+
|movieId|userId|rating| timestamp|               title|              genres|
+-------+------+------+----------+--------------------+--------------------+
|    193|   243|   5.0| 837155377|    Showgirls (1995)|               Drama|
|   2997|   392|   5.0|1027524319|Being John Malkov...|Comedy|Drama|Fantasy|
|    296|   540|   5.0|1179108599| Pulp Fiction (1994)|Comedy|Crime|Dram...|
|    161|   243|   5.0| 837155121| Crimson Tide (1995)|  Drama|Thriller|War|
|   1221|   392|   5.0|1027524082|Godfather: Part I...|         Crime|Drama|
+-------+------+------+----------+--------------------+--------------------+
only showing top 5 rows



### **Question 4:**
### Create a new DF derived from train_df grouping all records with the same rating, count them, and sort by the rating column in descending order. The expected output of this transformation should be:

In [18]:
from pyspark.sql.functions import expr, desc, asc

In [19]:
ratings_distribution = train_df.groupBy("rating").count().orderBy(desc("rating"))
ratings_distribution.show()

+------+-----+
|rating|count|
+------+-----+
|   5.0|10644|
|   4.5| 6826|
|   4.0|21655|
|   3.5|10569|
|   3.0|16128|
|   2.5| 4508|
|   2.0| 6089|
|   1.5| 1468|
|   1.0| 2325|
|   0.5| 1099|
+------+-----+



In our dataset, we have a special column called "genres" where we have a list of words separated by "|" that describe the genre of each movie. If we are interested in making predictions on  specific kind of movies we need to create a new column called genre with all the labels applied to each movie. To create that column, we need two steps.


In [20]:
#First, use the split function to create a new column with an array of genre values.
train_with_genres_array = train_df.withColumn("genres_array", split("genres", "\|"))
train_with_genres_array.show(5, truncate=False)

+-------+------+------+----------+----------------+-------------------------------------------+-------------------------------------------------+
|movieId|userId|rating|timestamp |title           |genres                                     |genres_array                                     |
+-------+------+------+----------+----------------+-------------------------------------------+-------------------------------------------------+
|1      |1     |4.0   |964982703 |Toy Story (1995)|Adventure|Animation|Children|Comedy|Fantasy|[Adventure, Animation, Children, Comedy, Fantasy]|
|1      |5     |4.0   |847434962 |Toy Story (1995)|Adventure|Animation|Children|Comedy|Fantasy|[Adventure, Animation, Children, Comedy, Fantasy]|
|1      |7     |4.5   |1106635946|Toy Story (1995)|Adventure|Animation|Children|Comedy|Fantasy|[Adventure, Animation, Children, Comedy, Fantasy]|
|1      |15    |2.5   |1510577970|Toy Story (1995)|Adventure|Animation|Children|Comedy|Fantasy|[Adventure, Animation, Childr

In [21]:
#Second, "explode" the array column into separate records, each one with a value taken from the array
train_with_genres_exploded = (
train_with_genres_array
.select("movieId", "userId", "rating", "genres", "genres_array")
.withColumn("genre", explode("genres_array"))
)
train_with_genres_exploded.show(5, truncate=False)

+-------+------+------+-------------------------------------------+-------------------------------------------------+---------+
|movieId|userId|rating|genres                                     |genres_array                                     |genre    |
+-------+------+------+-------------------------------------------+-------------------------------------------------+---------+
|1      |1     |4.0   |Adventure|Animation|Children|Comedy|Fantasy|[Adventure, Animation, Children, Comedy, Fantasy]|Adventure|
|1      |1     |4.0   |Adventure|Animation|Children|Comedy|Fantasy|[Adventure, Animation, Children, Comedy, Fantasy]|Animation|
|1      |1     |4.0   |Adventure|Animation|Children|Comedy|Fantasy|[Adventure, Animation, Children, Comedy, Fantasy]|Children |
|1      |1     |4.0   |Adventure|Animation|Children|Comedy|Fantasy|[Adventure, Animation, Children, Comedy, Fantasy]|Comedy   |
|1      |1     |4.0   |Adventure|Animation|Children|Comedy|Fantasy|[Adventure, Animation, Children, Come

Now we can start making questions about movie categories. For example, which is the mean rating of each movie category? Are there types of movies that are preferred by users?

In [22]:
mean_genre_rating = (
    train_with_genres_exploded
        .groupBy("genre")
        .agg(mean(col("rating")).alias("genre_rating"))
)
mean_genre_rating.show(10, truncate=False)

+-----------+------------------+
|genre      |genre_rating      |
+-----------+------------------+
|Crime      |3.6534308211473565|
|Romance    |3.499146874146874 |
|Thriller   |3.489255925733943 |
|Adventure  |3.5100289495450787|
|Drama      |3.6531895378424757|
|War        |3.8031265887137775|
|Documentary|3.7878937007874014|
|Fantasy    |3.4873953974895398|
|Mystery    |3.6281553398058253|
|Musical    |3.570217917675545 |
+-----------+------------------+
only showing top 10 rows



Now we want to check these ratings together with the actual number of movies rated by users. We can add a new column using count()

In [23]:
mean_genre_rating_movies = (
    train_with_genres_exploded
        .groupBy("genre")
        .agg(
            mean(col("rating")).alias("genre_rating"),
            count("movieId").alias("num_movies")
        )
)
mean_genre_rating_movies.show(10, truncate=False)

+-----------+------------------+----------+
|genre      |genre_rating      |num_movies|
+-----------+------------------+----------+
|Crime      |3.6534308211473565|13335     |
|Romance    |3.499146874146874 |14652     |
|Thriller   |3.489255925733943 |21221     |
|Adventure  |3.5100289495450787|19344     |
|Drama      |3.6531895378424757|33798     |
|War        |3.8031265887137775|3934      |
|Documentary|3.7878937007874014|1016      |
|Fantasy    |3.4873953974895398|9560      |
|Mystery    |3.6281553398058253|6180      |
|Musical    |3.570217917675545 |3304      |
+-----------+------------------+----------+
only showing top 10 rows



There is a problem in the previous result. As we have exploded an array into separated records, we are counting the same movie several times, then adding duplicates to our results. It's much more useful to consider each movie only once per each genre aggregation. We need to change our count by a countDistinct
action instead.

In [24]:
mean_genre_rating_movies = (
    train_with_genres_exploded
        .groupBy("genre")
        .agg(
                mean(col("rating")).alias("genre_rating"),
            countDistinct("movieId").alias("num_movies")
        )
)
mean_genre_rating_movies.show(10, truncate=False)

+-----------+------------------+----------+
|genre      |genre_rating      |num_movies|
+-----------+------------------+----------+
|Crime      |3.6534308211473565|1196      |
|Romance    |3.499146874146874 |1591      |
|Thriller   |3.489255925733943 |1889      |
|Adventure  |3.5100289495450787|1262      |
|Drama      |3.6531895378424757|4349      |
|War        |3.8031265887137775|381       |
|Documentary|3.7878937007874014|438       |
|Fantasy    |3.4873953974895398|778       |
|Mystery    |3.6281553398058253|573       |
|Musical    |3.570217917675545 |333       |
+-----------+------------------+----------+
only showing top 10 rows



### **Question 5:**
### Extend the previous DataFrame to have a new column with the unique number of ratings for each movie. You need to consider a countDistinct with both "movieId" and "userId" so that a user only ranks once for each movie.

In [25]:
mean_ratings  = (
    train_with_genres_exploded
        .groupBy("genre")
        .agg(
                mean(col("rating")).alias("genre_rating"),
                countDistinct("movieId").alias("num_movies"),
                countDistinct("userId", "movieId").alias("num_ratings")
        )
)
mean_ratings.show(10)

+-----------+------------------+----------+-----------+
|      genre|      genre_rating|num_movies|num_ratings|
+-----------+------------------+----------+-----------+
|      Crime|3.6534308211473565|      1196|      13335|
|    Romance| 3.499146874146874|      1591|      14652|
|   Thriller| 3.489255925733943|      1889|      21221|
|  Adventure|3.5100289495450787|      1262|      19344|
|      Drama|3.6531895378424757|      4349|      33798|
|        War|3.8031265887137775|       381|       3934|
|Documentary|3.7878937007874014|       438|       1016|
|    Fantasy|3.4873953974895398|       778|       9560|
|    Mystery|3.6281553398058253|       573|       6180|
|    Musical| 3.570217917675545|       333|       3304|
+-----------+------------------+----------+-----------+
only showing top 10 rows



### **Question 6:**
### Can you program a top 10 list of best average rating genres? and a top 10 list of genres with most ratings?

In [26]:
top10_genere_rating = mean_ratings.orderBy(desc("genre_rating")).select('genre', "genre_rating").limit(10)
top10_genere_rating.show()

+-----------+------------------+
|      genre|      genre_rating|
+-----------+------------------+
|  Film-Noir| 3.908440629470672|
|        War|3.8031265887137775|
|Documentary|3.7878937007874014|
|      Crime|3.6534308211473565|
|      Drama|3.6531895378424757|
|       IMAX| 3.633771275007465|
|  Animation| 3.630434782608696|
|    Mystery|3.6281553398058253|
|    Western|3.6067807351077312|
|    Musical| 3.570217917675545|
+-----------+------------------+



In [27]:
top10_num_ratings = mean_ratings.orderBy(desc("num_ratings")).select('genre', "num_ratings").limit(10)
top10_num_ratings.show()

+---------+-----------+
|    genre|num_ratings|
+---------+-----------+
|    Drama|      33798|
|   Comedy|      31487|
|   Action|      24555|
| Thriller|      21221|
|Adventure|      19344|
|  Romance|      14652|
|   Sci-Fi|      13734|
|    Crime|      13335|
|  Fantasy|       9560|
| Children|       7424|
+---------+-----------+

