#DataFrame Column Expressions

** Data Source **
* One hour of Pagecounts from the English Wikimedia projects captured August 5, 2016, at 12:00 PM UTC.
* Size on Disk: ~23 MB
* Type: Compressed Parquet File
* More Info: <a href="https://dumps.wikimedia.org/other/pagecounts-raw" target="_blank">Page view statistics for Wikimedia projects</a>

**Technical Accomplishments:**
* Continue exploring the `DataFrame` set of APIs.
* Continue to work with the `Column` class and introduce the `Row` class
* Introduce the transformations...
  * `orderBy(..)`
  * `sort(..)`
  * `filter(..)`
  * `where(..)`
* Introduce the actions...
  * `collect()`
  * `take(n)`
  * `first()`
  * `head()`

##![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) Getting Started

Run the following cell to configure our "classroom."

In [3]:
%run "./Includes/Classroom-Setup"

##![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) **The Data Source**

We will be using the same data source as our previous notebook.

As such, we can go ahead and start by creating our initial `DataFrame`.

In [5]:
(source, sasEntity, sasToken) = getAzureDataSource()
spark.conf.set(sasEntity, sasToken)

parquetFile = source + "/wikipedia/pagecounts/staging_parquet_en_only_clean/"

In [6]:
pagecountsEnAllDF = (spark  # Our SparkSession & Entry Point
  .read                     # Our DataFrameReader
  .parquet(parquetFile)     # Returns an instance of DataFrame
  .cache()                  # cache the data
)
print(pagecountsEnAllDF)

Let's look at the data once more...

In [8]:
from pyspark.sql.functions import *

sortedDescDF = (pagecountsEnAllDF
  .orderBy( col("requests").desc() )
)  
sortedDescDF.show(10, False)

In looking at the data, we can see multiple Wikipedia projects.

What if we want to look at only the main Wikipedia project, **en**?

For that, we will need to filter out some records.

##![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) filter(..) & where(..)

If you look at the API docs, `filter(..)` and `where(..)` are described like this:
> Filters rows using the given condition.

Both `filter(..)` and `where(..)` return a new dataset containing only those records for which the specified condition is true.
* Like `distinct()` and `dropDuplicates()`, `filter(..)` and `where(..)` are aliases for each other.
  * `filter(..)` appealing to functional programmers.
  * `where(..)` appealing to developers with an SQL background.
* Like `orderBy(..)` there are two variants of these two methods:
  * `filter(Column)`
  * `filter(String)`
  * `where(Column)`
  * `where(String)`
* Unlike `orderBy(String)` which requires a column name, `filter(String)` and `where(String)` both expect an SQL expression.

Let's start by looking at the variant using an SQL expression:

### filter(..) & where(..) w/SQL Expression

In [13]:
whereDF = (sortedDescDF
  .where( "project = 'en'" )
)
whereDF.show(10, False)

Now that we are only looking at the main Wikipedia articles, we get a better picture of the most popular articles on Wikipedia.

Next, let's take a look at the second variant that takes a `Column` object as its first parameter:

### filter(..) & where(..) w/Column

In [16]:
filteredDF = (sortedDescDF
  .filter( col("project") == "en")
)
filteredDF.show(10, False)

### A Scala Issue...

With Python, this is pretty straight forward.

But in Scala... notice anything unusual in that last command?

**Question:** In most every programming language, what is a single equals sign (=) used for?

**Question:** What are two equal signs (==) used for?

**Question:** 
* Considering that transformations are lazy...
* And the == operator executes now...
* And `filter(..)` and `where(..)` require us to pass a `Column` object...
* What would be wrong with `$"project" == "en"`?

Try it...

In [19]:
%scala

$"project" == "en"

Compare that to the following call...

In [21]:
%scala

$"project" === "en"

Let's take a look at the Scala Doc for the `Column` object. </br>

| "Operator" | Function |
|:----------:| -------- |
| === | Equality test |
| !== | Deprecated inequality test |
| =!= | Inequality test |
| <=> | Null safe equality test |

### The Solution...

With that behind us, we can clearly **see** the top ten most requested articles.

But what if we need to **programmatically** extract the value of the most requested article's name and its number of requests?

That is to say, how do we get the first record, and from there...
* the value of the second column, **article**, as a string...
* the value of the third column, **requests**, as an integer...

Before we proceed, let's apply another filter to get rid of **Main_Page** and anything starting with **Special:** - they're just noise to us.

In [25]:
articlesDF = (filteredDF
  .drop("bytes_served")
  .filter( col("article") != "Main_Page")
  .filter( col("article") != "-")
  .filter( col("article").startswith("Special:") == False)
)
articlesDF.show(10, False)

##![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) first() & head()

If you look at the API docs, both `first(..)` and `head(..)` are described like this:
> Returns the first row.

Just like `distinct()` & `dropDuplicates()` are aliases for each other, so are `first(..)` and `head(..)`.

However, unlike `distinct()` & `dropDuplicates()` which are **transformations** `first(..)` and `head(..)` are **actions**.

Once all processing is done, these methods return the object backing the first record.

In the case of `DataFrames` (both Scala and Python) that object is a `Row`.

In the case of `Datasets` (the strongly typed version of `DataFrames` in Scala and Java), the object may be a `Row`, a `String`, a `Customer`, a `PendingApplication` or any number of custom objects.

Focusing strictly on the `DataFrame` API for now, let's take a look at a call with `head()`:

In [27]:
firstRow = articlesDF.first()

print(firstRow)

##![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) The Row Class

Now that we have a reference to the object backing the first row (or any row), we can use it to extract the data for each column.

Before we do, let's take a look at the API docs for the `Row` class.

At the heart of it, we are simply going to ask for the value of the object in column N via `Row.get(i)`.

Python being a loosely typed language, the return value is of no real consequence.

However, Scala is going to return an object of type `Any`. In Java, this would be an object of type `Object`.

What we need (at least for Scala), especially if the data type matters in cases of performing mathematical operations on the value, we need to call one of the other methods:
* `getAs[T](i):T`
* `getDate(i):Date`
* `getString(i):String`
* `getInt(i):Int`
* `getLong(i):Long`

We can now put it all together to get the number of requests for the most requested project:

In [29]:
article = firstRow['article']
total = firstRow['requests']

print("Most Requested Article: \"{0}\" with {1:,} requests".format( article, total ))

##![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) collect()

If you look at the API docs, `collect(..)` is described like this:
> Returns an array that contains all of Rows in this Dataset.

`collect()` returns a collection of the specific type backing each record of the `DataFrame`.
* In the case of Python, this is always the `Row` object.
* In the case of Scala, this is also a `Row` object.
* If the `DataFrame` was converted to a `Dataset` the backing object would be the user-specified object.

Building on our last example, let's take the top 10 records and print them out.

In [31]:
rows = (articlesDF
  .limit(10)           # We only want the first 10 records.
  .collect()           # The action returning all records in the DataFrame
)

# rows is an Array. Now in the driver, 
# we can just loop over the array and print 'em out.

listItems = ""
for row in rows:
  project = row['article']
  total = row['requests']
  listItems += "    <li><b>{}</b> {:0,d} requests</li>\n".format(project, total)
  
html = """
<body>
  <h1>Top 10 Articles</h1>
  <ol>
    %s
  </ol>
</body>
""" % (listItems.strip())

print(html)

# UNCOMMENT FOR A PRETTIER PRESENTATION
# displayHTML(html)

##![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) take(n)

If you look at the API docs, `take(n)` is described like this:
> Returns the first n rows in the Dataset.

`take(n)` returns a collection of the first N records of the specific type backing each record of the `DataFrame`.
* In the case of Python, this is always the `Row` object.
* In the case of Scala, this is also a `Row` object.
* If the `DataFrame` was converted to a `Dataset` the backing object would be the user-specified object.

In short, it's the same basic function as `collect()` except you specify as the first parameter the number of records to return.

In [33]:
rows = articlesDF.take(10)

# rows is an Array. Now in the driver, 
# we can just loop over the array and print 'em out.

listItems = ""
for row in rows:
  project = row['article']
  total = row['requests']
  listItems += "    <li><b>{}</b> {:0,d} requests</li>\n".format(project, total)
  
html = """
<body>
  <h1>Top 10 Articles</h1>
  <ol>
    %s
  </ol>
</body>
""" % (listItems.strip())

print(html)

# UNCOMMENT FOR A PRETTIER PRESENTATION
# displayHTML(html)

##![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) DataFrame vs Dataset

We've been alluding to `Datasets` off and on. 

The following example demonstrates how to convert a `DataFrame` to a `Dataset`.

And when compared to the previous example, helps to illustrate the difference/relationship between the two.

** *Note:* ** *As a reminder, `Datasets` are a Java and Scala concept and brings to those languages the type safety that *<br/>
*is lost with `DataFrame`, or rather, `Dataset[Row]`. Python and R have no such concept because they are loosely typed.*

Before we demonstrate this, let's review all our transformations:

In [36]:
%scala
val (source, sasEntity, sasToken) = getAzureDataSource()
spark.conf.set(sasEntity, sasToken)

val parquetFile = source + "/wikipedia/pagecounts/staging_parquet_en_only_clean/"

In [37]:
%scala

val articlesDF = spark                          // Our SparkSession & Entry Point
  .read                                         // Our DataFrameReader
  .parquet(parquetFile)                         // Creates a DataFrame from a parquet file
  .filter( $"project" === "en")                 // Include only the "en" project
  .filter($"article" =!= "Main_Page")           // Exclude the Wikipedia Main Page
  .filter($"article" =!= "-")                   // Exclude some "weird" article
  .filter( ! $"article".startsWith("Special:")) // Exclude all the "special" articles
  .drop("bytes_served")                         // We just don't need this column
  .orderBy( $"requests".desc )                  // Sort by requests descending

Notice above that `articlesDF` is a `Dataset` of type `Row`.

Next, create the case class `WikiReq`. 

A little later we can convert this `DataFrame` to a `Dataset` of type `WikiReq`:

In [39]:
%scala

// the name and data type of the case class must match the schema they will be converted from.
case class WikiReq (project:String, article:String, requests:Int)

articlesDF.printSchema

Instead of the `Row` object, we can now back each record with our new `WikiReq` class.

And we can see the conversion from `DataFrames` to `Datasets` here:

In [41]:
%scala

val articlesDS = articlesDF.as[WikiReq]

Make note of the data type: **org.apache.spark.sql.Dataset[WikiReq]**

Compare that to a `DataFrame`: **org.apache.spark.sql.Dataset[Row]**

Now when we ask for the first 10, we won't get an array of `Row` objects but instead an array of `WikiReq` objects:

In [43]:
%scala
val wikiReqs = articlesDS.take(10)

// wikiReqs is an Array of WikiReqs. Now in the driver, 
// we can just loop over the array and print 'em out.

var listItems = ""
for (wikiReq <- wikiReqs) {
  // Notice how we don't relaly need temp variables?
  // Or more specifically, we don't need to cast.
  listItems += "    <li><b>%s</b> %,d requests</li>%n".format(wikiReq.article, wikiReq.requests)
}

var html = s"""
<body>
  <h1>Top 10 Articles</h1>
  <ol>
    ${listItems.trim()}
  </ol>
</body>
"""

println(html)
println("-"*80)

// UNCOMMENT FOR A PRETTIER PRESENTATION
// displayHTML(html)