In [5]:
from pyspark.sql.types import StructType, LongType,StringType, IntegerType, DoubleType

movieSchema = StructType()\
         .add("movieId", IntegerType(), True)\
         .add("title", StringType(), True)\
         .add("genres", StringType(), True)\
         
movieDf = spark.read.format("csv")\
          .option("header", True)\
          .schema(movieSchema)\
          .load("s3://gk-movielens2/movies.csv")

movieDf.show(2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+----------------+--------------------+
|movieId|           title|              genres|
+-------+----------------+--------------------+
|      1|Toy Story (1995)|Adventure|Animati...|
|      2|  Jumanji (1995)|Adventure|Childre...|
+-------+----------------+--------------------+
only showing top 2 rows

In [6]:
ratingSchema = StructType()\
         .add("userId", IntegerType(), True)\
         .add("movieId", IntegerType(), True)\
         .add("rating", DoubleType(), True)\
         .add("timestamp", StringType(), True)


ratingDf = spark.read.format("csv")\
          .option("header", True)\
          .schema(ratingSchema)\
          .load("s3://gk-movielens2/ratings.csv")

ratingDf.show(2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
+------+-------+------+---------+
only showing top 2 rows

In [7]:
# out of all 4 columns, we pick below 2 columns
df2 = ratingDf.select("movieId", "rating")
df2.show(2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+------+
|movieId|rating|
+-------+------+
|      1|   4.0|
|      3|   4.0|
+-------+------+
only showing top 2 rows

In [8]:
# count

print("Count ", ratingDf.count())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

('Count ', 100836)

In [9]:
# to get all columns
print("Columns", ratingDf.columns)
# schema
print(ratingDf.schema)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

('Columns', ['userId', 'movieId', 'rating', 'timestamp'])
StructType(List(StructField(userId,IntegerType,true),StructField(movieId,IntegerType,true),StructField(rating,DoubleType,true),StructField(timestamp,StringType,true)))

In [10]:

movieDf.take(2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(movieId=1, title=u'Toy Story (1995)', genres=u'Adventure|Animation|Children|Comedy|Fantasy'), Row(movieId=2, title=u'Jumanji (1995)', genres=u'Adventure|Children|Fantasy')]

In [11]:
# add new columns/drive new columns from existing data
df3 = ratingDf.withColumn("rating_adjusted", ratingDf.rating + .2  )
df3.show(1)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+-------+------+---------+---------------+
|userId|movieId|rating|timestamp|rating_adjusted|
+------+-------+------+---------+---------------+
|     1|      1|   4.0|964982703|            4.2|
+------+-------+------+---------+---------------+
only showing top 1 row

In [12]:
# rename the column in the df
# existing col, new column, create new data frame
df2 = ratingDf.withColumnRenamed("rating", "ratings")
df2.show(1)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+-------+-------+---------+
|userId|movieId|ratings|timestamp|
+------+-------+-------+---------+
|     1|      1|    4.0|964982703|
+------+-------+-------+---------+
only showing top 1 row

In [14]:
# select variance
# select all columns
df2 = ratingDf.select("*")
df2.show(1)
df2 = ratingDf.select("movieId", "rating")
df2.show(1)
# use .alias to give a name
df2 = ratingDf.select(ratingDf.userId, 
                     (ratingDf.rating + 0.2).alias("rating_adjusted") )
df2.show(1)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
+------+-------+------+---------+
only showing top 1 row

+-------+------+
|movieId|rating|
+-------+------+
|      1|   4.0|
+-------+------+
only showing top 1 row

+------+---------------+
|userId|rating_adjusted|
+------+---------------+
|     1|            4.2|
+------+---------------+
only showing top 1 row

In [15]:
# filter, apply predicates/conditions
# filter, where functions. where is an alias of filter, both are same
df2 = ratingDf.filter(ratingDf.rating > 4)
df2.show(3)

df2 = ratingDf.where(ratingDf.rating > 4)
df2.show(3)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|    101|   5.0|964980868|
+------+-------+------+---------+
only showing top 3 rows

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|    101|   5.0|964980868|
+------+-------+------+---------+
only showing top 3 rows

In [16]:
# multiple conditions
df2 = ratingDf.filter( (ratingDf.rating >=3) & (ratingDf.rating <=4))
df2.show(4)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     70|   3.0|964982400|
+------+-------+------+---------+
only showing top 4 rows

In [17]:
# Spark SQL and condition
df2 = ratingDf.filter( "rating >= 3 AND rating <= 4" )
df2.show(4)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     70|   3.0|964982400|
+------+-------+------+---------+
only showing top 4 rows

In [21]:
# Won't work
#ratingDf.filter( "rating"  >=3).show(4)

from pyspark.sql.functions import col
ratingDf.filter( col("rating") >=3).show(4)

print(ratingDf.rating, col("rating")) # both are same, Column type

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
+------+-------+------+---------+
only showing top 4 rows

(Column<rating>, Column<rating>)

In [22]:
from pyspark.sql.functions import col, asc, desc
# sort data by ascending order/ default
df2 = ratingDf.sort("rating")
df2.show(5)
# sort data by ascending by explitly
df2 = ratingDf.sort(asc("rating"))
df2.show(5)
# sort data by descending order
df2 = ratingDf.sort(desc("rating"))
df2.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     3|     31|   0.5|1306463578|
|     3|    914|   0.5|1306463567|
|     3|    527|   0.5|1306464275|
|     3|    688|   0.5|1306464228|
|     3|    720|   0.5|1306463595|
+------+-------+------+----------+
only showing top 5 rows

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     3|     31|   0.5|1306463578|
|     3|    914|   0.5|1306463567|
|     3|    527|   0.5|1306464275|
|     3|    688|   0.5|1306464228|
|     3|    720|   0.5|1306463595|
+------+-------+------+----------+
only showing top 5 rows

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|    101|   5.0|964980868|
|     1|    216|   5.0|964981208|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
|     1|     47|   5.0|964983815|
+------+-------+------+---------+
only showing t

In [28]:
# aggregation count
from pyspark.sql.functions import col, desc, avg, count
# count, groupBy
# a movie, rated by more users, dones't count avg rating
# filter, ensure that total_ratings >= 100 users
mostPopularDf = ratingDf\
                .groupBy("movieId")\
                .agg(count("userId"))\
                .withColumnRenamed("count(userId)", "total_ratings")\
                .filter(col("total_ratings") >= 100)\
                .sort(desc("total_ratings"))
                

mostPopularDf.show(200)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-------------+
|movieId|total_ratings|
+-------+-------------+
|    356|          329|
|    318|          317|
|    296|          307|
|    593|          279|
|   2571|          278|
|    260|          251|
|    480|          238|
|    110|          237|
|    589|          224|
|    527|          220|
|   2959|          218|
|      1|          215|
|   1196|          211|
|     50|          204|
|   2858|          204|
|     47|          203|
|    780|          202|
|    150|          201|
|   1198|          200|
|   4993|          198|
|   1210|          196|
|    858|          192|
|    457|          190|
|    592|          189|
|   5952|          188|
|   2028|          188|
|   7153|          185|
|    588|          183|
|    608|          181|
|   2762|          179|
|    380|          178|
|     32|          177|
|    364|          172|
|   1270|          171|
|    377|          171|
|   4306|          170|
|   3578|          170|
|   1580|          165|
|    590|       

In [31]:
# join mostPopularmovie with movieDf, to get the title of the movie
mostPopularMoviesDf = mostPopularDf\
                      .join(movieDf, 
                            movieDf.movieId == mostPopularDf.movieId)\
                      .select(mostPopularDf.movieId, "title", "total_ratings")



mostPopularMoviesDf.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------------------+-------------+
|movieId|               title|total_ratings|
+-------+--------------------+-------------+
|    356| Forrest Gump (1994)|          329|
|    318|Shawshank Redempt...|          317|
|    296| Pulp Fiction (1994)|          307|
|    593|Silence of the La...|          279|
|   2571|  Matrix, The (1999)|          278|
+-------+--------------------+-------------+
only showing top 5 rows

In [32]:
# perform two aggregates, count, avg, 

# aggregation of count of number of votes, +
# aggregation of avg voting
from pyspark.sql.functions import col, desc, avg, count
# count, groupBy
# a movie, rated by more users, dones't count avg rating
# filter, ensure that total_ratings >= 100 users
mostPopularDf = ratingDf\
                .groupBy("movieId")\
                .agg(count("userId").alias("total_ratings"), 
                     avg("rating").alias("avg_rating") )\
                .filter( (col("total_ratings") >= 100) &
                         (col("avg_rating") >= 3))\
                .sort(desc("total_ratings"))
                
mostPopularDf.show(200)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-------------+------------------+
|movieId|total_ratings|        avg_rating|
+-------+-------------+------------------+
|    356|          329| 4.164133738601824|
|    318|          317| 4.429022082018927|
|    296|          307| 4.197068403908795|
|    593|          279| 4.161290322580645|
|   2571|          278| 4.192446043165468|
|    260|          251| 4.231075697211155|
|    480|          238|              3.75|
|    110|          237| 4.031645569620253|
|    589|          224| 3.970982142857143|
|    527|          220|             4.225|
|   2959|          218| 4.272935779816514|
|      1|          215|3.9209302325581397|
|   1196|          211|4.2156398104265405|
|     50|          204| 4.237745098039215|
|   2858|          204| 4.056372549019608|
|     47|          203|3.9753694581280787|
|    780|          202|3.4455445544554455|
|    150|          201| 3.845771144278607|
|   1198|          200|            4.2075|
|   4993|          198| 4.106060606060606|
|   1210|  

In [33]:
# join mostPopularmovie with movieDf, to get the title of the movie
mostPopularMoviesDf = mostPopularDf\
                      .join(movieDf, 
                            movieDf.movieId == mostPopularDf.movieId)\
                      .select(mostPopularDf.movieId, "title", "total_ratings", "avg_rating")



mostPopularMoviesDf.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------------------+-------------+-----------------+
|movieId|               title|total_ratings|       avg_rating|
+-------+--------------------+-------------+-----------------+
|    356| Forrest Gump (1994)|          329|4.164133738601824|
|    318|Shawshank Redempt...|          317|4.429022082018927|
|    296| Pulp Fiction (1994)|          307|4.197068403908795|
|    593|Silence of the La...|          279|4.161290322580645|
|   2571|  Matrix, The (1999)|          278|4.192446043165468|
+-------+--------------------+-------------+-----------------+
only showing top 5 rows