# **Exercise 2: Spark SQL**
#### This second exercise will introduce database operations with Spark. 


### During the exercises, the following resources might come in handy:
* #### Documentation of the [PySpark API](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD)
* #### Documentation of the [Python API](https://docs.python.org/2.7/)
* #### Documentation of the [Spark SQL API](http://spark.apache.org/docs/latest/sql-programming-guide.html)
* #### Documentation of [Hive SQL](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF)

### To run code in Jupyter, press: 
* #### `Ctrl-Enter` to run the code in the currently selected cell
* #### `Shift-Enter` to run the code in the currently selected cell and jump to the next cell

### **Helper: Displays rows from a Spark SQL object as HTML**
#### The following code can display arrays of rows or dataframes as a html table. This creates a human-friendly output of our data. It is enough to browse through this code, as it is not important to know it in detail for this exercise.

In [None]:
from IPython.display import display, HTML
import warnings

def displayRows(rowDf):
    headers = []
    rows = []
    if(str(type(rowDf)) == "<class 'pyspark.sql.dataframe.DataFrame'>"):
        rows = rowDf.limit(10000).collect() #Let's limit the output just in case!
        if(len(rows) == 10000):
            if(rowDf.limit(10001).count() == 10001):
                warnings.warn("More than 10 000 rows was returned, only showing the first 10 000.")
                
        headers = list(rowDf.columns)
    else:
        rows = rowDf
        if(len(rows) > 10000):
            warnings.warn("Rows has {0} elements, only showing the first 10 000.".format(len(rows)))
            rows = rows[0:10000]
            
        #Computes the unique set of keys
        headers = list(sorted(reduce(lambda x,y: x.union(set(y.asDict().iterkeys())), rows, set())))
            
    tableHead = ["<th>{0}</th>".format(key) for key in headers]
    tableBody = ["<tr>{0}</tr>".format(
                    "".join(["<td>{0}</td>".format(rowDict.get(header)) 
                            for rowDict 
                            in (row.asDict(),) 
                            for header 
                            in headers])
                    ) for row in rows]
    
    display(HTML(
    u"""<table>
    <thead><tr>{0}</tr></thead>
    <tbody>{1}</tbody>
    </table>
    """.format("".join(tableHead), "".join(tableBody))))

## Part 1: Learning Spark SQL

#### This part will introduce you the Spark SQL by writing SQL queries.

The cell below generates data which you will write queries for.

In [None]:
my_list = ['apple', 'banana', 'grapes', 'pear']
counter_list = list(enumerate(my_list, 1))
print(counter_list)

In [None]:

#Top 20 boy and girl names 2014 in random order.
names = ["Caden", "Kaylee", "Lucas", "Ethan", "Alexander", "Jackson", 
         "Aiden", "Madelyn", "Michael", "Avery", "Luke", "Isabella", 
         "Chloe", "Elijah", "Abigail", "Madison", "Jacob", "Zoe", "Emily", 
         "Jayden", "Liam", "Mason", "Mia", "Sophia", "Benjamin", "Layla", 
         "Emma", "Lily", "Charlotte", "Caleb", "James", "Noah", "Ella", 
         "Jack", "Jayce", "Aubrey", "Olivia", "Harper", "Logan", "Ava"]
#A-G in phonetic alphabet
groups = ["Alpha","Bravo", "Charlie", "Delta", "Echo", "Foxtrot", "Golf"]

#Some numeric magic to generate not so uniform random data.
tblUserRdd = sc.parallelize(map(lambda i: (i, ((i*104729)^131) % 7, 26500 + ((i*104729)^96587) % 6367), range(1,51)))
# Parallelize data using 4 partitions
tblNamesRdd = sc.parallelize(enumerate(names, 1), 4)
# Parallelize data using 2 partitions
tblGroupNamesRdd = sc.parallelize(enumerate(groups), 2)

#Create dataframes from the RDDs
tblNames      = sqlContext.createDataFrame(tblNamesRdd,      ["userId", "name"])
tblUsers      = sqlContext.createDataFrame(tblUserRdd,       ["id", "groupId", "salary"])
tblGroupNames = sqlContext.createDataFrame(tblGroupNamesRdd, ["id", "name"])

#Register them for use with SQL-TRADITIONAL.
sqlContext.registerDataFrameAsTable(tblGroupNames, "tblGroupNames")
sqlContext.registerDataFrameAsTable(tblUsers, "tblUsers")
sqlContext.registerDataFrameAsTable(tblNames, "tblNames")

#### First, lets get some basic information about each dataframe

Dataframes are structured meaning that types and columns are well-defined; if you have read the data generation cell you might have noticed that the types were not specified. These are inferred by Spark.

#### Dataframes provide a very handy function called `printSchema()`. As its name implies, it shows the schema of the data, including column names and types.

In [None]:
tblUsers.printSchema()

#### It is possible to call a number of operations on dataframe, similar to RDDs dataframes have a `count()` action to display the number of rows in the dataframe.

In [None]:
tblUsers.count()

In [None]:
tblGroupNames.printSchema()

In [None]:
tblGroupNames.count()

In [None]:
tblUsers.printSchema()

In [None]:
tblNames.count()

#### Next 3 cells will display the content of the dataframe by using the helper function *displayRows*

In [None]:
displayRows(tblUsers)

In [None]:
displayRows(tblNames)

In [None]:
displayRows(tblGroupNames)

#### There is a basic function for displaying the contents of an Dataframe by using *show()*
However, the output is limited and gives a limited view of a long column. It is useful for debugging.

In [None]:
tblUsers.show()

#### Now, the first query you will write

### 1.a) Write a query that selects all user ids in the group with id 0

In [None]:
# Replace <FILL IN> with the proper code
q1a = sqlContext.sql("""
SELECT id 
FROM tblUsers 
WHERE <FILL IN>
""")

displayRows(q1a)

In [None]:
assert set(map(lambda row: row.id, q1a.collect())) == set([11,26,27])

### 1.b) Write a query that finds the min and max userId grouped by groupId

The result should have the following columns:

1. minUserId: The min user id per group
2. maxUserId: The max user id per group
2. groupId: The group id

**Hint:** Use `GROUP BY`, `MIN`, and `MAX`

In [None]:
q1b = sqlContext.sql("""
SELECT 
    <FILL IN> AS minUserId, 
    <FILL IN> AS maxUserId,
    <FILL IN> 
FROM tblUsers 
<FILL IN>
""")

displayRows(q1b)

In [None]:
minIds = {0: 11,
 1: 12,
 2: 6,
 3: 8,
 4: 24,
 5: 1,
 6: 4}

maxIds = {0: 27,
 1: 43,
 2: 46,
 3: 39,
 4: 40,
 5: 47,
 6: 50}

assert all(map(lambda row: minIds[row.groupId] == row.minUserId, q1b.collect()))
assert all(map(lambda row: maxIds[row.groupId] == row.maxUserId, q1b.collect()))

### 1.c) Compute the global average salary

When you do not specify any group by columns and use aggregating functions such as `AVG(<column name>)` then the aggregation will be performed over the entire result and return a single row.

In [None]:
avgSalary = sqlContext.sql("""
SELECT <FILL IN> AS avgSalary 
FROM tblUsers
""").collect()[0].avgSalary

avgSalary

In [None]:
assert avgSalary == 29707.34

### 1.d) Aggregate salaries per group

Group per groupId and compute the minimum, average, maximum salary and sort by average salary descending.

**Hint:** Use `MIN`, `AVG`, `MAX`, and `GROUP BY`; you can also sort by computed columns.

In [None]:
q1d = sqlContext.sql("""
SELECT 
    groupId,
    COUNT(id) AS NumUsers,
    <FILL IN> AS MinSalary, 
    <FILL IN> AS AvgSalary,
    <FILL IN> AS MaxSalary,
    AVG(salary) - {} AS GlobalAvgDelta
FROM tblUsers
<FILL IN>
ORDER BY <FILL IN>
""".format(avgSalary))

displayRows(q1d)

In [None]:
groups = [
    (5, 26623, 30573, 32862), 
    (4, 26923, 29898, 31849),
    (1, 26600, 29833, 32234),
    (2, 27784, 29537, 32796),
    (6, 26973, 29490, 32245),
    (3, 27858, 29447, 32531),
    (0, 26784, 28369, 30346)
]

q1dresult = q1d.collect()
assert len(q1dresult) == 7
assert map(lambda i: q1dresult[i].groupId == groups[i][0], range(0,len(q1dresult)-1)), "GroupID column does not match."
assert map(lambda i: q1dresult[i].MinSalary == groups[i][1], range(0,len(q1dresult)-1)), "MinSalary column does not match."
assert map(lambda i: int(q1dresult[i].AvgSalary) == groups[i][2], range(0,len(q1dresult)-1)), "AvgSalary column does not match."
assert map(lambda i: q1dresult[i].MaxSalary == groups[i][3], range(0,len(q1dresult)-1)), "MaxSalary column does not match."