In [27]:
import findspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql import types

findspark.init()

spark: SparkSession = SparkSession.builder \
    .appName("JupyterLocalSpark") \
    .master("local[*]") \
    .getOrCreate()

sc: SparkContext = spark.sparkContext

In [39]:
movies_schema = types.StructType([ \
    types.StructField("movieID", types.IntegerType(), nullable=False),
    types.StructField("movieName", types.StringType(), nullable=False),
])

df_movies = spark.read.option("sep", '|').schema(movies_schema).csv("data/ml-100k/u.item")
df_movies.show(5)

+-------+-----------------+
|movieID|        movieName|
+-------+-----------------+
|      1| Toy Story (1995)|
|      2| GoldenEye (1995)|
|      3|Four Rooms (1995)|
|      4|Get Shorty (1995)|
|      5|   Copycat (1995)|
+-------+-----------------+
only showing top 5 rows



In [40]:
schema = types.StructType([ \
    types.StructField("userID", types.IntegerType(), False), \
    types.StructField("movieID", types.IntegerType(), False), \
    types.StructField("rating", types.IntegerType(), True), \
    types.StructField("timestamp", types.LongType(), True)
])

df_reviews = spark.read.option("sep", "\t").schema(schema).csv("data/ml-100k/u.data")
df_reviews.show(5)

+------+-------+------+---------+
|userID|movieID|rating|timestamp|
+------+-------+------+---------+
|   196|    242|     3|881250949|
|   186|    302|     3|891717742|
|    22|    377|     1|878887116|
|   244|     51|     2|880606923|
|   166|    346|     1|886397596|
+------+-------+------+---------+
only showing top 5 rows



In [32]:
df_reviews = df_reviews.join(func.broadcast(df_movies), on="movieID")
df_reviews.show(5)

+-------+------+------+---------+--------------------+
|movieID|userID|rating|timestamp|           movieName|
+-------+------+------+---------+--------------------+
|    242|   196|     3|881250949|        Kolya (1996)|
|    302|   186|     3|891717742|L.A. Confidential...|
|    377|    22|     1|878887116| Heavyweights (1994)|
|     51|   244|     2|880606923|Legends of the Fa...|
|    346|   166|     1|886397596| Jackie Brown (1997)|
+-------+------+------+---------+--------------------+
only showing top 5 rows



In [38]:
df_reviews = df_reviews \
    .select("movieName", "movieID") \
    .groupBy("movieName", "movieID") \
    .count().sort(func.desc("count")) 

df_reviews.show(5)

+--------------------+-------+-----+
|           movieName|movieID|count|
+--------------------+-------+-----+
|    Star Wars (1977)|     50|  583|
|      Contact (1997)|    258|  509|
|        Fargo (1996)|    100|  508|
|Return of the Jed...|    181|  507|
|    Liar Liar (1997)|    294|  485|
+--------------------+-------+-----+
only showing top 5 rows

