## Spark DataFrame and SQL example

Spark provides methods for efficiently processing structured data such as `.csv` files and unstructured data such as raw text files like the one we used for doing word counts.

Code written using the DataFrame and SQL syntax in Python is just as efficient as code written in Scala/Java. This is a big difference when compared to Hadoop streaming where using Python code has performance impacts. In fact, DataFrame and SQL code expressed in Python are only "wrappers" around the Spark frame functions that have been implemented in Java. The actual processing is not exceuted in Python.

This example is based on one found in: [Spark: The Definitive Guide](http://shop.oreilly.com/product/0636920034957.do).
Download [this](https://raw.githubusercontent.com/databricks/Spark-The-Definitive-Guide/master/data/flight-data/csv/2015-summary.csv) file as example data. If the file can't be found anymore. We expect a `.csv` file with formated like:

```
DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,15
United States,Ireland,344
Egypt,United States,15
...
```

Summary:

- Structured data should be processed with the DataFrame or SQL syntax. We will show both of these ways here.
- Unstructured data should be procesed with the RDD syntax. We will show this in another example.

### Replace your file location here!

In [61]:
sample_location = "./sample.csv"

### Import Spark libraries
Import the necessary Spark libraries. The entry point is always the `SparkSession` instance. If you run the `pyspark` shell then this session instance will already have been created for you. It's stored in the `spark` variable.

In [62]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

Get the current Spark session or create a new one.

In [63]:
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL/DataFrame basic example") \
    .getOrCreate()
    
spark.conf.set("spark.sql.shuffle.partitions", "5")

Read the data. This creates a DataFrame. Replace `<your-file-location>` with the path where you stored the file.

In [64]:
data = spark \
    .read \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .csv(sample_location)
    
# If you are really interested...
# type(data)
# dir(data)

### Querying the data using the DataFrame way


In [65]:
data.take(5)

[Row(DEST_COUNTRY_NAME=u'United States', ORIGIN_COUNTRY_NAME=u'Romania', count=15),
 Row(DEST_COUNTRY_NAME=u'United States', ORIGIN_COUNTRY_NAME=u'Croatia', count=1),
 Row(DEST_COUNTRY_NAME=u'United States', ORIGIN_COUNTRY_NAME=u'Ireland', count=344),
 Row(DEST_COUNTRY_NAME=u'Egypt', ORIGIN_COUNTRY_NAME=u'United States', count=15),
 Row(DEST_COUNTRY_NAME=u'United States', ORIGIN_COUNTRY_NAME=u'India', count=62)]

In [66]:
data.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: integer (nullable = true)



In [67]:
data.sort("count").explain()

== Physical Plan ==
*Sort [count#376 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(count#376 ASC NULLS FIRST, 5)
   +- *FileScan csv [DEST_COUNTRY_NAME#374,ORIGIN_COUNTRY_NAME#375,count#376] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/dkoch/Teaching/hadoop-training-exercises/spark/sample.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>


You can reference the column directly or by `data["count"]`.

In [68]:
data.select(max(data['count'])).show()

+----------+
|max(count)|
+----------+
|    370002|
+----------+



Query to return the Top-5 destinations.

In [69]:
data.groupBy("DEST_COUNTRY_NAME") \
    .sum("count") \
    .withColumnRenamed("sum(count)", "destination_total") \
    .sort(desc("destination_total")) \
    .limit(5).show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



### The SQL way
First create a view (table) representation of the data

In [70]:
data.createOrReplaceTempView("data_view")

A first SQL query - simply return the maximum value of the `count` column.

In [71]:
spark.sql("SELECT max(count) from data_view").take(1)

[Row(max(count)=370002)]

Execute the same query as above - only expressed in SQL syntax.

In [72]:
spark.sql("""
    SELECT
        DEST_COUNTRY_NAME, sum(count) AS total
    FROM data_view
    GROUP BY
        DEST_COUNTRY_NAME
    ORDER BY total DESC
    LIMIT 5""").show()

+-----------------+------+
|DEST_COUNTRY_NAME| total|
+-----------------+------+
|    United States|411352|
|           Canada|  8399|
|           Mexico|  7140|
|   United Kingdom|  2025|
|            Japan|  1548|
+-----------------+------+

