# Working with data in Azure Databricks

**Technical Accomplishments:**
- viewing available tables
- loading table data in dataframes
- loading file/dbfs data in dataframes
- using spark for simple queries
- using spark to show the data and its structure
- using spark for complex queries
- using Databricks' `display` for custom visualisations

## Attach notebook to your cluster
Before executing any cells in the notebook, you need to attach it to your cluster. Make sure that the cluster is running.

In the notebook's toolbar, select the drop down arrow next to Detached, and then select your cluster under Attach to.

## About Spark DataFrames

Spark DataFrames are distributed collections of data, organized into rows and columns, similar to traditional SQL tables.

A DataFrame can be operated on using relational transformations, through the Spark SQL API, which is available in Scala, Java, Python, and R.

We will use Python in our notebook. 

We often refer to DataFrame variables using `df`.

## Loading data into dataframes

#### View available data

To check the data available in our Databricks environment we can use the `%sql` magic and query our tables:

In [6]:
%sql

select * from nyc_taxi;

passengerCount,tripDistance,hour_of_day,day_of_week,month_num,normalizeHolidayName,isPaidTimeOff,snowDepth,precipTime,precipDepth,temperature,totalAmount
1.0,9.4,15,2,1,,False,29.058823529411764,24.0,3.0,6.18571428571429,44.3
,14.75,13,4,1,,False,0.0,6.0,0.0,4.571929824561403,44.8
1.0,3.35,23,4,1,,False,0.0,1.0,0.0,4.384090909090913,18.96
1.0,3.33,18,2,1,,False,29.058823529411764,24.0,3.0,6.18571428571429,16.3
1.0,0.47,17,6,1,,False,0.0,1.0,0.0,3.846428571428569,5.3
1.0,3.07,9,1,1,,False,0.0,6.0,0.0,0.1594594594594597,16.3
1.0,0.92,23,4,1,,False,0.0,1.0,0.0,-2.999107142857142,8.97
1.0,1.9,12,4,1,,False,0.0,1.0,0.0,4.384090909090913,11.8
1.0,0.77,0,1,1,,False,0.0,1.0,0.0,-5.393749999999998,7.3
,2.35,2,6,1,,False,0.0,24.0,254.0,10.943654822335034,14.16


#### Reading data from our tables

Using Spark, we can read data into dataframes. 

It is important to note that spark has read/write support for a widely set of formats. 
It can use
* csv
* json
* parquet
* orc
* avro
* hive tables
* jdbc

We can read our data from the tables (since we already imported the initial csv as Databricks tables).

In [8]:
df = spark.sql("SELECT * FROM nyc_taxi")
display(df)

passengerCount,tripDistance,hour_of_day,day_of_week,month_num,normalizeHolidayName,isPaidTimeOff,snowDepth,precipTime,precipDepth,temperature,totalAmount
1.0,9.4,15,2,1,,False,29.058823529411764,24.0,3.0,6.18571428571429,44.3
,14.75,13,4,1,,False,0.0,6.0,0.0,4.571929824561403,44.8
1.0,3.35,23,4,1,,False,0.0,1.0,0.0,4.384090909090913,18.96
1.0,3.33,18,2,1,,False,29.058823529411764,24.0,3.0,6.18571428571429,16.3
1.0,0.47,17,6,1,,False,0.0,1.0,0.0,3.846428571428569,5.3
1.0,3.07,9,1,1,,False,0.0,6.0,0.0,0.1594594594594597,16.3
1.0,0.92,23,4,1,,False,0.0,1.0,0.0,-2.999107142857142,8.97
1.0,1.9,12,4,1,,False,0.0,1.0,0.0,4.384090909090913,11.8
1.0,0.77,0,1,1,,False,0.0,1.0,0.0,-5.393749999999998,7.3
,2.35,2,6,1,,False,0.0,24.0,254.0,10.943654822335034,14.16


#### Reading data from the DBFS

We can also read the data from the original files we've uploaded; or indeed from any other file available in the DBFS. 

The code is the same regardless of whether a file is local or in mounted remote storage that was mounted, thanks to DBFS mountpoints

In [10]:
df = spark.read.csv('dbfs:/FileStore/tables/nyc_taxi.csv', header=True, inferSchema=True)
display(df)

passengerCount,tripDistance,hour_of_day,day_of_week,month_num,normalizeHolidayName,isPaidTimeOff,snowDepth,precipTime,precipDepth,temperature,totalAmount
1.0,9.4,15,2,1,,False,29.058823529411764,24.0,3.0,6.18571428571429,44.3
,14.75,13,4,1,,False,0.0,6.0,0.0,4.571929824561403,44.8
1.0,3.35,23,4,1,,False,0.0,1.0,0.0,4.384090909090913,18.96
1.0,3.33,18,2,1,,False,29.058823529411764,24.0,3.0,6.18571428571429,16.3
1.0,0.47,17,6,1,,False,0.0,1.0,0.0,3.846428571428569,5.3
1.0,3.07,9,1,1,,False,0.0,6.0,0.0,0.1594594594594597,16.3
1.0,0.92,23,4,1,,False,0.0,1.0,0.0,-2.999107142857142,8.97
1.0,1.9,12,4,1,,False,0.0,1.0,0.0,4.384090909090913,11.8
1.0,0.77,0,1,1,,False,0.0,1.0,0.0,-5.393749999999998,7.3
,2.35,2,6,1,,False,0.0,24.0,254.0,10.943654822335034,14.16


#### DataFrame size

Use `count` to determine how many rows of data we have in a dataframe.

In [12]:
df.count()

#### DataFrame structure

To get information about the schema associated with our dataframe we can use `printSchema`:

In [14]:
df.printSchema

### show(..) vs display(..)
* `show(..)` is part of core spark - `display(..)` is specific to our notebooks.
* `show(..)` has parameters for truncating both columns and rows - `display(..)` does not.
* `show(..)` is a function of the `DataFrame`/`Dataset` class - `display(..)` works with a number of different objects.
* `display(..)` is more powerful - with it, you can...
  * Download the results as CSV
  * Render line charts, bar chart & other graphs, maps and more.
  * See up to 1000 records at a time.
  
For the most part, the difference between the two is going to come down to preference.

Remember, the `display` function is Databricks specific. It is not available in standard spark code.

## Querying dataframes

Once that spark has the data, we can manipulate it using spark SQL API.

We can easily use the spark SQL dsl to do joins, aggregations, filtering. 
We can change the data structure, add or drop columns, or change the column types.

We will use the python function we've already defined to convert Celsius degrees to Fahrenheit degrees.

In [18]:
def celsiusToFahrenheit(source_temp=None):
    return(source_temp * (9.0/5.0)) + 32.0
  
celsiusToFahrenheit(27)

We will adapt it as a udf (user defined function) to make it usable with Spark's dataframes API.

And we will use it to enrich our source data.

In [20]:
from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql.types import *

udfCelsiusToFahrenheit = udf(lambda z: celsiusToFahrenheit(z), DoubleType())

display(df.filter(col('temperature').isNotNull()) \
  .withColumn("tempC", col("temperature").cast(DoubleType())) \
  .select(col("tempC"), udfCelsiusToFahrenheit(col("tempC")).alias("tempF")))
  

tempC,tempF
6.18571428571429,43.13428571428573
4.571929824561403,40.229473684210525
4.384090909090913,39.89136363636364
6.18571428571429,43.13428571428573
3.846428571428569,38.92357142857142
0.1594594594594597,32.28702702702703
-2.999107142857142,26.601607142857144
4.384090909090913,39.89136363636364
-5.393749999999998,22.291250000000005
10.943654822335034,51.69857868020306


More complex SQL functions are available in spark: 

* grouping, sorting, limits, count
* aggregations: agg, max, sum
* windowing: partitionBy, count over, max over

For example may want to add a row-number column to our source data. Window functions will help with such complex queries:

In [22]:
from pyspark.sql.window import Window
from pyspark.sql.functions import desc, row_number, monotonically_increasing_id

display(df.orderBy('tripDistance', ascending=False) \
  .withColumn('rowno', row_number().over(Window.orderBy(monotonically_increasing_id()))))

passengerCount,tripDistance,hour_of_day,day_of_week,month_num,normalizeHolidayName,isPaidTimeOff,snowDepth,precipTime,precipDepth,temperature,totalAmount,rowno
1.0,62.55,11,5,1,,False,0.0,24.0,102.0,7.808965517241381,339.38,1
1.0,29.3,4,3,4,,False,0.0,1.0,0.0,14.330357142857144,87.84,2
2.0,28.34,4,5,5,,False,0.0,6.0,13.0,15.919642857142868,75.0,3
1.0,28.1,14,5,4,,False,0.0,24.0,70.0,12.527343749999998,164.8,4
2.0,27.3,17,2,2,,False,0.0,6.0,213.0,9.820370370370377,99.35,5
1.0,26.89,23,2,6,,False,0.0,24.0,102.0,23.4363095238095,101.8,6
4.0,26.77,11,6,2,,False,0.0,6.0,5.0,10.577678571428564,121.26,7
2.0,25.9,14,2,2,,False,0.0,24.0,282.0,4.0848,83.3,8
1.0,25.65,11,5,5,,False,0.0,24.0,64.0,17.962280701754388,114.5,9
5.0,25.4,15,6,6,,False,0.0,1.0,0.0,23.709821428571416,116.8,10


#### Data cleaning

Before using the source data, we have to validate the contents. Let's see if there are any duplicates:

In [24]:
df.count() - df.dropDuplicates().count()

Some columns might be missing. We check the presence of null values for each column.

In [26]:
display(df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]))

passengerCount,tripDistance,hour_of_day,day_of_week,month_num,normalizeHolidayName,isPaidTimeOff,snowDepth,precipTime,precipDepth,temperature,totalAmount
587,0,0,0,0,0,0,0,0,0,0,117


Since some of our columns seem to have such null values, we'll have to fix these rows.

We could either replace null values using `fillna` or ignore such rows using `dropna`

In [28]:
df = df.fillna({'passengerCount':'1'}).dropna()
display(df)

passengerCount,tripDistance,hour_of_day,day_of_week,month_num,normalizeHolidayName,isPaidTimeOff,snowDepth,precipTime,precipDepth,temperature,totalAmount
1.0,9.4,15,2,1,,False,29.058823529411764,24.0,3.0,6.18571428571429,44.3
1.0,14.75,13,4,1,,False,0.0,6.0,0.0,4.571929824561403,44.8
1.0,3.35,23,4,1,,False,0.0,1.0,0.0,4.384090909090913,18.96
1.0,3.33,18,2,1,,False,29.058823529411764,24.0,3.0,6.18571428571429,16.3
1.0,0.47,17,6,1,,False,0.0,1.0,0.0,3.846428571428569,5.3
1.0,3.07,9,1,1,,False,0.0,6.0,0.0,0.1594594594594597,16.3
1.0,0.92,23,4,1,,False,0.0,1.0,0.0,-2.999107142857142,8.97
1.0,1.9,12,4,1,,False,0.0,1.0,0.0,4.384090909090913,11.8
1.0,0.77,0,1,1,,False,0.0,1.0,0.0,-5.393749999999998,7.3
1.0,2.35,2,6,1,,False,0.0,24.0,254.0,10.943654822335034,14.16


#### Explore Summary Statistics and Data Distribution
Predictive modeling is based on statistics and probability, so we should take a look at the summary statistics for the columns in our data. The **describe** function returns a dataframe containing the **count**, **mean**, **standard deviation**, **minimum**, and **maximum** values for each numeric column.

In [30]:
display(df.describe())


summary,passengerCount,tripDistance,hour_of_day,day_of_week,month_num,normalizeHolidayName,snowDepth,precipTime,precipDepth,temperature,totalAmount
count,11617.0,11617.0,11617.0,11617.0,11617.0,11617,11617.0,11617.0,11617.0,11617.0,11617.0
mean,1.32994749074632,2.86314539037617,13.634242919858828,3.2207971076870106,3.503055866402686,,1.594342123854158,12.02143410519067,191.4620814323836,10.318198223395576,14.724533872771849
stddev,0.9905854727655304,2.8995739877114945,6.668682319466743,1.9629105573867032,1.707677463883683,,7.084436666546873,10.157326735285835,1213.6354936137388,8.497340521033312,10.96651683941929
min,1.0,0.01,0.0,0.0,1.0,"Martin Luther King, Jr. Day",0.0,1.0,0.0,-13.379464285714295,3.3
max,6.0,62.55,23.0,6.0,6.0,Washington's Birthday,67.0909090909091,24.0,9999.0,26.52410714285713,339.38


## Visualizing data

Azure Databricks has custom support for displaying data. 

The `display(..)` command has multiple capabilities:
* Presents up to 1000 records.
* Exporting data as CSV.
* Rendering a multitude of different graphs.
* Rendering geo-located data on a world map.

Let's take a look at our data using databricks visualizations:
* Run the cell below
* click on the second icon underneath the executed cell and choose `Bar`
* click on the `Plot Options` button to configure the graph
  * drag the `tripDistance` into the `Keys` list
  * drag the `totalAmount` into the `Values` list
  * choose `Aggregation` as `AVG`
  * click `Apply`

In [33]:
dfClean = df.select(col("tripDistance"), col("totalAmount")).dropna()

display(dfClean)

tripDistance,totalAmount
9.4,44.3
14.75,44.8
3.35,18.96
3.33,16.3
0.47,5.3
3.07,16.3
0.92,8.97
1.9,11.8
0.77,7.3
2.35,14.16


Note that the points form a diagonal line, which indicates a strong linear relationship between the trip distance and the total amount. This linear relationship shows a correlation between these two values, which we can measure statistically. 

The `corr` function calculates a correlation value between -1 and 1, indicating the strength of correlation between two fields. A strong positive correlation (near 1) indicates that high values for one column are often found with high values for the other, which a strong negative correlation (near -1) indicates that low values for one column are often found with high values for the other. A correlation near 0 indicates little apparent relationship between the fields.

In [35]:
dfClean.corr('tripDistance', 'totalAmount')

Predictive modeling is largely based on statistical relationships between fields in the data. To design a good model, you need to understand how the data points relate to one another.

A common way to start exploring relationships is to create visualizations that compare two or more data values. For example, modify the Plot Options of the chart above to compare the arrival delays for each carrier:

* Keys: temperature
* Series Groupings: month_num
* Values: snowDeprh
* Aggregation: avg
* Display Type: Line Chart

In [37]:
display(df)

passengerCount,tripDistance,hour_of_day,day_of_week,month_num,normalizeHolidayName,isPaidTimeOff,snowDepth,precipTime,precipDepth,temperature,totalAmount
1.0,9.4,15,2,1,,False,29.058823529411764,24.0,3.0,6.18571428571429,44.3
1.0,14.75,13,4,1,,False,0.0,6.0,0.0,4.571929824561403,44.8
1.0,3.35,23,4,1,,False,0.0,1.0,0.0,4.384090909090913,18.96
1.0,3.33,18,2,1,,False,29.058823529411764,24.0,3.0,6.18571428571429,16.3
1.0,0.47,17,6,1,,False,0.0,1.0,0.0,3.846428571428569,5.3
1.0,3.07,9,1,1,,False,0.0,6.0,0.0,0.1594594594594597,16.3
1.0,0.92,23,4,1,,False,0.0,1.0,0.0,-2.999107142857142,8.97
1.0,1.9,12,4,1,,False,0.0,1.0,0.0,4.384090909090913,11.8
1.0,0.77,0,1,1,,False,0.0,1.0,0.0,-5.393749999999998,7.3
1.0,2.35,2,6,1,,False,0.0,24.0,254.0,10.943654822335034,14.16


The plot now shows the relation between the month, the snow amount and the recorded temperature.