# Spark Project Level 1

Siny P Raphel

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [0]:
spark = SparkSession.builder.master('local').appName('fifa').getOrCreate()

In [0]:
players = spark.read.csv('/FileStore/tables/wc2018_players.csv', inferSchema=True, header=True)
players.show(2)

+---------+---+----+------------------+----------+----------+--------------------+------+------+
|     Team|  #|Pos.| FIFA Popular Name|Birth Date|Shirt Name|                Club|Height|Weight|
+---------+---+----+------------------+----------+----------+--------------------+------+------+
|Argentina|  3|  DF|TAGLIAFICO Nicolas|31.08.1992|TAGLIAFICO|      AFC Ajax (NED)|   169|    65|
|Argentina| 22|  MF|    PAVON Cristian|21.01.1996|     PAVÓN|CA Boca Juniors (...|   169|    65|
+---------+---+----+------------------+----------+----------+--------------------+------+------+
only showing top 2 rows



In [0]:
players.printSchema()

root
 |-- Team: string (nullable = true)
 |-- #: integer (nullable = true)
 |-- Pos.: string (nullable = true)
 |-- FIFA Popular Name: string (nullable = true)
 |-- Birth Date: string (nullable = true)
 |-- Shirt Name: string (nullable = true)
 |-- Club: string (nullable = true)
 |-- Height: integer (nullable = true)
 |-- Weight: integer (nullable = true)



In [0]:
players.columns

Out[14]: ['Team',
 '#',
 'Pos.',
 'FIFA Popular Name',
 'Birth Date',
 'Shirt Name',
 'Club',
 'Height',
 'Weight']

In [0]:
players = players.withColumnRenamed('FIFA Popular Name', 'Name')

*   Shows the names and height by adding 1 to the height column.

In [0]:
players.selectExpr('Name', 'Height + 1' ).show(5)

+------------------+------------+
|              Name|(Height + 1)|
+------------------+------------+
|TAGLIAFICO Nicolas|         170|
|    PAVON Cristian|         170|
|    LANZINI Manuel|         168|
|    SALVIO Eduardo|         168|
|      MESSI Lionel|         171|
+------------------+------------+
only showing top 5 rows



*    shows the player name and simultaneously checks whether or not they have height >170

In [0]:
players.selectExpr('Name', 'Height > 170').show(7)

+------------------+--------------+
|              Name|(Height > 170)|
+------------------+--------------+
|TAGLIAFICO Nicolas|         false|
|    PAVON Cristian|         false|
|    LANZINI Manuel|         false|
|    SALVIO Eduardo|         false|
|      MESSI Lionel|         false|
|  ANSALDI Cristian|          true|
|      BIGLIA Lucas|          true|
+------------------+--------------+
only showing top 7 rows



3.    Show FIFA Popular Name and 0 or 1 depending on Height>170

In [0]:
players.select('Name', F.expr('case when height > 170 then 1 else 0 end').alias('Height Check')).show(7)

+------------------+------------+
|              Name|Height Check|
+------------------+------------+
|TAGLIAFICO Nicolas|           0|
|    PAVON Cristian|           0|
|    LANZINI Manuel|           0|
|    SALVIO Eduardo|           0|
|      MESSI Lionel|           0|
|  ANSALDI Cristian|           1|
|      BIGLIA Lucas|           1|
+------------------+------------+
only showing top 7 rows



4.    name of  shortest player

In [0]:
players.agg({'height':'min'}).show()

+-----------+
|min(height)|
+-----------+
|        165|
+-----------+



In [0]:
players.filter('height == 165').select('Shirt Name').show()

+----------+
|Shirt Name|
+----------+
|  QUINTERO|
|     YAHIA|
|   SHAQIRI|
+----------+



In [0]:
players.orderBy('Height').select('Shirt Name').alias('Name').show(1)

+----------+
|Shirt Name|
+----------+
|     YAHIA|
+----------+
only showing top 1 row



5.    who is tallest of all. First we find the value of maximum height and then get the details of that player

In [0]:
max_ht =players.agg({'height' : 'max'}).first()['max(height)']

In [0]:
type(max_ht)

Out[123]: int

In [0]:
players.filter(F.col('height') == max_ht).show()

+-------+---+----+-------------+----------+----------+--------------+------+------+
|   Team|  #|Pos.|         Name|Birth Date|Shirt Name|          Club|Height|Weight|
+-------+---+----+-------------+----------+----------+--------------+------+------+
|Croatia| 12|  GK|KALINIC Lovre|03.04.1990|L. KALINIĆ|KAA Gent (BEL)|   201|    96|
+-------+---+----+-------------+----------+----------+--------------+------+------+



6.    average height of the players in Argentina team.

In [0]:
players.groupBy('team').mean('height').where('team == "Argentina"').show()

+---------+------------------+
|     team|       avg(height)|
+---------+------------------+
|Argentina|178.43478260869566|
+---------+------------------+



In [0]:
from pyspark.sql import functions as F

In [0]:
players.selectExpr('min(height)').show()

+-----------+
|min(height)|
+-----------+
|        165|
+-----------+



In [0]:
players.select(F.max('height')).show()

+-----------+
|max(height)|
+-----------+
|        201|
+-----------+



## Movies Spark Project Level 2 - Spark SQL

In [0]:
movies = spark.read.csv('/FileStore/tables/movies.csv', inferSchema=True, header=True)
ratings = spark.read.csv('/FileStore/tables/ratings.csv', inferSchema=True, header=True)

movies.show(2)
ratings.show(2)

+-------+----------------+--------------------+
|movieId|           title|              genres|
+-------+----------------+--------------------+
|      1|Toy Story (1995)|Adventure|Animati...|
|      2|  Jumanji (1995)|Adventure|Childre...|
+-------+----------------+--------------------+
only showing top 2 rows

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|     31|   2.5|1260759144|
|     1|   1029|   3.0|1260759179|
+------+-------+------+----------+
only showing top 2 rows



In [0]:
movies.tail(5)

Out[180]: [Row(movieId=162672, title='Mohenjo Daro (2016)', genres='Adventure|Drama|Romance', year=2016),
 Row(movieId=163056, title='Shin Godzilla (2016)', genres='Action|Adventure|Fantasy|Sci-Fi', year=2016),
 Row(movieId=163949, title='The Beatles: Eight Days a Week - The Touring Years (2016)', genres='Documentary', year=2016),
 Row(movieId=164977, title='The Gay Desperado (1936)', genres='Comedy', year=1936),
 Row(movieId=164979, title="Women of '69, Unboxed", genres='Documentary', year=69)]

In [0]:
movies = movies.select('*', F.regexp_extract('title', r'\(([0-9]{4})', 1).cast('int').alias('year'))

In [0]:
# spark.sql('select *, regexp_extract(title, r"\(([0-9]{4})", 1) as year from movies_sql limit 3').show(2)

+-------+----------------+--------------------+----+
|movieId|           title|              genres|year|
+-------+----------------+--------------------+----+
|      1|Toy Story (1995)|Adventure|Animati...|1995|
|      2|  Jumanji (1995)|Adventure|Childre...|1995|
+-------+----------------+--------------------+----+
only showing top 2 rows



In [0]:
movies.show(2)

+-------+----------------+--------------------+----+
|movieId|           title|              genres|year|
+-------+----------------+--------------------+----+
|      1|Toy Story (1995)|Adventure|Animati...|1995|
|      2|  Jumanji (1995)|Adventure|Childre...|1995|
+-------+----------------+--------------------+----+
only showing top 2 rows



In [0]:
movies.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- year: integer (nullable = true)



In [0]:
movies.createOrReplaceTempView('movies_sql')
ratings.createOrReplaceTempView('ratings_sql')

In [0]:
movies.select('title','year').orderBy('year').show(truncate=False)

+--------------------------------------------------------------------+----+
|title                                                               |year|
+--------------------------------------------------------------------+----+
|Stranger Things                                                     |null|
|Women of '69, Unboxed                                               |null|
|Hyena Road                                                          |null|
|The Lovers and the Despot                                           |null|
|Trip to the Moon, A (Voyage dans la lune, Le) (1902)                |1902|
|Birth of a Nation, The (1915)                                       |1915|
|Intolerance: Love's Struggle Throughout the Ages (1916)             |1916|
|20,000 Leagues Under the Sea (1916)                                 |1916|
|Immigrant, The (1917)                                               |1917|
|Dog's Life, A (1918)                                                |1918|
|Billy Blaze

Find the list of  oldest released movies.

In [0]:
spark.sql('select year, collect_list(title) from movies_sql where year is not null group by year order by year').show(7, False)

+----+---------------------------------------------------------------------------------------------------------------------------------------------------------+
|year|collect_list(title)                                                                                                                                      |
+----+---------------------------------------------------------------------------------------------------------------------------------------------------------+
|1902|[Trip to the Moon, A (Voyage dans la lune, Le) (1902)]                                                                                                   |
|1915|[Birth of a Nation, The (1915), Camille Claudel 1915 (2013)]                                                                                             |
|1916|[Intolerance: Love's Struggle Throughout the Ages (1916)]                                                                                                |
|1917|[Immigrant, The (1917)]     

How many movies are released each year?

In [0]:
spark.sql('select year, count(title) as `movie count` from movies_sql where year is not null group by year order by `movie count` desc ').show(5)

+----+-----------+
|year|movie count|
+----+-----------+
|2000|        264|
|1996|        263|
|1997|        256|
|1995|        256|
|1998|        255|
+----+-----------+
only showing top 5 rows



How many number of movies are there for each rating?

In [0]:
spark.sql('select rating, count(movieid) as count from ratings_sql group by rating order by rating').show()

+------+-----+
|rating|count|
+------+-----+
|   0.5| 1101|
|   1.0| 3326|
|   1.5| 1687|
|   2.0| 7271|
|   2.5| 4449|
|   3.0|20064|
|   3.5|10538|
|   4.0|28750|
|   4.5| 7723|
|   5.0|15095|
+------+-----+



In [0]:
ratings.groupBy('rating').count().orderBy('rating').show()

+------+-----+
|rating|count|
+------+-----+
|   0.5| 1101|
|   1.0| 3326|
|   1.5| 1687|
|   2.0| 7271|
|   2.5| 4449|
|   3.0|20064|
|   3.5|10538|
|   4.0|28750|
|   4.5| 7723|
|   5.0|15095|
+------+-----+



How many users have rated each movie?

In [0]:
spark.sql('select ms.title, count(userid) as `user count` from movies_sql ms inner join ratings_sql rs on ms.movieid = rs.movieid group by ms.movieid, title order by `user count` desc').show(5)

+--------------------+----------+
|               title|user count|
+--------------------+----------+
| Forrest Gump (1994)|       341|
| Pulp Fiction (1994)|       324|
|Shawshank Redempt...|       311|
|Silence of the La...|       304|
|Star Wars: Episod...|       291|
+--------------------+----------+
only showing top 5 rows



In [0]:
movies.join(ratings, on='movieid', how='inner').\
groupBy('movieid', 'title').count().orderBy('count', ascending=False).show(5)

+-------+--------------------+-----+
|movieid|               title|count|
+-------+--------------------+-----+
|    356| Forrest Gump (1994)|  341|
|    296| Pulp Fiction (1994)|  324|
|    318|Shawshank Redempt...|  311|
|    593|Silence of the La...|  304|
|    260|Star Wars: Episod...|  291|
+-------+--------------------+-----+
only showing top 5 rows



What is the total rating for each movie?

In [0]:
spark.sql('select ms.title, sum(rating) as `total rating` from movies_sql ms inner join ratings_sql rs on ms.movieid = rs.movieid group by ms.movieid, title order by `total rating` desc').show(5)

+--------------------+------------+
|               title|total rating|
+--------------------+------------+
|Shawshank Redempt...|      1395.5|
| Forrest Gump (1994)|      1382.5|
| Pulp Fiction (1994)|      1379.0|
|Silence of the La...|      1258.0|
|Star Wars: Episod...|      1228.5|
+--------------------+------------+
only showing top 5 rows



In [0]:
movies.join(ratings, on='movieid', how='inner').groupby('movieid', 'title').sum('rating').orderBy('sum(rating)').\
select('title',F.col('sum(rating)').alias('total rating')).show(5)

+--------------------+------------+
|               title|total rating|
+--------------------+------------+
|First Day of the ...|         0.5|
|Tunnel, The (Tunn...|         0.5|
|   100 Rifles (1969)|         0.5|
|Indestructible Ma...|         0.5|
|     Trespass (2011)|         0.5|
+--------------------+------------+
only showing top 5 rows



What is the average rating for each movie?

In [0]:
spark.sql('select ms.title, mean(rating) as `avg rating` from movies_sql ms inner join ratings_sql rs on ms.movieid = rs.movieid group by ms.movieid, title order by `avg rating` desc, title asc').show(5)

+--------------------+----------+
|               title|avg rating|
+--------------------+----------+
|'night Mother (1986)|       5.0|
| 10 Attitudes (2001)|       5.0|
|    16 Wishes (2010)|       5.0|
| 29th and Gay (2005)|       5.0|
|3 Women (Three Wo...|       5.0|
+--------------------+----------+
only showing top 5 rows



In [0]:
movies.join(ratings, on='movieid', how='inner').\
groupby('movieid', 'title').mean('rating').orderBy(['avg(rating)', 'title'], ascending=[False, True]).select('title', F.col('avg(rating)').alias('avg rating')).show(5)

+--------------------+----------+
|               title|avg rating|
+--------------------+----------+
|'night Mother (1986)|       5.0|
| 10 Attitudes (2001)|       5.0|
|    16 Wishes (2010)|       5.0|
| 29th and Gay (2005)|       5.0|
|3 Women (Three Wo...|       5.0|
+--------------------+----------+
only showing top 5 rows

