# Stacking and Unstacking data in `pyspark`

## Welcome back to developer's corner

While `pyspark` doesn't provide explicit methods for these two actions, we will be able to accomplish each operation using a combination of methods.  In this lecture, we will 

1. Build up expressions for stacking and unstacking a `pyspark df` 
2. Create helper functions for each operation to automate these processes.

#### `pyspark` setup

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Ops').getOrCreate()

In [2]:
from more_pyspark import to_pandas
salesk = spark.read.csv("./data/auto_sales.csv",  header=True, inferSchema=True)
salesk.collect() >> to_pandas


Unnamed: 0,Salesperson,Compact,Sedan,SUV,Truck
0,Ann,22,18,15,12
1,Bob,19,12,17,20
2,Yolanda,19,8,32,15
3,Xerxes,12,23,18,9


## Stacking data in `pyspark`

Two step process

1. Combine the columns to be stacked into an array
2. Explode the array

[Source](https://www.mien.in/2018/03/25/reshaping-dataframe-using-pivot-and-melt-in-apache-spark-and-pandas/)

#### A helper function that combines multiple column entries into a column array

In [3]:
from pyspark.sql.functions import array, explode, struct, lit, col
sales_cols = ['Compact', 'Sedan', 'SUV', 'Truck']
make_array = lambda var_name, val_name, cols: (array(*(struct(lit(c).alias(var_name), 
                                                              col(c).alias(val_name))
                                                       for c in cols)))
make_array('car_type', 'qtr_sales', sales_cols)

Column<b'array(named_struct(car_type, Compact AS `car_type`, NamePlaceholder(), Compact AS `qtr_sales`), named_struct(car_type, Sedan AS `car_type`, NamePlaceholder(), Sedan AS `qtr_sales`), named_struct(car_type, SUV AS `car_type`, NamePlaceholder(), SUV AS `qtr_sales`), named_struct(car_type, Truck AS `car_type`, NamePlaceholder(), Truck AS `qtr_sales`))'>

#### Testing `make_array`

In [4]:
from pyspark.sql.functions import array, explode, struct, lit, col, collect_list
(salesk
 .withColumn('sales', 
             make_array('car_type', 
                        'qtr_sales', 
                        sales_cols))
 .collect()) >> to_pandas

Unnamed: 0,Salesperson,Compact,Sedan,SUV,Truck,sales
0,Ann,22,18,15,12,"[(Compact, 22), (Sedan, 18), (SUV, 15), (Truck..."
1,Bob,19,12,17,20,"[(Compact, 19), (Sedan, 12), (SUV, 17), (Truck..."
2,Yolanda,19,8,32,15,"[(Compact, 19), (Sedan, 8), (SUV, 32), (Truck,..."
3,Xerxes,12,23,18,9,"[(Compact, 12), (Sedan, 23), (SUV, 18), (Truck..."


#### `explode` the contents to the array column to stack entries

In [5]:
t = (salesk
     .withColumn('sales', 
                 make_array('car_type', 
                            'qtr_sales', 
                            sales_cols))
     .withColumn("vars_and_vals", explode(col('sales'))))
t.take(3) >> to_pandas

Unnamed: 0,Salesperson,Compact,Sedan,SUV,Truck,sales,vars_and_vals
0,Ann,22,18,15,12,"[(Compact, 22), (Sedan, 18), (SUV, 15), (Truck...","(Compact, 22)"
1,Ann,22,18,15,12,"[(Compact, 22), (Sedan, 18), (SUV, 15), (Truck...","(Sedan, 18)"
2,Ann,22,18,15,12,"[(Compact, 22), (Sedan, 18), (SUV, 15), (Truck...","(SUV, 15)"


#### Checking out the schema of the result

Note that the exploded entries are named `'car_type'` and `'qtr_sales'`

In [6]:
t.printSchema()

root
 |-- Salesperson: string (nullable = true)
 |-- Compact: integer (nullable = true)
 |-- Sedan: integer (nullable = true)
 |-- SUV: integer (nullable = true)
 |-- Truck: integer (nullable = true)
 |-- sales: array (nullable = false)
 |    |-- element: struct (containsNull = false)
 |    |    |-- car_type: string (nullable = false)
 |    |    |-- qtr_sales: integer (nullable = true)
 |-- vars_and_vals: struct (nullable = false)
 |    |-- car_type: string (nullable = false)
 |    |-- qtr_sales: integer (nullable = true)



#### Testing extraction of tuple entries

In [7]:
t = (t
 .withColumn('types', t['vars_and_vals'].getItem("car_type"))
 .withColumn('sales', t['vars_and_vals'].getItem("qtr_sales")))
t.take(3) >> to_pandas

Unnamed: 0,Salesperson,Compact,Sedan,SUV,Truck,sales,vars_and_vals,types
0,Ann,22,18,15,12,22,"(Compact, 22)",Compact
1,Ann,22,18,15,12,18,"(Sedan, 18)",Sedan
2,Ann,22,18,15,12,15,"(SUV, 15)",SUV


#### Dropping extra columns


In [8]:
t = (t
 .drop(*sales_cols)
 .drop('vars_and_vals'))
t.take(3) >> to_pandas

Unnamed: 0,Salesperson,sales,types
0,Ann,22,Compact
1,Ann,18,Sedan
2,Ann,15,SUV


####  Putting it all together with generic names

In [9]:
cols_to_stack = sales_cols
var_lbl = 'car_type'
val_lbl = 'qtr_sales'
(salesk
 .withColumn('var_val_array', 
             make_array(var_lbl, 
                        val_lbl, 
                        cols_to_stack))
 .withColumn("vars_and_vals", explode(col('var_val_array')))
 .withColumn(var_lbl, col("vars_and_vals").getItem(var_lbl))
 .withColumn(val_lbl, col("vars_and_vals").getItem(val_lbl))
 .drop(*(cols_to_stack + ['var_val_array', "vars_and_vals"]))
 .collect()) >> to_pandas

Unnamed: 0,Salesperson,car_type,qtr_sales
0,Ann,Compact,22
1,Ann,Sedan,18
2,Ann,SUV,15
3,Ann,Truck,12
4,Bob,Compact,19
5,Bob,Sedan,12
6,Bob,SUV,17
7,Bob,Truck,20
8,Yolanda,Compact,19
9,Yolanda,Sedan,8


#### Building a `gather` functions

In [10]:
from functoolz import pipeable

cols_to_stack = sales_cols
var_lbl = 'car_type'
val_lbl = 'qtr_sales'

@pipeable
def gatherk(var_lbl, val_lbl, cols_to_stack, df):
    make_array = lambda var_name, val_name, cols: (array(*(struct(lit(c).alias(var_name), 
                                                                  col(c).alias(val_name))
                                                           for c in cols)))
    return (df
            .withColumn('var_val_array', 
                        make_array(var_lbl, 
                                   val_lbl, 
                                   cols_to_stack))
            .withColumn("vars_and_vals", explode(col('var_val_array')))
            .withColumn(var_lbl, col("vars_and_vals").getItem(var_lbl))
            .withColumn(val_lbl, col("vars_and_vals").getItem(val_lbl))
            .drop(*(cols_to_stack + ['var_val_array', "vars_and_vals"])))
sales_stackedk = salesk >> gatherk('car_type', 'qtr_sales', sales_cols)
sales_stackedk.take(5) >> to_pandas

Unnamed: 0,Salesperson,car_type,qtr_sales
0,Ann,Compact,22
1,Ann,Sedan,18
2,Ann,SUV,15
3,Ann,Truck,12
4,Bob,Compact,19


## Unstacking columns in `pyspark` with `group_by` and `pivot`

In [11]:
(sales_stackedk
 .groupBy('Salesperson')
 .pivot('car_type')
 .sum('qtr_sales')
 .take(5)) >> to_pandas

Unnamed: 0,Salesperson,Compact,SUV,Sedan,Truck
0,Yolanda,19,32,8,15
1,Xerxes,12,18,23,9
2,Ann,22,15,18,12
3,Bob,19,17,12,20


#### Creating a `speadk` helper function

In [12]:
from pyspark.sql.functions import array, explode, struct, lit, col

@pipeable
def spreadk(val_col, var_col, group_by_col, df):
    return  (df
             .groupBy(group_by_col)
             .pivot(val_col)
             .sum(var_col))
sales_unstackedk = sales_stackedk >> spreadk('car_type', 'qtr_sales', 'Salesperson')
sales_unstackedk.take(5) >> to_pandas

Unnamed: 0,Salesperson,Compact,SUV,Sedan,Truck
0,Yolanda,19,32,8,15
1,Xerxes,12,18,23,9
2,Ann,22,15,18,12
3,Bob,19,17,12,20


## <font color="red"> Exercise 3 </font>
    
**Task:** Load the `rochester_mins_max_temp_2018.csv` data, contains data weather data for Rochester, MN. that is available at the [DNR website](https://www.dnr.state.mn.us/climate/historical/lcd.html?loc=rst). Note that `SM` and `AV` stand for *sum* and *average*, respectively.

1. Identify that problem with the current format.
2. Use `gather` and `spread` to fix the issue.

In [14]:
import pandas as pd
from dfply import *
min_max = pd.read_csv("./data/rochester_mins_max_temp_2018.csv")
min_max.head()

Unnamed: 0,Month,Stat,Max,Min
0,Jan,SM,709.0,196.0
1,Jan,AV,22.9,6.3
2,Feb,SM,675.0,184.0
3,Feb,AV,24.1,6.6
4,Mar,SM,1200.0,693.0


In [17]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Ops').getOrCreate()

In [18]:
from more_pyspark import to_pandas
tempk = spark.read.csv("./data/rochester_mins_max_temp_2018.csv",  header=True, inferSchema=True)
tempk.collect() >> to_pandas


Unnamed: 0,Month,Stat,Max,Min
0,Jan,SM,709.0,196.0
1,Jan,AV,22.9,6.3
2,Feb,SM,675.0,184.0
3,Feb,AV,24.1,6.6
4,Mar,SM,1200.0,693.0
5,Mar,AV,38.7,22.4
6,Apr,SM,1336.0,735.0
7,Apr,AV,44.5,24.5
8,May,SM,2318.0,1658.0
9,May,AV,74.8,53.5


In [None]:
from pyspark.sql.functions import array, explode, struct, lit, col
stat_cols = ['Max', 'Min']
make_array = lambda var_name, val_name, cols: (array(*(struct(lit(c).alias(var_name), 
                                                              col(c).alias(val_name))
                                                       for c in cols)))
make_array('car_type', 'qtr_sales', stat_cols)



> *The stat column should be 2 seperate columns for the max and min*

## Up Next

Stuff