
# Describe a DataFrame

Your data processing in Apaceh Spark is accomplished by defining Dataframes to read and process the Data.

This notebook will introduce how to read your data using Apache Spark Dataframes.

#Introduction

**Data Source**
* One hour of Pagecounts from the English Wikimedia projects captured August 5, 2016, at 12:00 PM UTC.
* Size on Disk: ~23 MB
* Type: Compressed Parquet File
* More Info: <a href="https://dumps.wikimedia.org/other/pagecounts-raw" target="_blank">Page view statistics for Wikimedia projects</a>

**Technical Accomplishments:**
* Develop familiarity with the `DataFrame` APIs
* Introduce the classes...
  * `SparkSession`
  * `DataFrame` (aka `Dataset[Row]`)
* Introduce the actions...
  * `count()`

## Getting Started

In [0]:
from pyspark.sql import SparkSession

In [0]:
# Initialize Spark Session
spark = (SparkSession.builder
         .appName("Describe Dataframe")
         .getOrCreate())

In [0]:
spark.conf.get('spark.app.name')

In [0]:
display(spark.sparkContext.getConf().getAll())

## **The Data Source**

* In this notebook, we will be using a compressed parquet "file" called **pagecounts** (~23 MB file from Wikipedia)
* We will explore the data and develop an understanding of it as we progress.

In [0]:
%run ../DatasetSourcePath

In [0]:
parquetDir = sourcePath + "/dataset/pagecounts/staging_parquet_en_only_clean/"

file_list = dbutils.fs.ls(parquetDir)
for f in file_list:
  print(f.name)

As we can see from the files listed above, this data is stored in <a href="https://parquet.apache.org" target="_blank">Parquet</a> files which can be read in a single command, the result of which will be a `DataFrame`.

## Create a DataFrame
* We can read the Parquet files into a `DataFrame`.
* We'll start with the object **spark**, an instance of `SparkSession` and the entry point to Spark 2.0 applications.
* From there we can access the `read` object which gives us an instance of `DataFrameReader`.

In [0]:
parquetDir

In [0]:
pagecountsEnAllDF = (spark  # SparkSession & Entry Point
  .read                     # DataFrameReader
  .parquet(parquetDir)      # Returns an instance of DataFrame
)

print(pagecountsEnAllDF)    # Python hack to see the data type

In [0]:
pagecountsEnAllDF = spark \
  .read \
  .parquet(parquetDir)

## count()

If you look at the API docs, `count()` is described like this:
> Returns the number of rows in the Dataset.

`count()` will trigger a job to process the request and return a value.

We can now count all records in our `DataFrame` like this:

In [0]:
total = pagecountsEnAllDF.count()

print("Record Count: {0:,}".format( total ))

That tells us that there are around 2 million rows in the `DataFrame`.


# Use common DataFrame methods

We will now build upon that concept by introducing common DataFrame methods.

## Our Data

Let's continue by taking a look at the type of data we have. 

We can do this with the `printSchema()` command:

In [0]:
pagecountsEnAllDF.printSchema()

We should now be able to see that we have four columns of data:
* **project** (*string*): The name of the Wikipedia project. This will include values such as:
  * **en**: The English version of Wikipedia.
  * **fr**: The French version of Wikipedia.
  * **en.d**: The English version of Wiktionary.
  * **fr.b**: The French version of Wikibooks.
  * **de.n**: The German version of Wikinews.
* **article** (*string*): The name of the article in the corresponding project. This will include values such as:
  * Apache_Spark
  * Matei_Zaharia
  * Kevin_Bacon
* **requests** (*integer*): The number of requests (clicks) the article has received in the hour this data represents.
* **bytes_served** (*long*): The total number of bytes delivered for the requested article.
  > **Note:** In our copy of the data, this value is zero for all records and consequently is of no value to us.

## Spark API

You have already seen one command available to the `DataFrame` class, namely `DataFrame.printSchema()`
  
Let's take a look at the API to see what other operations we have available.

* <a href="https://spark.apache.org/docs/latest/" target="_blank">Spark API Documentation - Latest</a>

### Spark API (Python)

0. Select **Spark Python API (Sphinx)**.
0. Look for `Spark SQL API Reference`
0. Look up the documentation for `pyspark.sql.DataFrame`.


# Use the Display function

There are different ways to view data in a DataFrame. This notebook covers these methods as well as transformations to further refine the data.


**Technical Accomplishments:**
* Introduce the transformations...
  * `limit(..)`
  * `select(..)`
  * `drop(..)`
  * `distinct()`
  * `dropDuplicates(..)`
* Introduce the actions...
  * `show(..)`

## show(..)

What we want to look for next is a function that will allow us to print the data to the console.

In the API docs for `DataFrame`/`Dataset` find the docs for the `show(..)` command(s).

In the case of Python, we have one method with two optional parameters.<br/>
In the case of Scala, we have several overloaded methods.<br/>

In either case, the `show(..)` method effectively has two optional parameters:
* **n**: The number of records to print to the console, the default being 20.
* **truncate**: If true, columns wider than 20 characters will be truncated, where the default is true.

Let's take a look at the data in our `DataFrame` with the `show()` command:

In [0]:
pagecountsEnAllDF.show(20)

In the cell above, change the parameters of the show command to:
* print only the first five records
* disable truncation
* print only the first ten records and disable truncation

**Note:** The function `show(..)` is an **action** which triggers a job.

In [0]:
pagecountsEnAllDF.show(20,2)

In [0]:
pagecountsEnAllDF.show(2, vertical=True)

## limit(..)

Both `show(..)` and `display(..)` are **actions** that trigger jobs (though in slightly different ways).

If you recall, `show(..)` has a parameter to control how many records are printed but, `display(..)` does not.

We can address that difference with our first transformation, `limit(..)`.

If you look at the API docs, `limit(..)` is described like this:
> Returns a new Dataset by taking the first n rows...

`show(..)`, like many actions, does not return anything. 

On the other hand, transformations like `limit(..)` return a **new** `DataFrame`:

In [0]:
limitedDF = pagecountsEnAllDF.limit(5) # "limit" the number of records to the first 5

limitedDF # Python hack to force printing of the data type

### Nothing Happened
* Notice how "nothing" happened - that is no job was triggered.
* This is because we are simply defining the second step in our transformations.
  1. Read in the parquet file (represented by **pagecountsEnAllDF**).
  1. Limit those records to just the first 5 (represented by **limitedDF**).
* It's not until we induce an action that a job is triggered and the data is processed

We can induce a job by calling `show(..)` actions:

In [0]:
limitedDF.show(100, False) #show up to 100 records and don't truncate the columns

> you can enable `spark.sql.repl.eagerEval.enabled` configuration for the eager evaluation of PySpark DataFrame in notebooks

In [0]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', False)

In [0]:
limitedDF

In [0]:
display(pagecountsEnAllDF.limit(40))

## select(..)

Let's say, for the sake of argument, that we don't want to look at all the data:

In [0]:
pagecountsEnAllDF.columns

In [0]:
pagecountsEnAllDF.printSchema()

For example, it was asserted above that **bytes_served** had nothing but zeros in it and consequently is of no value to us.

If that is the case, we can disregard it by selecting only the three columns that we want:

In [0]:
# Transform the data by selecting only three columns
onlyThreeDF = (pagecountsEnAllDF
  .select("project", "article", "requests")
)

# Now let's take a look at what the schema looks like
onlyThreeDF.printSchema()

Again, notice how the call to `select(..)` does not trigger a job.

That's because `select(..)` is a transformation. It's just one more step in a long list of transformations.

Let's go ahead and invoke the action `show(..)` and take a look at the result.

In [0]:
# And lastly, show the first five records which should exclude the bytes_served column.
onlyThreeDF.show(5, False)

The `select(..)` command is one of the most powerful and most commonly used transformations. 

We will see plenty of other examples of its usage as we progress.

If you look at the API docs, `select(..)` is described like this:
> Returns a new Dataset by computing the given Column expression for each element.

The "Column expression" referred to there is where the true power of this operation shows up. Again, we will go deeper on these later.

Just like `limit(..)`, `select(..)` 
* does not trigger a job
* returns a new `DataFrame`
* simply defines the next transformation in a sequence of transformations.

## drop(..)

As a quick side note, you will quickly discover there are a lot of ways to accomplish the same task.

Take the transformation `drop(..)` for example - instead of selecting everything we wanted, `drop(..)` allows us to specify the columns we don't want.

If you look at the API docs, `drop(..)` is described like this:
> Returns a new Dataset with a column dropped.

And we can see that we can produce the same result as the last exercise this way:

In [0]:
# Transform the data by selecting only three columns
droppedDF = (pagecountsEnAllDF
  .drop("bytes_served")
)
# Now let's take a look at what the schema looks like
droppedDF.printSchema()

Again, `drop(..)` is just one more transformation - that is no job is triggered.

In [0]:
# And lastly, show the first five records which should exclude the bytes_served column.
droppedDF.show(5, False)

## distinct() & dropDuplicates()

These two transformations do the same thing. In fact, they are aliases for one another.
* You can see this by looking at the source code for these two methods
* ```def distinct(): Dataset[T] = dropDuplicates()```

The difference between them has everything to do with the programmer and their perspective.
* The name **distinct** will resonate with developers, analyst and DB admins with a background in SQL.
* The name **dropDuplicates** will resonate with developers that have a background or experience in functional programming.

As you become more familiar with the various APIs, you will see this pattern reassert itself.

The designers of the API are trying to make the API as approachable as possible for multiple target audiences.

If you look at the API docs, both `distinct(..)` and `dropDuplicates(..)` are described like this:
> Returns a new Dataset that contains only the unique rows from this Dataset....

With this transformation, we can now tackle our first business question:

### How many different English Wikimedia projects saw traffic during that hour?


If you recall, our original `DataFrame` has this schema:

In [0]:
pagecountsEnAllDF.printSchema()

The transformation `distinct()` is applied to the row as a whole - data in the **project**, **article** and **requests** column will effect this evaluation.

To get the distinct list of projects, and only projects, we need to reduce the number of columns to just the one column, **project**. 

We can do this with the `select(..)` transformation and then we can introduce the `distinct()` transformation.

In [0]:
distinctDF = (pagecountsEnAllDF
  .select("project")
  .distinct()
)

Just to reinforce, we have three transformations:
1. Read the data (now represented by `pagecountsEnAllDF`)
1. Select just the one column
1. Reduce the records to a distinct set

No job is triggered until we perform an action like `show(..)`:

In [0]:
# There will not be more than 100 projects
distinctDF.show(100, False)               

You can count those if you like.

But, it would be easier to ask the `DataFrame` for the `count()`:

In [0]:
total = distinctDF.count()     
print("Distinct Projects: {0:,}".format( total ))

## dropDuplicates(columns...)

The method `dropDuplicates(..)` has a second variant that accepts one or more columns.
* The distinction is not performed across the entire record unlike `distinct()` or even `dropDuplicates()`.
* The distinction is based only on the specified columns.
* This allows us to keep all the original columns in our `DataFrame`.

In [0]:
# Sample customer DataFrame
data = [
    (1, "Alice", "Smith", "NY", 34),
    (2, "Bob", "Johnson", "CA", 45),
    (1, "Alice", "Smith", "NY", 34),
    (3, "David", "Lee", "TX", 29)
]

columns = ["CustomerID", "FirstName", "LastName", "State", "Age"]
df = spark.createDataFrame(data, columns)

# Using dropDuplicates method
distinctDF = df.distinct() # or distinctDF = df.dropDuplicate() 

# Display the distinct DataFrame
display(distinctDF)

In [0]:
# Sample customer DataFrame
data = [
    (1, "Alice", "Smith", "NY", 34),
    (2, "Bob", "Johnson", "CA", 45),
    (1, "Alice", "Smith", "New York", 34),
    (3, "David", "Lee", "TX", 29)
]

columns = ["CustomerID", "FirstName", "LastName", "State", "Age"]
df = spark.createDataFrame(data, columns)

# Using dropDuplicates method
distinctDF = df.dropDuplicates(["CustomerID", "FirstName", "LastName", "Age"])

# Display the distinct DataFrame
display(distinctDF)

## Recap

Our code is spread out over many cells which can make this a little hard to follow.

Let's take a look at the same code in a single cell.

In [0]:
pagecountsEnAllDF = (spark       # SparkSession & Entry Point
  .read                          # DataFrameReader
  .parquet(parquetDir)           # Returns an instance of DataFrame
)
distinctDF = (pagecountsEnAllDF  # Our original DataFrame from spark.read.parquet(..)
  .select("project")             # Drop all columns except the "project" column
  .distinct()                    # Reduce the set of all records to just the distinct column.
)
total = distinctDF.count()     
print("Distinct Projects: {0:,}".format( total ))

## DataFrames vs SQL & Temporary Views

The `DataFrame`s API is built upon an SQL engine.

As such we can "convert" a `DataFrame` into a temporary view (or table) and then use it in "standard" SQL.

Let's start by creating a temporary view from a previous `DataFrame`.

In [0]:
pagecountsEnAllDF.createOrReplaceTempView("pagecounts")


Now that we have a temporary view (or table) we can start expressing our queries and transformations in SQL:

In [0]:
%sql
SELECT * FROM pagecounts

In [0]:
spark.sql("SELECT * FROM pagecounts").show(10)

In [0]:
spark.table("pagecounts").show(10)

And we can just as easily express in SQL the distinct list of projects, and just because we can, we'll sort that list:

In [0]:
%sql
SELECT DISTINCT project FROM pagecounts ORDER BY project

And converting from SQL back to a `DataFrame` is just as easy:

In [0]:
tableDF = spark.sql("SELECT DISTINCT project FROM pagecounts ORDER BY project")
tableDF.show()

In [0]:
%scala
val scalaDF = spark.sql("SELECT * FROM pagecounts")

// Filter for a specific project and count the number of records
val count = scalaDF.filter($"project" === "en").count()

println(s"Number of records for project 'en': $count")

# Lab: Find Distinct Articles

In [0]:
# TODO
# Replace <<FILL_IN>> with your code. 

df = (spark                    # Our SparkSession & Entry Point
  .read                        # Our DataFrameReader
  <<FILL_IN>>                  # Read in the parquet files
  <<FILL_IN>>                  # Reduce the columns to just the one
  <<FILL_IN>>                  # Produce a unique set of values
)
totalArticles = df.<<FILL_IN>> # Identify the total number of records remaining.

print("Distinct Articles: {0:,}".format(totalArticles))

## Verify Your Work
Run the following cell to verify that your `DataFrame` was created properly.

In [0]:
expected = 1783138
assert totalArticles == expected, "Expected the total to be " + str(expected) + " but found " + str(totalArticles)
