# Describe a DataFrame

Your data processing in Microsoft Fabric is accomplished by defining Dataframes to read and process the Data.

## ➡️ Getting Started

Run the following cell to configure our notebook.

In [None]:
%run Utilities

## ➡️ The Data Source

* In this notebook, we will be using a set of parquet "files" called **Yellow taxi trip records** (4 x ~55 MB files from March-June 2023)
* We will explore the data and develop an understanding of it as we progress.
* You can read more about this dataset here: <a href="https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page" target="_blank">TLC Trip Record Data</a>.

In [None]:
%%sh
ls -l /lakehouse/default/Files/taxidata/*.parquet

## ➡️ Create a DataFrame
* We can read the Parquet files into a `DataFrame`.
* We'll start with the object **spark**, an instance of `SparkSession` and the entry point to Spark 2.0 applications.
* From there we can access the `read` object which gives us an instance of `DataFrameReader`.

In [None]:
parquetDir = "Files/taxidata/yellow*.parquet"

In [None]:
tripdataDF = (spark         # Our SparkSession & Entry Point
  .read                     # Our DataFrameReader
  .parquet(parquetDir)      # Returns an instance of DataFrame
)
print(tripdataDF)           # Python hack to see the data type

## ➡️ count()

If you look at the API docs, `count()` is described like this:
> Returns the number of rows in the Dataset.

`count()` will trigger a job to process the request and return a value.

We can now count all records in our `DataFrame` like this:

In [None]:
total = tripdataDF.count()

print("Record Count: {0:,}".format( total ))

That tells us that there are around 13.5 million rows in the `DataFrame`.

Before we take a closer look at the contents of the DataFrame, let us introduce a technique that speeds up processing.

## ➡️ cache() & persist()

The ability to cache data is one technique for achieving better performance with Apache Spark. 

This is because every action requires Spark to read the data from its source (lakehouse) but caching moves that data into the memory of the local executor for "instant" access.

`cache()` is just an alias for `persist()`. 

In [None]:
(tripdataDF
  .cache()         # Mark the DataFrame as cached
  .count()         # Materialize the cache
) 

If you re-run that command, it should take significantly less time.

In [None]:
tripdataDF.count()

## ➡️ Performance considerations of Caching Data

When Caching Data you are placing it on the workers of the cluster. 

Caching takes resources, before moving a notebook into production please check and verify that you are appropriately using cache. 

And as a quick side note, you can remove a cache by calling the `DataFrame`'s `unpersist()` method but, it is not necessary.

## ➡️ Our Data

Let's continue by taking a look at the type of data we have. 

We can do this with the `printSchema()` command:

In [None]:
tripdataDF.printSchema()

## ➡️ show(..)

What we want to look for next is a function that will allow us to print the data to the console.

In the API docs for `DataFrame`/`Dataset` find the docs for the `show(..)` command(s).

In the case of Python, we have one method with two optional parameters.<br/>
In the case of Scala, we have several overloaded methods.<br/>

In either case, the `show(..)` method effectively has two optional parameters:
* **n**: The number of records to print to the console, the default being 20.
* **truncate**: If true, columns wider than 20 characters will be truncated, where the default is true.

Let's take a look at the data in our `DataFrame` with the `show()` command:

In [None]:
tripdataDF.show(5, True)

**Note:** The function `show(..)` is an **action** which triggers a job.

## ➡️ display(..)

The `show(..)` command is part of the core Spark API and simply prints the results to the console.

Our notebooks have a slightly more elegant alternative.

Instead of calling `show(..)` on an existing `DataFrame` we can instead pass our `DataFrame` to the `display(..)` command:

In [None]:
display(tripdataDF)

## ➡️ show(..) vs display(..)
* `show(..)` is part of core spark - `display(..)` is specific to our notebooks.
* `show(..)` is ugly - `display(..)` is pretty.
* `show(..)` has parameters for truncating both columns and rows - `display(..)` does not.
* `show(..)` is a function of the `DataFrame`/`Dataset` class - `display(..)` works with a number of different objects.
* `display(..)` is more powerful - with it, you can...
  * Download the results as CSV
  * Render line charts, bar chart & other graphs, maps and more.
  * See up to 1000 records at a time.
  
For the most part, the difference between the two is going to come down to preference.

Like `DataFrame.show(..)`, `display(..)` is an **action** which triggers a job.

## ➡️ limit(..)

Both `show(..)` and `display(..)` are **actions** that trigger jobs (though in slightly different ways).

If you recall, `show(..)` has a parameter to control how many records are printed but, `display(..)` does not.

We can address that difference with our first transformation, `limit(..)`.

If you look at the API docs, `limit(..)` is described like this:
> Returns a new Dataset by taking the first n rows...

`show(..)`, like many actions, does not return anything. 

On the other hand, transformations like `limit(..)` return a **new** `DataFrame`:

In [None]:
limitedDF = tripdataDF.limit(5) # "limit" the number of records to the first 5

limitedDF # Python hack to force printing of the data type

## ➡️ Nothing Happened
* Notice how "nothing" happened - that is no job was triggered.
* This is because we are simply defining the second step in our transformations.
  0. Read in the parquet file (represented by **tripdataDF**).
  0. Limit those records to just the first 5 (represented by **limitedDF**).
* It's not until we induce an action that a job is triggered and the data is processed

We can induce a job by calling either the `show(..)` or the `display(..)` actions:

In [None]:
limitedDF.show(100, False) #show up to 100 records and don't truncate the columns

In [None]:
display(limitedDF) # defaults to the first 1000 records

## ➡️ select(..)

Let's say, for the sake of argument, that we don't want to look at all the data:

In [None]:
tripdataDF.printSchema()

In [None]:
selectDF = (tripdataDF
    .select("VendorID", "tpep_pickup_datetime", "passenger_count", "total_amount")
)

selectDF.printSchema()

Again, notice how the call to `select(..)` does not trigger a job.

That's because `select(..)` is a transformation. It's just one more step in a long list of transformations.

Let's go ahead and invoke the action `show(..)` and take a look at the result.

In [None]:
# And lastly, show the first five records which should exclude the bytes_served column.

selectDF.show(5, False)

The `select(..)` command is one of the most powerful and most commonly used transformations. 

We will see plenty of other examples of its usage as we progress.

If you look at the API docs, `select(..)` is described like this:
> Returns a new Dataset by computing the given Column expression for each element.

The "Column expression" referred to there is where the true power of this operation shows up. Again, we will go deeper on these later.

Just like `limit(..)`, `select(..)` 
* does not trigger a job
* returns a new `DataFrame`
* simply defines the next transformation in a sequence of transformations.

## ➡️ drop(..)

As a quick side note, you will quickly discover there are a lot of ways to accomplish the same task.

Take the transformation `drop(..)` for example - instead of selecting everything we wanted, `drop(..)` allows us to specify the columns we don't want.

If you look at the API docs, `drop(..)` is described like this:
> Returns a new Dataset with a column dropped.

And we can see that we can produce the same result as the last exercise this way:

In [None]:
# Transform the data by selecting only three columns
droppedDF = (selectDF
  .drop("passenger_count") 
)

# Now let's take a look at what the schema looks like
droppedDF.printSchema()

Again, `drop(..)` is just one more transformation - that is no job is triggered.

In [None]:
# And lastly, show the first five records which should exclude the passenger_count column.
droppedDF.show(5, False)

## ➡️ distinct() & dropDuplicates()

These two transformations do the same thing. In fact, they are aliases for one another.
* You can see this by looking at the source code for these two methods
* ```def distinct(): Dataset[T] = dropDuplicates()```
* See <a href="https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala" target="_blank">Dataset.scala</a>

The difference between them has everything to do with the programmer and their perspective.
* The name **distinct** will resonate with developers, analyst and DB admins with a background in SQL.
* The name **dropDuplicates** will resonate with developers that have a background or experience in functional programming.

As you become more familiar with the various APIs, you will see this pattern reassert itself.

The designers of the API are trying to make the API as approachable as possible for multiple target audiences.

If you look at the API docs, both `distinct(..)` and `dropDuplicates(..)` are described like this:
> Returns a new Dataset that contains only the unique rows from this Dataset....

With this transformation, we can now tackle our first business question:

The transformation `distinct()` is applied to the row as a whole.

To get the distinct list of VendorID's, we need to reduce the number of columns to just the one column, **VendorID**. 

We can do this with the `select(..)` transformation and then we can introduce the `distinct()` transformation.

In [None]:
distinctDF = (tripdataDF            # Our original DataFrame from spark.read.parquet(..)
  .select("VendorID")               # Drop all columns except the "VendorID" column
  .distinct()                       # Reduce the set of all records to just the distinct column.
)

Just to reinforce, we have three transformations:

0. Read the data (now represented by `tripdataDF`)
0. Select just the one column
0. Reduce the records to a distinct set

No job is triggered until we perform an action like `show(..)`:

In [None]:
distinctDF.show(10, False)               

## ➡️ dropDuplicates(columns...)

The method `dropDuplicates(..)` has a second variant that accepts one or more columns.
* The distinction is not performed across the entire record unlike `distinct()` or even `dropDuplicates()`.
* The distinction is based only on the specified columns.
* This allows us to keep all the original columns in our `DataFrame`.

## ➡️ DataFrames vs SQL & Temporary Views

The `DataFrame`s API is built upon an SQL engine.

As such we can "convert" a `DataFrame` into a temporary view (or table) and then use it in "standard" SQL.

Let's start by creating a temporary view from a previous `DataFrame`.

In [None]:
tripdataDF.createOrReplaceTempView("tripdata")

Now that we have a temporary view (or table) we can start expressing our queries and transformations in SQL:

In [None]:
%%sql

SELECT *
FROM tripdata LIMIT 10

And we can just as easily express in SQL the distinct list of vendorID, and just because we can, we'll sort that list:

In [None]:
%%sql

SELECT DISTINCT VendorID
FROM tripdata
LIMIT 10

And converting from SQL back to a `DataFrame` is just as easy:

In [None]:
tableDF = spark.sql("SELECT DISTINCT VendorID FROM tripdata")
display(tableDF)