# #2 Discovering Butterfree - Spark Functions and Window

Welcome to Discovering Butterfree tutorial series!

This is the second tutorial of this series: its goal is to cover spark functions and windows definition.

Before diving into the tutorial make sure you have a basic understanding of these main data concepts: features, feature sets and the "Feature Store Architecture", you can read more about this [here].

## Example:
Simulating the following scenario (the same from previous tutorial):

- We want to create a feature set with features about houses for rent (listings).

- We are interested in houses only for the **Kanto** region.

We have two sets of data:

- Table: `listing_events`. Table with data about events of house listings.
- File: `region.json`. Static file with data about the cities and regions.

Our desire is to have result dataset with the following schema:

* id: **int**;
* timestamp: **timestamp**;
* rent: **float**;
* rent__avg_over_2_events_row_windows: **float**;
* rent__stddev_pop_over_2_events_row_windows: **float**;
* rent_over_area: **float**;
* bedrooms: **int**;
* bathrooms: **int**;
* area: **float**;
* bedrooms_over_area: **float**;
* bathrooms_over_area: **float**;
* latitude: **double**;
* longitude: **double**;
* lat_lng__h3_hash__10: **string**;
* city: **string**;
* region: **string**.

Note that we're going to compute two aggregated features, rent average and standard deviation, considering the two last occurrences (or events). It'd also be possible to define time windows, instead of windows based on events.

For more information about H3 geohash click [here]().

The following code blocks will show how to generate this feature set using Butterfree library:



In [1]:
# setup spark
from pyspark import SparkContext, SparkConf
from pyspark.sql import session

conf = SparkConf().set('spark.driver.host','127.0.0.1')
sc = SparkContext(conf=conf)
spark = session.SparkSession(sc)

In [2]:
# fix working dir
import pathlib
import os
path = os.path.join(pathlib.Path().absolute(), '../..')
os.chdir(path)

### Showing test data

In [3]:
listing_evengs_df = spark.read.json(f"{path}/examples/data/listing_events.json")
listing_evengs_df.createOrReplaceTempView("listing_events")  # creating listing_events view

region = spark.read.json(f"{path}/examples/data/region.json")

Listing events table:

In [4]:
listing_evengs_df.toPandas()

Unnamed: 0,area,bathrooms,bedrooms,id,region_id,rent,timestamp
0,50,1,1,1,1,1300,1588302000000
1,50,1,1,1,1,2000,1588647600000
2,100,1,2,2,2,1500,1588734000000
3,100,1,2,2,2,2500,1589252400000
4,150,2,2,3,3,3000,1589943600000
5,175,2,2,4,4,3200,1589943600000
6,250,3,3,5,5,3200,1590030000000
7,225,3,2,6,6,3200,1590116400000


Region table:

In [5]:
region.toPandas()

Unnamed: 0,city,id,lat,lng,region
0,Cerulean,1,73.44489,31.7503,Kanto
1,Veridian,2,-9.4351,-167.11772,Kanto
2,Cinnabar,3,29.73043,117.66164,Kanto
3,Pallet,4,-52.95717,-81.15251,Kanto
4,Violet,5,-47.35798,-178.77255,Johto
5,Olivine,6,51.7282,46.21958,Johto


### Extract

- For the extract part, we need the `Source` entity and the `FileReader` and `TableReader` for the data we have;
- We need to declare a query with the rule for joining the results of the readers too;
- As proposed in the problem we can filter the region dataset to get only **Kanto** region.

In [6]:
from butterfree.clients import SparkClient
from butterfree.extract import Source
from butterfree.extract.readers import FileReader, TableReader
from butterfree.extract.pre_processing import filter

readers = [
    TableReader(id="listing_events", table="listing_events",),
    FileReader(id="region", path=f"{path}/examples/data/region.json", format="json",).with_(
        transformer=filter, condition="region == 'Kanto'"
    ),
]

query = """
select
    listing_events.*,
    region.city,
    region.region,
    region.lat,
    region.lng,
    region.region as region_name
from
    listing_events
    join region
      on listing_events.region_id = region.id
"""

source = Source(readers=readers, query=query)

In [7]:
spark_client = SparkClient()
source_df = source.construct(spark_client)

And, finally, it's possible to see the results from building our souce dataset:

In [8]:
source_df.toPandas()

Unnamed: 0,area,bathrooms,bedrooms,id,region_id,rent,timestamp,city,region,lat,lng,region_name
0,50,1,1,1,1,1300,1588302000000,Cerulean,Kanto,73.44489,31.7503,Kanto
1,50,1,1,1,1,2000,1588647600000,Cerulean,Kanto,73.44489,31.7503,Kanto
2,100,1,2,2,2,1500,1588734000000,Veridian,Kanto,-9.4351,-167.11772,Kanto
3,100,1,2,2,2,2500,1589252400000,Veridian,Kanto,-9.4351,-167.11772,Kanto
4,150,2,2,3,3,3000,1589943600000,Cinnabar,Kanto,29.73043,117.66164,Kanto
5,175,2,2,4,4,3200,1589943600000,Pallet,Kanto,-52.95717,-81.15251,Kanto


### Transform
- At the transform part, a set of `Feature` objects is declared;
- An Instance of `FeatureSet` is used to hold the features;
- A `FeatureSet` can only be created when it is possible to define a unique tuple formed by key columns and a time reference. This is an **architectural requirement** for the data. So least one `KeyFeature` and one `TimestampFeature` is needed;
- Every `Feature` needs a unique name, a description, and a data-type definition;
- A `H3HashTransform` is used to convert specific locations to a h3 hash;
- A `CustomTransform` operator is used to illustrate how custom transform methods can be used within a `FeatureSet`;
- Finally, a `SparkFunctionTransform` is defined in order to compute mean and standard deviation for rent, considering the last two events (row window definition).

In [9]:
from pyspark.sql import functions as F

from butterfree.transform import FeatureSet
from butterfree.transform.features import Feature, KeyFeature, TimestampFeature
from butterfree.transform.transformations import (
    CustomTransform,
    SparkFunctionTransform,
)
from butterfree.transform.transformations import H3HashTransform
from butterfree.constants import DataType
from butterfree.transform.utils import Function

def divide(df, fs, column1, column2):
    name = fs.get_output_columns()[0]
    df = df.withColumn(name, F.col(column1) / F.col(column2))
    return df

keys = [
    KeyFeature(
        name="id",
        description="Unique identificator code for houses.",
        dtype=DataType.BIGINT,
    )
]

# from_ms = True because the data originally is not in a Timestamp format.
ts_feature = TimestampFeature(from_column="timestamp", from_ms=True)

features = [
    Feature(
        name="rent",
        description="Rent value by month described in the listing.",
        dtype=DataType.FLOAT,
    ),
    Feature(
        name="rent",
        description="Rent value by month described in the listing.",
        dtype=DataType.FLOAT,
        transformation=SparkFunctionTransform(
            functions=[
                Function(F.avg, DataType.FLOAT), 
                Function(F.stddev_pop, DataType.FLOAT),
            ]
        ).with_window(
            partition_by="id",
            mode="row_windows",
            window_definition=["2 events"],
        ),        
    ),    
    Feature(
        name="rent_over_area",
        description="Rent value by month divided by the area of the house.",
                transformation=CustomTransform(
            transformer=divide, column1="rent", column2="area",
        ),
        dtype=DataType.FLOAT,
    ),
    Feature(
        name="bedrooms",
        description="Number of bedrooms of the house.",
        dtype=DataType.INTEGER,
    ),
    Feature(
        name="bathrooms",
        description="Number of bathrooms of the house.",
        dtype=DataType.INTEGER,
    ),
    Feature(
        name="area",
        description="Area of the house, in squared meters.",
        dtype=DataType.FLOAT,
    ),
    Feature(
        name="bedrooms_over_area",
        description="Number of bedrooms divided by the area.",
        transformation=CustomTransform(
            transformer=divide, column1="bedrooms", column2="area",
        ),
        dtype=DataType.FLOAT,
    ),
    Feature(
        name="bathrooms_over_area",
        description="Number of bathrooms divided by the area.",
        transformation=CustomTransform(
            transformer=divide, column1="bathrooms", column2="area",
        ),
        dtype=DataType.FLOAT,
    ),
    Feature(
        name="latitude",
        description="House location latitude.",
        from_column="lat",  # arg from_column is needed when changing column name
        dtype=DataType.DOUBLE,
    ),
    Feature(
        name="longitude",
        description="House location longitude.",
        from_column="lng",
        dtype=DataType.DOUBLE,
    ),
    Feature(
        name="h3",
        description="H3 hash geohash.",
        transformation=H3HashTransform(
            h3_resolutions=[10], lat_column="latitude", lng_column="longitude",
        ),
        dtype=DataType.STRING,
    ),
    Feature(name="city", description="House location city.", dtype=DataType.STRING,),
    Feature(
        name="region",
        description="House location region.",
        from_column="region_name",
        dtype=DataType.STRING,
    ),
]

feature_set = FeatureSet(
    name="house_listings",
    entity="house",  # entity: to which "business context" this feature set belongs
    description="Features describring a house listing.",
    keys=keys,
    timestamp=ts_feature,
    features=features,
)

In [10]:
feature_set_df = feature_set.construct(source_df, spark_client)

  f"The column name {self.name} "
  f"The column name {self.name} "


The resulting dataset from the running the transformations defined within the `FeatureSet` are:

In [11]:
feature_set_df.toPandas()

Unnamed: 0,id,timestamp,rent,rent__avg_over_2_events_row_windows,rent__stddev_pop_over_2_events_row_windows,rent_over_area,bedrooms,bathrooms,area,bedrooms_over_area,bathrooms_over_area,latitude,longitude,lat_lng__h3_hash__10,city,region
0,1,2020-05-01,1300.0,1300.0,0.0,26.0,1,1,50.0,0.02,0.02,73.44489,31.7503,8a011c942b5ffff,Cerulean,Kanto
1,1,2020-05-05,2000.0,1650.0,350.0,40.0,1,1,50.0,0.02,0.02,73.44489,31.7503,8a011c942b5ffff,Cerulean,Kanto
2,3,2020-05-20,3000.0,3000.0,0.0,20.0,2,2,150.0,0.013333,0.013333,29.73043,117.66164,8a419174230ffff,Cinnabar,Kanto
3,2,2020-05-06,1500.0,1500.0,0.0,15.0,2,1,100.0,0.02,0.01,-9.4351,-167.11772,8a9a807200f7fff,Veridian,Kanto
4,2,2020-05-12,2500.0,2000.0,500.0,25.0,2,1,100.0,0.02,0.01,-9.4351,-167.11772,8a9a807200f7fff,Veridian,Kanto
5,4,2020-05-20,3200.0,3200.0,0.0,18.285714,2,2,175.0,0.011429,0.011429,-52.95717,-81.15251,8acf2ab9d74ffff,Pallet,Kanto


### Load

- For the load part we need `Writer` instances and a `Sink`.
- writers define where to load the data.
- The `Sink` gets the transformed data (feature set) and trigger the load to all the defined `writers`.
- `debug_mode` will create a temporary view instead of trying to write in a real data store.

In [12]:
from butterfree.load.writers import (
    HistoricalFeatureStoreWriter,
    OnlineFeatureStoreWriter,
)
from butterfree.load import Sink

writers = [HistoricalFeatureStoreWriter(debug_mode=True), OnlineFeatureStoreWriter(debug_mode=True)]
sink = Sink(writers=writers)

## Pipeline

- The `Pipeline` entity wraps all the other defined elements.
- `run` command will trigger the execution of the pipeline, end-to-end.

In [13]:
from butterfree.pipelines import FeatureSetPipeline

pipeline = FeatureSetPipeline(source=source, feature_set=feature_set, sink=sink)

In [14]:
result_df = pipeline.run()

  f"The column name {self.name} "
  f"The column name {self.name} "


### Showing the results

In [15]:
spark.table("historical_feature_store__house_listings").orderBy(
    "id", "timestamp"
).toPandas()

Unnamed: 0,id,timestamp,rent,rent__avg_over_2_events_row_windows,rent__stddev_pop_over_2_events_row_windows,rent_over_area,bedrooms,bathrooms,area,bedrooms_over_area,bathrooms_over_area,latitude,longitude,lat_lng__h3_hash__10,city,region,year,month,day
0,1,2020-05-01,1300.0,1300.0,0.0,26.0,1,1,50.0,0.02,0.02,73.44489,31.7503,8a011c942b5ffff,Cerulean,Kanto,2020,5,1
1,1,2020-05-05,2000.0,1650.0,350.0,40.0,1,1,50.0,0.02,0.02,73.44489,31.7503,8a011c942b5ffff,Cerulean,Kanto,2020,5,5
2,2,2020-05-06,1500.0,1500.0,0.0,15.0,2,1,100.0,0.02,0.01,-9.4351,-167.11772,8a9a807200f7fff,Veridian,Kanto,2020,5,6
3,2,2020-05-12,2500.0,2000.0,500.0,25.0,2,1,100.0,0.02,0.01,-9.4351,-167.11772,8a9a807200f7fff,Veridian,Kanto,2020,5,12
4,3,2020-05-20,3000.0,3000.0,0.0,20.0,2,2,150.0,0.013333,0.013333,29.73043,117.66164,8a419174230ffff,Cinnabar,Kanto,2020,5,20
5,4,2020-05-20,3200.0,3200.0,0.0,18.285714,2,2,175.0,0.011429,0.011429,-52.95717,-81.15251,8acf2ab9d74ffff,Pallet,Kanto,2020,5,20


In [16]:
spark.table("online_feature_store__house_listings").orderBy("id", "timestamp").toPandas()

Unnamed: 0,id,timestamp,rent,rent__avg_over_2_events_row_windows,rent__stddev_pop_over_2_events_row_windows,rent_over_area,bedrooms,bathrooms,area,bedrooms_over_area,bathrooms_over_area,latitude,longitude,lat_lng__h3_hash__10,city,region
0,1,2020-05-05,2000.0,1650.0,350.0,40.0,1,1,50.0,0.02,0.02,73.44489,31.7503,8a011c942b5ffff,Cerulean,Kanto
1,2,2020-05-12,2500.0,2000.0,500.0,25.0,2,1,100.0,0.02,0.01,-9.4351,-167.11772,8a9a807200f7fff,Veridian,Kanto
2,3,2020-05-20,3000.0,3000.0,0.0,20.0,2,2,150.0,0.013333,0.013333,29.73043,117.66164,8a419174230ffff,Cinnabar,Kanto
3,4,2020-05-20,3200.0,3200.0,0.0,18.285714,2,2,175.0,0.011429,0.011429,-52.95717,-81.15251,8acf2ab9d74ffff,Pallet,Kanto


- We can see that we were able to create all the desired features in an easy way
- The **historical feature set** holds all the data, and we can see that it is partitioned by year, month and day (columns added in the `HistoricalFeatureStoreWriter`)
- In the **online feature set** there is only the latest data for each id