# PySpark Tour

In [1]:
pip install pyspark

You should consider upgrading via the '/Users/priyankabharti/Documents/Projects/MOVIELENS-PYSPARK/.venv/bin/python -m pip install --upgrade pip' command.[0m
Note: you may need to restart the kernel to use updated packages.


In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MovieLens") \
    .getOrCreate()

spark.version


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/08 16:43:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


'4.0.1'

### Check if spark is working fine

In [3]:
spark.range(5).show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



### Read Dataframe

In [35]:
ratings_df = spark.read.csv("data/ratings.csv", header=True, inferSchema=True)
ratings_df.show()

+------+-------+-------+----------+
|userId|movieId|ratings| timestamp|
+------+-------+-------+----------+
|     1|      2|    3.5|1112486027|
|     1|     29|    3.5|1112484676|
|     1|     32|    3.5|1112484819|
|     1|     47|    3.5|1112484727|
|     1|     50|    3.5|1112484580|
|     1|    112|    3.5|1094785740|
|     1|    151|    4.0|1094785734|
|     1|    223|    4.0|1112485573|
|     1|    253|    4.0|1112484940|
|     1|    260|    4.0|1112484826|
|     1|    293|    4.0|1112484703|
|     1|    296|    4.0|1112484767|
|     1|    318|    4.0|1112484798|
|     1|    337|    3.5|1094785709|
|     1|    367|    3.5|1112485980|
|     1|    541|    4.0|1112484603|
|     1|    589|    3.5|1112485557|
|     1|    593|    3.5|1112484661|
|     1|    653|    3.0|1094785691|
|     1|    919|    3.5|1094785621|
+------+-------+-------+----------+
only showing top 20 rows


In [36]:
ratings_df.head(5)

[Row(userId=1, movieId=2, ratings=3.5, timestamp=1112486027),
 Row(userId=1, movieId=29, ratings=3.5, timestamp=1112484676),
 Row(userId=1, movieId=32, ratings=3.5, timestamp=1112484819),
 Row(userId=1, movieId=47, ratings=3.5, timestamp=1112484727),
 Row(userId=1, movieId=50, ratings=3.5, timestamp=1112484580)]

In [37]:
ratings_df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- ratings: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [38]:
movies_df = spark.read.csv("data/movies.csv", header=True, inferSchema=True)
movies_df.show()

+-------+--------------------+--------------------+
|movieId|              titles|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|Adventure|Animati...|
|     14|        Nixon (1995)|               Drama|
|     15|Cutthroat Island ...|Action|Adventure|...|
|     16|       Casino (1995)|         Crime|Drama|
|     17|Sen

In [39]:
movies_df.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- titles: string (nullable = true)
 |-- genres: string (nullable = true)



In [40]:
movies_df.schema["titles"].dataType

StringType()

In [41]:
movies_df.select("titles").show(5)

+--------------------+
|              titles|
+--------------------+
|    Toy Story (1995)|
|      Jumanji (1995)|
|Grumpier Old Men ...|
|Waiting to Exhale...|
|Father of the Bri...|
+--------------------+
only showing top 5 rows


In [42]:
movies_df.orderBy("movieId").select("titles").tail(5)

[Row(titles="Kein Bund f√ºr's Leben (2007)"),
 Row(titles='Feuer, Eis & Dosenbier (2002)'),
 Row(titles='The Pirates (2014)'),
 Row(titles='Rentun Ruusu (2001)'),
 Row(titles='Innocence (2014)')]

In [43]:
five_star_movies_df = ratings_df.filter(ratings_df["ratings"]== 5) 
five_star_movies_df.show(5)

+------+-------+-------+----------+
|userId|movieId|ratings| timestamp|
+------+-------+-------+----------+
|     1|   4993|    5.0|1112484682|
|     1|   5952|    5.0|1112484619|
|     1|   7153|    5.0|1112484633|
|     1|   8507|    5.0|1094786027|
|     2|     62|    5.0| 974820598|
+------+-------+-------+----------+
only showing top 5 rows


In [44]:
from pyspark.sql.functions import col

In [45]:
ratings_df \
    .filter(col("ratings") == 5) \
    .select("movieId", "timestamp") \
    .orderBy(col("timestamp").desc()) \
    .show(5)

+-------+----------+
|movieId| timestamp|
+-------+----------+
| 112852|1427741102|
| 112552|1427738795|
| 116797|1427738764|
| 104374|1427738762|
| 109487|1427738756|
+-------+----------+
only showing top 5 rows


In [46]:
from pyspark.sql.functions import max, col

ratings_df \
    .filter(col("ratings") == 5) \
    .groupBy("movieId") \
    .agg(max("timestamp").alias("latest_timestamp")) \
    .show(5)

+-------+----------------+
|movieId|latest_timestamp|
+-------+----------------+
|   1580|      1391462380|
|   1591|      1230782724|
|    471|      1376794048|
|   8638|      1427471611|
|   2366|      1337619094|
+-------+----------------+
only showing top 5 rows


In [47]:
from pyspark.sql.functions import max, col

max_ts = ratings_df \
    .filter(col("ratings") == 5) \
    .select(max("timestamp").alias("max_timestamp"))

latest_five_star_df = ratings_df \
    .filter(col("ratings") == 5) \
    .join(max_ts, ratings_df.timestamp == max_ts.max_timestamp) \
    .select("movieId", "timestamp")

latest_five_star_df.show()

+-------+----------+
|movieId| timestamp|
+-------+----------+
| 112852|1427741102|
+-------+----------+



In [48]:
from pyspark.sql.functions import from_unixtime

ratings_df \
    .select("timestamp", from_unixtime("timestamp").alias("readable_time")) \
    .show(5, truncate=False)


+----------+-------------------+
|timestamp |readable_time      |
+----------+-------------------+
|1112486027|2005-04-03 01:53:47|
|1112484676|2005-04-03 01:31:16|
|1112484819|2005-04-03 01:33:39|
|1112484727|2005-04-03 01:32:07|
|1112484580|2005-04-03 01:29:40|
+----------+-------------------+
only showing top 5 rows
