<center> <h1>Spark SQL</h1> </center>

Like the RDD APIs, the DataFrame APIs are classified into two
buckets: transformations and actions. The evaluation semantics are identical in RDDs.
Transformations are lazily evaluated, and actions are eagerly evaluated.

## Creating DataFrames

There are many ways to create a DataFrame; one common thing among them is the need to provide a schema, either implicitly or explicitly.

### Creating DataFrames from RDDs

In [1]:
import scala.util.Random

val rdd = spark.sparkContext.parallelize(1 to 10).map(x => (x, Random.nextInt(100)* x))
val kvDF = rdd.toDF("key","value")

Intitializing Scala interpreter ...

Spark Web UI available at http://192.168.1.80:4040
SparkContext available as 'sc' (version = 2.4.5, master = local[*], app id = local-1586532218165)
SparkSession available as 'spark'


import scala.util.Random
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[1] at map at <console>:26
kvDF: org.apache.spark.sql.DataFrame = [key: int, value: int]


Printing the Schema and Showing the Data of a DataFrame

In [2]:
kvDF.printSchema

root
 |-- key: integer (nullable = false)
 |-- value: integer (nullable = false)



In [3]:
kvDF.show()

+---+-----+
|key|value|
+---+-----+
|  1|   60|
|  2|    8|
|  3|  105|
|  4|  160|
|  5|  335|
|  6|  168|
|  7|  371|
|  8|  544|
|  9|  441|
| 10|  280|
+---+-----+



Creating a DataFrame from an RDD with a Schema Created Programmatically

In [4]:
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

val peopleRDD = sc.parallelize(Array(Row(1L, "John Doe", 30L), Row(2L, "Mary Jane", 25L)))

val schema = StructType(Array(
    StructField("id", LongType, true),
    StructField("name", StringType, true),
    StructField("age", LongType, true)
    ))

val peopleDF = spark.createDataFrame(peopleRDD, schema)

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
peopleRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = ParallelCollectionRDD[6] at parallelize at <console>:29
schema: org.apache.spark.sql.types.StructType = StructType(StructField(id,LongType,true), StructField(name,StringType,true), StructField(age,LongType,true))
peopleDF: org.apache.spark.sql.DataFrame = [id: bigint, name: string ... 1 more field]


**Each StructField object has three pieces of information: name, type, and whether the value is nullable.**

In [5]:
peopleDF.printSchema

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)



In [6]:
peopleDF.show()

+---+---------+---+
| id|     name|age|
+---+---------+---+
|  1| John Doe| 30|
|  2|Mary Jane| 25|
+---+---------+---+



The type of each column in a DataFrame is mapped to an internal Spark type, which can be a simple scalar type or a complex type. Table 4-1 describes the available internal Spark data types and associated Scala types.

### Creating DataFrames from a Range of Numbers

Using the SparkSession.range Function to Create a DataFrame

In [7]:
val df1 = spark.range(5).toDF("num").show

+---+
|num|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



df1: Unit = ()


In [8]:
spark.range(5,10).toDF("num").show

+---+
|num|
+---+
|  5|
|  6|
|  7|
|  8|
|  9|
+---+



In [9]:
spark.range(5,15,2).toDF("num").show

+---+
|num|
+---+
|  5|
|  7|
|  9|
| 11|
| 13|
+---+



Converting a Collection Tuple to a DataFrame Using Spark’s toDF
Implicit

In [10]:
val movies = Seq(("Damon, Matt", "The Bourne Ultimatum", 2007L),
                 ("Damon, Matt", "Good Will Hunting", 1997L))

val moviesDF = movies.toDF("actor", "title", "year")

moviesDF.printSchema

root
 |-- actor: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = false)



movies: Seq[(String, String, Long)] = List((Damon, Matt,The Bourne Ultimatum,2007), (Damon, Matt,Good Will Hunting,1997))
moviesDF: org.apache.spark.sql.DataFrame = [actor: string, title: string ... 1 more field]


In [11]:
moviesDF.show()

+-----------+--------------------+----+
|      actor|               title|year|
+-----------+--------------------+----+
|Damon, Matt|The Bourne Ultimatum|2007|
|Damon, Matt|   Good Will Hunting|1997|
+-----------+--------------------+----+



### Creating DataFrames from Data Sources

The two main classes in Spark SQL for reading and writing data are DataFrameReader and DataFrameWriter, respectively. This section will cover the details of working with the APIs in the DataFrameReader class and the various available options when reading data from a specific data source.

An instance of the DataFrameReader class is available as the read variable of the
SparkSession class.

In [12]:
spark.read

res8: org.apache.spark.sql.DataFrameReader = org.apache.spark.sql.DataFrameReader@787d1764


The common pattern for interacting with DataFrameReader is described below.

In [None]:
spark.read.format(...).option("key", value").schema(...).load()

**Table 4-2 describes the three main pieces of information that are used when reading data: format, option, and schema.**

#### Specifying the Data Source Format

In [None]:
spark.read.json("<path>")
spark.read.format("json")

spark.read.parquet("<path>")
spark.read.format("parquet")

spark.read.jdbc
spark.read.format("jdbc")

spark.read.orc("<path>")
spark.read.format("orc")

spark.read.csv("<path>")
spark.read.format("csv")

spark.read.text("<path>")
spark.read.format("text")

// custom data source – fully qualifed package name
spark.read.format("org.example.mysource")

### Creating DataFrames by Reading Text Files

Reading the README.md File As a Text File from a Spark Shell

In [None]:
val textFile = spark.read.text("README.md")

// show 5 lines and don't truncate
textFile.show(5, false)

### Creating DataFrames by Reading CSV Files

The CSV parser in Spark is designed to be flexible such that it can parse a text file using a user-provided delimiter. The comma delimiter just happens to be the default one. This means you can use the CSV format to read tab-separated value text files or other text files with an arbitrary delimiter.

See Table 4-4. CSV Common Options

Specifying the header and inferSchema options as true won’t require you to specify a schema. Otherwise, you need to define a schema by hand or programmatically create it and pass it into the schema function. If the inferSchema option is false and no schema is provided, Spark will assume the data type of all the columns to be the string type.

Reading CSV Files with Various Options

In [14]:
val movies = spark.read.option("header","true").csv("./beginning-apache-spark-2-master/chapter4/data/movies/movies.csv")

movies: org.apache.spark.sql.DataFrame = [actor: string, title: string ... 1 more field]


In [16]:
movies.printSchema

root
 |-- actor: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: string (nullable = true)



In [18]:
// now try to infer the schema
val movies2 = spark.read.option("header","true")
.option("inferSchema","true")
.csv("./beginning-apache-spark-2-master/chapter4/data/movies/movies.csv")

movies2: org.apache.spark.sql.DataFrame = [actor: string, title: string ... 1 more field]


In [19]:
movies2.printSchema

root
 |-- actor: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)



In [20]:
// now try to manually provide a schema
import org.apache.spark.sql.types._

val movieSchema = StructType(Array(
    StructField("actor_name", StringType, true),
    StructField("movie_title", StringType, true),
    StructField("produced_year", LongType, true)))

val movies3 = spark.read.option("header","true")
.schema(movieSchema)
.csv("./beginning-apache-spark-2-master/chapter4/data/movies/movies.csv")

movies3.printSchema

root
 |-- actor_name: string (nullable = true)
 |-- movie_title: string (nullable = true)
 |-- produced_year: long (nullable = true)



import org.apache.spark.sql.types._
movieSchema: org.apache.spark.sql.types.StructType = StructType(StructField(actor_name,StringType,true), StructField(movie_title,StringType,true), StructField(produced_year,LongType,true))
movies3: org.apache.spark.sql.DataFrame = [actor_name: string, movie_title: string ... 1 more field]


In [22]:
movies3.show(20,false)

+-----------------+---------------------------+-------------+
|actor_name       |movie_title                |produced_year|
+-----------------+---------------------------+-------------+
|McClure, Marc (I)|Freaky Friday              |2003         |
|McClure, Marc (I)|Coach Carter               |2005         |
|McClure, Marc (I)|Superman II                |1980         |
|McClure, Marc (I)|Apollo 13                  |1995         |
|McClure, Marc (I)|Superman                   |1978         |
|McClure, Marc (I)|Back to the Future         |1985         |
|McClure, Marc (I)|Back to the Future Part III|1990         |
|Cooper, Chris (I)|Me, Myself & Irene         |2000         |
|Cooper, Chris (I)|October Sky                |1999         |
|Cooper, Chris (I)|Capote                     |2005         |
|Cooper, Chris (I)|The Bourne Supremacy       |2004         |
|Cooper, Chris (I)|The Patriot                |2000         |
|Cooper, Chris (I)|The Town                   |2010         |
|Cooper,

Reading a TSV File with the CSV Format

In [23]:
val movies4 = spark.read.option("header", "true")
.option("sep", "\t")
.schema(movieSchema)
.csv("./beginning-apache-spark-2-master/chapter4/data/movies/movies.tsv")

movies4: org.apache.spark.sql.DataFrame = [actor_name: string, movie_title: string ... 1 more field]


In [24]:
movies4.printSchema

root
 |-- actor_name: string (nullable = true)
 |-- movie_title: string (nullable = true)
 |-- produced_year: long (nullable = true)



### Creating DataFrames by Reading JSON Files

Table 4-5 describes the common options for the JSON format.

Various Examples of Reading a JSON File

In [26]:
val movies5 = spark.read.json("./beginning-apache-spark-2-master/chapter4/data/movies/movies.json") 
movies5.printSchema

root
 |-- actor_name: string (nullable = true)
 |-- movie_title: string (nullable = true)
 |-- produced_year: long (nullable = true)



movies5: org.apache.spark.sql.DataFrame = [actor_name: string, movie_title: string ... 1 more field]


In [27]:
// specify a schema to override the Spark's inferring schema.
// producted_year is specified as integer type

import org.apache.spark.sql.types._

val movieSchema2 = StructType(Array(
    StructField("actor_name", StringType, true),
    StructField("movie_title", StringType, true),
    StructField("produced_year", IntegerType, true)))

val movies6 = spark.read.option("inferSchema","true")
.schema(movieSchema2)
.json("./beginning-apache-spark-2-master/chapter4/data/movies/movies.json")

movies6.printSchema

root
 |-- actor_name: string (nullable = true)
 |-- movie_title: string (nullable = true)
 |-- produced_year: integer (nullable = true)



import org.apache.spark.sql.types._
movieSchema2: org.apache.spark.sql.types.StructType = StructType(StructField(actor_name,StringType,true), StructField(movie_title,StringType,true), StructField(produced_year,IntegerType,true))
movies6: org.apache.spark.sql.DataFrame = [actor_name: string, movie_title: string ... 1 more field]


Parsing Error and How to Tell Spark to Fail Fast

In [28]:
// set data type for actor_name as BooleanType
import org.apache.spark.sql.types._

val badMovieSchema = StructType(Array(
    StructField("actor_name", BooleanType, true),
    StructField("movie_title", StringType, true),
    StructField("produced_year", IntegerType, true)))

val movies7 = spark.read.schema(badMovieSchema)
.json("./beginning-apache-spark-2-master/chapter4/data/movies/movies.json")

movies7.printSchema

root
 |-- actor_name: boolean (nullable = true)
 |-- movie_title: string (nullable = true)
 |-- produced_year: integer (nullable = true)



import org.apache.spark.sql.types._
badMovieSchema: org.apache.spark.sql.types.StructType = StructType(StructField(actor_name,BooleanType,true), StructField(movie_title,StringType,true), StructField(produced_year,IntegerType,true))
movies7: org.apache.spark.sql.DataFrame = [actor_name: boolean, movie_title: string ... 1 more field]


In [29]:
movies7.show()

+----------+-----------+-------------+
|actor_name|movie_title|produced_year|
+----------+-----------+-------------+
|      null|       null|         null|
|      null|       null|         null|
|      null|       null|         null|
|      null|       null|         null|
|      null|       null|         null|
|      null|       null|         null|
|      null|       null|         null|
|      null|       null|         null|
|      null|       null|         null|
|      null|       null|         null|
|      null|       null|         null|
|      null|       null|         null|
|      null|       null|         null|
|      null|       null|         null|
|      null|       null|         null|
|      null|       null|         null|
|      null|       null|         null|
|      null|       null|         null|
|      null|       null|         null|
|      null|       null|         null|
+----------+-----------+-------------+
only showing top 20 rows



In [30]:
// tell Spark to fail fast when facing a parsing error

val movies8 = spark.read.option("mode","failFast")
.schema(badMovieSchema)
.json("./beginning-apache-spark-2-master/chapter4/data/movies/movies.json")

movies8.printSchema

root
 |-- actor_name: boolean (nullable = true)
 |-- movie_title: string (nullable = true)
 |-- produced_year: integer (nullable = true)



movies8: org.apache.spark.sql.DataFrame = [actor_name: boolean, movie_title: string ... 1 more field]


In [31]:
// Spark will throw a RuntimeException when executing an action

movies8.show(5)

org.apache.spark.SparkException:  Job aborted due to stage failure: Task 0 in stage 18.0 failed 1 times, most recent failure: Lost task 0.0 in stage 18.0 (TID 28, localhost, executor driver): org.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST.

### Creating DataFrames by Reading Parquet Files

Parquet is one of the most popular open source columnar storage formats in the Hadoop ecosystem, and it was created at Twitter. Its popularity is because it is a self-describing data format and it stores data in a highly compact structure by leveraging compressions. The columnar storage format is designed to work well with a data analytics workload where only a small subset of the columns are used during the data analysis. **Parquet stores the data of each column in a separate file; therefore, columns that are not needed in a data analysis wouldn’t have to be unnecessarily read in.** It is quite flexible when it comes to supporting a complex data type with a nested structure.

Spark works extremely well with the Parquet file format, and in fact Parquet is the default file format for reading and writing data in Spark. Since **Parquet files are selfcontained, meaning the schema is stored inside the Parquet data file**, it is easy to work with Parquet in Spark.

Reading a Parquet File in Spark

In [33]:
// Parquet is the default format, so we don't need to specify the format when reading

val movies9 = spark.read.load("./beginning-apache-spark-2-master/chapter4/data/movies/movies.parquet")

movies9.printSchema

root
 |-- actor_name: string (nullable = true)
 |-- movie_title: string (nullable = true)
 |-- produced_year: long (nullable = true)



movies9: org.apache.spark.sql.DataFrame = [actor_name: string, movie_title: string ... 1 more field]


In [34]:
// If we want to be more explicit, we can specify the path to the parquet function

val movies10 = spark.read.parquet("./beginning-apache-spark-2-master/chapter4/data/movies/movies.parquet")

movies10.printSchema

root
 |-- actor_name: string (nullable = true)
 |-- movie_title: string (nullable = true)
 |-- produced_year: long (nullable = true)



movies10: org.apache.spark.sql.DataFrame = [actor_name: string, movie_title: string ... 1 more field]


### Creating DataFrames by Reading ORC Files

Optimized Row Columnar (ORC) is another popular open source self-describing columnar storage format in the Hadoop ecosystem. It was created by a company called Cloudera as part of the initiative to massively speed up Hive. It is quite similar to Parquet in terms of efficiency and speed and was designed for analytics workloads. Working with ORC files is just as easy as working with Parquet files

Reading an ORC File in Spark

In [35]:
val movies11 = spark.read.orc("./beginning-apache-spark-2-master/chapter4/data/movies/movies.orc")

movies11.printSchema

root
 |-- actor_name: string (nullable = true)
 |-- movie_title: string (nullable = true)
 |-- produced_year: long (nullable = true)



movies11: org.apache.spark.sql.DataFrame = [actor_name: string, movie_title: string ... 1 more field]


In [36]:
movies11.show(5, false)

+-----------------+------------------+-------------+
|actor_name       |movie_title       |produced_year|
+-----------------+------------------+-------------+
|McClure, Marc (I)|Coach Carter      |2005         |
|McClure, Marc (I)|Superman II       |1980         |
|McClure, Marc (I)|Apollo 13         |1995         |
|McClure, Marc (I)|Superman          |1978         |
|McClure, Marc (I)|Back to the Future|1985         |
+-----------------+------------------+-------------+
only showing top 5 rows



### Creating DataFrames from JDBC

JDBC is a standard application API for reading data from and writing data to a relational database management system. Spark has support for JDBC data sources, which means you can use Spark to read data from and write data to any of the existing RDBMSs such as MySQL, PostgreSQL, Oracle, SQLite, and so on. There are a few important pieces of information you need to provide when working with a JDBC data source: a JDBC driver for your RDBMS, a connection URL, authentication information, and a table name.

See Listing 4.18, 4.19

See Table 4-6

See Listing 4.20

## Working with Structured Operations

Unlike the RDD operations, the structured operations are designed to be more relational, meaning these operations mirror the kind of expressions you can do with SQL, such as projection, filtering, transforming, joining, and so on. 

**Similar to RDD operations, the structured operations are divided into two categories: transformation and action.** The semantics of the structured transformations and actions are identical to the ones in RDDs. In other words, **structured transformations are lazily evaluated, and structured actions are eagerly evaluated.**

Structured operations are sometimes described as a *domain-specific language* (DSL) for distributed data manipulation. A DSL is a computer language specialized for a particular application domain. In this case, the application domain is the distributed data manipulation.

**See Table 4-7. Commonly Used DataFrame Structured Transformations**

## Working with Columns

See Table 4-8. Different Ways of Referring to a Column

Different Ways of Referring to a Column

In [1]:
import org.apache.spark.sql.functions._

val kvDF = Seq((1,2),(2,3)).toDF("key","value")

Intitializing Scala interpreter ...

Spark Web UI available at http://192.168.1.80:4040
SparkContext available as 'sc' (version = 2.4.5, master = local[*], app id = local-1586643159662)
SparkSession available as 'spark'


import org.apache.spark.sql.functions._
kvDF: org.apache.spark.sql.DataFrame = [key: int, value: int]


In [2]:
// to display column names in a DataFrame, we can call the columns function
kvDF.columns

res0: Array[String] = Array(key, value)


In [None]:
kvDF.select("key")

kvDF.select(col("key"))

kvDF.select(column("key"))

kvDF.select($"key")

kvDF.select('key)

In [5]:
// using the col function of DataFrame

kvDF.select(kvDF.col("key"))
kvDF.select('key, 'key > 1).show


+---+---------+
|key|(key > 1)|
+---+---------+
|  1|    false|
|  2|     true|
+---+---------+



## Working with Structured Transformations

In [7]:
val movies = spark.read.parquet("./beginning-apache-spark-2-master/chapter4/data/movies/movies.parquet")

movies: org.apache.spark.sql.DataFrame = [actor_name: string, movie_title: string ... 1 more field]


### select(columns)

Two Variations of the select Transformation

In [8]:
movies.select("movie_title","produced_year").show(5)

+------------------+-------------+
|       movie_title|produced_year|
+------------------+-------------+
|      Coach Carter|         2005|
|       Superman II|         1980|
|         Apollo 13|         1995|
|          Superman|         1978|
|Back to the Future|         1985|
+------------------+-------------+
only showing top 5 rows



In [9]:
// using a column expression to transform year to decade

movies.select('movie_title,('produced_year - ('produced_year % 10)).
as("produced_decade")).show(5)

+------------------+---------------+
|       movie_title|produced_decade|
+------------------+---------------+
|      Coach Carter|           2000|
|       Superman II|           1980|
|         Apollo 13|           1990|
|          Superman|           1970|
|Back to the Future|           1980|
+------------------+---------------+
only showing top 5 rows



### selectExpr(expressions)

This transformation is a variant of the select transformation. The one big difference is that it accepts one or more SQL expressions, rather than columns. However, both are essentially performing the same projection task.

Adding the decade Column to the movies DataFrame Using a SQL Expression

In [10]:
movies.selectExpr("*","(produced_year - (produced_year % 10)) as decade").show(5)

+-----------------+------------------+-------------+------+
|       actor_name|       movie_title|produced_year|decade|
+-----------------+------------------+-------------+------+
|McClure, Marc (I)|      Coach Carter|         2005|  2000|
|McClure, Marc (I)|       Superman II|         1980|  1980|
|McClure, Marc (I)|         Apollo 13|         1995|  1990|
|McClure, Marc (I)|          Superman|         1978|  1970|
|McClure, Marc (I)|Back to the Future|         1985|  1980|
+-----------------+------------------+-------------+------+
only showing top 5 rows



Using a SQL Expression and Built-in Functions

In [11]:
movies.selectExpr("count(distinct(movie_title)) as movies","count(distinct(actor_name)) as actors").show

+------+------+
|movies|actors|
+------+------+
|  1409|  6527|
+------+------+



### filter(condition), where(condition)

Filter Rows with Logical Comparison Functions in the Column Class

In [15]:
movies.filter(col("produced_year") < 2000).show(10)

+-------------------+--------------------+-------------+
|         actor_name|         movie_title|produced_year|
+-------------------+--------------------+-------------+
|  McClure, Marc (I)|         Superman II|         1980|
|  McClure, Marc (I)|           Apollo 13|         1995|
|  McClure, Marc (I)|            Superman|         1978|
|  McClure, Marc (I)|  Back to the Future|         1985|
|  McClure, Marc (I)|Back to the Futur...|         1990|
|  Cooper, Chris (I)|         October Sky|         1999|
|  Cooper, Chris (I)|      A Time to Kill|         1996|
|  Cooper, Chris (I)|     American Beauty|         1999|
|  Cooper, Chris (I)| The Horse Whisperer|         1998|
|Knight, Shirley (I)|  As Good as It Gets|         1997|
+-------------------+--------------------+-------------+
only showing top 10 rows



In [None]:
movies.filter('produced_year < 2000)
movies.where('produced_year > 2000)

movies.filter('produced_year >= 2000)
movies.where('produced_year >= 2000)

**Note: Equality comparison require 3 equal signs**

In [16]:
movies.filter('produced_year === 2000).show(5)

+-----------------+--------------------+-------------+
|       actor_name|         movie_title|produced_year|
+-----------------+--------------------+-------------+
|Cooper, Chris (I)|  Me, Myself & Irene|         2000|
|Cooper, Chris (I)|         The Patriot|         2000|
|  Jolie, Angelina|Gone in Sixty Sec...|         2000|
|   Yip, Françoise|      Romeo Must Die|         2000|
|   Danner, Blythe|    Meet the Parents|         2000|
+-----------------+--------------------+-------------+
only showing top 5 rows



In [17]:
movies.filter(col("produced_year") === 2000).show(5)

+-----------------+--------------------+-------------+
|       actor_name|         movie_title|produced_year|
+-----------------+--------------------+-------------+
|Cooper, Chris (I)|  Me, Myself & Irene|         2000|
|Cooper, Chris (I)|         The Patriot|         2000|
|  Jolie, Angelina|Gone in Sixty Sec...|         2000|
|   Yip, Françoise|      Romeo Must Die|         2000|
|   Danner, Blythe|    Meet the Parents|         2000|
+-----------------+--------------------+-------------+
only showing top 5 rows



**Note: Inequality comparison uses an interesting looking operator =!=

In [18]:
movies.select("movie_title", "produced_year").filter('produced_year =!= 2000).show(10)

+--------------------+-------------+
|         movie_title|produced_year|
+--------------------+-------------+
|        Coach Carter|         2005|
|         Superman II|         1980|
|           Apollo 13|         1995|
|            Superman|         1978|
|  Back to the Future|         1985|
|Back to the Futur...|         1990|
|         October Sky|         1999|
|              Capote|         2005|
|The Bourne Supremacy|         2004|
|            The Town|         2010|
+--------------------+-------------+
only showing top 10 rows



In [19]:
// Otra forma
movies.select("movie_title", "produced_year").filter(col("produced_year") =!= 2000).show(10)

+--------------------+-------------+
|         movie_title|produced_year|
+--------------------+-------------+
|        Coach Carter|         2005|
|         Superman II|         1980|
|           Apollo 13|         1995|
|            Superman|         1978|
|  Back to the Future|         1985|
|Back to the Futur...|         1990|
|         October Sky|         1999|
|              Capote|         2005|
|The Bourne Supremacy|         2004|
|            The Town|         2010|
+--------------------+-------------+
only showing top 10 rows



To combine one or more comparison expressions, we will use either the OR and AND expression operator

In [23]:
movies.filter('produced_year > 2000 && length('movie_title) < 5).show(5)

+---------------+-----------+-------------+
|     actor_name|movie_title|produced_year|
+---------------+-----------+-------------+
|Jolie, Angelina|       Salt|         2010|
| Cueto, Esteban|        xXx|         2002|
|  Butters, Mike|        Saw|         2004|
| Franko, Victor|         21|         2008|
|  Ogbonna, Chuk|       Salt|         2010|
+---------------+-----------+-------------+
only showing top 5 rows



The other way of accomplishing the same result is by calling the filter function two times

In [24]:
movies.filter('produced_year > 2000).filter(length('movie_title) < 5).show(5)

+---------------+-----------+-------------+
|     actor_name|movie_title|produced_year|
+---------------+-----------+-------------+
|Jolie, Angelina|       Salt|         2010|
| Cueto, Esteban|        xXx|         2002|
|  Butters, Mike|        Saw|         2004|
| Franko, Victor|         21|         2008|
|  Ogbonna, Chuk|       Salt|         2010|
+---------------+-----------+-------------+
only showing top 5 rows



### distinct, dropDuplicates

These two transformations have identical behavior. However, dropDuplicates allows you to control which columns should be used in deduplication logic. If none is specified, the deduplication logic will use all the columns in the DataFrame.

In [25]:
movies.select("movie_title").distinct.selectExpr("count(movie_title) as movies").show

+------+
|movies|
+------+
|  1409|
+------+



### sort(columns), orderBy(columns)

Both of these transformations have the same semantics. The orderBy transformation is more relational than the other one. By default, the sorting is in ascending order, and it is fairly easy to change it to descending.

In [28]:
val movieTitles = movies.dropDuplicates("movie_title")
.selectExpr("movie_title", "length(movie_title) as title_length", "produced_year")

movieTitles.show(5, false)

+-----------------------------------+------------+-------------+
|movie_title                        |title_length|produced_year|
+-----------------------------------+------------+-------------+
|Poseidon                           |8           |2006         |
|Ironiya sudby. Prodolzhenie        |27          |2007         |
|The Last Airbender                 |18          |2010         |
|Failure to Launch                  |17          |2006         |
|Le fabuleux destin d'Amélie Poulain|35          |2001         |
+-----------------------------------+------------+-------------+
only showing top 5 rows



movieTitles: org.apache.spark.sql.DataFrame = [movie_title: string, title_length: int ... 1 more field]


In [29]:
movieTitles.sort('title_length).show(5)

+-----------+------------+-------------+
|movie_title|title_length|produced_year|
+-----------+------------+-------------+
|         RV|           2|         2006|
|         12|           2|         2007|
|         Up|           2|         2009|
|         X2|           2|         2003|
|         21|           2|         2008|
+-----------+------------+-------------+
only showing top 5 rows



Sorting in descending order

In [31]:
movieTitles.sort('title_length.desc).show(5, false)

+-----------------------------------------------------------------------------------+------------+-------------+
|movie_title                                                                        |title_length|produced_year|
+-----------------------------------------------------------------------------------+------------+-------------+
|Borat: Cultural Learnings of America for Make Benefit Glorious Nation of Kazakhstan|83          |2006         |
|The Chronicles of Narnia: The Lion, the Witch and the Wardrobe                     |62          |2005         |
|Hannah Montana & Miley Cyrus: Best of Both Worlds Concert                          |57          |2008         |
|The Chronicles of Narnia: The Voyage of the Dawn Treader                           |56          |2010         |
|Istoriya pro Richarda, milorda i prekrasnuyu Zhar-ptitsu                           |56          |1997         |
+-----------------------------------------------------------------------------------+-----------

Sorting by two columns in different orders

In [32]:
movieTitles.orderBy('title_length.desc, 'produced_year).show(6, false)

+-----------------------------------------------------------------------------------+------------+-------------+
|movie_title                                                                        |title_length|produced_year|
+-----------------------------------------------------------------------------------+------------+-------------+
|Borat: Cultural Learnings of America for Make Benefit Glorious Nation of Kazakhstan|83          |2006         |
|The Chronicles of Narnia: The Lion, the Witch and the Wardrobe                     |62          |2005         |
|Hannah Montana & Miley Cyrus: Best of Both Worlds Concert                          |57          |2008         |
|Istoriya pro Richarda, milorda i prekrasnuyu Zhar-ptitsu                           |56          |1997         |
|The Chronicles of Narnia: The Voyage of the Dawn Treader                           |56          |2010         |
|Pirates of the Caribbean: The Curse of the Black Pearl                             |54         

### limit(n)

This transformation returns a new DataFrame by taking the first n rows. This transformation is commonly used after the sorting is done to figure out the top n or bottom n rows based on the sorting order.

In [45]:
val actorNameDF = movies.select("actor_name").distinct.selectExpr("*", "length(actor_name) as length")

actorNameDF: org.apache.spark.sql.DataFrame = [actor_name: string, length: int]


In [46]:
actorNameDF.sort(col("length").desc).limit(10).show

+--------------------+------+
|          actor_name|length|
+--------------------+------+
|Badalamenti II, P...|    28|
|Driscoll, Timothy...|    28|
|Phillips, Christo...|    27|
|Marshall-Fricker,...|    27|
|Shepard, Maridean...|    27|
|Martino, Nicholas...|    27|
|Pahlavi, Shah Moh...|    27|
|Lawrence, Mark Ch...|    26|
|Van de Kamp Bucha...|    26|
|Lough Haggquist, ...|    26|
+--------------------+------+



In [47]:
// Otra forma
actorNameDF.orderBy('length.desc).limit(10).show

+--------------------+------+
|          actor_name|length|
+--------------------+------+
|Badalamenti II, P...|    28|
|Driscoll, Timothy...|    28|
|Phillips, Christo...|    27|
|Marshall-Fricker,...|    27|
|Shepard, Maridean...|    27|
|Martino, Nicholas...|    27|
|Pahlavi, Shah Moh...|    27|
|Lawrence, Mark Ch...|    26|
|Van de Kamp Bucha...|    26|
|Lough Haggquist, ...|    26|
+--------------------+------+



### union(otherDataFrame)

You learned earlier that DataFrames are immutable. So if there is a need to add more rows to an existing DataFrame, then the union transformation is useful for that purpose as well as for combining rows from two DataFrames. **This transformation requires both DataFrames to have the same schema, meaning both column names and their order must exactly match.**

**Example:** We want to add a missing actor to movie with title as "12".

In [48]:
val shortNameMovieDF = movies.where('movie_title === "12")
shortNameMovieDF.show

+--------------------+-----------+-------------+
|          actor_name|movie_title|produced_year|
+--------------------+-----------+-------------+
|    Efremov, Mikhail|         12|         2007|
|     Stoyanov, Yuriy|         12|         2007|
|     Gazarov, Sergey|         12|         2007|
|Verzhbitskiy, Viktor|         12|         2007|
+--------------------+-----------+-------------+



shortNameMovieDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [actor_name: string, movie_title: string ... 1 more field]


Create a DataFrame with one row

In [52]:
import org.apache.spark.sql.Row

val actor = Seq(Row("Brychta, Edita", "12", 2007L))
val actorRDD = sc.parallelize(actor)
val actorDF = spark.createDataFrame(actorRDD, shortNameMovieDF.schema)

import org.apache.spark.sql.Row
actor: Seq[org.apache.spark.sql.Row] = List([Brychta, Edita,12,2007])
actorRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = ParallelCollectionRDD[138] at parallelize at <console>:34
actorDF: org.apache.spark.sql.DataFrame = [actor_name: string, movie_title: string ... 1 more field]


In [53]:
actorDF.show

+--------------+-----------+-------------+
|    actor_name|movie_title|produced_year|
+--------------+-----------+-------------+
|Brychta, Edita|         12|         2007|
+--------------+-----------+-------------+



In [54]:
val completeDF = shortNameMovieDF.union(actorDF)
completeDF.show

+--------------------+-----------+-------------+
|          actor_name|movie_title|produced_year|
+--------------------+-----------+-------------+
|    Efremov, Mikhail|         12|         2007|
|     Stoyanov, Yuriy|         12|         2007|
|     Gazarov, Sergey|         12|         2007|
|Verzhbitskiy, Viktor|         12|         2007|
|      Brychta, Edita|         12|         2007|
+--------------------+-----------+-------------+



completeDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [actor_name: string, movie_title: string ... 1 more field]


### withColumn(colName, column)

This transformation is used to add a new column to a DataFrame. It requires two input parameters: a column name and a value in the form of a column expression. You can accomplish pretty much the same goal by using the selectExpr transformation. However, if the given column name matches one of the existing ones, then that column is replaced with the given column expression.

Adding a new column based on a certain column expression

In [57]:
movies.withColumn("decade", ('produced_year - ('produced_year % 10))).show(5, false)

+-----------------+------------------+-------------+------+
|actor_name       |movie_title       |produced_year|decade|
+-----------------+------------------+-------------+------+
|McClure, Marc (I)|Coach Carter      |2005         |2000  |
|McClure, Marc (I)|Superman II       |1980         |1980  |
|McClure, Marc (I)|Apollo 13         |1995         |1990  |
|McClure, Marc (I)|Superman          |1978         |1970  |
|McClure, Marc (I)|Back to the Future|1985         |1980  |
+-----------------+------------------+-------------+------+
only showing top 5 rows



Otra forma

In [58]:
movies.withColumn("decade", (col("produced_year") - (col("produced_year") % 10))).show(5, false)

+-----------------+------------------+-------------+------+
|actor_name       |movie_title       |produced_year|decade|
+-----------------+------------------+-------------+------+
|McClure, Marc (I)|Coach Carter      |2005         |2000  |
|McClure, Marc (I)|Superman II       |1980         |1980  |
|McClure, Marc (I)|Apollo 13         |1995         |1990  |
|McClure, Marc (I)|Superman          |1978         |1970  |
|McClure, Marc (I)|Back to the Future|1985         |1980  |
+-----------------+------------------+-------------+------+
only showing top 5 rows



Now replace the produced_year with new values

In [59]:
movies.withColumn("produced_year", ('produced_year - ('produced_year % 10))).show(5, false)

+-----------------+------------------+-------------+
|actor_name       |movie_title       |produced_year|
+-----------------+------------------+-------------+
|McClure, Marc (I)|Coach Carter      |2000         |
|McClure, Marc (I)|Superman II       |1980         |
|McClure, Marc (I)|Apollo 13         |1990         |
|McClure, Marc (I)|Superman          |1970         |
|McClure, Marc (I)|Back to the Future|1980         |
+-----------------+------------------+-------------+
only showing top 5 rows



### withColumnRenamed(existingColName, newColName)

See page 123 of PDF for understain this topic in deep.

Using the withColumnRenamed Transformation to Rename Some of the Column Names

In [60]:
movies.withColumnRenamed("actor_name", "actor")
.withColumnRenamed("movie_title", "title")
.withColumnRenamed("produced_year", "year").show(5)

+-----------------+------------------+----+
|            actor|             title|year|
+-----------------+------------------+----+
|McClure, Marc (I)|      Coach Carter|2005|
|McClure, Marc (I)|       Superman II|1980|
|McClure, Marc (I)|         Apollo 13|1995|
|McClure, Marc (I)|          Superman|1978|
|McClure, Marc (I)|Back to the Future|1985|
+-----------------+------------------+----+
only showing top 5 rows



### drop(columnName1, columnName2)

This transformation simply drops the specified columns from the DataFrame. You can specify one or more column names to drop, but only the ones that exist in the schema
will be dropped and the ones that don’t will be silently ignored. You can use the select
transformation to drop columns by projecting only the columns that you want to keep.
In the case that a DataFrame has 100 columns and you want to drop a few, then this
transformation is more convenient to use than the select transformation.

Dropping Two Columns: One Exists and the Other One Doesn’t

In [61]:
movies.drop("actor_name", "me").printSchema

root
 |-- movie_title: string (nullable = true)
 |-- produced_year: long (nullable = true)



As you can see from the previous example, the second column, me, doesn’t exist in
the schema, so the drop transformation simply ignores it.

### sample(fraction), sample(fraction, seed), sample(fraction, seed, withReplacement)

This transformation returns a randomly selected set of rows from the DataFrame. The
number of the returned rows will be approximately equal to the specified fraction, which
represents a percentage, and the value has to be between 0 and 1. The seed is used to
seed the random number generator, which is used to generate a row number to include
in the result. If a seed is not specified, then a randomly generated value is used. The
withReplacement option is used to determine whether a randomly selected row will be
placed back into the selection pool. In other words, when withReplacement is true, a
particular selected row has the potential to be selected more than once.

In [62]:
// sample with no replacement and a fraction
movies.sample(false, 0.0003).show(3)

+-----------------+--------------------+-------------+
|       actor_name|         movie_title|produced_year|
+-----------------+--------------------+-------------+
|List, Peyton (II)|The Sorcerer's Ap...|         2010|
|Kehoe, Michael G.|       Jerry Maguire|         1996|
|   Hardrict, Cory|   Never Been Kissed|         1999|
+-----------------+--------------------+-------------+
only showing top 3 rows



In [63]:
// sample with replacement, a fraction and a seed
movies.sample(true, 0.0003, 123456).show(3)

+--------------------+--------------+-------------+
|          actor_name|   movie_title|produced_year|
+--------------------+--------------+-------------+
|Panzarella, Russ (V)|Public Enemies|         2009|
|        Reed, Tanoai|     Daredevil|         2003|
|        Moyo, Masasa|  Spider-Man 3|         2007|
+--------------------+--------------+-------------+
only showing top 3 rows



### randomSplit(weights)

This transformation is commonly used during the process of preparing the data to train
machine learning models. Unlike the previous transformations, this one returns one
or more DataFrames. The number of DataFrames it returns is based on the number of
weights you specify. If the provided set of weights don’t add up to 1, then they will be
normalized accordingly to add up to 1.

In [64]:
val smallerMovieDFs = movies.randomSplit(Array(0.6, 0.3, 0.1))

smallerMovieDFs: Array[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]] = Array([actor_name: string, movie_title: string ... 1 more field], [actor_name: string, movie_title: string ... 1 more field], [actor_name: string, movie_title: string ... 1 more field])


In [66]:
smallerMovieDFs(0).count

res47: Long = 18903


In [67]:
smallerMovieDFs(1).count

res48: Long = 9328


## Working with Missing or Bad Data

Spark provides a dedicated class called DataFrameNaFunctions to help in
dealing with this inconvenient issue. An instance of DataFrameNaFunctions is available
as the an member variable inside the DataFrame class. There are three common ways
of dealing with missing or bad data. The first way is to drop the rows that have missing
values in a one or more columns. The second way is to fill those missing values with
user-provided values. The third way is to replace the bad data with something that you
know how to deal with.

Dropping Rows with Missing Data

In [68]:
// first create a DataFrame with missing values in one or more columns
import org.apache.spark.sql.Row

val badMovies = Seq(Row(null, null, null),
                    Row(null, null, 2018L),
                    Row("John Doe", "Awesome Movie", null),
                    Row(null, "Awesome Movie", 2018L),
                    Row("Mary Jane", null, 2018L))

val badMoviesRDD = spark.sparkContext.parallelize(badMovies)

val badMoviesDF = spark.createDataFrame(badMoviesRDD, movies.schema)

badMoviesDF.show

+----------+-------------+-------------+
|actor_name|  movie_title|produced_year|
+----------+-------------+-------------+
|      null|         null|         null|
|      null|         null|         2018|
|  John Doe|Awesome Movie|         null|
|      null|Awesome Movie|         2018|
| Mary Jane|         null|         2018|
+----------+-------------+-------------+



import org.apache.spark.sql.Row
badMovies: Seq[org.apache.spark.sql.Row] = List([null,null,null], [null,null,2018], [John Doe,Awesome Movie,null], [null,Awesome Movie,2018], [Mary Jane,null,2018])
badMoviesRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = ParallelCollectionRDD[191] at parallelize at <console>:40
badMoviesDF: org.apache.spark.sql.DataFrame = [actor_name: string, movie_title: string ... 1 more field]


In [69]:
// dropping rows that have missing data in any column
// both of the lines below will achieve the same purpose

badMoviesDF.na.drop().show
badMoviesDF.na.drop("any").show

+----------+-----------+-------------+
|actor_name|movie_title|produced_year|
+----------+-----------+-------------+
+----------+-----------+-------------+

+----------+-----------+-------------+
|actor_name|movie_title|produced_year|
+----------+-----------+-------------+
+----------+-----------+-------------+



In [70]:
// drop rows that have missing data in every single column
badMoviesDF.na.drop("all").show

+----------+-------------+-------------+
|actor_name|  movie_title|produced_year|
+----------+-------------+-------------+
|      null|         null|         2018|
|  John Doe|Awesome Movie|         null|
|      null|Awesome Movie|         2018|
| Mary Jane|         null|         2018|
+----------+-------------+-------------+



In [71]:
// drops rows when column actor_name has missing data
badMoviesDF.na.drop(Array("actor_name")).show

+----------+-------------+-------------+
|actor_name|  movie_title|produced_year|
+----------+-------------+-------------+
|  John Doe|Awesome Movie|         null|
| Mary Jane|         null|         2018|
+----------+-------------+-------------+



### describe(columnNames)

Sometimes it is useful to have a general sense of the basic statistics of the data you
are working with. The basic statistics this transformation can compute for string and
numeric columns are count, mean, standard deviation, minimum, and maximum.
You can pick and choose which string or numeric columns to compute the statistics for.

In [72]:
movies.describe("produced_year").show()

+-------+------------------+
|summary|     produced_year|
+-------+------------------+
|  count|             31392|
|   mean|2002.7964449541284|
| stddev| 6.377236851493877|
|    min|              1961|
|    max|              2012|
+-------+------------------+



## Working with Structured Actions

See Table 4-9. Commonly Used Structured Actions

## Datasets

See page 130 PDF.

## Writing Data Out to Storage Systems

Common Interacting Pattern with DataFrameWriter

In [None]:
movies.write.format(...).mode(...).option(...).partitionBy(...).bucketBy(...).sortBy(...).save(path)

In [None]:
// write data out as CVS format, but using a '#' as delimiter
movies.write.format("csv").option("sep", "#").save("/tmp/output/csv")

// write data out using overwrite save mode
movies.write.format("csv").mode("overwrite").option("sep", "#").save
("/tmp/output/csv")

Reducing the Number of Partitions in a DataFrame to 1

In [None]:
val singlePartitionDF = movies.coalesce(1)

**Partitions:** The idea of writing data out using partitioning and bucketing is borrowed from
the Apache Hive user community. As a general rule of thumb, the partition by column
should have low cardinality. In the movies DataFrame, the produced_year column is
a good candidate for the partition by column. Let’s say you are going to write out the
movies DataFrame with partitioning by the produced_year column. DataFrameWriter
will write out all the movies with the same produced_year into a single directory. The
number of directories in the output folder will correspond to the number of years in the
movies DataFrame.

Writing the movies DataFrame Using the Parquet Format and
Partition by the produced_year Column

In [81]:
movies.write.partitionBy("produced_year").save("/tmp/output/movies")

// the /tmp/output/movies directory will contain the following subdirectories produced_year=1961 to produced_year=2012