# Schema Best Practices (10 mins)

[![Slack Status](https://img.shields.io/badge/slack-join_chat-white.svg?logo=slack&style=social)](http://slack.fugue.ai)

When using Pandas, it's easy to rely on schema inference. In a distributed setting, this is a bad practice because it can be expensive or inaccurate. Fugue enforces a schema requirement, but it helps guarantee consistency when local code is brought to the distributed setting.

In this section we answer:

* Why is schema important for distributed computing?
* What are the ways to define schema in Fugue?


## Inference can be Expensive

Take a look at the following operation. If we don't supply the output schema of the operation, Dask will execute it for one partition first to infer the schema. This can easily double the execution time of expensive operations.

In [15]:
import pandas as pd
import dask.dataframe as dd
from time import sleep

pdf = pd.DataFrame([[0],[1],[2],[3],[4],[5],[6],[7]], columns=["a"])
ddf = dd.from_pandas(pdf, npartitions=2)

In [16]:
def add_col(df):
    if df["a"].iloc[0] == 1:
        sleep(5)
    return df.assign(b=1)

The warning message below already tells us that it has to infer the schema. Note that the `sleep()` above is 5 seconds but execution time is 10 seconds. Why is that?

In [17]:
%%time
ddf.groupby("a").apply(add_col).compute()

  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result


CPU times: user 51.3 ms, sys: 8.89 ms, total: 60.2 ms
Wall time: 10 s


Unnamed: 0,a,b
1,1,1
0,0,1
2,2,1
3,3,1
4,4,1
5,5,1
6,6,1
7,7,1


But if we supply the schema, it actually completes in the expected 5 seconds.

In [18]:
%%time
# Now we add the meta argument
ddf.groupby("a").apply(add_col, meta={'a': 'int', 'b': 'int'}).compute()

CPU times: user 29.4 ms, sys: 4.11 ms, total: 33.5 ms
Wall time: 5.03 s


Unnamed: 0,a,b
1,1,1
4,4,1
5,5,1
6,6,1
7,7,1
0,0,1
2,2,1
3,3,1


This is because if schema is not supplied, Dask executes the operation for one partition to see the expected schema. It fills the data with 1's and "foo" for string data, and then runs everything. It then assumes that schema. 

Computational efficiency is one reason Fugue requires schema, but defining schema also helps prevent silent errors, like floats being truncated as int.

## Best Practice: Parquet over CSV

On a side note, schema is one of the reasons Parquet files are heavy preferred over CSV. CSV files don't hold schema information, which is why Pandas practitioners tend to rely on schema inference.

It can hold schema information and supports predicate pushdown to loading data can be significantly more efficient. Additionally, parquet files are also around 1/5th the size of CSV files.

## Defining Schema in Fugue

There are three ways to define schema in Fugue. They are all equivalent and it's ok to even just stick to one method. This is in order of least invasive to most invasive.

1. Defining during runtime
2. Schema hint
3. Decorator

In [19]:
df = pd.DataFrame({"a": [2,3,4]})

### Defining During Runtime

This is the least invasive, and is what we've already seen before. None of the code has to even be touched by Fugue. The schema is supplied during runtime and translated by Fugue.

In [20]:
from fugue import transform

def add_new_col(df: pd.DataFrame) -> pd.DataFrame:
    return df.assign(new=[1,2,3])

transform(df, add_new_col, schema="*, new:int")

Unnamed: 0,a,new
0,2,1
1,3,2
2,4,3


### Schema Hint

But if a function is very deterministic in output schema and used repeatedly, we can add a comment on top that contains the schema. This will be read during runtime as well. If a schema hint is supplied, we don't need to specify it during the runtime.

This is actually a good practice because it helps with the maintenance of the code. If you use to move away from Fugue, it just remains as a helpful comment.

In [21]:
# schema: *, new:int
def add_new_col(df: pd.DataFrame) -> pd.DataFrame:
    return df.assign(new=[1,2,3])

transform(df, add_new_col)

Unnamed: 0,a,new
0,2,1
1,3,2
2,4,3


### Decorator Approach

The last approach is using a decorator. It's still non-invasive because the understand function and still be used normally. The only situation that would require this is having a very long schema, which in practice, is does not play well with distributed computing.

In [22]:
from fugue import transformer

@transformer(schema="*, new:int")
def add_new_col(df: pd.DataFrame) -> pd.DataFrame:
    return df.assign(new=[1,2,3])

transform(df, add_new_col)

Unnamed: 0,a,new
0,2,1
1,3,2
2,4,3


## Schema Operations

Fugue does not invent schema, it uses pyarrow schema. But Fugue creates a special syntax to represent schema. Each column is represented as : , separated by commas.

The Fugue project has a utility called triad, which contains the Schema class. In practice, you will just need to interact with the string representation.

In [23]:
df = pd.DataFrame({"a": [1,2,3], "b": [1,2,3], "c": [1,2,3]})

### Add a column

When using the `transform()`, the `*` in a schema expression means all existing columns. From there we can add new columns by adding `",column_name:type"`.

In [24]:
def add_col(df: pd.DataFrame) -> pd.DataFrame:
    return df.assign(new_col=df["a"] + 1)

transform(
    df=df, 
    using=add_col, 
    schema="*,new_col:int"
    )

Unnamed: 0,a,b,c,new_col
0,1,1,1,2
1,2,2,2,3
2,3,3,3,4


### Dropping columns

To drop a column, use -col without ",".

In [25]:
def drop_col(df: pd.DataFrame) -> pd.DataFrame:
    return df.drop("b", axis=1)

transform(
    df=df, 
    using=drop_col,
    schema="*-b"
    )

Unnamed: 0,a,c
0,1,1
1,2,2
2,3,3


## Exercise

Below is an example of a scikit-learn pipeline. Let's treat this as someone else's code that was handed to us to scale to Spark. Scikit-learn pipelines chain together and use the `fit-transform` interface.

In the cell below we create the transformers and mock some initial data.

In [26]:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from typing import Iterable, List, Any, Dict
from fugue import transform

from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline

class DropColumn(BaseEstimator, TransformerMixin):
    def __init__(self, column):
        self.column = column
        
    def fit(self, X, y = None):
        return self

    def transform(self, X, y = None):
        return X.drop(self.column, axis=1)

class IncreasePrice(BaseEstimator, TransformerMixin):
    def __init__(self, increases:Dict):
        self.increases = increases
        self.color = None
        
    def fit(self, X, y = None):
        # assumes this is already partitioned by color
        self.color = X.iloc[0]["color"]
        return self

    def transform(self, X, y = None):
        return X.assign(price=X['price'] + self.increases[self.color])


design = ["old", "current", "new"]
colors = ["red", "blue", "green", "yellow"]
size = ["S", "M", "L", "XL"]

# treat this dataframe like shirts
data = pd.DataFrame({"color": np.random.choice(colors, 10000),
                     "size": np.random.choice(size, 10000),
                     "design": np.random.choice(design, 10000),
                     "price": np.random.randint(50,100, 10000),
                     "units": np.random.randint(100,500, 10000)})
data.head()

Unnamed: 0,color,size,design,price,units
0,green,L,old,64,156
1,blue,M,current,70,365
2,green,M,new,86,148
3,red,S,new,65,100
4,yellow,M,new,96,366


Next we create a pipeline object and apply the pipeline for one color. This will also show us what the new schema is.

In [27]:
price_increase = {"red": 10, "blue": 0, "green": 5, "yellow": 10}
pipeline = Pipeline([('increase_price', IncreasePrice(price_increase)), 
                      ('drop', DropColumn('design')), 
                      ('pass',None)], 
                      verbose = True)

def apply_pipeline(df: pd.DataFrame, pipeline: Pipeline) -> pd.DataFrame:
    # Guess units based on price design
    y = df['units']
    X = df.drop('units', axis=1)
    pipeline.fit(X, y)
    res = pipeline.transform(X)
    res = res.assign(y=y)
    return res

apply_pipeline(data.loc[data["color"] = "red"], pipeline).head()

[Pipeline] .... (step 1 of 3) Processing increase_price, total=   0.0s
[Pipeline] .............. (step 2 of 3) Processing drop, total=   0.0s
[Pipeline] .............. (step 3 of 3) Processing pass, total=   0.0s


Unnamed: 0,color,size,price,y
0,green,L,69,156
1,blue,M,75,365
2,green,M,91,148
3,red,S,70,100
4,yellow,M,101,366


Now we fill in the schema to get the following code to work:

```python
out = transform(df, 
                using=..., 
                schema="...",
                params={...},
                partition={...})
```

## Scaling to Spark

In the next section, we will dive in more to the `partition` argument. This will make the `apply_pipeline` run for each color of t-shirt.

The assumption is that nothing in the Pipeline relies on global aggregations such as global median or max. Every partition should be independent.

In [None]:
# Run on Spark
spark = SparkSession.builder.getOrCreate()
sdf = spark.createDataFrame(data)

# change to make it so they create the transform
out = transform(sdf, 
                ..., 
                ...,
                ...,
                ...,
                engine=spark)
out.show(10)

+-----+----+-----+---+
|color|size|price|  y|
+-----+----+-----+---+
| blue|   M|   70|365|
| blue|   S|   91|188|
| blue|   S|   57|451|
| blue|   M|   72|275|
| blue|   M|   78|307|
| blue|  XL|   89|410|
| blue|   L|   94|334|
| blue|   M|   77|194|
| blue|   M|   77|156|
| blue|   L|   99|276|
+-----+----+-----+---+
only showing top 10 rows



[Pipeline] .... (step 1 of 3) Processing increase_price, total=   0.0s
[Pipeline] .............. (step 2 of 3) Processing drop, total=   0.0s
[Pipeline] .............. (step 3 of 3) Processing pass, total=   0.0s
