# Stacking and Unstacking data in `pyspark`

## Welcome back to developer's corner

While `pyspark` doesn't provide explicit methods for these two actions, we will be able to accomplish each operation using a combination of methods.  In this lecture, we will 

1. Build up expressions for stacking and unstacking a `pyspark df` 
2. Create helper functions for each operation to automate these processes.

#### `pyspark` setup

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Ops').config('spark.driver.host', 'localhost').getOrCreate()

In [2]:
from more_pyspark import to_pandas
salesk = spark.read.csv("./data/auto_sales.csv", header=True, inferSchema=True)
salesk.collect() >> to_pandas

Unnamed: 0,Salesperson,Compact,Sedan,SUV,Truck
0,Ann,22,18,15.0,12
1,Bob,19,12,17.0,20
2,Doug,20,13,,20
3,Yolanda,19,8,32.0,15
4,Xerxes,12,23,18.0,9


## Stacking data in `pyspark`

Two step process

1. Combine the columns to be stacked into an array
2. Explode the array

[Source](https://www.mien.in/2018/03/25/reshaping-dataframe-using-pivot-and-melt-in-apache-spark-and-pandas/)

#### A helper function that combines multiple column entries into a column array

In [3]:
from pyspark.sql.functions import array, explode, struct, lit, col
make_array = lambda var_name, val_name, cols: (array(*(struct(lit(c).alias(var_name), 
                                                              col(c).alias(val_name))
                                                       for c in cols)))

sales_cols = ['Compact','Sedan', 'SUV', 'Truck']
make_array('car_type', 'qtr_sales', sales_cols)

Column<'array(struct(Compact AS `car_type`, Compact AS `qtr_sales`), struct(Sedan AS `car_type`, Sedan AS `qtr_sales`), struct(SUV AS `car_type`, SUV AS `qtr_sales`), struct(Truck AS `car_type`, Truck AS `qtr_sales`))'>

#### Testing `make_array`

In [4]:
from pyspark.sql.functions import array, explode, struct, lit, col, collect_list
(salesk
 .withColumn('sales', 
             make_array('car_type', 
                        'qtr_sales', 
                        sales_cols))
 .collect()) >> to_pandas

Unnamed: 0,Salesperson,Compact,Sedan,SUV,Truck,sales
0,Ann,22,18,15.0,12,"[(Compact, 22), (Sedan, 18), (SUV, 15), (Truck..."
1,Bob,19,12,17.0,20,"[(Compact, 19), (Sedan, 12), (SUV, 17), (Truck..."
2,Doug,20,13,,20,"[(Compact, 20), (Sedan, 13), (SUV, None), (Tru..."
3,Yolanda,19,8,32.0,15,"[(Compact, 19), (Sedan, 8), (SUV, 32), (Truck,..."
4,Xerxes,12,23,18.0,9,"[(Compact, 12), (Sedan, 23), (SUV, 18), (Truck..."


#### `explode` the contents to the array column to stack entries

In [5]:
t = (salesk
    .withColumn('sales', 
                make_array('car_type', 
                           'qtr_sales', 
                           sales_cols))
    .withColumn("vars_and_vals", explode(col('sales'))))
t.take(3) >> to_pandas

Unnamed: 0,Salesperson,Compact,Sedan,SUV,Truck,sales,vars_and_vals
0,Ann,22,18,15,12,"[(Compact, 22), (Sedan, 18), (SUV, 15), (Truck...","(Compact, 22)"
1,Ann,22,18,15,12,"[(Compact, 22), (Sedan, 18), (SUV, 15), (Truck...","(Sedan, 18)"
2,Ann,22,18,15,12,"[(Compact, 22), (Sedan, 18), (SUV, 15), (Truck...","(SUV, 15)"


#### Checking out the schema of the result

Note that the exploded entries are named `'car_type'` and `'qtr_sales'`

In [6]:
t.printSchema()

root
 |-- Salesperson: string (nullable = true)
 |-- Compact: integer (nullable = true)
 |-- Sedan: integer (nullable = true)
 |-- SUV: integer (nullable = true)
 |-- Truck: integer (nullable = true)
 |-- sales: array (nullable = false)
 |    |-- element: struct (containsNull = false)
 |    |    |-- car_type: string (nullable = false)
 |    |    |-- qtr_sales: integer (nullable = true)
 |-- vars_and_vals: struct (nullable = false)
 |    |-- car_type: string (nullable = false)
 |    |-- qtr_sales: integer (nullable = true)



#### Testing extraction of tuple entries

In [7]:
t = (t
 .withColumn('types', t['vars_and_vals'].getItem("car_type"))
 .withColumn('sales', t['vars_and_vals'].getItem("qtr_sales")))
t.take(3) >> to_pandas

Unnamed: 0,Salesperson,Compact,Sedan,SUV,Truck,sales,vars_and_vals,types
0,Ann,22,18,15,12,22,"(Compact, 22)",Compact
1,Ann,22,18,15,12,18,"(Sedan, 18)",Sedan
2,Ann,22,18,15,12,15,"(SUV, 15)",SUV


#### Dropping extra columns


In [8]:
t = (t
 .drop(*sales_cols)
 .drop('vars_and_vals'))
t.take(3) >> to_pandas

Unnamed: 0,Salesperson,sales,types
0,Ann,22,Compact
1,Ann,18,Sedan
2,Ann,15,SUV


####  Putting it all together with generic names

In [9]:
cols_to_stack = sales_cols
var_lbl = 'car_type'
val_lbl = 'qtr_sales'
(salesk
 .withColumn('var_val_array', 
             make_array(var_lbl, 
                        val_lbl, 
                        cols_to_stack))
 .withColumn("vars_and_vals", explode(col('var_val_array')))
 .withColumn(var_lbl, col("vars_and_vals").getItem(var_lbl))
 .withColumn(val_lbl, col("vars_and_vals").getItem(val_lbl))
 .drop(*(cols_to_stack + ['var_val_array', "vars_and_vals"]))
 .collect()) >> to_pandas

Unnamed: 0,Salesperson,car_type,qtr_sales
0,Ann,Compact,22.0
1,Ann,Sedan,18.0
2,Ann,SUV,15.0
3,Ann,Truck,12.0
4,Bob,Compact,19.0
5,Bob,Sedan,12.0
6,Bob,SUV,17.0
7,Bob,Truck,20.0
8,Doug,Compact,20.0
9,Doug,Sedan,13.0


#### Building a `gather` functions

In [67]:
from composable import pipeable

cols_to_stack = sales_cols
var_lbl = 'car_type'
val_lbl = 'qtr_sales'

@pipeable
def gather(var_lbl, val_lbl, cols_to_stack, df):
    make_array = lambda var_name, val_name, cols: (array(*(struct(lit(c).alias(var_name), 
                                                                  col(c).alias(val_name))
                                                           for c in cols)))
    return (df
            .withColumn('var_val_array', 
                        make_array(var_lbl, 
                                   val_lbl, 
                                   cols_to_stack))
            .withColumn("vars_and_vals", explode(col('var_val_array')))
            .withColumn(var_lbl, col("vars_and_vals").getItem(var_lbl))
            .withColumn(val_lbl, col("vars_and_vals").getItem(val_lbl))
            .drop(*(cols_to_stack + ['var_val_array', "vars_and_vals"])))
sales_stackedk = salesk >> gather('car_type', 'qtr_sales', sales_cols)
sales_stackedk.collect() >> to_pandas

Unnamed: 0,Salesperson,car_type,qtr_sales
0,Ann,Compact,22.0
1,Ann,Sedan,18.0
2,Ann,SUV,15.0
3,Ann,Truck,12.0
4,Bob,Compact,19.0
5,Bob,Sedan,12.0
6,Bob,SUV,17.0
7,Bob,Truck,20.0
8,Doug,Compact,20.0
9,Doug,Sedan,13.0


## Unstacking columns in `pyspark` with `group_by` and `pivot`

In [68]:
(sales_stackedk
 .groupBy('Salesperson')
 .pivot('car_type')
 .sum('qtr_sales')
 .collect()) >> to_pandas

Unnamed: 0,Salesperson,Compact,SUV,Sedan,Truck
0,Yolanda,19,32.0,8,15
1,Doug,20,,13,20
2,Xerxes,12,18.0,23,9
3,Ann,22,15.0,18,12
4,Bob,19,17.0,12,20


#### Creating a `spread` helper function

In [69]:
from pyspark.sql.functions import array, explode, struct, lit, col

@pipeable
def spread(val_col, var_col, group_by_col, df):
    return  (df
             .groupBy(group_by_col)
             .pivot(val_col)
             .sum(var_col))

sales_unstackedk = sales_stackedk >> spread('car_type', 'qtr_sales', 'Salesperson')
sales_unstackedk.take(5) >> to_pandas

Unnamed: 0,Salesperson,Compact,SUV,Sedan,Truck
0,Yolanda,19,32.0,8,15
1,Doug,20,,13,20
2,Xerxes,12,18.0,23,9
3,Ann,22,15.0,18,12
4,Bob,19,17.0,12,20


## <font color="red"> Exercise 3 </font>
    
**Task:** Load the `Rochester_temps_2019.csv` data, contains data weather data for Rochester, MN. that is available at the [DNR website](https://www.dnr.state.mn.us/climate/historical/lcd.html?loc=rst). Note that `SM` and `AV` stand for *sum* and *average*, respectively.

1. Identify that problem with the current format.
2. Use `gather` and `spread` to fix the issue.

In [70]:
min_max = spark.read.csv("./data/Rochester_temps_2019.csv", header=True, inferSchema=True)
min_max.take(5) >> to_pandas

Unnamed: 0,ELEMENT,JAN,FEB,MAR,APR,MAY,JUN,JUL,AUG,SEP,OCT,NOV,DEC
0,MEAN DAILY MAXIMUM,19.7,20.3,33.9,54.6,63.5,76.0,81.7,76.7,74.7,52.1,34.6,29.2
1,HIGHEST DAILY MAXIMUM,44.0,41.0,61.0,83.0,86.0,87.0,90.0,85.0,89.0,71.0,47.0,42.0
2,MEAN DAILY MINIMUM,4.4,2.4,17.9,35.1,44.3,56.5,62.6,56.7,55.5,35.6,21.5,13.9
3,LOWEST DAILY MINIMUM,-30.0,-15.0,-17.0,24.0,33.0,43.0,52.0,48.0,46.0,19.0,0.0,-8.0
4,AVERAGE DRY BULB,12.1,11.3,25.9,44.8,53.9,66.3,72.2,66.7,65.1,43.9,28.1,21.5


> The current format breaks the rule of tidy data. The months should all be in one column and the variables under elements should be individual columns.

In [71]:
# Your code here.
months_cols = ['JAN','FEB', 'MAR', 'APR', 'MAY', 'JUN', 'JUL', 'AUG', 'SEP', 'OCT', 'NOV', 'DEC']
months_stacked = (min_max
                  >> gather('month', 'measure', months_cols))
months_stacked.collect() >> to_pandas

Unnamed: 0,ELEMENT,month,measure
0,MEAN DAILY MAXIMUM,JAN,19.7
1,MEAN DAILY MAXIMUM,FEB,20.3
2,MEAN DAILY MAXIMUM,MAR,33.9
3,MEAN DAILY MAXIMUM,APR,54.6
4,MEAN DAILY MAXIMUM,MAY,63.5
...,...,...,...
127,NUM DAYS W MINIMUM <= 0°,AUG,0.0
128,NUM DAYS W MINIMUM <= 0°,SEP,0.0
129,NUM DAYS W MINIMUM <= 0°,OCT,0.0
130,NUM DAYS W MINIMUM <= 0°,NOV,1.0


In [75]:
spread_element = months_stacked >> spread('ELEMENT', 'measure', 'month')
spread_element.take(5) >> to_pandas



Unnamed: 0,month,NUM DAYS W MINIMUM <= 0°,AVERAGE DRY BULB,HIGHEST DAILY MAXIMUM,LOWEST DAILY MINIMUM,MEAN DAILY MAXIMUM,MEAN DAILY MINIMUM,MEAN DEW POINT,MEAN WET BULB,NUM DAYS W MAXIMUM <= 32°,NUM DAYS W MINIMUM <= 32°,NUM DAYS W MAXIMUM >= 90°
0,APR,0.0,44.8,83.0,24.0,54.6,35.1,,,0.0,11.0,0.0
1,OCT,0.0,43.9,71.0,19.0,52.1,35.6,,,1.0,14.0,0.0
2,NOV,1.0,28.1,47.0,0.0,34.6,21.5,22.6,26.4,12.0,29.0,0.0
3,FEB,15.0,11.3,41.0,-15.0,20.3,2.4,,,24.0,27.0,0.0
4,SEP,0.0,65.1,89.0,46.0,74.7,55.5,57.5,60.6,0.0,0.0,0.0
