<img style="float: left" src="images/spark.png" />
<img style="float: right" src="images/surfsara.png" />
<hr style="clear: both" />

## Spark Structured API

In the previous notebook you have seen distributed processing using RDDs is done. In this notebook we will look at the Structured API. We will see how you can use DataFrames and SQL to do common data processing operations. By the end you should have a feeling on the strengths and weaknesses of these different approaches.

The first difference is our Spark _entrypoint_. For RDDs this was the 'SparkContext' (usually named `sc`). For DataFrames we will use a 'SparkSession', which is more powerfull and easier to use. By convention we name our SparkSession 'spark'.

In [None]:
# Create a SparkSession, the 'DataFrame version' of the SparkContext
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .getOrCreate()
)

## DataFrames from Python collections

Just like we have seen with `sc.parallelize`, we can create a DataFrame from an existing Python collection. In addition to the collection itself we will also describe the structure of the data by naming the columns. Additionally, we could  specify the data types of the columns, but in this case we can let Spark infer this automatically.

As an example we will look at the inventory of a hypothetical mobile phone shop. We create a DataFrame from a list of tuples. 

In [None]:
phoneStock = [
    ('iPhone 6', 'Apple', 6, 549.00),
    ('iPhone 6s', 'Apple', 5, 585.00),
    ('iPhone 7', 'Apple', 11, 739.00),
    ('Pixel', 'Google', 8, 859.00),
    ('Pixel XL', 'Google', 2, 959.00),
    ('Galaxy S7', 'Samsung', 10, 539.00),
    ('Galaxy S6', 'Samsung', 5, 414.00),
    ('Galaxy A5', 'Samsung', 7, 297.00),
    ('Galaxy Note 7', 'Samsung', 0, 841.00)
]

columns = ['model', 'brand', 'stock', 'unit_price']

phoneDF = spark.createDataFrame(phoneStock, columns)

print('the type of phoneStock: ' + str(type(phoneStock)))
print('the type of phoneDF: ' + str(type(phoneDF)))

(It is also possible to create a Spark DataFrame from a [pandas](http://pandas.pydata.org/) DataFrame).

### DataFrame inspection

When developing an application it is very useful to check the stucture and content of the DataFrames you are creating. We can view the contents of the DataFrame records using the [show](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.show) method. (This is similar to the RDD `take` method).

In [None]:
# TODO: Replace <FILL IN> with appropriate code

phoneDF.<FILL IN>

There are two methods to look at the structure of a DataFrame, `describe` and `printSchema`. `describe` gives a new DataFrame with statistics about numerical colums. `printSchema` is especially useful with complicated nested structures.

In [None]:
phoneDF.describe().show()

In [None]:
phoneDF.printSchema()

## Data extraction

Now that we have our data in a DataFrame, we want to use it to manipulate the data. Let's start by selecting subsets of the data: specific columns and/or rows.

### Selecting columns

Often we are not interested in all the columns of our data. DataFrames make it very easy to select only a subset by using the `select` method. Realise that we are not modifying the original DataFrame, but creating a new one.

In [None]:
# Select only the model column
modelDF = phoneDF.select('model')
modelDF.show()

In [None]:
# Select both the brand and model columns
bmDF = phoneDF.select('brand', 'model')
bmDF.show()

In [None]:
# TODO: Replace <FILL IN> with appropriate code
# Select the model and stock columns
msDF = <FILL IN>
msDF.show()

#### Note: Columns specifications

In the previous examples we have used strings to specify the columns we want to select. This is one valid format, but there are a few others that can be used as well. Sometimes the more complicated ones are required because the shorter versions are ambiguous for Sparks parser. For example, all these are equivalent:

```
bmDF = phoneDF.select('brand', 'model')
bmDF = phoneDF.select(['brand', 'model'])
bmDF = phoneDF.select(phoneDF['brand'], phoneDF['model'])
```

### Filtering rows

We can filter specific rows by using the DataFrame `filter` method. The column specifications are the same as with the select method, but note we need to add the DataFrame name in this case.

In [None]:
# Select rows with phones from Google
googleDF = phoneDF.filter(phoneDF['brand'] == 'Google')

googleDF.show()

In [None]:
# TODO: Replace <FILL IN> with appropriate code
# Select the rows with unit_price less than 550.00

cheapDF = <FILL IN>
cheapDF.show()

## Aggregating data

An important part of data processing is the ability of combining multiple records, like we did with `reduceByKey`. In the DataFrame API this is a two-step process:

First you group the data using the `groupBy` method. `groupBy` can operate on one or multiple columns. It will not actually perform the grouping but create a reference to a `GroupedData` object.

In [None]:
groupedDF = phoneDF.groupBy('brand')
print(type(groupedDF))

After the data is grouped we can apply one of the standard aggregation functions on it. They are listed at the [GroupedData](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.GroupedData) API documentation. These are: `min`, `max`, `mean`, `sum` and `count`. We can apply an aggregation to all columns or a subset.

In [None]:
# Minimum for all columns
minDF = groupedDF.min()

minDF.show()

In [None]:
# TODO: Replace <FILL IN> with appropriate code
# Maximum for just the unit_price column
maxDF = <FILL IN>

maxDF.show()

Finally, we can combine different aggregations per column using the `agg` method on a GroupedData instance.

In [None]:
sumDF = groupedDF.agg({'stock': 'sum', 'unit_price': 'mean'})

sumDF.show()

## Conversion to/from RDD

Sometimes you want to do data manipulations which would be very easy with RDD operations, but complicated with the DataFrame API. Fortunately you can convert between DataFrames and RDDs of type 'Row'. Going from DataFrame to RDD is quite simple. Going back from RDD to DataFrame is more difficult because you need to re-apply the schema.

In [None]:
phoneRDD = phoneDF.rdd
pluralRDD = phoneRDD.map(lambda r: r.brand + 's')
pluralRDD.collect()

## Reading structured files/sources

One of the advantages of DataFrames is the ability to read already structured data and automatically import the structure in Spark. Spark contains readers for a number of formats such as csv, json, parquet, orc, text and jdbc. There are also third-party readers/connectors for databases such as MongoDB and Cassandra.

Here we read some json-formatted tweets. As you can see the complicated schema is inferred.

In [None]:
tweetDF = spark.read.format("json").load('../data/tweets.json')
tweetDF.printSchema()

### Nested data

Nested fields can be selected by using a dot-notation.

In [None]:
# Select the user-name and text:
tweetDF.select('user.name', 'text').show(10, False)

## SQL

The SQL API aims to be ANSI-SQL SQL2003 and Hive-SQL compatible. The expressiveness is very similar to the DataFrame API. You can access the SQL API from the SparkSession by using `spark.sql`. Before you can query DataFrames using SQL you need to 'register' them as SQL tables. 

In [None]:
# DataFrame version
resDF = phoneDF.filter(phoneDF['stock'] > 7).select('model')
resDF.show()

In [None]:
# SQL version

# Register the phoneDF DataFrame within SQL as a table with name 'phones'
phoneDF.createOrReplaceTempView('phones')

# Perform the SQL query on the 'phones' table
resDF = spark.sql('SELECT model FROM phones WHERE stock > 7')
resDF.show()

## Joining with other data sets

Often you want to combine multiple dataset and combine them on a shared columns. In this example we create an extra table with information about the phone manufacturer.

In [None]:
companies = [
    ('Google', 'USA', 1998, 'Sundar Pichai'),
    ('Samsung', 'South Korea', 1938 ,'Oh-Hyun Kwon' ),
    ('Apple', 'USA', 1976 ,'Tim Cook')
]

columns = ['company_name', 'hq_country', 'founding_year', 'ceo']

companyDF = spark.createDataFrame(companies, columns)
companyDF.show()

To join two DataFrames, we use the `join` method on one of the DataFrames. This method takes two arguments: (1) the other DataFrame, and (2) a join relation. Here we join the two DataFrames on the brand/company_name columns:

In [None]:
joinedDF = phoneDF.join(companyDF, phoneDF['brand'] == companyDF['company_name'])
joinedDF.show()

Here is an example of a more complicated query that combines multiple steps:

In [None]:
# All the models from USA companies with more than 7 items in stock
result = (phoneDF.join(companyDF, phoneDF['brand'] == companyDF['company_name'])
          .filter(companyDF['hq_country'] == 'USA')
          .filter(phoneDF['stock'] > 7)
          .select('model')
)

result.show()

## Bonus: word count in DataFrames

It is also possible to use DataFrames for less-structured data such as text. Here we show how you could do word count with DataFrames:

In [None]:
from pyspark.sql.functions import explode, split, trim

(spark
     .read.text('../data/shakespeare.txt')
     .select(
        explode(
            split("value", "\W+")
        ).alias("word")
    ).groupBy("word")
     .count()
 ).orderBy("count", ascending=0).show()

### User-defined functions

In the previous example we used the built-in split function. It is also possible to define an use a custom user-defined function, or udf. 

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

def my_tokenize(s):
    s = s.lower()
    words = s.split()
    return words

returnType = ArrayType(StringType())

tokenize_udf = udf(my_tokenize, returnType)

In [None]:
from pyspark.sql.functions import explode, split

(spark
     .read.text('../data/shakespeare.txt')
     .select(
        explode(
            tokenize_udf("value")
        ).alias("word")
    ).groupBy("word")
     .count()
 ).orderBy("count", ascending=0).show()