# Creating `mutate` helper functions

Now that we have looked at creating the `select` and `filter_by` helpers for both `pyspark` and `sqlalchemy`, we will now focus on the other primary verb, `mutate`.

## Set up

Let's read in a data set in `sqlalchemy` and `pyspark`

#### `sqlalchemy`

In [1]:
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine, func
from sqlalchemy.ext.automap import automap_base

engine = create_engine("sqlite:///databases/heroes_2_1.db")

Base = automap_base()
Base.prepare(engine, reflect=True)
Hero = Base.classes.heroes

#### `pyspark`

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, mean

spark = SparkSession.builder.appName('Ops').getOrCreate()
df_spark = spark.read.csv('data/heroes_information.csv', inferSchema=True, header=True)

#### Functions from previous lecture

In [3]:
from functoolz import pipeable

@pipeable
def headk(df, num = 5): # Keywords must follow positional arguments
    return df.take(num)

@pipeable
def selectk(cols, df):
    return df.select(*cols)

## Comparing `dfply.mutate` to `df_spark.withColumn`

* `mutate`
    * Multiple new/changed expressions
    * Uses keywords, i.e. `new_col = col_expr`
* `withColumn` 
    * Only one new/changed column
    * Uses `withColumn('new_col', col_expr)`


## Relating a `mutate` to `withColumn` using keyword unpacking

<img src="./img/mutate_and_the_accumulator_pattern.gif" width=800>

## Relating a `mutate` to `withColumn` using keyword unpacking

<img src="./img/mutate_and_the_accumulator_pattern2.gif" width=800>

## Step 1 - Play around with expression

#### Making some example expressions

In [4]:
kwargs = {'height_in':col('Height')* 0.3937, 'height_ft':col('height_in')/12}
kwargs

{'height_in': Column<b'(Height * 0.3937)'>,
 'height_ft': Column<b'(height_in / 12)'>}

#### Iterating over the names and exprs

In [5]:
for name, expr in kwargs.items():
    print(name, ', ', expr)

height_in ,  Column<b'(Height * 0.3937)'>
height_ft ,  Column<b'(height_in / 12)'>


#### Applying the accumulator pattern

Accumulate with

* **Initial Value:** original `df`
* **Update:** Apply the next `withColumn`

In [6]:
from more_pyspark import to_pandas
df_acc = df_spark
for name, expr in kwargs.items():
    df_acc = df_acc.withColumn(name, expr)
df_acc.take(3) >> to_pandas

Unnamed: 0,Id,name,Gender,Eye color,Race,Hair color,Height,Publisher,Skin color,Alignment,Weight,height_in,height_ft
0,0,A-Bomb,Male,yellow,Human,No Hair,203.0,Marvel Comics,-,good,441.0,79.9211,6.660092
1,1,Abe Sapien,Male,blue,Icthyo Sapien,No Hair,191.0,Dark Horse Comics,blue,good,65.0,75.1967,6.266392
2,2,Abin Sur,Male,blue,Ungaran,No Hair,185.0,DC Comics,red,good,90.0,72.8345,6.069542


#### Important: We didn't change the original `df`

In [7]:
df_spark.take(3) >> to_pandas

Unnamed: 0,Id,name,Gender,Eye color,Race,Hair color,Height,Publisher,Skin color,Alignment,Weight
0,0,A-Bomb,Male,yellow,Human,No Hair,203.0,Marvel Comics,-,good,441.0
1,1,Abe Sapien,Male,blue,Icthyo Sapien,No Hair,191.0,Dark Horse Comics,blue,good,65.0
2,2,Abin Sur,Male,blue,Ungaran,No Hair,185.0,DC Comics,red,good,90.0


## Creating `mutatek`

In [8]:
from functoolz import pipeable

@pipeable
def mutatek(df, **kwargs):
    df_acc = df
    for name, expr in kwargs.items():
        df_acc = df_acc.withColumn(name, expr)
    return df_acc

## Testing out `mutatek`

In [9]:
(df_spark
 >> mutatek(height_in = col('Height')* 0.3937, 
            height_ft = col('height_in')/12)
 >> headk(num=3)
 >> to_pandas)

Unnamed: 0,Id,name,Gender,Eye color,Race,Hair color,Height,Publisher,Skin color,Alignment,Weight,height_in,height_ft
0,0,A-Bomb,Male,yellow,Human,No Hair,203.0,Marvel Comics,-,good,441.0,79.9211,6.660092
1,1,Abe Sapien,Male,blue,Icthyo Sapien,No Hair,191.0,Dark Horse Comics,blue,good,65.0,75.1967,6.266392
2,2,Abin Sur,Male,blue,Ungaran,No Hair,185.0,DC Comics,red,good,90.0,72.8345,6.069542


#### Functions from Lecture 2.7.2

In [10]:
from sqlalchemy import select as select_sql, column
from more_sqlalchemy import everything, pprint
import pandas as pd

@pipeable
def to_statement(table_class):
    return select_sql(everything(table_class))

@pipeable
def headq(stmt, num = 5):
    return stmt.limit(num)

@pipeable
def pprintq(stmt):
    pprint(stmt)
    return stmt

@pipeable
def to_pandasq(engine, stmt):
    return pd.read_sql_query(stmt, con = engine)

@pipeable
def selectq(columns, stmt):
    cols_to_keep = [c for c in stmt.columns if c.name in columns]
    return stmt.with_only_columns(cols_to_keep)


## <font color="red" > Exercise 3 - Construct `mutateq` </font>

* `stmt.column` can be used to add columns to an existing `stmt`
    * Explore using help.  How many additional arguments?  What are they?
* Must be `pipeable` with `stmt`.
* Will use `*kwargs` to specify each new column
* As before, you will need to use the accumulator pattern 
* You will need to add the name to the expr using `label`
    * i.e. `expr.label(name)`

In [37]:
# Your function definition here

In [38]:
# Your test expression here

## Up Next

Stuff