<a href="https://colab.research.google.com/github/vafter341ew/COLAB/blob/main/6_2_intro_to_pyspark_dataframes.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#### Colab Prep

Execute the following code cells to whenever you open/restart the notebook in Google Colab.

In [None]:
!wget https://github.com/WSU-DataScience/dsci_325_module6_basic_data_management_in_python/raw/main/sample_data.zip

In [None]:
!unzip ./sample_data.zip

Archive:  ./sample_data.zip
replace __MACOSX/._sample_data? [y]es, [n]o, [A]ll, [N]one, [r]ename: 

# Introduction to `pyspark.sql.DataFrame`s

Adapted from [Databrick's tutorial](https://docs.databricks.com/spark/latest/dataframes-datasets/introduction-to-dataframes-python.html)

## Installing a `pyspark` Anaconda virtual environment

Use Anaconda Navigator to create a virtual environment with the following packages installed

#### `pyspark` Stuff
1. `openjdk` to install Java,
2. `pyspark` to install `spark` and `pyspark`, and
3. `findspark` to (possibly) deal with any issues finding `spark`.

#### The usual suspects - data management

1. `pandas`
2. `polars`
3. `pyarrow[all]`

#### The usual suspects - visualization and ML

1. `scikit-learn`
2. `seaborn`
3. `plotnine`

In [None]:
import pandas as pd

pd.__version__

In [None]:
import pyarrow

pyarrow.__version__

In [None]:
# import pyspark class Row from module sql
from pyspark.sql import SparkSession

## What is spark?

* Build for the Hadoop platform
* Replacement of MapReduce
* Second-generation optimization
    * Lazy
    * Optimized
    * Persistent data structures
* Written in scala

## Ok ... so what's Hadoop?

* Distributed computing platform
* [Used by lots of companies](https://wiki.apache.org/hadoop/PoweredBy)
* Becoming an industry standard


## What is `pyspark`?

* Python interface to spark
* Needs a spark session
    * `session` $\leftrightarrow$ spark


## Step 0 - Create a spark session

`pyspark` (Python) communicates with `spark` (JVM via Scala) through a session

In [None]:
spark = SparkSession.builder.appName('Ops').getOrCreate()

## Overview -  `pyspark.DataFrame`

* A `DataFrame` is a collection of `Row`s
* `Row`s can be distributed over many machines
* `spark`
    * Hides the messy details
    * Optimizes operations

## How to think about a `pyspark.DataFrame`

<img src="https://github.com/wsu-stat489/module5_intro_to_pyspark/blob/main/img/pyspark_df.png?raw=1" width=600>

## Reading a `csv` file with `spark.read.csv`

#### `read.csv` is lazy

In [None]:
(heroes :=
 spark.read.csv('./sample_data/heroes_information.csv', header=True)
)

## Example - `filter` and `collect`

#### `filter` is lazy

In [None]:
from pyspark.sql.functions import col

(heroes
 .filter(col('Eye color') == 'yellow')
)

#### `limit` is lazy

In [None]:
from pyspark.sql.functions import col

(heroes
 .filter(col('Eye color') == 'yellow')
 .limit(5)
)

#### `take` is eager

In [None]:
from pyspark.sql.functions import col

(heroes
 .filter(col('Eye color') == 'yellow')
 .take(5)
)

#### `collect` is eager

In [None]:
from pyspark.sql.functions import col

(heroes
 .filter(col('Eye color') == 'yellow')
#  .limit(5)
 .collect()
)

### Why is `pyspark` so slow?

* Optimized for
    * Distributed computation
    * Big data
* Not great for
    * Local work on
    * Small data

### Why is `pyspark` so fast?

* Distributed nature $\longrightarrow$ leverage multi-core CPU,
* Data model can optimize data access via predicate/projection/slice pushdown,
* Lazy evaluation allow optimized memory usages (e.g., for a grouped aggregation), and
* Arrow allows FAST implementation of `pandas` user defined functions (UDF).

See [this article](https://www.databricks.com/blog/2018/05/03/benchmarking-apache-spark-on-a-single-node-machine.html) for details.

## `filter` and `collect` illustrated

<img src="https://github.com/wsu-stat489/module5_intro_to_pyspark/blob/main/img/pyspark_filter_collect.gif?raw=1" width=600>

## Inspecting the column types

In [None]:
heroes.printSchema()

## Gathering results in `pyspark.sql`

* **Important fact** All `pyspark` queries end in a collection method
* **Why?** Data is (possibly) spread across many machines
* <font color = "red"> **Warning** This might be is *expensive*! Know how much data your are requesting! </font>

## Gathering methods

Here are the default methods for gathering the results.

* `collect` returns all values
* `take(n)` returns the first `n` values
* `sample(n)` returns `n` randomly selected values

**Note.** These are combersome, as they return a list of `Row`s :(

### Inspecting the content - `take`

In [None]:
heroes.take(5) # BAD!!!

## Inspecting the whole table - `collect`

In [None]:
heroes.collect() # BAD!!!1!

## Converting to `pandas` using `pyarrow`

If we have `pyarrow` installed, we can use the `toPandas` method to collect the data and convert to `pandas`

#### Use `limit` to collect the head.

In [None]:
heroes.limit(5).toPandas() # Good!

#### Use `sample` and `toPandas` to get a random sample.

In [None]:
(sample :=
 heroes
 .sample(fraction=0.01)
).toPandas()

#### Use `toPandas` to collect the whole table (careful...)

In [None]:
heroes.toPandas()

## Houston, we have a problem! (Did you notice?)

<img src="https://github.com/wsu-stat489/module5_intro_to_pyspark/blob/main/img/pyspark_missing_values.png?raw=1" width=400>

### Specifying a `nullValue`

In [None]:
(heros :=
 spark.read.csv('./sample_data/heroes_information.csv', header=True, nullValue='-')
).limit(5).toPandas()

### Did you notice?

<img src="https://github.com/wsu-stat489/module5_intro_to_pyspark/blob/main/img/pyspark_default_types.png?raw=1" width=400>

Default type is a string

### Letting `spark` guess the types

Set `inferScheme=True`

In [None]:
(heros :=
 spark.read.csv('./sample_data/heroes_information.csv', header=True, inferSchema=True, nullValue='-')
)

## Checking the column types after `inferScheme`

In this case, `spark` guessed correctly

In [None]:
heros.printSchema()

## Inspecting the content - `limit(5).toPandas()`

In [None]:
heros.limit(5).toPandas()

## Explicit `schema` specification

Format is `add(name, type, nullable?)`

In [None]:
from pyspark.sql.types import StructType
from pyspark.sql.types import DoubleType, StringType, IntegerType

hero_schema = (StructType()
  .add('Id', IntegerType(), False)
  .add('name', StringType(), True)
  .add('Gender', StringType(), True)
  .add('Eye color', StringType(), True)
  .add('Race', StringType(), True)
  .add('Hair color', StringType(), True)
  .add('Height', DoubleType(), True)
  .add('Publisher', StringType(), True)
  .add('Skin color', StringType(), True)
  .add('Alignment', StringType(), True)
  .add('Weight', DoubleType(), True))

(heros :=
 spark.read.csv('./sample_data/heroes_information.csv', header=True, schema=hero_schema, nullValue='-')
).limit(5).toPandas()

## `pyspark.sql` queries are like `SQL` queries

#### Filter, group, and aggregate (categorical)

In [None]:
(heros
.where(col('Gender') == 'Male')
.groupby('Eye color')
.count()
.limit(5)
).toPandas()

#### Group by multiple and aggregate (categorical)

In [None]:
(heros
 .groupby('Eye color', 'Gender')
 .count()
 .limit(5)
).toPandas()

## <font color="red"> Exercise 4.2 </font>

First, define a `schema` and read in `./data/super_hero_powers.csv`, then perform `pyspark.sql` queries to answer each of the following questions.

1. How many heroes have both Super Strength and Super Speed?
2. How many heroes have names that start with the word *Black*
3. Are heroes with Agility more likely to have Stealth?
4. What fraction of all heroes that can fly also have Super Strength?
5. Consider heroes that have names that contain `"girl"`, `"boy"`, `"woman"`, or `"man"`.  Compute the following ratio

$$\frac{N(\text{boy or man})}{N(\text{girl or woman})}$$

**Hint:** You will need to use some combination of `where`, `group_by`, and `count` for each part.

In [None]:
%%bash

ls -alG sample_data | grep hero

In [None]:
!curl https://raw.githubusercontent.com/DSCI-326/DSCI326_module_6_lazy_operations/refs/heads/main/data/super_hero_powers.csv -0 ./sample_data/super_hero_powers.csv



In [None]:
(powers := spark.read.csv('./sample_data/super_hero_powers.csv', header=True, inferSchema=True))

In [None]:
from pyspark.sql.types import StructType
from pyspark.sql.types import BooleanType, StringType

# 1.
(powers
.where(col('Super Strength') == 'True')
.where(col('Super Speed') == 'True')
.count()
).toPandas()

In [None]:
#2.
(powers.name.contains('Black'))


In [None]:
#3.
(powers
.where(col('Agility') == 'True')
.where(col('Stealth') == 'True')
.count()
).toPandas()

In [None]:
#4.
(powers
.where(col('Flight') == 'True')
.where(col('Super Strength') == 'True')
.count()
).toPandas()

In [None]:
#5.
(powers.where(col('name').contains('girl') | col('name').contains('boy') | col('name').contains('woman') | col('name').contains('man'))
.count())

# Appendix

## Creating a `Row` of data

* Use the `Row` class
* Pass data using keywords
    * key == column name
    * value == cell value

In [None]:
from pyspark.sql import Row

department1 = Row(id='123456', name='Computer Science')
department1

## Unpacking a `Row` dictionary

* Data is in a row dictionary
* Unpack keywords using `**`

In [None]:
dept2_info = {'id':'789012', 'name':'Mechanical Engineering'}
department2 = Row(**dept2_info)
department2

## Unpacking a list of row dictionaries

In [None]:
dept_info = [{'id':123456, 'name':'Computer Science'},
             {'id':789012, 'name':'Mechanical Engineering'},
             {'id':345678, 'name':'Theater and Drama'},
             {'id':901234, 'name':'Indoor Recreation'}]

dept_rows = [Row(**r) for r in dept_info]
dept_rows

## Access `Row` content with column attributes

In [None]:
[dept.id for dept in dept_rows]

In [None]:
[dept.name for dept in dept_rows]

## Creating a `pyspark.DataFrame`

* A `DataFrame` is a collection of `Row`s
* Create with spark.createDataFrame
* Need to have a

In [None]:
df = spark.createDataFrame(dept_rows)
df

## Creating rows from list of data

## Creating a Row class

* Pass `Row` the columns names
* Creates a specialized `Row` class

In [None]:
Employee = Row("firstName", "lastName", "email", "salary")
Employee

## Creating a `Employee` instance

* Pass the data to `Employee` to make a row
* Order matters ... use the same order as names

In [None]:
Employee = Row("firstName", "lastName", "email", "salary")
employee1 = Employee('michael', 'armbrust', 'no-reply@berkeley.edu', 100000)
employee1

## Unpacking a data list

* Suppose the data is in a list/tuple.
* Use sequence unpacking with `*`

In [None]:
empl2_info = ('xiangrui', 'meng', 'no-reply@stanford.edu', 120000)
empl2_info

In [None]:
employee2 = Employee(*empl2_info)
employee2

## Unpacking

In [None]:
# Create the Employees
Employee = Row("firstName", "lastName", "email", "salary")
employees = [('michael', 'armbrust', 'no-reply@berkeley.edu', 100000),
             ('xiangrui', 'meng', 'no-reply@stanford.edu', 120000),
             ('matei', None, 'no-reply@waterloo.edu', 140000),
             (None, 'wendell', 'no-reply@berkeley.edu', 160000)]
emp_rows = [Employee(*r) for r in employees]
emp_rows