<a href="https://colab.research.google.com/github/vu-bigdata-2020/lectures/blob/master/06_mapreduce_and_spark/example/spark-example/imdb-example.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# IMDB File Processing

This example shows the basic RDD processing.



In [0]:
#Basic installation requires you to setup java. Colab does not have java - so we install.
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

In [12]:
#install spark. we are using the one that uses hadoop as the underlying scheduler.
!wget -q https://downloads.apache.org/spark//spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!ls -l


total 454484
-rw-r--r--  1 root root    308767 Mar 16 19:54 imdb.csv
drwxr-xr-x  1 root root      4096 Mar  3 18:11 sample_data
drwxr-xr-x 13 1000 1000      4096 Feb  2 19:47 spark-2.4.5-bin-hadoop2.7
-rw-r--r--  1 root root 232530699 Feb  2 20:27 spark-2.4.5-bin-hadoop2.7.tgz
-rw-r--r--  1 root root 232530699 Feb  2 20:27 spark-2.4.5-bin-hadoop2.7.tgz.1


In [0]:
#Provides findspark.init() to make pyspark importable as a regular library.
os.environ["SPARK_HOME"] = "spark-2.4.5-bin-hadoop2.7"
!pip install -q findspark
import findspark
findspark.init()

In [0]:
#The entry point to using Spark SQL is an object called SparkSession. It initiates a Spark Application which all the code for that Session will run on
# to larn more see https://blog.knoldus.com/spark-why-should-we-use-sparksession/
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Learning_Spark") \
    .getOrCreate()

# Now the installation is complete. And we will start with processing.



In [0]:
#upload the ibmdb movie data. It is in the data folder.
#https://github.com/vu-bigdata-2020/lectures/tree/master/06_mapreduce_and_spark/example/spark-example/data
from google.colab import files
files.upload()

In [0]:
#this will infer the schema -- for example column names. It creates an RDD
data = spark.read.csv('imdb.csv',inferSchema=True, header=True)

In [21]:
#just shows the basic count of rows and number of columns
data.count(), len(data.columns)

(1000, 12)

In [22]:
#To view a DataFrame, use the .show() method:
data.show(5)

+----+--------------------+--------------------+--------------------+--------------------+--------------------+----+-----------------+------+------+------------------+---------+
|Rank|               Title|               Genre|         Description|            Director|              Actors|Year|Runtime (Minutes)|Rating| Votes|Revenue (Millions)|Metascore|
+----+--------------------+--------------------+--------------------+--------------------+--------------------+----+-----------------+------+------+------------------+---------+
|   1|Guardians of the ...|Action,Adventure,...|A group of interg...|          James Gunn|Chris Pratt, Vin ...|2014|              121|   8.1|757074|            333.13|     76.0|
|   2|          Prometheus|Adventure,Mystery...|Following clues t...|        Ridley Scott|Noomi Rapace, Log...|2012|              124|     7|485820|            126.46|     65.0|
|   3|               Split|     Horror,Thriller|Three girls are k...|  M. Night Shyamalan|James McAvoy, Any...

In [25]:
#to see data schema
data.printSchema()

root
 |-- Rank: integer (nullable = true)
 |-- Title: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Director: string (nullable = true)
 |-- Actors: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Runtime (Minutes): string (nullable = true)
 |-- Rating: string (nullable = true)
 |-- Votes: string (nullable = true)
 |-- Revenue (Millions): double (nullable = true)
 |-- Metascore: double (nullable = true)



In [26]:
#We can also selectively choose which columns we want to display with the .select() method
data.select("Title","Genre","Director","Year").show(5, truncate=False) #truncate=False parameter that adjusts the size of columns to prevent values from being cut off.

+-----------------------+------------------------+--------------------+----+
|Title                  |Genre                   |Director            |Year|
+-----------------------+------------------------+--------------------+----+
|Guardians of the Galaxy|Action,Adventure,Sci-Fi |James Gunn          |2014|
|Prometheus             |Adventure,Mystery,Sci-Fi|Ridley Scott        |2012|
|Split                  |Horror,Thriller         |M. Night Shyamalan  |2016|
|Sing                   |Animation,Comedy,Family |Christophe Lourdelet|2016|
|Suicide Squad          |Action,Adventure,Fantasy|David Ayer          |2016|
+-----------------------+------------------------+--------------------+----+
only showing top 5 rows



In [27]:
#summary statistics
data.describe(["Runtime (Minutes)","Rating","Title"]).show()

+-------+--------------------+------------------+--------------------+
|summary|   Runtime (Minutes)|            Rating|               Title|
+-------+--------------------+------------------+--------------------+
|  count|                1000|              1000|                1000|
|   mean|  126.65829145728644|15.428728728728741|   635.6666666666666|
| stddev|  160.08961216341075|126.98048279758784|   860.1559548515994|
|    min| teamed up on a j...|               1.9|(500) Days of Summer|
|    max|Taraneh Alidoosti...| Baltasar Kormákur|            Zootopia|
+-------+--------------------+------------------+--------------------+



In [28]:
#we can rename columns
df2 = data.withColumnRenamed("Runtime (Minutes)","runtime").withColumnRenamed("Revenue (Millions)","revenue")
df2.printSchema()

root
 |-- Rank: integer (nullable = true)
 |-- Title: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Director: string (nullable = true)
 |-- Actors: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- runtime: string (nullable = true)
 |-- Rating: string (nullable = true)
 |-- Votes: string (nullable = true)
 |-- revenue: double (nullable = true)
 |-- Metascore: double (nullable = true)



In [29]:
#group by is a transformation 
# https://backtobazics.com/big-data/spark/apache-spark-groupby-example/
df2.groupBy("Director").count().orderBy("count", ascending=False).show(10)

+------------------+-----+
|          Director|count|
+------------------+-----+
|      Ridley Scott|    8|
|M. Night Shyamalan|    6|
|       David Yates|    6|
|       Michael Bay|    6|
|Paul W.S. Anderson|    6|
|       Danny Boyle|    5|
|       Zack Snyder|    5|
|       J.J. Abrams|    5|
|        Justin Lin|    5|
|     Antoine Fuqua|    5|
+------------------+-----+
only showing top 10 rows



In [31]:
#filtering -- another transformation
condition1 = (df2.runtime.isNotNull()) |  (df2.revenue.isNotNull())
df3 = df2.filter(condition1).show(5)

+----+--------------------+--------------------+--------------------+--------------------+--------------------+----+-------+------+------+-------+---------+
|Rank|               Title|               Genre|         Description|            Director|              Actors|Year|runtime|Rating| Votes|revenue|Metascore|
+----+--------------------+--------------------+--------------------+--------------------+--------------------+----+-------+------+------+-------+---------+
|   1|Guardians of the ...|Action,Adventure,...|A group of interg...|          James Gunn|Chris Pratt, Vin ...|2014|    121|   8.1|757074| 333.13|     76.0|
|   2|          Prometheus|Adventure,Mystery...|Following clues t...|        Ridley Scott|Noomi Rapace, Log...|2012|    124|     7|485820| 126.46|     65.0|
|   3|               Split|     Horror,Thriller|Three girls are k...|  M. Night Shyamalan|James McAvoy, Any...|2016|    117|   7.3|157606| 138.12|     62.0|
|   4|                Sing|Animation,Comedy,...|In a city 

In [0]:
df4=df3.filter(title.isNotNull)

In [0]:
#Zip an RDD with its element indices.
#https://amplab-extras.github.io/SparkR-pkg/rdocs/1.2/zipWithIndex.html
rdd=df2.rdd.zipWithIndex()
print(rdd.take(2))

In [0]:
print('RDD Count:', rdd.count())
print('RDD Collect as map:',  rdd.collectAsMap())
print('RDD Num Partitions:', rdd.getNumPartitions())