# Practice 3: Dataframes

## 1. Analyzing Transactions

*This problem is taken from [Data Algorithms: Recipes for Scaling Up with Hadoop and Spark](http://shop.oreilly.com/product/0636920033950.do) by Mahmoud Parsian.*

Consider a company such as Amazon, which has over 200 million users and can do hundreds of millions of transactions per day. The user data consists of users’ location information and the transaction data includes user identity information, but no direct information about a user’s location. We are interested in finding out which countries are our best consumers in terms of average order total cost.

### 1.1 Dataset
We have access to seven tables of a store's database as tab separated value files (`.tsv`):
- customers (`data/store/customers.tsv`)
- orders (`data/store/orders.tsv`)
- order details (`data/store/order_details.tsv`)
- products (`data/store/products.tsv`)
- categories (`data/store/categories.tsv`)
- shippers (`data/store/shippers.tsv`)
- suppliers (`data/store/suppliers.tsv`)

The tables can be viewed online: [SQL Tutorial Database](https://docs.google.com/spreadsheets/d/1_rn2PWl5qqw7ZuBuCm6D7FJrHepOlunjpCPbvitdxxQ/edit?usp=sharing).

### 1.2 Objective
We are interested in finding the average order cost per country.

### 1.3 Instructions

1. Identify the fields that you will need to find the answer; 
2. Initialize the Spark and Spark SQL contexts;
3. Read the datasets you need as dataframes;
4. Join the tables and keep only the fields you need;
5. Compute the total cost per order;
6. Compute the average order cost per country;
7. Display the countries that are spending the most.

### 1.4 Hints

- If you go through the dataset, you will see that the information required to fulfill the objective is scattered among the different tables.
- An order may include multiple products, you will need to aggregate the order details to compute the order total cost.
- Some of the methods required in this practice have not been covered so far, you will need to look at the [DataFrame's API](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame).
- If you are stuck, take a peek at the [Recap](#2.-Recap) section to get insights on what functions you should use.

In [1]:
import pyspark

try: 
    sc = pyspark.SparkContext()
except ValueError:
    pass

In [2]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [3]:
customers = sqlContext.read.format('com.databricks.spark.csv')\
                      .options(header='true', inferschema='true', delimiter='\t')\
                      .load('data/store/customers.tsv')

details = sqlContext.read.format('com.databricks.spark.csv')\
                    .options(header='true', inferschema='true', delimiter='\t')\
                    .load('data/store/order_details.tsv')

orders = sqlContext.read.format('com.databricks.spark.csv')\
                   .options(header='true', inferschema='true', delimiter='\t')\
                   .load('data/store/orders.tsv')

products = sqlContext.read.format('com.databricks.spark.csv')\
                     .options(header='true', inferschema='true', delimiter='\t')\
                     .load('data/store/products.tsv')

## Identify what fields you need

In [4]:
sub_orders = orders.select('OrderID', 'CustomerID')
sub_clients = customers.select('CustomerID', 'Country')
sub_details = details.select('OrderID', 'ProductID', 'Quantity')
sub_products = products.select('ProductID', 'Price')

## Join the tables

In [5]:
df = sub_orders.join(sub_clients, 'CustomerID')
df = df.join(sub_details, 'OrderID')
df = df.join(sub_products, 'ProductID')

## Aggregate the results

In [6]:
df2 = df.withColumn('Total', df.Quantity * df.Price)\
        .select('OrderID', 'Country', 'Total')\
        .groupBy('OrderID', 'Country')\
        .agg({'Total' : 'sum'})

## Aggregate by country

In [7]:
df3 = df2.groupBy('Country').agg({'sum(Total)' : 'avg'})

## Get the 5 most spending countries

In [8]:
df3.orderBy('avg(sum(Total))', ascending=False).show(5)

+-------+------------------+
|Country|   avg(sum(Total))|
+-------+------------------+
|Denmark|         4467.7125|
|Belgium|           4025.65|
|Austria| 3974.766153846154|
| Canada|3480.7055555555557|
|Ireland|           2565.17|
+-------+------------------+
only showing top 5 rows



## 2. Recap

In this notebook, we used and learned about the following parts of 
**[Python Spark SQL API](https://spark.apache.org/docs/1.6.0/api/python/pyspark.sql.html)**:
1. Import Spark SQL Python module: 
**[`import pyspark.sql`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html)**
2. Create a Spark SQLContext:
**[`pyspark.sql.SQLContext()`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SQLContext)**
1. Create an RDD from a CSV file:
**[`SQLContext.read.format('com.databricks.spark.csv')`](https://spark.apache.org/docs/1.6.0/api/python/pyspark.sql.html#pyspark.sql.SQLContext.read)**
2. Select a subset of variables from a DataFrame : **[`DataFrame.select(*cols)`](https://spark.apache.org/docs/1.6.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.select)**
3. Join two DataFrames : **[`DataFrame.join(DataFrame)`](https://spark.apache.org/docs/1.6.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.join)**
4. Create a new DataFrame by adding a column to an existing one: **[`DataFrame.withColumn(colName, col)`](https://spark.apache.org/docs/1.6.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.withColumn)**
5. Group the DataFrame using the specified columns: **[`DataFrame.groupBy(*cols)`](https://spark.apache.org/docs/1.6.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.groupBy)**
6. Compute aggregates on GroupeData and return a new DataFrame: **[`GroupedData.agg(*exprs)`](https://spark.apache.org/docs/1.6.0/api/python/pyspark.sql.html#pyspark.sql.GroupedData.agg)**
7. Sort a DataFrame by the specified columns: **[`DataFrame.orderBy(*cols)`](https://spark.apache.org/docs/1.6.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.orderBy)**
8. Print the first `n` rows of a DataFrame to the console: **[`DataFrame.show(n=20)`](https://spark.apache.org/docs/1.6.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.show)**

## 3. References

* [Data Algorithms: Recipes for Scaling Up with Hadoop and Spark](http://shop.oreilly.com/product/0636920033950.do) by Mahmoud Parsian
* [W3C School - Store Dataset](http://www.w3schools.com/sql/trysql.asp?filename=trysql_select_all)