In [None]:
# Increase compatibility with Databricks
    from IPython.display import display as idisplay, HTML
    displayHTML = lambda x: idisplay(HTML(x))
    def display(*args, **kargs): pass

# First Notebook with Spark

Just a quick little story.  A week or so ago (early April 2018) I had an interview with a terrific HR representative.  While I never heard back from the company, she suggested that I get Spark training because it is really a much-needed skill.  So I went exploring and found an [Udemy course by Jose Portilla](https://www.udemy.com/scala-and-spark-for-big-data-and-machine-learning/learn/v4/overview) (recommended) and a [Whizlabs course](https://www.whizlabs.com/spark-developer-certification/) (not taken yet) that would put me on the right path.  I saw a note about DataBricks and went to the end of Portilla's course where he introduced this service - I think he should have put it up front.  I love notebooks - that is how I can learn and track my notes!

## Data Frames in Scala and Python
This is my first notebook on the site.  I'm still exploring the capabilities of the Databricks notebooks, [but](https://docs.databricks.com/user-guide/notebooks/index.html#mix-languages)
it is awesome that I can use a notebook on the cloud.

This is the notebook: https://docs.databricks.com/spark/latest/dataframes-datasets/introduction-to-dataframes-scala.html  
mixed (see below) with the notebook: https://docs.databricks.com/spark/latest/dataframes-datasets/introduction-to-dataframes-python.html#creating-dataframes-with-python

Quick notes for making notebooks:
* The magic code "%md" turns a cell into a markdown cell.  
* "ctrl + alt + p" inserts a cell above the current cell.  
* "ctrl + alt + n" inserts a cell below the current cell.  
* Another feature that I have never seen (well-supported) in any other notebook is the ability to switch languages: https://docs.databricks.com/user-guide/notebooks/index.html#mix-languages

Use the shortcuts link to pop up all the navigation run and edit commands.

Lastly - I note that this page:https://www.dezyre.com/article/top-apache-spark-certifications-to-choose-from-in-2018/348 seems to be the best for outlining how to prepare for certification and the difference between the cert tests.

In [None]:
%scala
// Create the case classes for our domain
case class Department(id: String, name: String)
case class Employee(firstName: String, lastName: String, email: String, salary: Int)
case class DepartmentWithEmployees(department: Department, employees: Seq[Employee])

// Create the Departments
val department1 = new Department("123456", "Computer Science")
val department2 = new Department("789012", "Mechanical Engineering")
val department3 = new Department("345678", "Theater and Drama")
val department4 = new Department("901234", "Indoor Recreation")

// Create the Employees
val employee1 = new Employee("michael", "armbrust", "no-reply@berkeley.edu", 100000)
val employee2 = new Employee("xiangrui", "meng", "no-reply@stanford.edu", 120000)
val employee3 = new Employee("matei", null, "no-reply@waterloo.edu", 140000)
val employee4 = new Employee(null, "wendell", "no-reply@princeton.edu", 160000)

// Create the DepartmentWithEmployees instances from Departments and Employees
val departmentWithEmployees1 = new DepartmentWithEmployees(department1, Seq(employee1, employee2))
val departmentWithEmployees2 = new DepartmentWithEmployees(department2, Seq(employee3, employee4))
val departmentWithEmployees3 = new DepartmentWithEmployees(department3, Seq(employee1, employee4))
val departmentWithEmployees4 = new DepartmentWithEmployees(department4, Seq(employee2, employee3))

In [None]:
%python
# import pyspark class Row from module sql
from pyspark.sql import *

# Create Example Data - Departments and Employees

# Create the Departments
department1 = Row(id='123456', name='Computer Science')
department2 = Row(id='789012', name='Mechanical Engineering')
department3 = Row(id='345678', name='Theater and Drama')
department4 = Row(id='901234', name='Indoor Recreation')

# Create the Employees
Employee = Row("firstName", "lastName", "email", "salary")
employee1 = Employee('michael', 'armbrust', 'no-reply@berkeley.edu', 100000)
employee2 = Employee('xiangrui', 'meng', 'no-reply@stanford.edu', 120000)
employee3 = Employee('matei', None, 'no-reply@waterloo.edu', 140000)
employee4 = Employee(None, 'wendell', 'no-reply@berkeley.edu', 160000)

# Create the DepartmentWithEmployees instances from Departments and Employees
departmentWithEmployees1 = Row(department=department1, employees=[employee1, employee2])
departmentWithEmployees2 = Row(department=department2, employees=[employee3, employee4])
departmentWithEmployees3 = Row(department=department3, employees=[employee1, employee4])
departmentWithEmployees4 = Row(department=department4, employees=[employee2, employee3])

print(department1)
print(employee2)
print(departmentWithEmployees1.employees[0].email)

Create the first DataFrame from a List of the Case Classes

In [None]:
%scala
val departmentsWithEmployeesSeq1 = Seq(departmentWithEmployees1, departmentWithEmployees2)
// Can't do a display of a Seq.  println doesn't do much with Seq.
val df1 = departmentsWithEmployeesSeq1.toDF()
// But display works awesome on data frames!
display(df1)

In [None]:
%python
departmentsWithEmployeesSeq1 = [departmentWithEmployees1, departmentWithEmployees2]
df1 = sqlContext.createDataFrame(departmentsWithEmployeesSeq1)

display(df1)

Create a 2nd DataFrame from a List of Case Classes.

In [None]:
%scala
val departmentsWithEmployeesSeq2 = Seq(departmentWithEmployees3, departmentWithEmployees4)
val df2 = departmentsWithEmployeesSeq2.toDF()
display(df2)

In [None]:
%python
departmentsWithEmployeesSeq2 = [departmentWithEmployees3, departmentWithEmployees4]
df2 = sqlContext.createDataFrame(departmentsWithEmployeesSeq2)

display(df2)

# Working with DataFrames
Union 2 DataFrames.

In [None]:
%scala
val unionDF = df1.unionAll(df2)
display(unionDF)

In [None]:
%python
unionDF = df1.unionAll(df2)
display(unionDF)

Write the dataframe to a [Parquet](http://parquet.apache.org/documentation/latest/) file.  Apparently, [Parquet](https://tech.blue-yonder.com/efficient-dataframe-storage-with-apache-parquet/) is an excellent tool for storing data efficiently.

In [None]:
%scala
// Remove the file if it exists
dbutils.fs.rm("/tmp/databricks-df-example-scala.parquet", true)
unionDF.write.parquet("/tmp/databricks-df-example-scala.parquet")

In [None]:
%python
# Remove the file if it exists
dbutils.fs.rm("/tmp/databricks-df-example-python.parquet", True)
unionDF.write.parquet("/tmp/databricks-df-example-python.parquet")

Read the data frame from a Parquet file.  For giggles, I attempted to swap one file to read into another, but got a conversion error.  When I wrote and read from the sayme language, things worked.  It may be that Parquet files cannot be interchanged between languages.

In [None]:
%scala
val parquetDF = sqlContext.read.parquet("/tmp/databricks-df-example-scala.parquet")

display(parquetDF)

In [None]:
%python
parquetDF = sqlContext.read.parquet("/tmp/databricks-df-example-python.parquet")
display(parquetDF)

"Explode" the data frame.

In [None]:
%scala
import org.apache.spark.sql.functions.explode

// Note that the orginal code presented in the Scala notebook did not appear to work properly
// so we adopted the code using Python as an example and figured out how to select out what we really want
val inter = parquetDF.select("employees")
display(inter)
val explodeDF = inter.explode($"employees") {
    case Row(employee: Seq[Row]) => employee.map{ employee =>
    val firstName = employee(0).asInstanceOf[String]
    val lastName = employee(1).asInstanceOf[String]
    val email = employee(2).asInstanceOf[String]
    val salary = employee(3).asInstanceOf[Int]
    Employee(firstName, lastName, email, salary)
}}.select("firstName", "lastName", "email", "salary")
display(explodeDF)

In [None]:
%python
from pyspark.sql.functions import explode

df = parquetDF.select(explode("employees").alias("e"))
explodeDF = df.selectExpr("e.firstName", "e.lastName", "e.email", "e.salary")

display(explodeDF)

Use ``filter()`` to return only the rows that match the given predicate.

In [None]:
%scala
val filterDF = explodeDF
  .filter($"firstName" === "xiangrui" || $"firstName" === "michael")
  .sort($"lastName".asc)
display(filterDF)

In [None]:
%python
from pyspark.sql.functions import col, asc

# Use `|` instead of `or`
filterDF = explodeDF.filter((col("firstName") == "xiangrui") | (col("firstName") == "michael")).sort(asc("lastName"))
display(filterDF)

The ``where()`` clause is equivalent to ``filter()``.

In [None]:
%scala
val whereDF = explodeDF.where(($"firstName" === "xiangrui") || ($"firstName" === "michael")).sort($"lastName".asc)
display(whereDF)

In [None]:
%python
whereDF = explodeDF.where((col("firstName") == "xiangrui") | (col("firstName") == "michael")).sort(asc("lastName"))
display(whereDF)

SCALA: Replace ``null`` values with ``--`` using DataFrame Na functions.  
Python: Replace null values with ``--`` using DataFrame Na functions.

In [None]:
%scala
val naFunctions = explodeDF.na
val nonNullDF = naFunctions.fill("--")
display(nonNullDF)

In [None]:
%python
nonNullDF = explodeDF.fillna("--")
display(nonNullDF)

Retrieve only rows with missing firstName or lastName.

In [None]:
%scala
val filterNonNullDF = nonNullDF.filter($"firstName" === "--" || $"lastName" === "--").sort($"email".asc)
display(filterNonNullDF)

In [None]:
%python
filterNonNullDF = nonNullDF.filter(col("firstName").like("--") | col("lastName").like("--")).sort("email")
display(filterNonNullDF)

Example aggregations using ``agg()`` and ``countDistinct()``.

In [None]:
%scala
import org.apache.spark.sql.functions._

// Find the distinct (firstName, lastName) combinations
val countDistinctDF = nonNullDF.select($"firstName", $"lastName")
  .groupBy($"firstName", $"lastName")
  .agg(countDistinct($"firstName") as "distinct_first_names")
display(countDistinctDF)

In [None]:
%python
from pyspark.sql.functions import countDistinct

countDistinctDF = nonNullDF.select("firstName", "lastName")\
  .groupBy("firstName", "lastName")\
  .agg(countDistinct("firstName"))

display(countDistinctDF)

Compare the DataFrame and SQL Query Physical Plans (Hint: They should be the same.)

In [None]:
%scala
countDistinctDF.explain()

In [None]:
%python
countDistinctDF.explain()

In [None]:
%scala
// register the DataFrame as a temp table so that we can query it using SQL
nonNullDF.registerTempTable("databricks_df_example")

// Perform the same query as the DataFrame above and return ``explain``
sqlContext.sql("""
SELECT firstName, lastName, count(distinct firstName) as distinct_first_names
FROM databricks_df_example
GROUP BY firstName, lastName
""").explain

In [None]:
%python
# register the DataFrame as a temp table so that we can query it using SQL
explodeDF.registerTempTable("databricks_df_example")

# Perform the same query as the DataFrame above and return ``explain``
countDistinctDF_sql = sqlContext.sql("SELECT firstName, lastName, count(distinct firstName) as distinct_first_names FROM databricks_df_example GROUP BY firstName, lastName")

countDistinctDF_sql.explain()

Print the summary statistics for the salaries.

In [None]:
%scala
// Sum up all the salaries
val salarySumDF = nonNullDF.agg("salary" -> "sum")
display(salarySumDF)

In [None]:
%python
salarySumDF = explodeDF.agg({"salary" : "sum"})
display(salarySumDF)

In [None]:
%python
type(explodeDF.salary)

Print the summary statistics for the table.

In [None]:
%scala
nonNullDF.describe("salary").show()

In [None]:
%python
nonNullDF.describe("salary").show()

## Scala: A Utility for Flattening
If your data has several levels of nesting, here is a helper function to flatten your DataFrame to make it easier to work with.

In [None]:
%scala
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

implicit class DataFrameFlattener(df: DataFrame) {
  def flattenSchema: DataFrame = {
    df.select(flatten(Nil, df.schema): _*)
  }

  protected def flatten(path: Seq[String], schema: DataType): Seq[Column] = schema match {
    case s: StructType => s.fields.flatMap(f => flatten(path :+ f.name, f.dataType))
    case other => col(path.map(n => s"`$n`").mkString(".")).as(path.mkString(".")) :: Nil
  }
}

In [None]:
val veryNestedDF = Seq(("1", (2, (3, 4)))).toDF()
display(veryNestedDF)

In [None]:
%scala
display(veryNestedDF.flattenSchema)

## Python: An example using Pandas & Matplotlib Integration

In [None]:
%python
import pandas as pd
import matplotlib.pyplot as plt
plt.clf()
pdDF = nonNullDF.toPandas()
pdDF.plot(x='firstName', y='salary', kind='bar', rot=45)
display()

Cleanup: Remove the Parquet Files

In [None]:
%scala
dbutils.fs.rm("/tmp/databricks-df-example-scala.parquet", true)

In [None]:
%python
dbutils.fs.rm("/tmp/databricks-df-example-python.parquet", True)