# Spark DataFrames

- Enable wider audiences beyond “Big Data” engineers to leverage the power of distributed processing
- Inspired by data frames in R and Python (Pandas)
- Designed from the ground-up to support modern big
data and data science applications
- Extension to the existing RDD API

## References
- [Spark SQL, DataFrames and Datasets Guide](https://spark.apache.org/docs/latest/sql-programming-guide.html)
- [Introduction to DataFrames - Python](https://docs.databricks.com/spark/latest/dataframes-datasets/introduction-to-dataframes-python.html)
- [PySpark Cheat Sheet: Spark DataFrames in Python](https://www.datacamp.com/community/blog/pyspark-sql-cheat-sheet)

### DataFrames are :
- The preferred abstraction in Spark
- Strongly typed collection of distributed elements 
- Built on Resilient Distributed Datasets (RDD)
- Immutable once constructed

### With Dataframes you can :
- Track lineage information to efficiently recompute lost data 
- Enable operations on collection of elements in parallel

### You construct DataFrames
- by parallelizing existing collections (e.g., Pandas DataFrames) 
- by transforming an existing DataFrames
- from files in HDFS or any other storage system (e.g., Parquet)

### Features
- Ability to scale from kilobytes of data on a single laptop to petabytes on a large cluster
- Support for a wide array of data formats and storage systems
- Seamless integration with all big data tooling and infrastructure via Spark
- APIs for Python, Java, Scala, and R

### DataFrames versus RDDs
- Nice API for new users familiar with data frames in other programming languages.
- For existing Spark users, the API will make Spark easier to program than using RDDs
- For both sets of users, DataFrames will improve performance through intelligent optimizations and code-generation

## PySpark Shell

**Run the Spark shell:**

~~~ bash
pyspark
~~~

Output similar to the following will be displayed, followed by a `>>>` REPL prompt:

~~~
Python 3.6.5 |Anaconda, Inc.| (default, Apr 29 2018, 16:14:56)
[GCC 7.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
2018-09-18 17:13:13 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.3.1
      /_/

Using Python version 3.6.5 (default, Apr 29 2018 16:14:56)
SparkSession available as 'spark'.
>>>
~~~

Read data and convert to Dataset

~~~ py
df = sqlContext.read.csv("/tmp/irmar.csv", sep=';', header=True)
~~~

~~~
>>> df2.show()
+---+--------------------+------------+------+------------+--------+-----+---------+--------+
|_c0|                name|       phone|office|organization|position|  hdr|    team1|   team2|
+---+--------------------+------------+------+------------+--------+-----+---------+--------+
|  0|      Alphonse Paul |+33223235223|   214|          R1|     DOC|False|      EDP|      NA|
|  1|        Ammari Zied |+33223235811|   209|          R1|      MC| True|      EDP|      NA|
.
.
.
| 18|    Bernier Joachim |+33223237558|   214|          R1|     DOC|False|   ANANUM|      NA|
| 19|   Berthelot Pierre |+33223236043|   601|          R1|      PE| True|       GA|      NA|
+---+--------------------+------------+------+------------+--------+-----+---------+--------+
only showing top 20 rows
~~~

## Transformations, Actions, Laziness

Like RDDs, DataFrames are lazy. Transformations contribute to the query plan, but they don't execute anything.
Actions cause the execution of the query.

### Transformation examples
- filter
- select
- drop
- intersect 
- join
### Action examples
- count 
- collect 
- show 
- head
- take

## Creating a DataFrame in Python

In [1]:
import sys, subprocess
import os

os.environ["PYSPARK_PYTHON"] = sys.executable

In [2]:
from pyspark import SparkContext, SparkConf, SQLContext
# The following three lines are not necessary
# in the pyspark shell
conf = SparkConf().setAppName("people").setMaster("local[*]") 
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")
sqlContext = SQLContext(sc)

In [3]:
df = sqlContext.read.json("data/people.json") # get a dataframe from json file

df.show(24)

+----------+---------+-----------+
| firstname| lastname|      login|
+----------+---------+-----------+
|     Simon|     Uzel|     uzel_s|
|   Perrine|   Moreau|   moreau_p|
|     Elise|    Negri|    negri_e|
|   Camille|   Cochet|   cochet_c|
|   Nolwenn| Giguelay| giguelay_n|
|     Youen|    Meyer|    meyer_y|
|    Emilie|  Lacoste|  lacoste_e|
|       Pia|  LeBihan|  lebihan_p|
|      Yann|    Evain|    evain_y|
|   Camille|    Guyon|    guyon_c|
|  Mathilde|  LeMener|  lemener_m|
|    Gildas| LeGuilly| liguilly_g|
|    Pierre| Gardelle| gardelle_p|
|Christophe|Boulineau|boulineau_c|
|      Omar| Aitichou| aitichou_o|
|     Lijun|      Chi|      chi_l|
|    Jiawei|      Liu|      lin_j|
|     Irvin|Keraudren|keraudren_i|
|     Bryan|    Jacob|    jacob_b|
|   Raphael| Guillerm| guillerm_r|
|     Bruno|Queguiner|queguiner_b|
|   Yingshi|     Zeng|     zeng_y|
+----------+---------+-----------+



## Schema Inference

In this exercise, let's explore schema inference. We're going to be using a file called `irmar.txt`. The data is structured, but it has no self-describing schema. And, it's not JSON, so Spark can't infer the schema automatically. Let's create an RDD and look at the first few rows of the file.

In [4]:
rdd = sc.textFile("data/irmar.csv")
for line in rdd.take(10):
  print(line)

Alphonse Paul;+33223235223;214;R1;DOC;False;EDP;NA
Ammari Zied;+33223235811;209;R1;MC;True;EDP;NA
André Simon;+33223237555;301;R1;DOC;False;THEO-ERG;NA
Angst Jurgen;+33223236519;320;R1;MC;False;PROC-STOC;NA
Bailleul Ismaël;+33223236369;302;R1;MC;True;THEO-ERG;NA
Baker Mark;+33223236028;835;R1;PR;True;GAN;NA
Balac Stephane;+33223236274;110;R1;MC;False;ANANUM;NA
Bauer Max;+33223236675;734;R1;MC;False;GAN;NA
Bavard Juliette;+33223236724;331;CNRS;CR;False;GAN;THEO-ERG
Beauchard Karine;+33223236164;235;R1;PR;True;ANANUM;NA


## Hands-on Exercises

You can look at the <a href="http://spark.apache.org/docs/2.3.1/api/python/index.html" target="_blank">DataFrames API documentation</a> 

Let's take a look to file "/tmp/irmar.csv". Each line consists 
of the same information about a person:

* name
* phone
* office
* organization
* position 
* hdr
* team1
* team2

In [5]:
from collections import namedtuple

rdd = sc.textFile("data/irmar.csv")

Person = namedtuple('Person', ['name', 'phone', 'office', 'organization', 
                               'position', 'hdr', 'team1', 'team2'])
def str_to_bool(s):
    if s == 'True': return True
    return False

def map_to_person(line):
    cols = line.split(";")
    return Person(name         = cols[0],
                  phone        = cols[1],
                  office       = cols[2],
                  organization = cols[3],
                  position     = cols[4], 
                  hdr          = str_to_bool(cols[5]),
                  team1        = cols[6],
                  team2        = cols[7])
    
people_rdd = rdd.map(map_to_person)
df = people_rdd.toDF()

In [6]:
df.show()

+--------------------+------------+------+------------+--------+-----+---------+--------+
|                name|       phone|office|organization|position|  hdr|    team1|   team2|
+--------------------+------------+------+------------+--------+-----+---------+--------+
|       Alphonse Paul|+33223235223|   214|          R1|     DOC|false|      EDP|      NA|
|         Ammari Zied|+33223235811|   209|          R1|      MC| true|      EDP|      NA|
|         André Simon|+33223237555|   301|          R1|     DOC|false| THEO-ERG|      NA|
|        Angst Jurgen|+33223236519|   320|          R1|      MC|false|PROC-STOC|      NA|
|     Bailleul Ismaël|+33223236369|   302|          R1|      MC| true| THEO-ERG|      NA|
|          Baker Mark|+33223236028|   835|          R1|      PR| true|      GAN|      NA|
|      Balac Stephane|+33223236274|   110|          R1|      MC|false|   ANANUM|      NA|
|           Bauer Max|+33223236675|   734|          R1|      MC|false|      GAN|      NA|
|     Bava

### Schema

In [7]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- office: string (nullable = true)
 |-- organization: string (nullable = true)
 |-- position: string (nullable = true)
 |-- hdr: boolean (nullable = true)
 |-- team1: string (nullable = true)
 |-- team2: string (nullable = true)



### display

In [8]:
display(df)

DataFrame[name: string, phone: string, office: string, organization: string, position: string, hdr: boolean, team1: string, team2: string]

### select

In [9]:
df.select(df["name"], df["position"], df["organization"])

DataFrame[name: string, position: string, organization: string]

In [10]:
df.select(df["name"], df["position"], df["organization"]).show()

+--------------------+--------+------------+
|                name|position|organization|
+--------------------+--------+------------+
|       Alphonse Paul|     DOC|          R1|
|         Ammari Zied|      MC|          R1|
|         André Simon|     DOC|          R1|
|        Angst Jurgen|      MC|          R1|
|     Bailleul Ismaël|      MC|          R1|
|          Baker Mark|      PR|          R1|
|      Balac Stephane|      MC|          R1|
|           Bauer Max|      MC|          R1|
|     Bavard Juliette|      CR|        CNRS|
|    Beauchard Karine|      PR|          R1|
|        Bekka Bachir|      PR|          R1|
|         Bekka Karim|      MC|          R1|
|      Belgacem Maher|     DOC|         EXT|
|    Bellis Alexandre|     DOC|          R1|
|     Belmiloudi Aziz|      MC|        INSA|
|     Ben Elouefi Rim|     DOC|         EXT|
|   Benasseni Jacques|      PR|          R2|
|Bennani-Dosse Moh...|      MC|          R2|
|     Bernier Joachim|     DOC|          R1|
|    Berth

### filter

In [11]:
df.filter(df["organization"] == "R2").show()

+--------------------+------------+------+------------+--------+-----+---------+-----+
|                name|       phone|office|organization|position|  hdr|    team1|team2|
+--------------------+------------+------+------------+--------+-----+---------+-----+
|   Benasseni Jacques|+33299141822|    NA|          R2|      PR| true|     STAT|   NA|
|Bennani-Dosse Moh...|+33299141796|    NA|          R2|      MC|false|     STAT|   NA|
|Cornillon Pierre-...|+33299141819|    NA|          R2|      MC|false|     STAT|   NA|
|     Fromont Magalie|+33299053264|    NA|          R2|      PR| true|     STAT|   NA|
|Giacofci Joyce Ma...|+33299141800|    NA|          R2|      MC|false|     STAT|   NA|
|Klutchnikoff Nicolas|+33299141819|    NA|          R2|      MC|false|     STAT|   NA|
|     Le Guevel Ronan|+33299141800|    NA|          R2|      MC|false|PROC-STOC| STAT|
|           Mom Alain|+33299141808|    NA|          R2|      MC|false|     STAT|   NA|
|        Morvan Marie|+33223236670|    NA| 

### filter + select

In [12]:
df2 = df.filter(df["organization"] == "R2").select(df['name'],df['team1'])

In [13]:
df2.show()

+--------------------+---------+
|                name|    team1|
+--------------------+---------+
|   Benasseni Jacques|     STAT|
|Bennani-Dosse Moh...|     STAT|
|Cornillon Pierre-...|     STAT|
|     Fromont Magalie|     STAT|
|Giacofci Joyce Ma...|     STAT|
|Klutchnikoff Nicolas|     STAT|
|     Le Guevel Ronan|PROC-STOC|
|           Mom Alain|     STAT|
|        Morvan Marie|     STAT|
|     Pelletier Bruno|     STAT|
|    Rouviere Laurent|     STAT|
+--------------------+---------+



### orderBy

In [14]:
(df.filter(df["organization"] == "R2")
   .select(df["name"],df["position"])
   .orderBy("position")).show()

+--------------------+--------+
|                name|position|
+--------------------+--------+
|        Morvan Marie|     DOC|
|Cornillon Pierre-...|      MC|
|Bennani-Dosse Moh...|      MC|
|Giacofci Joyce Ma...|      MC|
|           Mom Alain|      MC|
|Klutchnikoff Nicolas|      MC|
|    Rouviere Laurent|      MC|
|     Le Guevel Ronan|      MC|
|   Benasseni Jacques|      PR|
|     Fromont Magalie|      PR|
|     Pelletier Bruno|      PR|
+--------------------+--------+



### groupBy

In [15]:
df.groupby(df["hdr"])

<pyspark.sql.group.GroupedData at 0x7f863a6d9610>

In [16]:
df.groupby(df["hdr"]).count().show()

+-----+-----+
|  hdr|count|
+-----+-----+
| true|  103|
|false|  141|
+-----+-----+



WARNING: Don't confuse GroupedData.count() with DataFrame.count(). GroupedData.count() is not an action. DataFrame.count() is an action.

In [17]:
df.filter(df["hdr"]).count()

103

In [18]:
df.filter(df['hdr']).select("name").show()

+--------------------+
|                name|
+--------------------+
|         Ammari Zied|
|     Bailleul Ismaël|
|          Baker Mark|
|    Beauchard Karine|
|        Bekka Bachir|
|     Belmiloudi Aziz|
|   Benasseni Jacques|
|    Berthelot Pierre|
|       Bourqui David|
|Breton Jean-Chris...|
|         Briane Marc|
|        Cadre Benoît|
|       Caloz Gabriel|
|        Cantat Serge|
|       Caruso Xavier|
|   Castella Francois|
|       Causeur David|
|   Cerveau Dominique|
|   Chartier Philippe|
|   Chauvet Guillaume|
+--------------------+
only showing top 20 rows



In [19]:
df.groupBy(df["organization"]).count().show()

+------------+-----+
|organization|count|
+------------+-----+
|         ENS|    3|
|        CNRS|   19|
|        INSA|   19|
|          R2|   11|
|       INRIA|    9|
|        AGRO|    5|
|         EXT|    2|
|          R1|  176|
+------------+-----+



### Exercises

- How many teachers from INSA (PR+MC) ?
- How many MC in STATS team ?
- How many MC+CR with HDR ?
- What is the ratio of student supervision (DOC / HDR) ?
- List number of people for every organization ?
- List number of HDR people for every team ?
- Which team contains most HDR ?
- List number of DOC students for every organization ?
- Which team contains most DOC ?
- List people from CNRS that are neither CR nor DR ?

In [20]:
sc.stop()