# Introduction to `pyspark.sql.DataFrame`s

Adapted from [Databrick's tutorial](https://docs.databricks.com/spark/latest/dataframes-datasets/introduction-to-dataframes-python.html)

## Installing a `pyspark` Anaconda virtual environment

Use Anaconda Navigator to create a virtual environment with the following packages installed

#### `pyspark` Stuff
1. `openjdk` to install Java,
2. `pyspark` to install `spark` and `pyspark`, and
3. `findspark` to (possibly) deal with any issues finding `spark`.

#### The usual suspects - data management

1. `pandas`
2. `polars`
3. `pyarrow[all]`

#### The usual suspects - visualization and ML

1. `scikit-learn`
2. `seaborn`
3. `plotnine`
4. `altair`

In [4]:
# import pyspark class Row from module sql
from pyspark.sql import SparkSession

## What is spark?

* Build for the Hadoop platform
* Replacement of MapReduce
* Second-generation optimization
    * Lazy
    * Optimized
    * Persistent data structures
* Written in scala

## Ok ... so what's Hadoop?

* Distributed computing platform
* [Used by lots of companies](https://wiki.apache.org/hadoop/PoweredBy)
* Becoming an industry standard


## What is `pyspark`?

* Python interface to spark
* Needs a spark session
    * `session` $\leftrightarrow$ spark


## Step 0 - Create a spark session

`pyspark` (Python) communicates with `spark` (JVM via Scala) through a session

In [5]:
spark = SparkSession.builder.appName('Ops').getOrCreate()

## Overview -  `pyspark.DataFrame`

* A `DataFrame` is a collection of `Row`s
* `Row`s can be distributed over many machines
* `spark`
    * Hides the messy details
    * Optimizes operations

## How to think about a `pyspark.DataFrame`

<img src="https://github.com/wsu-stat489/module5_intro_to_pyspark/blob/main/img/pyspark_df.png?raw=1" width=600>

## Reading a `csv` file with `spark.read.csv`

#### `read.csv` is lazy

In [6]:
(heroes := 
 spark.read.csv('./data/heroes_information.csv', header=True)
)

DataFrame[ID: string, name: string, Gender: string, Eye color: string, Race: string, Hair color: string, Height: string, Publisher: string, Skin color: string, Alignment: string, Weight: string]

## Example - `filter` and `collect`

#### `filter` is lazy

In [60]:
from pyspark.sql.functions import col

(heroes
 .filter(col('Eye color') == 'yellow')
)

DataFrame[ID: string, name: string, Gender: string, Eye color: string, Race: string, Hair color: string, Height: string, Publisher: string, Skin color: string, Alignment: string, Weight: string]

#### `limit` is lazy

In [61]:
from pyspark.sql.functions import col

(heroes
 .filter(col('Eye color') == 'yellow')
 .limit(5)
)

DataFrame[ID: string, name: string, Gender: string, Eye color: string, Race: string, Hair color: string, Height: string, Publisher: string, Skin color: string, Alignment: string, Weight: string]

#### `take` is eager

In [62]:
from pyspark.sql.functions import col

(heroes
 .filter(col('Eye color') == 'yellow')
 .take(5)
)

[Row(ID='0', name='A-Bomb', Gender='Male', Eye color='yellow', Race='Human', Hair color='No Hair', Height='203', Publisher='Marvel Comics', Skin color='-', Alignment='good', Weight='441'),
 Row(ID='24', name='Angel Dust', Gender='Female', Eye color='yellow', Race='Mutant', Hair color='Black', Height='165', Publisher='Marvel Comics', Skin color='-', Alignment='good', Weight='57'),
 Row(ID='31', name='Anti-Monitor', Gender='Male', Eye color='yellow', Race='God / Eternal', Hair color='No Hair', Height='61', Publisher='DC Comics', Skin color='-', Alignment='bad', Weight='-99'),
 Row(ID='56', name='Azazel', Gender='Male', Eye color='yellow', Race='Neyaphem', Hair color='Black', Height='183', Publisher='Marvel Comics', Skin color='red', Alignment='bad', Weight='67'),
 Row(ID='207', name='Darth Vader', Gender='Male', Eye color='yellow', Race='Cyborg', Hair color='No Hair', Height='198', Publisher='George Lucas', Skin color='-', Alignment='bad', Weight='135')]

#### `collect` is eager

In [65]:
from pyspark.sql.functions import col

(heroes
 .filter(col('Eye color') == 'yellow')
#  .limit(5)
 .collect()
)

[Row(ID='0', name='A-Bomb', Gender='Male', Eye color='yellow', Race='Human', Hair color='No Hair', Height='203', Publisher='Marvel Comics', Skin color='-', Alignment='good', Weight='441'),
 Row(ID='24', name='Angel Dust', Gender='Female', Eye color='yellow', Race='Mutant', Hair color='Black', Height='165', Publisher='Marvel Comics', Skin color='-', Alignment='good', Weight='57'),
 Row(ID='31', name='Anti-Monitor', Gender='Male', Eye color='yellow', Race='God / Eternal', Hair color='No Hair', Height='61', Publisher='DC Comics', Skin color='-', Alignment='bad', Weight='-99'),
 Row(ID='56', name='Azazel', Gender='Male', Eye color='yellow', Race='Neyaphem', Hair color='Black', Height='183', Publisher='Marvel Comics', Skin color='red', Alignment='bad', Weight='67'),
 Row(ID='207', name='Darth Vader', Gender='Male', Eye color='yellow', Race='Cyborg', Hair color='No Hair', Height='198', Publisher='George Lucas', Skin color='-', Alignment='bad', Weight='135'),
 Row(ID='209', name='Data', Gende

### Why is `pyspark` so slow?

* Optimized for 
    * Distributed computation
    * Big data 
* Not great for
    * Local work on
    * Small data

### Why is `pyspark` so fast?

* Distributed nature $\longrightarrow$ leverage multi-core CPU,
* Data model can optimize data access via predicate/projection/slice pushdown,
* Lazy evaluation allow optimized memory usages (e.g., for a grouped aggregation), and
* Arrow allows FAST implementation of `pandas` user defined functions (UDF).

See [this article](https://www.databricks.com/blog/2018/05/03/benchmarking-apache-spark-on-a-single-node-machine.html) for details.

## `filter` and `collect` illustrated

<img src="https://github.com/wsu-stat489/module5_intro_to_pyspark/blob/main/img/pyspark_filter_collect.gif?raw=1" width=600>

## Inspecting the column types

In [66]:
heros.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Eye color: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Hair color: string (nullable = true)
 |-- Height: double (nullable = true)
 |-- Publisher: string (nullable = true)
 |-- Skin color: string (nullable = true)
 |-- Alignment: string (nullable = true)
 |-- Weight: double (nullable = true)



## Gathering results in `pyspark.sql`

* **Important fact** All `pyspark` queries end in a collection method
* **Why?** Data is (possibly) spread across many machines
* <font color = "red"> **Warning** This might be is *expensive*! Know how much data your are requesting! </font>

## Gathering methods

Here are the default methods for gathering the results.

* `collect` returns all values
* `take(n)` returns the first `n` values 
* `sample(n)` returns `n` randomly selected values

**Note.** These are combersome, as they return a list of `Row`s :(

### Inspecting the content - `take`

In [67]:
heros.take(5) # BAD!!!

[Row(Id=0, name='A-Bomb', Gender='Male', Eye color='yellow', Race='Human', Hair color='No Hair', Height=203.0, Publisher='Marvel Comics', Skin color=None, Alignment='good', Weight=441.0),
 Row(Id=1, name='Abe Sapien', Gender='Male', Eye color='blue', Race='Icthyo Sapien', Hair color='No Hair', Height=191.0, Publisher='Dark Horse Comics', Skin color='blue', Alignment='good', Weight=65.0),
 Row(Id=2, name='Abin Sur', Gender='Male', Eye color='blue', Race='Ungaran', Hair color='No Hair', Height=185.0, Publisher='DC Comics', Skin color='red', Alignment='good', Weight=90.0),
 Row(Id=3, name='Abomination', Gender='Male', Eye color='green', Race='Human / Radiation', Hair color='No Hair', Height=203.0, Publisher='Marvel Comics', Skin color=None, Alignment='bad', Weight=441.0),
 Row(Id=4, name='Abraxas', Gender='Male', Eye color='blue', Race='Cosmic Entity', Hair color='Black', Height=-99.0, Publisher='Marvel Comics', Skin color=None, Alignment='bad', Weight=-99.0)]

## Inspecting the whole table - `collect`

In [68]:
heros.collect() # BAD!!!1!

[Row(Id=0, name='A-Bomb', Gender='Male', Eye color='yellow', Race='Human', Hair color='No Hair', Height=203.0, Publisher='Marvel Comics', Skin color=None, Alignment='good', Weight=441.0),
 Row(Id=1, name='Abe Sapien', Gender='Male', Eye color='blue', Race='Icthyo Sapien', Hair color='No Hair', Height=191.0, Publisher='Dark Horse Comics', Skin color='blue', Alignment='good', Weight=65.0),
 Row(Id=2, name='Abin Sur', Gender='Male', Eye color='blue', Race='Ungaran', Hair color='No Hair', Height=185.0, Publisher='DC Comics', Skin color='red', Alignment='good', Weight=90.0),
 Row(Id=3, name='Abomination', Gender='Male', Eye color='green', Race='Human / Radiation', Hair color='No Hair', Height=203.0, Publisher='Marvel Comics', Skin color=None, Alignment='bad', Weight=441.0),
 Row(Id=4, name='Abraxas', Gender='Male', Eye color='blue', Race='Cosmic Entity', Hair color='Black', Height=-99.0, Publisher='Marvel Comics', Skin color=None, Alignment='bad', Weight=-99.0),
 Row(Id=5, name='Absorbing M

## Converting to `pandas` using `pyarrow`

If we have `pyarrow` installed, we can use the `toPandas` method to collect the data and convert to `pandas`

#### Use `limit` to collect the head.

In [70]:
heroes.limit(5).toPandas() # Good!

Unnamed: 0,ID,name,Gender,Eye color,Race,Hair color,Height,Publisher,Skin color,Alignment,Weight
0,0,A-Bomb,Male,yellow,Human,No Hair,203,Marvel Comics,-,good,441
1,1,Abe Sapien,Male,blue,Icthyo Sapien,No Hair,191,Dark Horse Comics,blue,good,65
2,2,Abin Sur,Male,blue,Ungaran,No Hair,185,DC Comics,red,good,90
3,3,Abomination,Male,green,Human / Radiation,No Hair,203,Marvel Comics,-,bad,441
4,4,Abraxas,Male,blue,Cosmic Entity,Black,-99,Marvel Comics,-,bad,-99


#### Use `sample` and `toPandas` to get a random sample.

In [71]:
(sample := 
 heros
 .sample(fraction=0.01)
).toPandas()

Unnamed: 0,Id,name,Gender,Eye color,Race,Hair color,Height,Publisher,Skin color,Alignment,Weight
0,126,Boba Fett,Male,brown,Human / Clone,Black,183.0,George Lucas,,bad,-99.0
1,175,Chuck Norris,Male,,,,178.0,,,good,-99.0
2,199,Daphne Powell,Female,,,,-99.0,ABC Studios,,good,-99.0
3,304,Guy Gardner,Male,blue,Human-Vuldarian,Red,188.0,DC Comics,,good,95.0
4,317,Hawkwoman II,Female,,,,-99.0,DC Comics,,good,-99.0
5,443,Mephisto,Male,white,,Black,198.0,Marvel Comics,,bad,140.0
6,460,Mister Sinister,Male,red,Human / Altered,Black,196.0,Marvel Comics,,bad,128.0
7,469,Monica Dawson,Female,,,,-99.0,NBC - Heroes,,good,-99.0
8,478,Multiple Man,Male,blue,,Brown,180.0,Marvel Comics,,good,70.0
9,505,Oracle,Female,blue,Human,Red,178.0,DC Comics,,good,59.0


#### Use `toPandas` to collect the whole table (careful...)

In [73]:
heroes.toPandas()

Unnamed: 0,ID,name,Gender,Eye color,Race,Hair color,Height,Publisher,Skin color,Alignment,Weight
0,0,A-Bomb,Male,yellow,Human,No Hair,203,Marvel Comics,-,good,441
1,1,Abe Sapien,Male,blue,Icthyo Sapien,No Hair,191,Dark Horse Comics,blue,good,65
2,2,Abin Sur,Male,blue,Ungaran,No Hair,185,DC Comics,red,good,90
3,3,Abomination,Male,green,Human / Radiation,No Hair,203,Marvel Comics,-,bad,441
4,4,Abraxas,Male,blue,Cosmic Entity,Black,-99,Marvel Comics,-,bad,-99
...,...,...,...,...,...,...,...,...,...,...,...
729,729,Yellowjacket II,Female,blue,Human,Strawberry Blond,165,Marvel Comics,-,good,52
730,730,Ymir,Male,white,Frost Giant,No Hair,304.8,Marvel Comics,white,good,-99
731,731,Yoda,Male,brown,Yoda's species,White,66,George Lucas,green,good,17
732,732,Zatanna,Female,blue,Human,Black,170,DC Comics,-,good,57


## Houston, we have a problem! (Did you notice?)

<img src="https://github.com/wsu-stat489/module5_intro_to_pyspark/blob/main/img/pyspark_missing_values.png?raw=1" width=400>

### Specifying a `nullValue`

In [74]:
(heros := 
 spark.read.csv('./data/heroes_information.csv', header=True, nullValue='-')
).limit(5).toPandas()

Unnamed: 0,ID,name,Gender,Eye color,Race,Hair color,Height,Publisher,Skin color,Alignment,Weight
0,0,A-Bomb,Male,yellow,Human,No Hair,203,Marvel Comics,,good,441
1,1,Abe Sapien,Male,blue,Icthyo Sapien,No Hair,191,Dark Horse Comics,blue,good,65
2,2,Abin Sur,Male,blue,Ungaran,No Hair,185,DC Comics,red,good,90
3,3,Abomination,Male,green,Human / Radiation,No Hair,203,Marvel Comics,,bad,441
4,4,Abraxas,Male,blue,Cosmic Entity,Black,-99,Marvel Comics,,bad,-99


### Did you notice?

<img src="https://github.com/wsu-stat489/module5_intro_to_pyspark/blob/main/img/pyspark_default_types.png?raw=1" width=400>

Default type is a string

### Letting `spark` guess the types

Set `inferScheme=True` 

In [75]:
(heros := 
 spark.read.csv('./data/heroes_information.csv', header=True, inferSchema=True, nullValue='-')
)

DataFrame[ID: int, name: string, Gender: string, Eye color: string, Race: string, Hair color: string, Height: double, Publisher: string, Skin color: string, Alignment: string, Weight: int]

## Checking the column types after `inferScheme`

In this case, `spark` guessed correctly

In [76]:
heros.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Eye color: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Hair color: string (nullable = true)
 |-- Height: double (nullable = true)
 |-- Publisher: string (nullable = true)
 |-- Skin color: string (nullable = true)
 |-- Alignment: string (nullable = true)
 |-- Weight: integer (nullable = true)



## Inspecting the content - `limit(5).toPandas()`

In [77]:
heros.limit(5).toPandas()

Unnamed: 0,ID,name,Gender,Eye color,Race,Hair color,Height,Publisher,Skin color,Alignment,Weight
0,0,A-Bomb,Male,yellow,Human,No Hair,203.0,Marvel Comics,,good,441
1,1,Abe Sapien,Male,blue,Icthyo Sapien,No Hair,191.0,Dark Horse Comics,blue,good,65
2,2,Abin Sur,Male,blue,Ungaran,No Hair,185.0,DC Comics,red,good,90
3,3,Abomination,Male,green,Human / Radiation,No Hair,203.0,Marvel Comics,,bad,441
4,4,Abraxas,Male,blue,Cosmic Entity,Black,-99.0,Marvel Comics,,bad,-99


## Explicit `schema` specification

Format is `add(name, type, nullable?)`

In [81]:
from pyspark.sql.types import StructType
from pyspark.sql.types import DoubleType, StringType, IntegerType

hero_schema = (StructType()
  .add('Id', IntegerType(), False)
  .add('name', StringType(), True)
  .add('Gender', StringType(), True)
  .add('Eye color', StringType(), True)
  .add('Race', StringType(), True)
  .add('Hair color', StringType(), True)
  .add('Height', DoubleType(), True)
  .add('Publisher', StringType(), True)
  .add('Skin color', StringType(), True)
  .add('Alignment', StringType(), True)
  .add('Weight', DoubleType(), True))

(heros := 
 spark.read.csv('./data/heroes_information.csv', header=True, schema=hero_schema, nullValue='-')
).limit(5).toPandas()

Unnamed: 0,Id,name,Gender,Eye color,Race,Hair color,Height,Publisher,Skin color,Alignment,Weight
0,0,A-Bomb,Male,yellow,Human,No Hair,203.0,Marvel Comics,,good,441.0
1,1,Abe Sapien,Male,blue,Icthyo Sapien,No Hair,191.0,Dark Horse Comics,blue,good,65.0
2,2,Abin Sur,Male,blue,Ungaran,No Hair,185.0,DC Comics,red,good,90.0
3,3,Abomination,Male,green,Human / Radiation,No Hair,203.0,Marvel Comics,,bad,441.0
4,4,Abraxas,Male,blue,Cosmic Entity,Black,-99.0,Marvel Comics,,bad,-99.0


## `pyspark.sql` queries are like `SQL` queries

#### Filter, group, and aggregate (categorical)

In [82]:
(heros
.where(col('Gender') == 'Male')
.groupby('Eye color')
.count()
.limit(5)
).toPandas()

Unnamed: 0,Eye color,count
0,grey,6
1,green,30
2,yellow,16
3,bown,1
4,,121


#### Group by multiple and aggregate (categorical)

In [83]:
(heros
 .groupby('Eye color', 'Gender')
 .count()
 .limit(5)
).toPandas()

Unnamed: 0,Eye color,Gender,count
0,yellow (without irises),,1
1,green,Male,30
2,violet,Female,2
3,hazel,Female,3
4,blue,Male,143


## <font color="red"> Exercise 4.2 </font>

First, define a `schema` and read in `./data/super_hero_powers.csv`, then perform `pyspark.sql` queries to answer each of the following questions.

1. How many heroes have both Super Strength and Super Speed?
2. How many heroes have names that start with the word *Black*
3. Are heroes with Agility more likely to have Stealth?
4. What fraction of all heroes that can fly also have Super Strength?
5. Consider heroes that have names that contain `"girl"`, `"boy"`, `"woman"`, or `"man"`.  Compute the following ratio

$$\frac{N(\text{boy or man})}{N(\text{girl or woman}}$$

**Hint:** You will need to use some combination of `where`, `group_by`, and `count` for each part.

In [49]:
%%bash

ls -alG data | grep hero

-rwx------@  1 bn8210wy  staff      46276 Nov  4 08:04 [31mheroes_information.csv[m[m
-rw-------@  1 bn8210wy  staff     672305 Oct 18  2022 super_hero_powers.csv


In [50]:
from pyspark.sql.types import StructType
from pyspark.sql.types import BooleanType, StringType 

# Your code here

In [31]:
# Your code here

# Appendix

## Creating a `Row` of data

* Use the `Row` class
* Pass data using keywords
    * key == column name
    * value == cell value

In [62]:
department1 = Row(id='123456', name='Computer Science')
department1

Row(id='123456', name='Computer Science')

## Unpacking a `Row` dictionary

* Data is in a row dictionary
* Unpack keywords using `**`

In [63]:
dept2_info = {'id':'789012', 'name':'Mechanical Engineering'}
department2 = Row(**dept2_info)
department2

Row(id='789012', name='Mechanical Engineering')

## Unpacking a list of row dictionaries

In [64]:
dept_info = [{'id':123456, 'name':'Computer Science'},
             {'id':789012, 'name':'Mechanical Engineering'},
             {'id':345678, 'name':'Theater and Drama'},
             {'id':901234, 'name':'Indoor Recreation'}]

dept_rows = [Row(**r) for r in dept_info]
dept_rows

[Row(id=123456, name='Computer Science'),
 Row(id=789012, name='Mechanical Engineering'),
 Row(id=345678, name='Theater and Drama'),
 Row(id=901234, name='Indoor Recreation')]

## Access `Row` content with column attributes

In [65]:
[dept.id for dept in dept_rows]

[123456, 789012, 345678, 901234]

In [66]:
[dept.name for dept in dept_rows]

['Computer Science',
 'Mechanical Engineering',
 'Theater and Drama',
 'Indoor Recreation']

## Creating a `pyspark.DataFrame`

* A `DataFrame` is a collection of `Row`s
* Create with spark.createDataFrame
* Need to have a 

In [67]:
df = spark.createDataFrame(dept_rows)
df

ConnectionRefusedError: [Errno 61] Connection refused

## Creating rows from list of data

## Creating a Row class

* Pass `Row` the columns names
* Creates a specialized `Row` class

In [68]:
Employee = Row("firstName", "lastName", "email", "salary")
Employee

<Row('firstName', 'lastName', 'email', 'salary')>

## Creating a `Employee` instance

* Pass the data to `Employee` to make a row
* Order matters ... use the same order as names

In [42]:
Employee = Row("firstName", "lastName", "email", "salary")
employee1 = Employee('michael', 'armbrust', 'no-reply@berkeley.edu', 100000)
employee1

Row(firstName='michael', lastName='armbrust', email='no-reply@berkeley.edu', salary=100000)

## Unpacking a data list

* Suppose the data is in a list/tuple.
* Use sequence unpacking with `*`

In [43]:
empl2_info = ('xiangrui', 'meng', 'no-reply@stanford.edu', 120000)
empl2_info

('xiangrui', 'meng', 'no-reply@stanford.edu', 120000)

In [44]:
employee2 = Employee(*empl2_info)
employee2

Row(firstName='xiangrui', lastName='meng', email='no-reply@stanford.edu', salary=120000)

## Unpacking 

In [45]:
# Create the Employees
Employee = Row("firstName", "lastName", "email", "salary")
employees = [('michael', 'armbrust', 'no-reply@berkeley.edu', 100000),
             ('xiangrui', 'meng', 'no-reply@stanford.edu', 120000),
             ('matei', None, 'no-reply@waterloo.edu', 140000),
             (None, 'wendell', 'no-reply@berkeley.edu', 160000)]
emp_rows = [Employee(*r) for r in employees]
emp_rows

[Row(firstName='michael', lastName='armbrust', email='no-reply@berkeley.edu', salary=100000),
 Row(firstName='xiangrui', lastName='meng', email='no-reply@stanford.edu', salary=120000),
 Row(firstName='matei', lastName=None, email='no-reply@waterloo.edu', salary=140000),
 Row(firstName=None, lastName='wendell', email='no-reply@berkeley.edu', salary=160000)]