# Spark 2 Concepts

* SparkContext vs. SparkSession vs. SQLContext
* RDD vs. DataFrame vs Dataset

__Learning Objectives__

* SparkContext vs. SparkSession - how to access these variables in pyspark / jupyter
* Understanding RDDs vs. DataFrames vs. Datasets
* Working with RDDs and DataFrames
* Interoperability between RDD and DataFrames
* Understanding the SQLContext
* Accessing RDDs in DataFrames
* Understanding Spark DataFrame vs. Python Pandas DataFrame

## SparkContext vs. SparkSession

__SparkContext__
	
* Available from Spark 1.x
* Familiar code entry point to Spark --- sc = new sparkContext(...) 
* One SparkContext per application
* Create RDDs, accumulators... 
* Run jobs

__SparkSession__

* In Spark 2.x, SparkContext is wrapped in SparkSession
* Entry point to SparkSQL
* Merges SQLContext and HiveContext
* Can have multiple SparkSession objects

In [1]:
# SparkContext is initialized and available as "sc" in pyspark
sc

In [2]:
# SparkSession is initialized and available as "spark" in pyspark
spark

## Understanding RDDs vs. DataFrames vs. Datasets

__RDDs__ = Lower-level API

* Resilient Distributed Dataset
* Primary abstraction since initial version of Spark
* Transformations (map, filter…) and Actions on Data (collect, count, reduce…)
* Present in Scala, Java, Python and R
* From a developer standpoint - deal with unstructured data, complex data types, manage low level details, fine tune performance

__DataFrames__ = Untyped Higher-level API

* Added to Spark in 1.3
* DataFrames built on top of RDDs, and is a dataset organized into named columns
* Conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood
* Present in Scala, Java, Python and R
* From a developer standpoint - deal with structured or semi-structured data, think in SQL, no type safety at compile-time, leverage Spark optimizations for performance

__DataSets__ = Typed Higher-level API

* Added to Spark in 1.6
* Extension of DataFrames: compile-time type safety, OOP interface
* No named columns
* Present in Scala and Java, but not in Python and R
* From a developer standpoint - deal with structured or semi-structured data, functional APIs, offers the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine

__Performance__ = RDD < Dataset < DataFrame

__Starting Spark 2.0, APIs for Datasets and DataFrames have merged__

* Datasets = Datasets of the Row() object in Scala / Java often called DataFrames
* DataFrames = Equivalent to Dataset[Row] in Scala / Java

## Working with RDDs and DataFrames

In [3]:
# Create RDD containing some mixed data
mixed_data = sc.parallelize([1, "Tirthal", "Patel", 37, 58])
mixed_data

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:184

In [4]:
# Perform action on RDD, e.g. view all the elements from the RDD 
mixed_data.collect()

[1, 'Tirthal', 'Patel', 37, 58]

__Let's try to convert mixed data RDD to DataFrame using rdd.toDF()__

Why giving TypeError: Can not infer schema for type? 

Well, this RDD has no schema and contains elements of different types, so it cannot be converted to a DataFrame.

In [5]:
df = mixed_data.toDF()

TypeError: Can not infer schema for type: <class 'int'>

## Interoperability between RDD and DataFrames

In [6]:
# Create RDD with structured list
usersRDD = sc.parallelize([[1, "Tirthal", "Patel", 37, 58], [2, "Ian", "Patel", 8, 20]])
usersRDD

ParallelCollectionRDD[2] at parallelize at PythonRDD.scala:184

In [7]:
# Perform action on RDD, e.g. view all the elements from the RDD 
usersRDD.collect()

[[1, 'Tirthal', 'Patel', 37, 58], [2, 'Ian', 'Patel', 8, 20]]

In [8]:
# Perform action on RDD, e.g. get first record fromt the RDD
usersRDD.first()

[1, 'Tirthal', 'Patel', 37, 58]

__Let's try to convert structured data RDD to DataFrame using rdd.toDF()__

In [9]:
usersDF = usersRDD.toDF()
usersDF

DataFrame[_1: bigint, _2: string, _3: string, _4: bigint, _5: bigint]

In [11]:
usersDF.show()

# See column names have been automatically generated and assigned

+---+-------+-----+---+---+
| _1|     _2|   _3| _4| _5|
+---+-------+-----+---+---+
|  1|Tirthal|Patel| 37| 58|
|  2|    Ian|Patel|  8| 20|
+---+-------+-----+---+---+



__How to specify names for each field instead of auto-generated?__

Structured data with a schema can be converted to a DataFrame

In [12]:
from pyspark.sql.types import Row 

usersData = sc.parallelize([Row(id=1, fname="Tirthal", lname="Patel",age=37, weight=58.00),
                            Row(id=2, fname="Ian", lname="Patel",age=8, weight=20.00)])
usersData

ParallelCollectionRDD[19] at parallelize at PythonRDD.scala:184

In [13]:
usersData.collect()

# See RDD is made up of Row objects - each element is Row

[Row(age=37, fname='Tirthal', id=1, lname='Patel', weight=58.0),
 Row(age=8, fname='Ian', id=2, lname='Patel', weight=20.0)]

In [14]:
usersDataFrame = usersData.toDF()
usersDataFrame.show()

+---+-------+---+-----+------+
|age|  fname| id|lname|weight|
+---+-------+---+-----+------+
| 37|Tirthal|  1|Patel|  58.0|
|  8|    Ian|  2|Patel|  20.0|
+---+-------+---+-----+------+



__Does it support complex data types?__

In [15]:
from datetime import datetime
complicated_data = sc.parallelize([Row(col_float=12.22, 
                                   col_time=datetime(2018, 6, 8, 11, 2, 8), 
                                   col_row=Row(x=25, y=100), 
                                   col_dict={ "K1" : 100, "K2" : 200},
                                   col_list=[10, 20, 30]
                                 ),                                                                      
                                 Row(col_float=80.25, 
                                   col_time=datetime(2017, 5, 9, 18, 10, 45), 
                                   col_row=Row(x=45, y=600), 
                                   col_dict={ "K1" : 200, "K2" : 300},
                                   col_list=[50, 60, 70]
                                 )])

complicated_data_df = complicated_data.toDF()
complicated_data_df.show()

+--------------------+---------+------------+---------+-------------------+
|            col_dict|col_float|    col_list|  col_row|           col_time|
+--------------------+---------+------------+---------+-------------------+
|[K1 -> 100, K2 ->...|    12.22|[10, 20, 30]|[25, 100]|2018-06-08 11:02:08|
|[K1 -> 200, K2 ->...|    80.25|[50, 60, 70]|[45, 600]|2017-05-09 18:10:45|
+--------------------+---------+------------+---------+-------------------+



## Understanding the SQLContext

__SQLContext__

* The entry point into all functionality in Spark SQL
* Just a wrapper around SparkContext, which enables to run sql queries on spark data

In [16]:
sqlContext = SQLContext(sc)

sqlContext

<pyspark.sql.context.SQLContext at 0x76d8f60>

In [17]:
# Generate single id column DataFrame
ids_df = sqlContext.range(3)
ids_df

DataFrame[id: bigint]

In [18]:
ids_df.show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
+---+



__Create DataFrame for any complex data__

In [19]:
shapes_list = [(1, 'Square', 'red'), (2, 'Circle', 'blue'), (3, 'Rectangle', 'black')]

In [20]:
# Create DataFrame with auto-generated column names
sqlContext.createDataFrame(shapes_list).show()

+---+---------+-----+
| _1|       _2|   _3|
+---+---------+-----+
|  1|   Square|  red|
|  2|   Circle| blue|
|  3|Rectangle|black|
+---+---------+-----+



In [21]:
# Create DataFrame with custom specification of data
sqlContext.createDataFrame(shapes_list, ['ID', 'SHAPE', 'COLOR']).show()

+---+---------+-----+
| ID|    SHAPE|COLOR|
+---+---------+-----+
|  1|   Square|  red|
|  2|   Circle| blue|
|  3|Rectangle|black|
+---+---------+-----+



__Use SQLContext to generate DataFrames from RDD__

In [22]:
# Create RDD of Row objects without column specification
shapes_rows_RDD = sc.parallelize([Row(1, 'Square', 'red'), Row(2, 'Circle', 'blue'), Row(3, 'Rectangle', 'black')])

# Setup column names for Row objects
shapes_column_names =  Row('ID', 'SHAPE', 'COLOR')

# Use map operation to perform transformation on every element in the RDD (i.e. assign column names to the data)
shapes_RDD = shapes_rows_RDD.map(lambda r : shapes_column_names(*r))
shapes_RDD

PythonRDD[65] at RDD at PythonRDD.scala:49

In [23]:
shapes_RDD.collect()

# See field names are assigned to row objects

[Row(ID=1, SHAPE='Square', COLOR='red'),
 Row(ID=2, SHAPE='Circle', COLOR='blue'),
 Row(ID=3, SHAPE='Rectangle', COLOR='black')]

In [24]:
# Create DataFrame from RDD using SQLContext
shapes_DF = sqlContext.createDataFrame(shapes_RDD)
shapes_DF

DataFrame[ID: bigint, SHAPE: string, COLOR: string]

In [25]:
shapes_DF.show()

# See this inferred the data types for each column

+---+---------+-----+
| ID|    SHAPE|COLOR|
+---+---------+-----+
|  1|   Square|  red|
|  2|   Circle| blue|
|  3|Rectangle|black|
+---+---------+-----+



## Accessing RDDs in DataFrames

__Access particular data from the complex data frame using metrics notation__

In [26]:
complicated_data_df.take(2)

[Row(col_dict={'K1': 100, 'K2': 200}, col_float=12.22, col_list=[10, 20, 30], col_row=Row(x=25, y=100), col_time=datetime.datetime(2018, 6, 8, 11, 2, 8)),
 Row(col_dict={'K1': 200, 'K2': 300}, col_float=80.25, col_list=[50, 60, 70], col_row=Row(x=45, y=600), col_time=datetime.datetime(2017, 5, 9, 18, 10, 45))]

In [27]:
# Get list of 1st row and 3rd column
row_first_col_list_value = complicated_data_df.collect()[0][2]
row_first_col_list_value

[10, 20, 30]

In [28]:
row_first_col_list_value.append(40)
row_first_col_list_value

[10, 20, 30, 40]

In [29]:
# See the original cell remains unchanged, because copy of the list was returned by data frame
complicated_data_df.first()

Row(col_dict={'K1': 100, 'K2': 200}, col_float=12.22, col_list=[10, 20, 30], col_row=Row(x=25, y=100), col_time=datetime.datetime(2018, 6, 8, 11, 2, 8))

__Extract specific columns by converting the DataFrame to RDD__

In [30]:
complicated_data_df.rdd\
                   .map(lambda x : (x.col_time, x.col_row))\
                   .collect()

[(datetime.datetime(2018, 6, 8, 11, 2, 8), Row(x=25, y=100)),
 (datetime.datetime(2017, 5, 9, 18, 10, 45), Row(x=45, y=600))]

__Extract specific columns using select method of DataFrame instead of rdd__

In [31]:
complicated_data_df.select('col_time', 'col_row').show()

+-------------------+---------+
|           col_time|  col_row|
+-------------------+---------+
|2018-06-08 11:02:08|[25, 100]|
|2017-05-09 18:10:45|[45, 600]|
+-------------------+---------+



__How to calculate some value on every record of DataFrame?__

DataFrame doesn't support map operation, rather withColumn can be used

In [32]:
complicated_data_df.select('col_float')\
                   .withColumn("col_twofold_float", complicated_data_df.col_float * 2)\
                   .show()

+---------+-----------------+
|col_float|col_twofold_float|
+---------+-----------------+
|    12.22|            24.44|
|    80.25|            160.5|
+---------+-----------------+



__How to rename column name within DataFrame?__

In [33]:
complicated_data_df.withColumnRenamed("col_time", "col_datetime").show()

+--------------------+---------+------------+---------+-------------------+
|            col_dict|col_float|    col_list|  col_row|       col_datetime|
+--------------------+---------+------------+---------+-------------------+
|[K1 -> 100, K2 ->...|    12.22|[10, 20, 30]|[25, 100]|2018-06-08 11:02:08|
|[K1 -> 200, K2 ->...|    80.25|[50, 60, 70]|[45, 600]|2017-05-09 18:10:45|
+--------------------+---------+------------+---------+-------------------+



In [34]:
# Original DataFrame is NOT changed, rather new DataFrame is created and returned (see above)
complicated_data_df.show()

+--------------------+---------+------------+---------+-------------------+
|            col_dict|col_float|    col_list|  col_row|           col_time|
+--------------------+---------+------------+---------+-------------------+
|[K1 -> 100, K2 ->...|    12.22|[10, 20, 30]|[25, 100]|2018-06-08 11:02:08|
|[K1 -> 200, K2 ->...|    80.25|[50, 60, 70]|[45, 600]|2017-05-09 18:10:45|
+--------------------+---------+------------+---------+-------------------+



In [35]:
# How to get specific column data with alias column name
complicated_data_df.select(complicated_data_df.col_time.alias("Date & Time")).show()

+-------------------+
|        Date & Time|
+-------------------+
|2018-06-08 11:02:08|
|2017-05-09 18:10:45|
+-------------------+



## Spark DataFrame vs. Python Pandas DataFrame

* __Spark DataFrame__ = Distributed across machines as per Spark architecture
* __Pandas DataFrame__ = In memory on single machine

__Convert from Spark DataFrame to Pandas DataFrame__

In [36]:
import pandas

df_pandas = complicated_data_df.toPandas()
df_pandas

Unnamed: 0,col_dict,col_float,col_list,col_row,col_time
0,"{'K1': 100, 'K2': 200}",12.22,"[10, 20, 30]","(25, 100)",2018-06-08 11:02:08
1,"{'K1': 200, 'K2': 300}",80.25,"[50, 60, 70]","(45, 600)",2017-05-09 18:10:45


__Convert from Pandas DataFrame to Spark DataFrame__

In [None]:
df_spark = sqlContext.createDataFrame(df_pandas).show()

## Spark DataFrame vs. Spark Dataset

* __Spark DataFrame__ = Untyped Dataset Operations.
* __Spark Dataset__ = Datasets are similar to RDDs, however, instead of using Java serialization or Kryo they use a specialized Encoder to serialize the objects for processing or transmitting over the network. While both encoders and standard serialization are responsible for turning an object into bytes, encoders are code generated dynamically and use a format that allows Spark to perform many operations like filtering, sorting and hashing without deserializing the bytes back into an object.

__Creating DataFrames and Performing untyped dataset operations__ (example code in Scala)

```
val df = spark.read.json("examples/src/main/resources/people.json")
    
// Print the schema in a tree format
df.printSchema()
    // root
    // |-- age: long (nullable = true)
    // |-- name: string (nullable = true)
    
// Displays the content of the DataFrame to stdout    
df.show()
    // +----+-------+
    // | age|   name|
    // +----+-------+
    // |null|Michael|
    // |  30|   Andy|
    // |  19| Justin|
    // +----+-------+
    
// Select only the "name" column
df.select("name").show()
    // +-------+
    // |   name|
    // +-------+
    // |Michael|
    // |   Andy|
    // | Justin|
    // +-------+

// Count people by age
df.groupBy("age").count().show()
    // +----+-----+
    // | age|count|
    // +----+-----+
    // |  19|    1|
    // |null|    1|
    // |  30|    1|
    // +----+-----+
```

__Creating Datasets__ (example code in Scala)

```
case class Person(name: String, age: Long)

// Encoders are created for case classes
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
    // +----+---+
    // |name|age|
    // +----+---+
    // |Andy| 32|
    // +----+---+

// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
    // +----+-------+
    // | age|   name|
    // +----+-------+
    // |null|Michael|
    // |  30|   Andy|
    // |  19| Justin|
    // +----+-------+
```

### Disclaimer

The above notes and code I produced for my own learning purpose, while attended [pluralsight course](https://app.pluralsight.com/library/courses/spark-2-getting-started/table-of-contents).