Before you submit your work on Blackboard, make sure everything runs as expected. First, **restart the kernel** (in the menubar, select Kernel$\rightarrow$Restart) and then **run all cells** (in the menubar, select Cell$\rightarrow$Run All).

Make sure you fill in any place that says `YOUR CODE HERE` or "YOUR ANSWER HERE", as well as your name, student id (including the '@'), and the names of the other people in your group in the collaborators field (different names should be separated by semicolons ';') below:

In [1]:
NAME = "Karishma Prasad"
COLLABORATORS = ""

---
# Spark SQL

We will use the historical World cup player dataset (available alongside this notebook from Blackboard) which is in JSON format. You should run this notebook on your VM, and it is expected that you will place this file on the local system in `/home/cloudera/Downloads/`. First we will check that you're running on the VM.

In [2]:
assert sc.version == "1.6.0", "This notebook must be run on the VM (use of Spark 1.x is assumed)"

Now we need to do some imports to have access to the SQL context.

In [3]:
from pyspark.sql import SQLContext
sqlCtx = SQLContext(sc)

The cell below reads the data into a dataframe named `playersDF`.

In [4]:
assert os.path.exists('/home/cloudera/Downloads/all-world-cup-players.json'), "You need to download the all-world-cup-players.json file into the /home/cloudera/Downloads directory on your VM"

# Reformat the JSON for Spark 1.x (which assumes one JSON record per line)
import json

if os.path.exists('/home/cloudera/Downloads/all-world-cup-players.reformat.json'):
    os.remove('/home/cloudera/Downloads/all-world-cup-players.reformat.json')

with open('/home/cloudera/Downloads/all-world-cup-players.json') as jsonfile:
    js = json.load(jsonfile)
    
with open('/home/cloudera/Downloads/all-world-cup-players.reformat.json', 'w') as outfile:
    for record in js:
        json.dump(record,outfile)
        outfile.write('\n')
    
# Now read the reformatted data in  
playersDF = sqlCtx.read.json('file:/home/cloudera/Downloads/all-world-cup-players.reformat.json')

assert playersDF.count() == 9443, "Something has gone wrong with the reading process"

We will now explore three different ways to extract the same information from the data. 
1. Via DataFrames directly
2. Via Views
3. Via RDDs

Let's start with the DataFrames. Use DataFrame operations to extact `teamNamesFromDF` to extract all the team names from 2014 (only). (You may want to look at the DataFrame you have read in first.)

In [5]:
# ----------------------------------------------
# 1.a Checking the Schema of the dataframe 
# ----------------------------------------------
playersDF.printSchema

#raise NotImplementedError()

<bound method DataFrame.printSchema of DataFrame[Club: string, ClubCountry: string, Competition: string, DateOfBirth: string, FullName: string, IsCaptain: boolean, Number: string, Position: string, Team: string, Year: bigint]>

In [6]:
# ----------------------------------------------
# 1.b. Checking the dataframe by using .show()
# ----------------------------------------------
playersDF.show(5)

+--------------------+-----------+-----------+-----------+-----------------+---------+------+--------+---------+----+
|                Club|ClubCountry|Competition|DateOfBirth|         FullName|IsCaptain|Number|Position|     Team|Year|
+--------------------+-----------+-----------+-----------+-----------------+---------+------+--------+---------+----+
|Club AtlÃ©tico Ta...|  Argentina|  World Cup|   1905-5-5|     Ãngel Bossio|    false|      |      GK|Argentina|1930|
|Quilmes AtlÃ©tico...|  Argentina|  World Cup| 1908-10-23|     Juan Botasso|    false|      |      GK|Argentina|1930|
|          Boca Junio|  Argentina|  World Cup|  1907-2-23|   Roberto Cherro|    false|      |      FW|Argentina|1930|
|Central Norte TucumÃ|  Argentina|  World Cup|  1907-2-23|Alberto Chividini|    false|      |      DF|Argentina|1930|
|Club Atletico Est...|  Argentina|  World Cup|  1909-3-19|                 |    false|    10|      FW|Argentina|1930|
+--------------------+-----------+-----------+----------

In [7]:
# -----------------------------------------------------
# 1.c. Extracting teams name from 2014 using distinct()
# -----------------------------------------------------

teamNamesFromDF = playersDF.select("Team", "Year").where("Year = 2014").distinct()
teamNamesFromDF.show(2)     #Checking the output
teamNamesFromDF.count()     # Number of records

+--------------------+----+
|                Team|Year|
+--------------------+----+
|              Russia|2014|
|Bosnia and Herzeg...|2014|
+--------------------+----+
only showing top 2 rows



32

In [8]:
from pyspark.sql import DataFrame
assert isinstance(teamNamesFromDF, DataFrame), "Your answer should be a dataframe"
assert teamNamesFromDF.count() == 32, "Unexpected number of teams"

Now do the same via constructing a "table" called `players` from the data, using an sql query, and naming the resulting DataFrame `teamNamesFromTable`.

In [9]:
# -----------------------------------------------------
# 3. Table contruction 
# -----------------------------------------------------
playersDF.registerTempTable("players")        # Creating table called 'players'
'players' in sqlCtx.tableNames()              # To check if the table exists
teamNamesFromTable = sqlCtx.sql("""                    
                           SELECT DISTINCT Team,Year
                           FROM players             
                           WHERE Year=2014
                           """)                # SQL query to get distinct team names of year 2014
teamNamesFromTable.show(10)

+--------------------+----+
|                Team|Year|
+--------------------+----+
|              Russia|2014|
|Bosnia and Herzeg...|2014|
|       United States|2014|
|             Germany|2014|
|              Mexico|2014|
|             Nigeria|2014|
|                Iran|2014|
|         Ivory Coast|2014|
|         South Korea|2014|
|              Brazil|2014|
+--------------------+----+
only showing top 10 rows



In [10]:
# Check the table was created
assert ('players' in sqlCtx.tableNames()), "You have either not created your table or you have named it something other than players"
assert teamNamesFromTable.count() == 32, "Unexpected number of teams"

Your third implementation should go via RDDs: i.e. you'll need to create a (Row) RDD from the data, perform `.map`, `.filter` etc operations to obtain an RDD wthe same result using RDDs. Your resulting RDD should be named teamNamesFromRDD.

In [11]:
# -----------------------------------------------------
# 3.a. Converting Data frame to RDD
# -----------------------------------------------------
myRDD = playersDF.rdd     # Converion of Dataframe to RDD
myRDD.take(2)             # Checking the output using .take()

#raise NotImplementedError()

[Row(Club=u'Club Atl\xc3\xa9tico Talleres de Remedios de Escalada', ClubCountry=u'Argentina', Competition=u'World Cup', DateOfBirth=u'1905-5-5', FullName=u'\xc3ngel Bossio', IsCaptain=False, Number=u'', Position=u'GK', Team=u'Argentina', Year=1930),
 Row(Club=u'Quilmes Atl\xc3\xa9tico Club', ClubCountry=u'Argentina', Competition=u'World Cup', DateOfBirth=u'1908-10-23', FullName=u'Juan Botasso', IsCaptain=False, Number=u'', Position=u'GK', Team=u'Argentina', Year=1930)]

In [12]:
# -----------------------------------------------------
# 3.b. Limiting RDD with team names and year extracedRDD
# -----------------------------------------------------
extractedRDD = myRDD.map (lambda record: [record[i] for i in [8, 9]])  #RDD with names and year
extractedRDD.take(5)                                                

[[u'Argentina', 1930],
 [u'Argentina', 1930],
 [u'Argentina', 1930],
 [u'Argentina', 1930],
 [u'Argentina', 1930]]

In [13]:
# -----------------------------------------------------
# 3.b. Limiting RDD with team names teamNamesFromRDD
# -----------------------------------------------------
filteredRDD = extractedRDD.filter(lambda keyValue: keyValue[1] == 2014) #Year = 2014
teamNamesFromRDD = filteredRDD.keys().distinct()  # To get distinct key values
teamNamesFromRDD.take(2)                          # Checking output                 

[u'Brazil', u'Italy']

In [14]:
from pyspark.rdd import RDD
assert isinstance(teamNamesFromRDD, RDD), "Your result should be an RDD"
assert teamNamesFromRDD.count() == 32, "Unexpected number of teams"

lineage = teamNamesFromRDD.toDebugString()
assert 'MapPartitionsRDD' in lineage, "Did you really manage to answer this question via RDDs without a map?"