-sandbox
<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning" style="width: 600px; height: 163px">
</div>

# Aggregations and JOINs
Apache Spark&trade; and Databricks&reg; allow you to create on-the-fly data lakes.

## In this lesson you:
* Use basic aggregations.
* Correlate two data sets with a join.

## Audience
* Primary Audience: Data Engineers and Data Scientists
* Secondary Audience: Data Analysts

## Prerequisites
* Web browser: Chrome or Firefox
* Lesson: <a href="$./02-Querying-Files">Querying Files with DataFrames</a>

### Getting Started

Run the following cell to configure our "classroom."

In [4]:
%run "./Includes/Classroom-Setup"

<iframe  
src="//fast.wistia.net/embed/iframe/659uzjn3f5?videoFoam=true"
style="border:1px solid #1cb1c2;"
allowtransparency="true" scrolling="no" class="wistia_embed"
name="wistia_embed" allowfullscreen mozallowfullscreen webkitallowfullscreen
oallowfullscreen msallowfullscreen width="640" height="360" ></iframe>
<div>
<a target="_blank" href="https://fast.wistia.net/embed/iframe/659uzjn3f5?seo=false">
  <img alt="Opens in new tab" src="https://files.training.databricks.com/static/images/external-link-icon-16x16.png"/>&nbsp;Watch full-screen.</a>
</div>

-sandbox
## Basic Aggregations

Using <a "https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions" target="_blank">built-in Spark functions</a>, you can aggregate data in various ways. 

Run the cell below to compute the average of all salaries in the people DataFrame.

<img alt="Side Note" title="Side Note" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.05em; transform:rotate(15deg)" src="https://files.training.databricks.com/static/images/icon-note.webp"/> By default, you get a floating point value.

<iframe  
src="//fast.wistia.net/embed/iframe/44ui5pgc61?videoFoam=true"
style="border:1px solid #1cb1c2;"
allowtransparency="true" scrolling="no" class="wistia_embed"
name="wistia_embed" allowfullscreen mozallowfullscreen webkitallowfullscreen
oallowfullscreen msallowfullscreen width="640" height="360" ></iframe>
<div>
<a target="_blank" href="https://fast.wistia.net/embed/iframe/44ui5pgc61?seo=false">
  <img alt="Opens in new tab" src="https://files.training.databricks.com/static/images/external-link-icon-16x16.png"/>&nbsp;Watch full-screen.</a>
</div>

In [8]:
%fs ls "dbfs:/mnt/training/dataframes/"

path,name,size
dbfs:/mnt/training/dataframes/country_ip_ranges.json,country_ip_ranges.json,37814003
dbfs:/mnt/training/dataframes/people-10m-partitioned.csv/,people-10m-partitioned.csv/,0
dbfs:/mnt/training/dataframes/people-10m.avro/,people-10m.avro/,0
dbfs:/mnt/training/dataframes/people-10m.csv/,people-10m.csv/,0
dbfs:/mnt/training/dataframes/people-10m.orc/,people-10m.orc/,0
dbfs:/mnt/training/dataframes/people-10m.parquet/,people-10m.parquet/,0
dbfs:/mnt/training/dataframes/people-alt.csv,people-alt.csv,37029285
dbfs:/mnt/training/dataframes/people-with-dups.csv,people-with-dups.csv,7020771
dbfs:/mnt/training/dataframes/people-with-dups.txt,people-with-dups.txt,5483873
dbfs:/mnt/training/dataframes/people-with-header-100k.txt,people-with-header-100k.txt,5881970


In [9]:
peopleDF = spark.read.parquet("/mnt/training/dataframes/people-10m.parquet")

In [10]:
from pyspark.sql.functions import avg
avgSalaryDF = peopleDF.select(avg("salary").alias("averageSalary"))

avgSalaryDF.show()

Convert that value to an integer using the `round()` function. See
<a href "https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$" class="text-info">the documentation for <tt>round()</tt></a>
for more details.

In [12]:
from pyspark.sql.functions import round
roundedAvgSalaryDF = avgSalaryDF.select(round("averageSalary").alias("roundedAverageSalary"))

roundedAvgSalaryDF.show()

In addition to the average salary, what are the maximum and minimum salaries?

In [14]:
from pyspark.sql.functions import min, max
salaryDF = peopleDF.select(max("salary").alias("max"), min("salary").alias("min"), round(avg("salary")).alias("averageSalary"))

salaryDF.show()

## Joining Two Data Sets

Correlate the data in two data sets using a DataFrame join. 

The `people` data set has 10 million names in it. 

> How many of the first names appear in Social Security data files? 

To find out, use the Social Security data set with first name popularity data from the United States Social Security Administration. 

For every year from 1880 to 2014, `dbfs:/mnt/training/ssn/names-1880-2016.parquet/` lists the first names of people born in that year, their gender, and the total number of people given that name. 

By joining the `people` data set with `names-1880-2016`, weed out the names that aren't represented in the Social Security data.

(In a real application, you might use a join like this to filter out bad data.)

<iframe  
src="//fast.wistia.net/embed/iframe/h8r3flam1s?videoFoam=true"
style="border:1px solid #1cb1c2;"
allowtransparency="true" scrolling="no" class="wistia_embed"
name="wistia_embed" allowfullscreen mozallowfullscreen webkitallowfullscreen
oallowfullscreen msallowfullscreen width="640" height="360" ></iframe>
<div>
<a target="_blank" href="https://fast.wistia.net/embed/iframe/h8r3flam1s?seo=false">
  <img alt="Opens in new tab" src="https://files.training.databricks.com/static/images/external-link-icon-16x16.png"/>&nbsp;Watch full-screen.</a>
</div>

Start by taking a look at what the social security data set looks like. Each year is its own directory.

In [18]:
%fs ls dbfs:/mnt/training/ssn/names-1880-2016.parquet/

path,name,size
dbfs:/mnt/training/ssn/names-1880-2016.parquet/_SUCCESS,_SUCCESS,0
dbfs:/mnt/training/ssn/names-1880-2016.parquet/year=1880/,year=1880/,0
dbfs:/mnt/training/ssn/names-1880-2016.parquet/year=1881/,year=1881/,0
dbfs:/mnt/training/ssn/names-1880-2016.parquet/year=1882/,year=1882/,0
dbfs:/mnt/training/ssn/names-1880-2016.parquet/year=1883/,year=1883/,0
dbfs:/mnt/training/ssn/names-1880-2016.parquet/year=1884/,year=1884/,0
dbfs:/mnt/training/ssn/names-1880-2016.parquet/year=1885/,year=1885/,0
dbfs:/mnt/training/ssn/names-1880-2016.parquet/year=1886/,year=1886/,0
dbfs:/mnt/training/ssn/names-1880-2016.parquet/year=1887/,year=1887/,0
dbfs:/mnt/training/ssn/names-1880-2016.parquet/year=1888/,year=1888/,0


Let's load this file into a DataFrame and look at the data.

In [20]:
ssaDF = spark.read.parquet("/mnt/training/ssn/names-1880-2016.parquet/")

display(ssaDF)

firstName,gender,total,year
Emma,F,18806,2008
Isabella,F,18609,2008
Emily,F,17426,2008
Olivia,F,17076,2008
Ava,F,17033,2008
Madison,F,17024,2008
Sophia,F,16078,2008
Abigail,F,15076,2008
Elizabeth,F,11995,2008
Chloe,F,11822,2008


Next, with a quick count of distinct names, get an idea of how many distinct names there are in each of the tables.

DataFrames have a `distinct` method just for this purpose.

In [22]:
peopleDistinctNamesDF = peopleDF.select("firstName").distinct()

In [23]:
peopleDistinctNamesDF.count()

-sandbox
In preparation for the join, let's rename the `firstName` column to `ssaFirstName` in the Social Security DataFrame.

<img alt="Side Note" title="Side Note" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.05em; transform:rotate(15deg)" src="https://files.training.databricks.com/static/images/icon-note.webp"/> Question to ponder: why would we want to do this?

In [25]:
ssaDistinctNamesDF = ssaDF.select("firstName").withColumnRenamed("firstName",'ssaFirstName').distinct()

Count how many distinct names in the Social Security DataFrame.

In [27]:
ssaDistinctNamesDF.count()

Now join the two DataFrames.

In [29]:
from pyspark.sql.functions import col
joinedDF = peopleDistinctNamesDF.join(ssaDistinctNamesDF, col("firstName") == col("ssaFirstName"))

How many are there?

In [31]:
joinedDF.count()

-sandbox
## Exercise 1

In the tables above, some of the salaries in the `peopleDF` DataFrame are negative. 

These salaries represent bad data. 

Your job is to convert all the negative salaries to positive ones, and then sort the top 20 people by their salary.

<img alt="Hint" title="Hint" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.3em" src="https://files.training.databricks.com/static/images/icon-light-bulb.svg"/>&nbsp;**Hint:** See the Apache Spark documentation, <a href="https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$" target="_blank">built-in functions</a>.

### Step 1
Create a DataFrame`PeopleWithFixedSalariesDF`, where all the negative salaries have been converted to positive numbers.

In [34]:
peopleDF.show()

In [35]:
# TODO

from pyspark.sql.functions import abs
peopleWithFixedSalariesDF = peopleDF.withColumn("salary", abs(peopleDF.salary))

In [36]:
# TEST - Run this cell to test your solution.

belowZero = peopleWithFixedSalariesDF.filter(peopleWithFixedSalariesDF["salary"] < 0).count()
dbTest("DF-L3-belowZero", 0, belowZero)

print("Tests passed!")

In [37]:
peopleWithFixedSalariesDF.show()

### Step 2

Starting with the `peopleWithFixedSalariesDF` DataFrame, create another DataFrame called `PeopleWithFixedSalariesSortedDF` where:
0. The data set has been reduced to the first 20 records.
0. The records are sorted by the column `salary` in ascending order.

In [39]:
# TODO
peopleWithFixedSalariesSortedDF = peopleWithFixedSalariesDF.sort("salary").limit(20)

In [40]:
# TEST - Run this cell to test your solution.

resultsDF = peopleWithFixedSalariesSortedDF.select("salary")
dbTest("DF-L3-count", 20, resultsDF.count())

print("Tests passed!")

In [41]:
# TEST - Run this cell to test your solution.

from pyspark.sql import Row

results = resultsDF.collect()

dbTest("DF-L3-fixedSalaries-0", Row(salary=2), results[0])
dbTest("DF-L3-fixedSalaries-1", Row(salary=3), results[1])
dbTest("DF-L3-fixedSalaries-2", Row(salary=4), results[2])

dbTest("DF-L3-fixedSalaries-10", Row(salary=19), results[10])
dbTest("DF-L3-fixedSalaries-11", Row(salary=19), results[11])
dbTest("DF-L3-fixedSalaries-12", Row(salary=20), results[12])

dbTest("DF-L3-fixedSalaries-17", Row(salary=28), results[17])
dbTest("DF-L3-fixedSalaries-18", Row(salary=30), results[18]) 
dbTest("DF-L3-fixedSalaries-19", Row(salary=31), results[19]) 

print("Tests passed!")

## Exercise 2

As a refinement, assume all salaries under $20,000 represent bad rows and filter them out.

Additionally, categorize each person's salary into $10K groups.

### Step 1
 Starting with the `peopleWithFixedSalariesDF` DataFrame, create a DataFrame called `peopleWithFixedSalaries20KDF` where:
0. The data set excludes all records where salaries are below $20K.
0. The data set includes a new column called `salary10k`, that should be the salary in groups of 10,000. For example:
  * A salary of 23,000 should report a value of "2".
  * A salary of 57,400 should report a value of "6".
  * A salary of 1,231,375 should report a value of "123".

In [44]:
from pyspark.sql.types import *

In [45]:
# TODO
peopleWithFixedSalaries20KDF = peopleWithFixedSalariesDF.filter(peopleWithFixedSalariesDF.salary >= 20000).withColumn("salary10k", round((peopleWithFixedSalariesDF.salary / 10000).cast(DoubleType()),0))

In [46]:
peopleWithFixedSalaries20KDF.show()

In [47]:
# TEST - Run this cell to test your solution.

below20K = peopleWithFixedSalaries20KDF.filter("salary < 20000").count()
 
dbTest("DF-L3-count-salaries", 0, below20K)  

print("Tests passed!")

In [48]:
# TEST - Run this cell to test your solution.

from pyspark.sql.functions import count
results = (peopleWithFixedSalaries20KDF 
  .select("salary10k") 
  .groupBy("salary10k") 
  .agg(count("*").alias("total")) 
  .orderBy("salary10k") 
  .limit(5) 
  .collect()
)

dbTest("DF-L3-countSalaries-0", Row(salary10k=2.0, total=43792), results[0])
dbTest("DF-L3-countSalaries-1", Row(salary10k=3.0, total=212630), results[1])
dbTest("DF-L3-countSalaries-2", Row(salary10k=4.0, total=536536), results[2])
dbTest("DF-L3-countSalaries-3", Row(salary10k=5.0, total=1055261), results[3])
dbTest("DF-L3-countSalaries-4", Row(salary10k=6.0, total=1623248), results[4])

print("Tests passed!")

## Exercise 3

Using the `peopleDF` DataFrame, count the number of females named Caren who were born before March 1980.

### Step 1

Starting with `peopleDF`, create a DataFrame called `carensDF` where:
0. The result set has a single record.
0. The data set has a single column named `total`.
0. The result counts only 
  * Females (`gender`)
  * First Name is "Caren" (`firstName`)
  * Born before March 1980 (`birthDate`)

In [51]:
peopleDF.show()

In [52]:
# TODO
carensDF = peopleDF.filter((peopleDF.gender == 'F') & (peopleDF.firstName == 'Caren') & (peopleDF.birthDate < '1980-03-01')).agg(count("*").alias("total"))

In [53]:
# TEST - Run this cell to test your solution.

rows = carensDF.collect()

dbTest("DF-L3-carens-len", 1, len(rows))
dbTest("DF-L3-carens-total", Row(total=750), rows[0])

print("Tests passed!")

## Review Questions
**Q:** What is the DataFrame equivalent of the SQL statement `SELECT count(*) AS total`  
**A:** ```.agg(count("*").alias("total"))```

**Q:** What is the DataFrame equivalent of the SQL statement 
```SELECT firstName FROM PeopleDistinctNames INNER JOIN SSADistinctNames ON firstName = ssaFirstName```  
**A:** 
`peopleDistinctNamesDF.join(ssaDistinctNamesDF, peopleDistinctNamesDF(col("firstName")) == col("ssaFirstName"))`

## Next Steps

* Do the [Challenge Exercise]($./Optional/03-Joins-Aggregations).
* Start the next lesson, [Accessing Data]($./04-Accessing-Data).

## Additional Topics & Resources

* <a href="http://spark.apache.org/docs/latest/sql-programming-guide.html" target="_blank">Spark SQL, DataFrames and Datasets Guide</a>
* <a href="https://databricks.com/blog/2017/08/31/cost-based-optimizer-in-apache-spark-2-2.html" target="_blank">Cost-based Optimizer in Apache Spark 2.2</a>

-sandbox
&copy; 2018 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the <a href="http://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/>
<a href="https://databricks.com/privacy-policy">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use">Terms of Use</a> | <a href="http://help.databricks.com/">Support</a>