## Exploring Data with Dataframes and Spark SQL
In this exercise, you will explore data using the Spark Dataframe API and Spark SQL.
https://microsoftlearning.github.io/databricks-intro/

### Load Data Using an Explicit Schema
Now you can load the data into a dataframe. If the structure of the data is known ahead of time, you can explicitly specify the schema for the dataframe.

Modify the code below to reflect your Azure blob storage account name, and then click the &#9658; button at the top right of the cell to run it.

In [3]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

path="/FileStore/tables/"

flightSchema = StructType([
  StructField("DayofMonth", IntegerType(), False),
  StructField("DayOfWeek", IntegerType(), False),
  StructField("Carrier", StringType(), False),
  StructField("OriginAirportID", IntegerType(), False),
  StructField("DestAirportID", IntegerType(), False),
  StructField("DepDelay", IntegerType(), False),
  StructField("ArrDelay", IntegerType(), False),
])


flights = spark.read.csv(path+"raw_flight_data-eb9f8.csv", schema=flightSchema, header=True)
flights.show()

### Infer a Data Schema
If the structure of the data source is unknown, you can have Spark automatically infer the schema.

In this case, you will load data about airports without knowing the schema.

Modify the code below to reflect your Azure blob storage account name, and then run the cell.

In [5]:
airports = spark.read.csv(path+"airports.csv", header=True, inferSchema=True)
airports.show()

### Use Dataframe Methods
Spark DataFrames provide functions that you can use to extract and manipulate data. For example, you can use the **select** function to return a new dataframe containing columns selected from an existing dataframe.

In [7]:
cities = airports.select("city", "name")
cities.show()

### Combine Operations
You can combine functions in a single statement to perform multiple operations on a dataframe. In this case, you will use the **join** function to combine the **flights** and **airports** dataframes, and then use the **groupBy** and **count** functions to return the number of flights from each airport.

In [9]:
flightsByOrigin = flights.join(airports, flights.OriginAirportID == airports.airport_id).groupBy("city").count()
flightsByOrigin.show()

### Count the Rows in a Dataframe
Now that you're familiar with working with dataframes, a key task when building predictive solutions is to explore the data, determing statistics that will help you understand the data before building predictive models. For example, how many rows of flight data do you actually have?

In [11]:
flights.count()

### Determine the Presence of Duplicates
The data you have to work with won't always be perfect - often you'll want to *clean* the data; for example to detect and remove duplicates that might affect your model. You can use the **dropDuplicates** function to create a new dataframe with the duplicates removed, enabling you to determine how many rows are duplicates of other rows.

In [13]:
flights.count() - flights.dropDuplicates().count()

### Identify Missing Values
As well as determining if duplicates exist in your data, you should detect missing values, and either remove rows containing missing data or replace the missing values with a suitable relacement. The **dropna** function creates a dataframe with any rows containing missing data removed - you can specify a subset of columns, and whether the row should be removed in *any* or *all* values are missing. You can then use this new dataframe to determine how many rows contain missing values.

In [15]:
flights.count() - flights.dropDuplicates().dropna(how="any", subset=["ArrDelay", "DepDelay"]).count()

### Clean the Data
Now that you've identified that there are duplicates and missing values, you can clean the data by removing the duplicates and replacing the missing values. The **fillna** function replaces missing values with a specified replacement value. In this case, you'll remove all duplicate rows and replace missing **ArrDelay** and **DepDelay** values with **0**.

In [17]:
data=flights.dropDuplicates().fillna(value=0, subset=["ArrDelay", "DepDelay"])
data.count()

## Explore the Data
Now that you've cleaned the data, you can start to explore it and perform some basic analysis. Let's start by examining the lateness of a flight. The dataset includes the **ArrDelay** field, which tells you how many minutes behind schedule a flight arrived. However, if a flight is only a few minutes behind schedule, you might not consider it *late*. Let's make our definition of lateness such that flights that arrive within 25 minutes of their scheduled arrival time are considered on-time, but any flights that are more than 25 minutes behind schedule are classified as *late*. We'll add a column to indicate this classification:

In [19]:
data = data.select("DayofMonth", "DayOfWeek", "Carrier", "OriginAirportID","DestAirportID",
                   "DepDelay", "ArrDelay", ((col("ArrDelay") > 25).cast("Int").alias("Late")))
data.show()

### Explore Summary Statistics and Data Distribution
Predictive modeling is based on statistics and probability, so we should take a look at the summary statistics for the columns in our data. The **describe** function returns a dataframe containing the **count**, **mean**, **standard deviation**, **minimum**, and **maximum** values for each numeric column.

In [21]:
data.describe().show()

The *DayofMonth* is a value between 1 and 31, and the mean is around halfway between these values; which seems about right. The same is true for the *DayofWeek* which is a value between 1 and 7. *Carrier* is a string, so there are no numeric statistics; and we can ignore the statistics for the airport IDs - they're just unique identifiers for the airports, not actually numeric values. The departure and arrival delays range between 63 or 94 minutes ahead of schedule, and over 1,800 minutes behind schedule. The means are much closer to zero than this, and the standard deviation is quite large; so there's quite a bit of variance in the delays. The *Late* indicator is a 1 or a 0, but the mean is very close to 0; which implies that there significantly fewer late flights than non-late flights.

Let's verify that assumption by creating a table and using the **Spark SQL** API to run a SQL statement that counts the number of late and non-late flights:

In [23]:
data.createOrReplaceTempView("flightData")
spark.sql("SELECT Late, COUNT(*) AS Count FROM flightData GROUP BY Late").show()

Yes, it looks like there are significantly more non-late flights than late ones - we can see this more clearly with a visualization, so let's use the inline **%sql** magic to query the table and bring back some results we can display as a chart:

In [25]:
%sql
SELECT * FROM flightData

DayofMonth,DayOfWeek,Carrier,OriginAirportID,DestAirportID,DepDelay,ArrDelay,Late
19,5,DL,12889,13487,7,16,0
18,4,DL,12892,13487,5,-4,0
18,4,DL,14635,11193,-7,-12,0
18,4,DL,10397,11298,-3,44,1
18,4,DL,10397,12451,1,-7,0
19,5,DL,13487,11433,-3,-21,0
19,5,DL,14771,13487,340,325,1
19,5,DL,10397,11298,20,20,0
19,5,DL,12953,13487,4,-28,0
19,5,DL,14869,11278,-4,-24,0


The query returns a table of data containing the first 1000 rows, which should be a big enough sample for us to explore. To see the distribution of *Late* classes (1 for late, 0 for on-time), in the visualization drop-down list under the table above, click **Bar**. Then click **Plot Options** and configure the visualization like this:
- **Keys**: Late
- **Series Groupings**: *none*
- **Values**: &lt;id&gt;
- **Aggregation**: Count
- **Display type**: Bar chart
- **Grouped**: Selected

You should be able to see that the sample includes significantly more on-time flights than late ones. This indicates that the dataset is *imbalanced*; which might adversely affect the accuracy of any machine learning model we train from this data.

Additionally, you observed earlier that there are some extremely high **DepDelay** and **ArrDelay** values that might be skewing the distribution of the data disproportionately because of a few *outliers*. Let's visualize the distribution of these columns to explore this. Change the **Plot Options** settings as follows:
- **Keys**: *none*
- **Series Groupings**: *none*
- **Values**: DepDelay
- **Aggregation**: Count
- **Display Type**: Histogram plot
- **Number of bins**: 20

You can drag the handle at the bottom right of the visualization to resize it. Note that the data is skewed such that most flights have a **DepDelay** value within 100 or so minutes of 0. However, there are a few flights with extremely high delays. Another way to view this distribution is a *box plot*. Change the **Plot Options** as follows:
- **Keys**: *none*
- **Series Groupings**: *none*
- **Values**: DepDelay
- **Aggregation**: Count
- **Display Type**: Box plot

The box plot consists of a box with a line indicating the median departure delay, and *whiskers* extending from the box to show the first and fourth quartiles of the data, with statistical *outliers* shown as small circles. This confirms the extremely skewed distribution of **DepDelay** values seen in the histogram (and if you care to check, you'll find that the **ArrDelay** column has a similar distribution).

Let's address the outliers and imbalanced classes in our data by removing rows with extreme delay values, and *undersampling* the more common on-time flights:

In [27]:
from pyspark.sql.functions import rand

# Remove outliers - let's make the cut-off 150 minutes.
data = data.filter("DepDelay < 150 AND ArrDelay < 150")

# Separate the late and on-time flights
pos = data.filter("Late = 1")
neg = data.filter("Late = 0")

# undersample the most prevalent class to get a roughly even distribution
posCount = pos.count()
negCount = neg.count()
if posCount > negCount:
  pos = pos.sample(True, negCount/(negCount + posCount))
else:
  neg = neg.sample(True, posCount/(negCount + posCount))
  
# shuffle into random order (so a sample of the first 1000 has a mix of classes)
data = neg.union(pos).orderBy(rand())

# Replace the temporary table so we can query and visualize the balanced dataset
data.createOrReplaceTempView("flightData")

# Show the statistics
data.describe().show()

Now the maximums for the **DepDelay** and **ArrDelay** are clipped at under 150, and the mean value for the binary *Late* class is nearer 0.5; indicating a more or less even number of each class. We removed some data to accomplish this balancing act, but there are still a substantial number of rows for us to train a machine learning model with, and now the data is more balanced. Let's visualize the data again to confirm this:

In [29]:
%sql
SELECT * FROM flightData

DayofMonth,DayOfWeek,Carrier,OriginAirportID,DestAirportID,DepDelay,ArrDelay,Late
25,2,DL,14869,11278,24,22,0
6,6,WN,11259,10140,42,35,1
13,2,DL,14100,11433,31,31,1
31,6,DL,10397,15304,-4,-8,0
23,5,9E,11433,12478,-1,0,0
9,3,US,11057,13930,66,56,1
25,3,EV,13198,12264,-6,-16,0
26,4,WN,14107,11066,39,30,1
10,3,B6,10721,14843,127,115,1
17,3,UA,14771,14107,-3,-14,0


Display the data as a bar chart to compare the distribution of the **Late** classes as you did previously. There should now be a more or less even number of each class. Then visualize the **DepDelay** field as a histogram and as a box plot to verify that the distribution, while still skewed, has fewer outliers.

### Explore Relationships in the Data
Predictive modeling is largely based on statistical relationships between fields in the data. To design a good model, you need to understand how the data points relate to one another.

A common way to start exploring relationships is to create visualizations that compare two or more data values. For example, modify the **Plot Options** of the chart above to compare the arrival delays for each carrier:
- **Keys**: Carrier
- **Series Groupings**: *none*
- **Values**: ArrDelay
- **Aggregation**: Count
- **Display Type**: Box plot

You may need to resize the plot to see the data clearly, but it should show that the median delay, and the distribution of delays varies by carrier; with some carriers having a higher median delay than others. The same is true for other features, such as the day of the week and the destination airport. You may already suspect that there's likely to be a relationship between delarture delay and arrival delay, so let's examine that next. Change the **Plot Options** as follows:
- **Keys**: None
- **Series Groupings**: *none*
- **Values**: ArrDelay, DepDelay
- **Aggregation**: Count
- **Display Type**: Scatter plot
- **Show LOESS**: Selected

The scatter plot shows the departure delay and corresponding arrival delay for each flight as a point in a two dimensional space. Note that the points form a diagonal line, which indicates a strong linear relationship between departure delay and arrival delay. This linear relationship shows a *correlation* between these two values, which we can measure statistically. The **corr** function calculates a correlation value between -1 and 1, indicating the strength of correlation between two fields. A strong positive correlation (near 1) indicates that high values for one column are often found with high values for the other, which a strong negative correlation (near -1) indicates that *low* values for one column are often found with *high* values for the other. A correlation near 0 indicates little apparent relationship between the fields.

In [32]:
resultValue = data.corr("DepDelay", "ArrDelay")

In this notebook we've cleaned the flight data, and explored it to identify some potential relationships between features of the flights and their lateness.

In [34]:
dbutils.notebook.exit(str(resultValue))

0.8969856340493185