<p align="center"><img src="logo/spark.png" alt="Hadoop Logo" width="250"/></p>
# **Lab 2 - Part 4 - Spark SQL**
#### Spark SQL is a Spark module for structured data processing. It provides a programming abstraction called `DataFrames` and can also act as distributed SQL query engine. This lab presents how to work with Spark SQL.

### ** Part 1: Create a DataFrame and apply DataFrame Operation **

The entry point into all relational functionality in Spark is the `SQLContext` class, or one of its decedents. To create a basic `SQLContext`, all you need is a `SparkContext`. With a `SQLContext`, applications can create `DataFrames` from an existing RDD, from a Hive table, or from data sources. As an example, the following creates a `DataFrame` based on the content of a JSON file, located at `data/people/people.json`.

In [51]:
import org.apache.spark.sql.SQLContext

val sqlContext = new SQLContext(sc)

val df = sqlContext.read.json("data/people/people.json")
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+





Let's try more functions on `DataFrame`. First print the schema in a tree format, using the `printSchema` method.

In [52]:
// TODO: Replace <FILL IN> with appropriate code
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)





Now, use the `select` method and print only the `name` column.

In [53]:
// TODO: Replace <FILL IN> with appropriate code
// Select only the "name" column
df.select(df("name")).show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+





Use again the `select` method and select everybody, but increment the age by one.

In [54]:
// TODO: Replace <FILL IN> with appropriate code
// Select everybody, but increment the age by 1
df.select(df("name"),df("age") + 1).show()

+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+





Next select people older that 21, by using the `filter` method, and then count people by age. You can use `groupBy` for the second task.

In [55]:
// TODO: Replace <FILL IN> with appropriate code
// Select people older than 21
df.filter(df("age") > 21).show()

// Count people by age
df.groupBy("age").count().show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

+----+-----+
| age|count|
+----+-----+
|null|    1|
|  19|    1|
|  30|    1|
+----+-----+





The `sql` function on a `SQLContext` enables applications to run SQL queries programmatically and returns the result as a `DataFrame`. Let's select everybody again with the help of the `sql` method.

In [56]:
// TODO: Replace <FILL IN> with appropriate code
df.registerTempTable("df")
sqlContext.sql("SELECT * FROM df").show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+





### ** Part 2: Interoperating with RDDs **

Spark SQL supports two different methods for converting existing RDDs into DataFrames:
 + (i) The first method uses reflection to infer the schema of an RDD that contains specific types of objects. This reflection based approach leads to more concise code and works well when you already know the schema while writing your Spark application.
 + (ii) The second method for creating `DataFrames` is through a programmatic interface that allows you to construct a schema and then apply it to an existing RDD. While this method is more verbose, it allows you to construct `DataFrames` when the columns and their types are not known until runtime.

Let's first try the inferring the schema using reflection. Spark SQL can convert an RDD of `Row` objects to a `DataFrame`, inferring the datatypes. Rows are constructed by passing a list of key/value pairs as kwargs to the `Row` class. The keys of this list define the column names of the table, and the types are inferred by looking at the first row. Since we currently only look at the first row, it is important that there is no missing data in the first row of the RDD. In future versions we plan to more completely infer the schema by looking at more data, similar to the inference that is performed on JSON files.

In [57]:
// TODO: Replace <FILL IN> with appropriate code
case class Person(name: String, age: Int)

// Load a text file and convert each line to a Row.
val lines = sc.textFile("data/people/people.txt")
val parts = lines.map(l => l.split(","))
val people = parts.map(p => Person(p(0), p(1).trim.toInt))

// Infer the schema, and register the DataFrame as a table.
val schemaPeople = sqlContext.createDataFrame(people)
schemaPeople.registerTempTable("people")

// SQL can be run over DataFrames that have been registered as a table. Complete the following query
// to return teenagers, i.e., age >= 13 and age <= 19.
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by field index:
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

// or by field name:
teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)

Name: Justin
Name: Justin




When case classes cannot be defined ahead of time (for example, the structure of records is encoded in a string, or a text dataset will be parsed and fields will be projected differently for different users), a DataFrame can be created programmatically with three steps:
* (i) Create an RDD of Rows from the original RDD.
* (ii) Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.
* (iii) Apply the schema to the RDD of Rows via createDataFrame method provided by SQLContext.

In [58]:
// Just run this code
// Create an RDD
val people = sc.textFile("data/people/people.txt")

// The schema is encoded in a string
val schemaString = "name age"

// Import Row.
import org.apache.spark.sql.Row;

// Import Spark SQL data types
import org.apache.spark.sql.types.{StructType,StructField,StringType};

// Generate the schema based on the string of schema
val schema =
  StructType(
    schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

// Convert records of the RDD (people) to Rows.
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))

// Apply the schema to the RDD.
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)

// Register the DataFrames as a table.
peopleDataFrame.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.
val results = sqlContext.sql("SELECT name FROM people")

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by field index or by field name.
results.map(t => "Name: " + t(0)).collect().foreach(println)



Name: Michael
Name: Andy
Name: Justin


### ** Part 3: Data Sources **

Spark SQL supports operating on a variety of data sources through the `DataFrame` interface. A `DataFrame` can be operated on as normal RDDs and can also be registered as a temporary table. Registering a DataFrame as a table allows you to run SQL queries over its data. In the simplest form, the default data source (*parquet* unless otherwise configured by `spark.sql.sources.default`) will be used for all operations. Save operations also can optionally take a SaveMode, that specifies how to handle existing data if present. It can take the values: `error` (default), `append`, `overwrite`, `ignore`.

In [59]:
// Just run this code
// Load data from a parquet file
val df = sqlContext.read.load("data/people/people.parquet")
df.select("name", "favorite_color").write.mode("overwrite").save("namesAndFavColors.parquet")

// Manually specify the data source type, e.g., json, parquet, jdbc.
val jdf = sqlContext.read.format("json").load("data/people/people.json")
jdf.select("name", "age").write.format("parquet").mode("overwrite").save("namesAndAges.parquet")





*Parquet* is a columnar format that is supported by many other data processing systems. Spark SQL provides support for both reading and writing Parquet files that automatically preserves the schema of the original data. Let's load data programmatically.

In [60]:
// Just run this code
// The RDD is implicitly converted to a DataFrame by implicits, allowing it to be stored using Parquet.
schemaPeople.write.parquet("people.parquet")

// Read in the parquet file created above.  Parquet files are self-describing so the schema is preserved.
// The result of loading a Parquet file is also a DataFrame.
val parquetFile = sqlContext.read.parquet("people.parquet")

//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile")
val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

Name: Justin




Spark SQL can automatically infer the schema of a JSON dataset and load it as a `DataFrame`. This conversion can be done using `SQLContext.read.json` on a JSON file. Note that the file that is offered as a json file is not a typical JSON file. Each line must contain a separate, self-contained valid JSON object. As a consequence, a regular multi-line JSON file will most often fail.

In [61]:
// Just run this code
// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files.
val people = sqlContext.read.json("data/people/people.json")

// The inferred schema can be visualized using the printSchema() method.
people.printSchema()
// root
//  |-- age: integer (nullable = true)
//  |-- name: string (nullable = true)

// Register this DataFrame as a table.
people.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

// Alternatively, a DataFrame can be created for a JSON dataset represented by
// an RDD[String] storing one JSON object per string.
val anotherPeopleRDD = sc.parallelize(
  """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val anotherPeople = sqlContext.read.json(anotherPeopleRDD)

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)





[address: struct<city:string,state:string>, name: string]