<img src="uva_seal.png">  

## Spark SQL and DataFrames

### University of Virginia
### DS 5559: Big Data Analytics
### Last Updated: Jan 7, 2021

---  

### Sources 

Learning Spark, Chapter 9: Spark SQL

https://spark.apache.org/docs/latest/sql-programming-guide.html

https://www.datacamp.com/community/tutorials/apache-spark-tutorial-machine-learning

Demonstration of several useful DataFrame operations:  
https://docs.databricks.com/spark/latest/dataframes-datasets/introduction-to-dataframes-python.html

### OBJECTIVES
- Introduction to Spark SQL, the interface for working with structured and semistructured data
- Introduce DataFrames and show basic functionality
- Discuss SparkSession

### CONCEPTS AND FUNCTIONS
- Schema
- SQL
- Dataset and DataFrame
- Partition
- Parquet files

---  

**NOTES**

These lecture notes are a quick outline of Spark SQL and DataFrames.  
There is a lot of functionality provided, and Spark SQL is a heavy development area.   

### Schema

A database *schema* is the structure that represents the logical view of the entire database.  It defines how data is organized and how relations among them are associated.  This is implemented through the use of tables, views, and integrity constraints.

### Schema in Spark

The schema in Spark defines the data structure. For each field, a 3-tuple is specified: `(column name, data type, nullable)`  

---  

**Example of schema with two Fields *author* and *pages*, which cannot contain null values**
```
schema = StructType([StructField("author", StringType(), false), StructField("pages", IntegerType(), False)])
```
---  

It is possible to allow Spark to infer the schema of your data, but it's preferable to feed it the schema:

- avoids having Spark launch a separate job to read a large fraction of the data to infer schema
- early detection of errors if the data doesn't match the schema
- Spark inference may be incorrect. For example, it may think all numerical data are strings.

### Common Spark Data Types

- integer types, all `int` in python:
  - ShortType
  - IntegerType
  - LongType
  - FloatType
  - DoubleType
- StringType
- BooleanType

### SQL in Ten Seconds (tongue in cheek)


SQL is a structured query language used to communicate with relational databases.  
Commands include CREATE, SELECT, UPDATE, ALTER, INSERT INTO, DROP, DELETE.  
This course will use SELECT.

### Spark SQL Capabilities:

- load data from various structured formats including JSON, Hive, Parquet  
- query data using SQL inside Spark or from external tools that connect to Spark (e.g., `Tableau`) 
- Spark SQL integrates between SQL and Python/Java/Scala/R code. Can do things like join RDDs and SQL tables.

### Dataset and DataFrame

- A Dataset is a distributed collection of data   
- A Dataset can be constructed from JVM objects and then manipulated using functional transformations (`map()`, `flatMap()`, `filter()`, etc.)  
- A DataFrame is a Dataset organized into named columns   

In practice, you will be thinking in terms of `DataFrames`, and not `Datasets`.  For users familiar with dataframes from R and Python, they are similar, yet with operations distinct to Spark.  As an example, adding a new column to a DataFrame is executed using `withColumn()`.  This may feel more formal compared to R and Python.  

Additionally - when compared to R and Python - the Spark DataFrame uses richer optimizations under the hood.  The structure makes use of distributed computing, in the same manner as RDDs.  

DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs.  

The DataFrame API is available in Scala, Java, Python, and R. 

### DataFrames vs RDDs  

Now that we have two powerful objects that parallelize data, we have more flexibility, but this can lead to confusion.  When is it better to use DataFrames, and when is it better to use RDDs?  

Here are some recommendations:   

- In general, most work can be done with DataFrames  

- Use DataFrames to use high-level expressions, to perform SQL queries to explore the data, and to gain columnar access.  For example, if you are thinking about the data by field names, you probably want the data in a DataFrame.

- For machine learning and building predictive models, DataFrames are recommended. You will be exploring the data by column, and building features from the columns of data.  
- RDDs can be useful to perform low-level transformations and actions on unstructured data. For example, filtering strings and performing other simple transformations on text is best done with RDDs.  In these cases, the analyst doesn't care about field names, and there is no need to impose schema on the data.  

- Use RDDs when you want to manipulate the data with functional programming constructs rather than domain specific expressions.

### Creating a DataFrame

There are multiple ways to do this:
- use a function such as `read.csv()` to read data from files into DataFrames (most common)
- pass data to `createDataFrame()`
- conversion from RDD using `toDF()`

**Example 1: Create DataFrame from RDD using `toDF()`**

---  
```
# import modules 
from pyspark.sql import Row

# Map the RDD to a DF

df = rdd.map(lambda line: Row(longitude=line[0], 
                              latitude=line[1], 
                              housingMedianAge=line[2],
                              totalRooms=line[3],
                              totalBedRooms=line[4],
                              population=line[5], 
                              households=line[6],
                              medianIncome=line[7],
                              medianHouseValue=line[8])).toDF()
```
---  

**Example 2: Create DataFrame by passing data and schema to `createDataFrame()`**

In [1]:
# import context manager: SparkSession
from pyspark.sql import SparkSession

# import data types
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# set up the session
spark = SparkSession.builder.getOrCreate()

# create some data; list of tuples
data = [
    (0, "Hi I heard about Spark"),
    (1, "I wish Java could use case classes"),
    (2, "Logistic regression models are neat")
]

# define schema; each field holds (name, data type, nullable)
# for large number of fields, best to automate schema construction
schema = StructType([StructField('id', IntegerType(), False), 
                     StructField('sentence', StringType(), False)])

# create df by passing data, schema
sentenceDataFrame = spark.createDataFrame(data, ["id", "sentence"])

# print first few records
sentenceDataFrame.show()

# print data type
print(type(sentenceDataFrame))

+---+--------------------+
| id|            sentence|
+---+--------------------+
|  0|Hi I heard about ...|
|  1|I wish Java could...|
|  2|Logistic regressi...|
+---+--------------------+

<class 'pyspark.sql.dataframe.DataFrame'>


**Example 3: Create a DataFrame from some JSON data**  
(For an example of JSON data see: http://json.org/example.html)


In [2]:
import os
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

# Read data in json format
df = spark.read.json("people.json")

# Displays the content of the DataFrame to stdout
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



### Creating an RDD from a DataFrame

This is very simple: `df.rdd`

Here we convert our df containing sentences:

In [3]:
sentence_rdd = sentenceDataFrame.rdd
print(sentence_rdd.take(2))
print(type(sentence_rdd))

[Row(id=0, sentence='Hi I heard about Spark'), Row(id=1, sentence='I wish Java could use case classes')]
<class 'pyspark.rdd.RDD'>


### SparkSession

The `SparkSession` is a unified conduit to all Spark operations and data.  It's an example of a `context manager`.  

Spark used to use many context managers to the point of confusion.  
From the developers:  

*We have been getting a lot of questions about the relationship between SparkContext, SQLContext, and HiveContext in Spark 1.x. It was really strange to have “HiveContext” as an entry point when people want to use the DataFrame API. In Spark 2.0, we are introducing SparkSession, a new entry point that subsumes SQLContext and HiveContext. For backward compatibility, we keep the Hive and SQL Contexts.*  

For details:  
https://docs.databricks.com/spark/latest/gentle-introduction/sparksession.html

Here is an example of building a more elaborate SparkSession:  

**SparkSession Example Setup**

---  
```
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .master("local[*]") \                         # use all cores on local machine
    .appName("Python Spark SQL basic example") \  # will see appName on cluster manager
    .config("spark.executor.memory", '20g') \     # RAM per executor (worker)
    .config('spark.executor.cores', '5') \        # cores available to EACH executor
    .config('spark.executor.instances', '17') \   # total number of executors
    .config("spark.driver.memory",'1g') \         # RAM for driver, generally lower need than a worker
    .getOrCreate()
    
# for details see:
# https://spark.apache.org/docs/latest/configuration.html
```
---  

### Setting up Cores, Executors, RAM 

NOTES  
- setting these configs is best codified in a function
- Spark sets configs by default, but unfortunately they're not always optimal

---  

<span style="color:red">**Example: Hardware consists of 6 nodes, each with 16 cores, 64GB RAM**</span>

RESOURCE OVERHEAD:  
$O1$. On each executor, 1 core and 1 GB RAM is consumed by OS and Hadoop Daemons  
This leaves 15 available cores on each node  
$O2$. The resource manager (e.g., YARN) will require an overhead ~1GB RAM per node  
$O3$. One executor is required for the driver

**Number of cores**  
More cores means more concurrent processing, but an application running > 5 concurrent tasks generally doesn't perform well.  
cap this at **spark.executor.cores = 5**.  

**Executor instances**  
We can set 15 cores_per_node / 5 cores_per_executor = 3 executors_per_node. 15 is due to $O1$.

Given 6 nodes and 3 executors per node, we can set 18 executors  
One of these executors is required for the driver $(O3)$    
Thus, we set **spark.executor.instances = 17**  

**Executor memory**  
Available RAM is 63GB per node $(O1)$. For 3 executors per node, this gives 63GB/3 = 21GB per executor  
The resource manager will require an overhead ~1GB per node $(O2)$. set **spark.executor.memory = 20g**

**NOTE:** `spark.executor.cores` will use all cores by default (this is a simpler way to go, but not always optimal)  

---  

### Some Useful Operations

Next, we turn to the documentation to explore more DataFrame functionality including subsetting, filtering, aggregation.  
https://spark.apache.org/docs/latest/sql-programming-guide.html

There are several different ways to extract columns from a DataFrame, shown below with examples.

In [4]:
# Read data in json format
df = spark.read.json("people.json")
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



### Filtering

Notes: 
- `col()` extracts a column from a DataFrame  
- `asc()` takes an optional parameter to sort ascending or descending  

Keep records where age > 21

In [5]:
df.filter(df['age'] > 21).show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



Keep records subject to filters on name, then sort

In [6]:
df.filter(df['age'] > 21).count()

1

In [6]:
from pyspark.sql.functions import col, asc

df.filter((col("name") == "Andy") | (col("name") == "Michael")).sort(asc("name")).show()

+----+-------+
| age|   name|
+----+-------+
|  30|   Andy|
|null|Michael|
+----+-------+



In [7]:
# alternatively using df.name instead of col("name")

df.filter((df.name == "Andy") | (df.name == "Michael")).sort(asc("name")).show()

+----+-------+
| age|   name|
+----+-------+
|  30|   Andy|
|null|Michael|
+----+-------+



Fetch records with age *null*

In [None]:
df.filter(col("age").isNull()).show() 

Fetch records with age *not null*

In [None]:
df.filter(col("age").isNotNull()).show() 

### where() is equivalent to filter()

In [None]:
df.where((col("name") == "Andy") | (col("name") == "Michael")).sort(asc("name")).show()

### Impute missing with 0 (just for illustration; not a great idea for this data)

In [None]:
df.fillna(0).show()

### Summarize the age field

In [None]:
df.describe("age").show()

### Spark SQL Queries

To write SQL queries against DataFrames, first register as a `SQL temp view`, and then write the query.

**Example of SQL Query against DataFrame**

In [None]:
# register DataFrame as temp view with name "people"
df.createOrReplaceTempView("people")

# query the view
sqlDF = spark.sql("SELECT * FROM people where name == 'Andy'")
sqlDF.show()

### Aggregate on columns

SQL functions can be loaded from this library: `pyspark.sql.functions`

Group by the location column to compute the min, count, and avg

---  
```
from pyspark.sql import functions as F

agg_df = df.groupBy("location").agg(F.min("id"), F.count("id"), F.avg("date_diff"))
```
---  

### Write DF to Parquet file, partitioning columns

```
df = df.withColumn('end_month', F.month('end_date'))
df = df.withColumn('end_year', F.year('end_date'))
df.write.partitionBy("end_year", "end_month").parquet("/tmp/sample_table")
```

### Infer the schema when reading in file

```
adult_df = spark.read.\
    format("com.spark.csv").\
    option("header", "false").\
    option("inferSchema", "true").load("dbfs:/databricks-datasets/adult/adult.data")
adult_df.printSchema()
```

### Saving and Loading Data

#### Save / Load using Generic Functions

```
df = spark.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
```

#### Save / Load using Manually Specified Formats

```
df = spark.read.load("examples/src/main/resources/people.json", format="json")
df.select("name", "age").write.save("namesAndAges.parquet", format="parquet")
```

### Parquet Files

- Project was developed at Twitter, taken over by Apache Software Foundation (Apache)   
- Parquet is a columnar format that is supported by many other data processing systems  

- Spark SQL provides support for both reading and writing Parquet files that automatically preserves the schema of the original data. When writing Parquet files, all columns are automatically converted to be nullable for compatibility reasons.


Key observation: It can be much more efficient to store data in terms of columns than rows.  
Column data is stored in contiguous memory blocks.


#### Save / Load Operations using Parquet Files


```
# read in data in JSON format. This will produce a DataFrame.
peopleDF = spark.read.json("examples/src/main/resources/people.json")

# DataFrames can be saved as Parquet files, maintaining the schema information.
peopleDF.write.parquet("people.parquet")

# Read in the Parquet file created above.
# Parquet files are self-describing so the schema is preserved.
# Loading parquet files produces a DataFrame.
parquetFile = spark.read.parquet("people.parquet")
```

### Partition Discovery

Database tables can be partitioned to make querying more efficient.  
For example, the data can be
split by gender and country, producing smaller tables.  
If the analyst is only interested in a single country, the query will run faster.


In a partitioned table, data are usually stored in different directories, with partitioning column values encoded in the path of each partition directory.  

All built-in file sources (including Text/CSV/JSON/ORC/Parquet) are able to discover and infer partitioning information automatically. 

In [None]:

path
└── to
    └── table
        ├── gender=male
        │   ├── ...
        │   │
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...
        └── gender=female
            ├── ...
            │
            ├── country=US
            │   └── data.parquet
            ├── country=CN
            │   └── data.parquet
            └── ...


**TRY FOR YOURSELF (UNGRADED EXERCISES)**

Run the code below to create a DataFrame.  Then complete the tasks that follow.

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession \
        .builder \
        .master("local[*]") \
        .getOrCreate()

sentenceData = spark.createDataFrame([ \
    (0, "the quick brown"), \
    (1, "fox jumped") \
], ["id", "raw"])

1) Print `sentenceData`

2) Print the record count

3) Print the row containing the word "fox".  Do this by registering the dataframe as a SQL temp view, and then query it using `spark.sql`.  Hint: the `like` command will be helpful in the query.

4) Print the row containing the word "fox." Do this by using the functions `filter()` and `contains()`.  Note this method is probably simpler.

**Summary**  
You should now have a basic understanding of Spark SQL, DataFrames, and how to use some of the common transformations on DataFrames.  Additionally, you should have some sense of when DataFrames are preferred over RDDs, and vice versa.