## CS431/631 Data Intensive Distributed Computing
### Fall 2020 - Assignment 4
---

**Please edit this (text) cell to provide your name and UW student ID number!**
* **Name:** Yu Li
* **ID:** 20603929

---
#### Overview
For this assignment, you will be using Python and Spark to perform some simple analyses on relational (tabular) data.  You will use Spark to read tabular data from files and then answer simple queries about the data in those tables.

In addition to Python and Spark, you will need to use a little bit of SQL.  If you are already familiar with SQL, great.  If not, you will want to spend a short amount of time getting familiar with the basics.   Type "SQL tutorial" into your favorite search engine, and you will find many examples of text, interactive and video tutorials.   As a simple starting point, you might also want to look at [these slides](https://cs.uwaterloo.ca/~kmsalem/courses/cs743/F14/slides/sql.pdf), which give a short introduction to SQL.   Even this is much more than you will need for this assignment.

You will be working with tabular data based on the schema for the TPC-H benchmark, which is a standard test used to measure the performance of relational database systems.   The schema defines the tables that exist in the database, the information in each table, and relationships between information in one table and information in another.   The TPC-H schema models business information:  customers, orders, parts, suppliers, and so on. You can find it on page 13 of the [TPC-H benchmark specification](http://www.tpc.org/TPC_Documents_Current_Versions/pdf/tpc-h_v2.17.3.pdf).   The TPC-H schema is important for this assignment, so make sure that you keep this schema handy.

Finally, for this assignment you will be using Spark in a slightly different way than you have used it so far.  For previous assignments, you have used the original Spark RDD interface.   For this assignment, you will be using the newer Spark interface, which is based on DataFrames.   DataFrames are RDDs in which each element is constrained to have the same structure.   You can think of a DataFrame like a table in a relational database.   Each element of the DataFrame is a row or record in the table.   All records have the same structure.   There is a programming guide for Spark DataFrames [here](https://spark.apache.org/docs/latest/sql-programming-guide.html).  Start with that. There is also a [more detailed guide to the full programming interface](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html).

---
#### Getting Started
To get started, let's initialize Spark, load a couple of the TPC-H tables, and run some simple queries.
First, as always, we need to tell the Python interpreter where to find Spark, so run `findspark.init`

In [3]:
import findspark
findspark.init("/u/cs451/packages/spark")

Next, we launch Spark.  When you used the RDD interface for previous assignments, you created a `SparkContext` when you launched Spark.   To use Spark SQL and the DataFrame interface, you instead create a `SparkSession`.   You do that as shown in the next cell (run it!).    

In [4]:
from pyspark.sql import SparkSession
import random
spark = SparkSession.builder.appName("YourTest").master("local[2]").config('spark.ui.port', random.randrange(4000,5000)).getOrCreate()

Next, let's create DataFrames from the TPC-H data files.  We have installed the TPC-H data files in the directory `/u/cs451/data/TPC-H-0.1-TXT/`. 

If you are working on your own machine, you can download the dataset [here](https://roegiest.com/bigdata-2019w/assignments/TPC-H-0.1-TXT.tar.gz) and extract it to the same location on your system, for convenience. If you decide to modify the location of the dataset, you will have to change the path provided to `spark.read.csv()` for the examples that follow. You will be able to provide a different path when answering the questions.

There is one file for each table in the TPC-H database, e.g., `nation.tbl` for the TPC-H Nation table, `customer.tbl` for the TPC-H Customer table, and so on.    These are plain text csv files, with the character `|` used as a field separator.

Create a Spark DataFrame corresponding to the TPC-H Nation table by loading the data from the `nation.tbl` file.   Run the code in the next cell to do this.   After you run this code, `nation_raw` will refer to your new Spark DataFrame.   The Spark `show()` method will display a small (default 20) number of elements from the DataFrame, so that you can inspect them. 

In [5]:
nation_raw = spark.read.csv("/u/cs451/data/TPC-H-0.1-TXT/nation.tbl",sep='|',inferSchema=True)
nation_raw.show()

+---+----------+---+--------------------+----+
|_c0|       _c1|_c2|                 _c3| _c4|
+---+----------+---+--------------------+----+
|  0|   ALGERIA|  0| haggle. carefull...|null|
|  1| ARGENTINA|  1|al foxes promise ...|null|
|  2|    BRAZIL|  1|y alongside of th...|null|
|  3|    CANADA|  1|eas hang ironic, ...|null|
|  4|     EGYPT|  4|y above the caref...|null|
|  5|  ETHIOPIA|  0|ven packages wake...|null|
|  6|    FRANCE|  3|refully final req...|null|
|  7|   GERMANY|  3|l platelets. regu...|null|
|  8|     INDIA|  2|ss excuses cajole...|null|
|  9| INDONESIA|  2| slyly express as...|null|
| 10|      IRAN|  4|efully alongside ...|null|
| 11|      IRAQ|  4|nic deposits boos...|null|
| 12|     JAPAN|  2|ously. final, exp...|null|
| 13|    JORDAN|  4|ic deposits are b...|null|
| 14|     KENYA|  0| pending excuses ...|null|
| 15|   MOROCCO|  0|rns. blithely bol...|null|
| 16|MOZAMBIQUE|  0|s. ironic, unusua...|null|
| 17|      PERU|  1|platelets. blithe...|null|
| 18|     CHI

Now you have a DataFrame to work with.   The columns of the DataFrame correspond to the fields of the TPC-H Nation table, so have a look at the TPC-H schema diagram to see what you are dealing with.   Column c0 is the NATIONKEY, column c1 is the NAME, c2 is the REGIONKEY, and so on.   Since this is a synthetic database, you'll notice that the data in some of the fields (like the COMMENT field) consists of random words.   That's fine.   You can also ask Spark to tell you about the type of data in each column:

In [5]:
nation_raw.dtypes

[('_c0', 'int'),
 ('_c1', 'string'),
 ('_c2', 'int'),
 ('_c3', 'string'),
 ('_c4', 'string')]

Before going on, let's clean this DataFrame up a bit, to make it easier to use.   First, let's assign names to the columns, so that we can remember what information each column holds.   Second, you'll notice that Spark has created an extra final column (filled with `null` values) because each line in the input file ends with a separator character (|).  Let's drop that final column, since we don't need it.   Run the following code to do this:

In [6]:
nation = nation_raw.toDF('NationKey','Name','RegionKey','Comment','extra').drop('extra').cache()
nation.show()

+---------+----------+---------+--------------------+
|NationKey|      Name|RegionKey|             Comment|
+---------+----------+---------+--------------------+
|        0|   ALGERIA|        0| haggle. carefull...|
|        1| ARGENTINA|        1|al foxes promise ...|
|        2|    BRAZIL|        1|y alongside of th...|
|        3|    CANADA|        1|eas hang ironic, ...|
|        4|     EGYPT|        4|y above the caref...|
|        5|  ETHIOPIA|        0|ven packages wake...|
|        6|    FRANCE|        3|refully final req...|
|        7|   GERMANY|        3|l platelets. regu...|
|        8|     INDIA|        2|ss excuses cajole...|
|        9| INDONESIA|        2| slyly express as...|
|       10|      IRAN|        4|efully alongside ...|
|       11|      IRAQ|        4|nic deposits boos...|
|       12|     JAPAN|        2|ously. final, exp...|
|       13|    JORDAN|        4|ic deposits are b...|
|       14|     KENYA|        0| pending excuses ...|
|       15|   MOROCCO|      

This style of code should look familar to you.  We started with the `nation_raw` DataFrame and applied a series of DataFrame operations (`toDF`, `drop`, and `cache`).   This is just like the RDD interface, except now we are applying DataFrame operations to DataFrames, instead of RDD operations to RDDs.

Next, let's load up the TPC-H Supplier table, and then try performing some queries:

In [7]:
supplier_raw = spark.read.csv("/u/cs451/data/TPC-H-0.1-TXT/supplier.tbl",sep='|',inferSchema=True).drop("_c7")
supplier = supplier_raw.toDF("SuppKey","Name","Address","NationKey","Phone","AcctBal","Comment").cache()
supplier.show()

+-------+------------------+--------------------+---------+---------------+-------+--------------------+
|SuppKey|              Name|             Address|NationKey|          Phone|AcctBal|             Comment|
+-------+------------------+--------------------+---------+---------------+-------+--------------------+
|      1|Supplier#000000001| N kD4on9OM Ipw3,...|       17|27-918-335-1736|5755.94|each slyly above ...|
|      2|Supplier#000000002|89eJ5ksX3ImxJQBvx...|        5|15-679-861-2259|4032.68| slyly bold instr...|
|      3|Supplier#000000003|q1,G3Pj6OjIuUYfUo...|        1|11-383-516-1199| 4192.4|blithely silent r...|
|      4|Supplier#000000004|Bk7ah4CK8SYQTepEm...|       15|25-843-787-7479|4641.08|riously even requ...|
|      5|Supplier#000000005|   Gcdm2rJRzl5qlTVzc|       11|21-151-690-3663|-283.84|. slyly regular p...|
|      6|Supplier#000000006|        tQxuVm7s7CnK|       14|24-696-997-4969|1365.79|final accounts. r...|
|      7|Supplier#000000007|s,4TicNGB4uO6PaSq...|      

---
#### Writing Queries
There are two equivalent ways of writing queries over Spark DataFrames.   The first way is to assign a "view name" to the DataFrame, and then write SQL queries referring to those view names using the `sql` operation.  

The code below gives the view names "nation" and "supplier" to the two DataFrames we've already created.

In [8]:
supplier.createOrReplaceTempView("supplier")
nation.createOrReplaceTempView("nation")

Now, we can write SQL queries that refer to the "supplier" and "nation" views as tables.   For example, suppose we want to see the names and addresses of suppliers who have account balances above 9900.00:

In [9]:
q1_result = spark.sql("select Name, Address, AcctBal from supplier where AcctBal > 9900.00")
q1_result.show()

+------------------+--------------------+-------+
|              Name|             Address|AcctBal|
+------------------+--------------------+-------+
|Supplier#000000049|     Nvq 6macF4GtJvz|9915.24|
|Supplier#000000234|iMrk7HUD87at3IIh4rBi| 9957.0|
|Supplier#000000693|S,mnHfsroFOVieQGd...|9956.55|
|Supplier#000000855|ekQwhb9fh5VGIvMBJ...|9964.88|
|Supplier#000000892|j6prA4M3sX9a9xHem...|9993.46|
+------------------+--------------------+-------+



In the example above, the `sql` command runs the SQL query against the supplier table.   It returns the query result as a new DataFrame, which `q1_result` refers to.

Instead of writing your queries in SQL and running them using `sql`, it is possible to do the same thing by applying a sequence of DataFrame operations to the input DataFrames, as you did when you were using the RDD interface in the previous assignments.    For example, to answer the same query that we just answered using SQL, we can do the following:

In [10]:
q1_resultB = supplier.filter("AcctBal > 9900.00").select('Name','Address','AcctBal')
q1_resultB.show()

+------------------+--------------------+-------+
|              Name|             Address|AcctBal|
+------------------+--------------------+-------+
|Supplier#000000049|     Nvq 6macF4GtJvz|9915.24|
|Supplier#000000234|iMrk7HUD87at3IIh4rBi| 9957.0|
|Supplier#000000693|S,mnHfsroFOVieQGd...|9956.55|
|Supplier#000000855|ekQwhb9fh5VGIvMBJ...|9964.88|
|Supplier#000000892|j6prA4M3sX9a9xHem...|9993.46|
+------------------+--------------------+-------+



Both methods should give the same result.   Internally, Spark handles both similarly.   For this assignment, you'll be asked to try out both methods.

Now it is time for you to try writing your own queries.

### Important

###### The questions that follow ask you to implement functions whose prototypes are given to you. Do **not** change the prototypes of the functions. Do **not** write code outside of the functions. 

##### You may use specific cells, identified by `# Your tests here`, for test purposes. Code in these cells will *not* be executed when marking your assignment. 

##### We will make sure the function `load_dataset_and_set_views()` is called before your answers to other questions.

---
#### Question 0
First, you will instantiate the DataFrames and set the views needed to answer the following questions. 
Modify the code in the cell below to read the corresponding tables from disk in the same fashion as the supplier table. Keep the column names consistent with the [TPC-H schema](http://www.tpc.org/TPC_Documents_Current_Versions/pdf/tpc-h_v2.17.3.pdf) (page 13).

In [10]:
def load_dataset_and_set_views(path="/u/cs451/data/TPC-H-0.1-TXT/"):
    global supplier, orders, customer, partsupp, nation, part
    
    supplier_raw = spark.read.csv(path+"supplier.tbl",sep='|',inferSchema=True).drop("_c7")
    supplier = supplier_raw.toDF("SuppKey","Name","Address","NationKey","Phone","AcctBal","Comment").cache()
    supplier.createOrReplaceTempView("supplier")
    
    # Your solution to Question 0 here
    order_raw = spark.read.csv(path+"orders.tbl",sep='|',inferSchema=True).drop("_c9")
    orders = order_raw.toDF("OrderKey","CustKey","OrderStatus","TotalPrice","OrderDate","OrderPriority","Clerk", "ShipPriority", "Comment").cache()
    orders.createOrReplaceTempView("orders")
    
    customer_raw = spark.read.csv(path+"customer.tbl",sep='|',inferSchema=True).drop("_c8")
    customer = customer_raw.toDF("CustKey","Name","Address","NationKey","Phone","AcctBal","MktSegment","Comment").cache()
    customer.createOrReplaceTempView("customer")
    
    partsupp_raw = spark.read.csv(path+"partsupp.tbl",sep='|',inferSchema=True).drop("_c5")
    partsupp = partsupp_raw.toDF("PartKey","SuppKey","AvailQty","SupplyCost","Comment").cache()
    partsupp.createOrReplaceTempView("partsupp")
    
    nation_raw = spark.read.csv(path+"nation.tbl",sep='|',inferSchema=True).drop("_c4")
    nation = nation_raw.toDF("NationKey","Name","RegionKey", "Comment").cache()
    nation.createOrReplaceTempView("nation")
    
    part_raw = spark.read.csv(path+"part.tbl",sep='|',inferSchema=True).drop("_c9")
    part = part_raw.toDF("PartKey","Name","MFGR","Brand","Type", "Size", "Container", "RetailPrice", "Comment").cache()
    part.createOrReplaceTempView("part")

In [11]:
# Your tests here
load_dataset_and_set_views()
q1 = spark.sql("select * from orders limit 5")
q1.show()

+--------+-------+-----------+----------+-------------------+-------------+---------------+------------+--------------------+
|OrderKey|CustKey|OrderStatus|TotalPrice|          OrderDate|OrderPriority|          Clerk|ShipPriority|             Comment|
+--------+-------+-----------+----------+-------------------+-------------+---------------+------------+--------------------+
|       1|   3691|          O| 194029.55|1996-01-02 00:00:00|        5-LOW|Clerk#000000951|           0|nstructions sleep...|
|       2|   7801|          O|  60951.63|1996-12-01 00:00:00|     1-URGENT|Clerk#000000880|           0| foxes. pending a...|
|       3|  12332|          F| 247296.05|1993-10-14 00:00:00|        5-LOW|Clerk#000000955|           0|sly final account...|
|       4|  13678|          O|  53829.87|1995-10-11 00:00:00|        5-LOW|Clerk#000000124|           0|sits. slyly regul...|
|       5|   4450|          F| 139660.54|1994-07-30 00:00:00|        5-LOW|Clerk#000000925|           0|quickly. bold 

---
#### Question 1 (2/25 marks)
In the cell below, implement the function `five_highest_totalprice_orders_sql()` that will return the ORDERKEY, ORDERDATE, and TOTALPRICE of the five orders with the highest TOTALPRICE. Express your query in SQL, and use `sql` to execute it. The function must return a DataFrame with the corresponding schema `ORDERKEY, ORDERDATE, TOTALPRICE`. Use the view(s) or DataFrame(s) created in Question 0.

In [12]:
def five_highest_totalprice_orders_sql():
    query = "select OrderKey, OrderDate, TotalPrice from orders order by TotalPrice DESC limit 5"
    return spark.sql(query)


In [13]:
# Your tests here
five_highest_totalprice_orders_sql().show()

+--------+-------------------+----------+
|OrderKey|          OrderDate|TotalPrice|
+--------+-------------------+----------+
|  279812|1994-02-19 00:00:00| 479129.21|
|  370726|1996-09-29 00:00:00|  460099.4|
|   66659|1993-10-15 00:00:00| 458396.42|
|  253639|1998-01-23 00:00:00| 456532.89|
|  502886|1994-04-12 00:00:00| 456423.88|
+--------+-------------------+----------+



---
#### Question 2 (2/25 marks)
In the cell below, implement the function `five_highest_totalprice_orders_dtf()` to answer the same query you answered in Question 1, but this time do not use the `sql` method. The function must returns a DataFrame with the corresponding schema `ORDERKEY, ORDERDATE, TOTALPRICE`, ordered by TOTALPRICE value. Use the view(s) or DataFrame(s) created in Question 0.

In [14]:
def five_highest_totalprice_orders_dtf():
    return orders.select('OrderKey', "OrderDate", "TotalPrice").orderBy("TotalPrice", ascending=False).limit(5)


In [15]:
# Your tests here
five_highest_totalprice_orders_dtf().show(5)

+--------+-------------------+----------+
|OrderKey|          OrderDate|TotalPrice|
+--------+-------------------+----------+
|  279812|1994-02-19 00:00:00| 479129.21|
|  370726|1996-09-29 00:00:00|  460099.4|
|   66659|1993-10-15 00:00:00| 458396.42|
|  253639|1998-01-23 00:00:00| 456532.89|
|  502886|1994-04-12 00:00:00| 456423.88|
+--------+-------------------+----------+



---
#### Question 3 (3/25 marks)
In the cell below, implement the function `cust_most_recent_order_sql(custkey)` that takes a Customer Key as an input and returns the customer's name as well as the ORDERDATE and TOTALPRICE of that customer's most recent order.   Express the query in SQL, and use `sql` to execute it. You will need to use information from multiple tables to generate your answer. Use the view(s) or DataFrame(s) created in Question 0.

In [16]:
def cust_most_recent_order_sql(custkey):
    query = """select c.Name, 
                      o.OrderDate, 
                      o.TotalPrice 
                      from customer as c 
                      inner join 
                      orders as o 
                      on c.CustKey = o.CustKey where c.CustKey = {0}
                      order by OrderDate Desc
                      limit 1
                      """.format(custkey)
    return spark.sql(query)


In [17]:
# Your tests here
cust_most_recent_order_sql(1).show()


+------------------+-------------------+----------+
|              Name|          OrderDate|TotalPrice|
+------------------+-------------------+----------+
|Customer#000000001|1997-03-04 00:00:00| 268835.44|
+------------------+-------------------+----------+



---
#### Question 4 (3/25 marks)
In the cell below, answer the same query you answered in Question 3, but this time do not use the `sql` method. Use the view(s) or DataFrame(s) created in Question 0.

In [18]:
def cust_most_recent_order_dtf(custkey):
    order_q = orders.filter("CustKey="+str(custkey)).select("CustKey","OrderDate", "TotalPrice").orderBy("OrderDate", ascending=False).limit(1)
    cust_q = customer.filter("CustKey="+str(custkey)).select("CustKey", "Name")
    return cust_q.join(order_q, on = "CustKey").select("Name", "OrderDate", "TotalPrice")
    

In [19]:
# Your tests here
cust_most_recent_order_dtf(1).show(1)


+------------------+-------------------+----------+
|              Name|          OrderDate|TotalPrice|
+------------------+-------------------+----------+
|Customer#000000001|1997-03-04 00:00:00| 268835.44|
+------------------+-------------------+----------+



---
#### Question 5 (5/25 marks)
In the cell below, implement `distinct_supplied_parts(nname)` that will return the number of distinct parts supplied by suppliers that are located in the given nation. The function must output an **integer** (not a DataFrame).

You may answer this question with or without using `sql` - whichever you prefer. Use the view(s) or DataFrame(s) created in Question 0.

In [20]:
def distinct_supplied_parts(nname):
    q_1 = nation.filter("Name = '{0}'".format(nname)).select("nationkey")\
                                        .join(supplier, on = "nationkey")\
                                        .join(partsupp, on = "suppkey")\
                                        .select("partkey")\
                                        .distinct().count()
    return q_1


In [21]:
# Your tests here
distinct_supplied_parts("CANADA")


2799

---
#### Question 6 (5/25 marks)
In the cell below, implement `count_suppliers_brand_per_nation(bname)` that takes a brand name as input, like those that appear in the Parts table.  Given the brand,
report, for each nation, a count of that nation's suppliers of parts having that brand, ordered by nation name.   Your output should be a table (i.e. DataFrame) of nations and their supplier counts. Each supplier should be counted at most once in a nation's total, even if that supplier produces multiple parts with the given brand.

You may answer this question with or without using `sql` - whichever you prefer. Use the view(s) or DataFrame(s) created in Question 0.

In [23]:
def count_suppliers_brand_per_nation(bname):
    # Your solution to Question 6 here
    q = """ select  n.name, count(distinct ps.suppkey) as count from nation as n inner join
            supplier as s on n.nationkey = s.nationkey
            inner join partsupp as ps on s.suppkey = ps.suppkey
            inner join part as p on ps.partkey = p.partkey
            where p.brand = '{0}' group by n.name order by n.name""".format(bname)
    return spark.sql(q)


In [24]:
# Your tests here
count_suppliers_brand_per_nation("Brand#14").show()

+----------+-----+
|      name|count|
+----------+-----+
|   ALGERIA|   34|
| ARGENTINA|   38|
|    BRAZIL|   41|
|    CANADA|   35|
|     CHINA|   51|
|     EGYPT|   39|
|  ETHIOPIA|   32|
|    FRANCE|   35|
|   GERMANY|   49|
|     INDIA|   45|
| INDONESIA|   45|
|      IRAN|   39|
|      IRAQ|   40|
|     JAPAN|   40|
|    JORDAN|   28|
|     KENYA|   35|
|   MOROCCO|   40|
|MOZAMBIQUE|   32|
|      PERU|   37|
|   ROMANIA|   32|
+----------+-----+
only showing top 20 rows



---
#### Question 7 (5/25 marks)
In the cell below, write code for the function `order_number_per_customer_nation(nname)` that will return, for a given nation name, for each year, the total number of orders placed by customers from the specified Nation, ordered by descending number of orders.

You may answer this question with or without using `sql` - whichever you prefer. Use the view(s) or DataFrame(s) created in Question 0.

In [27]:
def order_number_per_customer_nation(nname):
    q = """select  year(o.orderdate) as year , count(distinct o.orderkey) as count from orders as o
           inner join customer as c on o.custkey = c.custkey inner join 
           nation as n on n.nationkey = c.nationkey where
           n.Name = '{0}' group by year order by count desc""".format(nname)
    return spark.sql(q)


In [28]:
# Your tests here
order_number_per_customer_nation("CANADA").show()

+----+-----+
|year|count|
+----+-----+
|1992|  982|
|1996|  940|
|1995|  932|
|1997|  921|
|1994|  912|
|1993|  900|
|1998|  595|
+----+-----+



---
Don't forget to save your workbook!   When you are finished and you are ready to submit your assignment, download your notebook file (.ipynb) from the hub to your machine, and then follow the submission instructions in the assignment.

**Make sure that your notebook runs properly on the Jupyter hub before submitting it.**