# Fugue demo 

## What is fugue?

Fugue is an open-source abstraction layer that was created to provide a seamless transition from a single machine to a distributed compute setting. 

With Fugue, users can code their logic in native Python, Pandas, or SQL, and Fugue will adapt it for execution in a Spark or Dask distributed engine.

## Fugue benefits

* Cross-framework code
* Rapid interactions
* Friendly interface for Spark
* Easily testable code

## Fugue drawbacks

* Adds a new dependecy to your app
* Distributed engines become a black box
* Harder to debug
* Easily testable code

## A simple Fugue example

In Fugue, the construction of workflow and the construction of ExecutionEngines can be totally decoupled.

A Fugue workflow is static, it is just the description of your logic flow, and it is independent from execution. When you finish defining the workflow, you can also choose an ExecutionEngine for the end to end execution.

### Generate some test data

In [None]:
from typing import List, Dict, Any

In [None]:
import random

import pandas as pd

random.seed(1234)

def generate_hotels(num: int, iso2_codes: List[str]) -> pd.DataFrame:
    """ Generates a hotel list with hotel id, country code and number of stars """
    data = {
        "cluster_id": [f"c_{i}" for i in range(num)],
        "iso2_country": [random.choice(iso2_codes) for i in range(num)],
        "stars": [random.randint(1, 5) for i in range(num)]
    }
    return pd.DataFrame(data)

In [None]:
iso2_code_map = {"ES": "Spain", "IT": "Italy", "FR": "France", "DE": "Germany", "AT": "Austria"}

In [None]:
num = 40
hotels_df = generate_hotels(num, list(iso2_code_map.keys()))
hotels_df.sort_values(by=["iso2_country", "stars"]).head(num)

### Define the business logic

In [None]:
#schema: cluster_id: str, iso2_country: str, stars: int
def filter_by_countries_py(data: List[Dict[str, Any]], countries: List[str]) -> List[Dict[str, Any]]:
    return [row for row in data if row["iso2_country"] in countries]

# schema: cluster_id: str, iso2_country: str, stars: int, country: str
def find_country_name_py(data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    for row in data:
        row['country'] = iso2_code_map.get(row["iso2_country"])
    return data

### And the workflow

In [None]:
from fugue import FugueWorkflow

countries = ["AT", "DE"]

dag = FugueWorkflow()
df = dag.df(hotels_df.copy())
filtered_df = df.process(filter_by_countries_py, params={"countries": countries})
enriched_df = filtered_df.process(find_country_name_py)
enriched_df.show(num)

In [None]:
dag.run()

In [None]:
from fugue import FugueWorkflow

countries = ["AT", "DE"]

with FugueWorkflow() as dag:
    df = dag.df(hotels_df.copy())
    filtered_df = df.process(filter_by_countries_py, params={"countries": countries})
    enriched_df = filtered_df.process(find_country_name_py)
    enriched_df.show(num)

### Now execute the workflow in a Spark cluster

In [None]:
import os

import pyspark.sql
from pyspark.sql import functions as F, types as T

ENV_PATH = "hdfs:///user/javiers/fugue_demo/support/fugue_conda_env.tar.gz#cluster_venv"

os.environ["PYSPARK_PYTHON"] = "./cluster_venv/bin/python"

spark = pyspark.sql.SparkSession \
    .builder.master('yarn') \
    .appName('demo') \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.hive.metastore.version", "1.2.1") \
    .config("spark.sql.hive.metastore.jars", "/opt/hive-1.2-jars/*") \
    .config("spark.executor.instances", '2') \
    .config("spark.executor.cores", '1') \
    .config("spark.yarn.dist.archives", ENV_PATH) \
    .enableHiveSupport() \
    .getOrCreate()
    
    #.config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    
spark.sparkContext.setLogLevel("WARN")

In [None]:
from fugue import FugueWorkflow
from fugue_spark import SparkExecutionEngine

countries = ["AT", "DE"]

engine = SparkExecutionEngine(spark)

with FugueWorkflow(engine) as dag:
    df = dag.df(hotels_df.copy())
    filtered_df = df.process(filter_by_countries_py, params={"countries": countries})
    enriched_df = filtered_df.process(find_country_name_py)
    enriched_df.show(num)

### An alternative implementation using pandas

In [None]:
def filter_by_countries(df: pd.DataFrame, countries: List[str]) -> pd.DataFrame:
    """ Filter hotels in given country code list """
    return df[df.iso2_country.isin(countries)]

def find_country_name(df: pd.DataFrame) -> pd.DataFrame:
    """ Set the country name associated to hotel country code """
    df['country'] = df.iso2_country.apply(lambda code: iso2_code_map.get(code))
    return df

In [None]:
from fugue import FugueWorkflow
from fugue_spark import SparkExecutionEngine

countries = ["AT", "DE"]

engine = SparkExecutionEngine(spark)

with FugueWorkflow(engine) as dag:
    df = dag.df(hotels_df.copy())
    filtered_df = df.process(filter_by_countries, params={"countries": countries})
    enriched_df = filtered_df.process(find_country_name)
    enriched_df.show(num)

## Main syntax options

### Method 1: Native approach

In [None]:
from fugue import FugueWorkflow


def find_country_name_py(data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    for row in data:
        row['country'] = iso2_code_map.get(row["iso2_country"])
    return data


with FugueWorkflow() as dag:
    df = dag.df(hotels_df.copy())
    enriched_df = df.process(
        find_country_name_py, schema="cluster_id: str, iso2_country: str, stars: int, country: str"
    )
    enriched_df.show()

### Method 2: Schema Hint

In [None]:
from fugue import FugueWorkflow


# schema: cluster_id: str, iso2_country: str, stars: int, country: str
def find_country_name_py(data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    for row in data:
        row['country'] = iso2_code_map.get(row["iso2_country"])
    return data


with FugueWorkflow() as dag:
    df = dag.df(hotels_df.copy())
    enriched_df = df.process(find_country_name_py)
    enriched_df.show()

### Method 3: Decorator

In [None]:
from fugue import processor


@processor("cluster_id: str, iso2_country: str, stars: int, country: str")
def find_country_name_py(data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    for row in data:
        row['country'] = iso2_code_map.get(row["iso2_country"])
    return data


with FugueWorkflow() as dag:
    df = dag.df(hotels_df.copy())
    enriched_df = df.process(find_country_name_py)
    enriched_df.show()

## A more complex workflow using built-in operators

In [None]:
hotels_path = "hdfs:///user/javiers/fugue_demo/data/hotels.csv"
hotels_schema = "cluster_id: str, iso2_country: str, stars: int, trust_score: int"

prices_path = "hdfs:///user/javiers/fugue_demo/data/prices.csv"
prices_schema = "cluster_id: str, avg_price: int"

output_path = "hdfs:///user/javiers/fugue_demo/output/at_de_hotels_with_prices.csv"

In [None]:
def filter_by_countries(df: pd.DataFrame, countries: List[str]) -> pd.DataFrame:
    return df[df.iso2_country.isin(countries)]


def find_country_name(df: pd.DataFrame) -> pd.DataFrame:
    df['country'] = df.iso2_country.apply(lambda x: iso2_code_map.get(x))
    return df

In [None]:
from fugue import FugueWorkflow
from fugue_spark import SparkExecutionEngine


engine = SparkExecutionEngine(spark)

with FugueWorkflow(engine) as dag:
    
    my_hotels_df = dag.load(hotels_path, header=True, columns=hotels_schema)
    
    my_hotels_df = my_hotels_df.process(filter_by_countries, params={"countries": ["AT", "DE"]})
    my_hotels_df = my_hotels_df.process(find_country_name)
    
    prices_df = dag.load(prices_path, header=True, columns=prices_schema)
    
    my_hotels_with_price_df = my_hotels_df \
        .left_outer_join(prices_df, on=["cluster_id"])
    
    final_df = my_hotels_with_price_df \
        .rename({"cluster_id": "hotel_id"}) \
        .drop(["iso2_country"]) \
        .persist()
    
    final_df[["hotel_id", "country", "avg_price"]] \
        .save(output_path, mode="overwrite", header=True)

In [None]:
spark.read.csv(output_path, header=True, 
               schema="`cluster_id` STRING, `country` STRING, `avg_price` INT"
              ).show(20)

Note that `load` method only allows CSV, JSON and PARQUET formats!

## Extensions

There are only 3 types of nodes in Fugue workflow. They are also called driver side extensions.

<img src="../images/nodes.svg" width="500">

* **Creator**: no input, outputs a single output dataframe
* **Processor**: one or multiple input dataframes, outputs a single output dataframe
* **Outputter**: one or multiple input dataframes, no output

All these nodes/extensions work on the whole dataset and they are **ExecutionEngine aware**.

There are two special types of Processors: **Transformer** and **CoTransformer**. They are special because they are **NOT ExecutionEngine aware**, and they work on partition level

<img src="../images/transformers.svg" width="300">

* **Transformer**: single dataframe in, single dataframe out
* **CoTransformer**: one or multiple dataframes in, single dataframe out

The inputs and outputs for the extensions have to be Fugue DataFrames.

We orchestrate the extensions to generate a Fugue Workflow.

### Creator

In [None]:
from datetime import date
from dateutil.rrule import rrule, DAILY
from typing import List, Any

from fugue import FugueWorkflow


#schema: a:str
def create_dates(start: date, end: date) -> List[List[Any]]:
    return [ [dt.strftime("%Y-%m-%d")] for dt in rrule(DAILY, dtstart=start, until=end)]


with FugueWorkflow() as dag:
    start_date = date(2021, 9, 1)
    end_date = date(2021, 9, 15)
    
    df = dag.create(create_dates, params={"start": start_date, "end": end_date})
    df.show(20)

#### Accessing the distributed engine API

In [None]:
from fugue import ExecutionEngine, DataFrame
from fugue_spark import SparkExecutionEngine, SparkDataFrame


def read_from_hive(e: ExecutionEngine, num_rows: int) -> DataFrame:
    
    assert isinstance(e, SparkExecutionEngine)
    
    query = f"SELECT cluster_id, iso2_country FROM hotel4x.hotel LIMIT {num_rows}"
    
    sdf= e.spark_session.sql(query)
    return SparkDataFrame(sdf) # has to return a Fugue SparkDataFrame


def read_orc(e: ExecutionEngine) -> DataFrame:
    
    assert isinstance(e, SparkExecutionEngine)
    
    path = "fugue_demo/data/hotels.orc"
    
    sdf= e.spark_session.read.orc(path).limit(5)
    return SparkDataFrame(sdf) # has to return a Fugue SparkDataFrame


engine = SparkExecutionEngine(spark)

with FugueWorkflow(engine) as dag:
    dag.create(read_from_hive, params={"num_rows":5}).show()
    dag.create(read_orc).show()

### Processor

In [None]:
from typing import Iterable, Dict, Any, List
from fugue import DataFrames, DataFrame
import pandas as pd


def concat(dfs:DataFrames) -> pd.DataFrame:
    pdfs = [df.as_pandas() for df in dfs.values()]
    return pd.concat(pdfs).reset_index(drop=True)

with FugueWorkflow() as dag:
    df1 = dag.df([[0,1]], "one:int, two:int")
    df2 = dag.df([[1,1],[1,2]], "one:int, two:int")
    df3 = dag.df([[2,1], [2,2], [2,3]], "one:int, two:int")
    
    df = dag.process(df1, df2, df3, using=concat)
    df.show()

### Outputter

In [None]:
from fugue import DataFrame
import pandas as pd

def save_unique(df1: DataFrame, df2: DataFrame, path: str) -> None:
    unique_df = pd.concat([df1.as_pandas(), df2.as_pandas()]) \
        .reset_index(drop=True) \
        .drop_duplicates() \
        .to_csv(path, header=True, index=False)

path = "../output/outputter_example.csv"    

with FugueWorkflow() as dag:
    df1 = dag.df([[1,1], [2,3]], "one:int, two:int")
    df2 = dag.df([[1,1],[1,2]], "one:int, two:int")
    
    dag.output(df1, df2, using=save_unique, params={"path": path})

In [None]:
!cat ../output/outputter_example.csv

### Transformer

Transformer represents the logic unit executing on logical partitions of the input dataframe. The partitioning logic is not a concern of Transformer, it should be specified in a previous step.

Transformer and Co-Transformer require users to be explicit on the output schema. 

To make it easier, `*` can represent the input dataframe schema, so `*, b:int` means the output will have an additional column `b`. Only transformers we can use this special [syntax](https://triad.readthedocs.io/en/latest/api/triad.collections.html#triad.collections.schema.Schema.transform)

In [None]:
#schema: *-cluster_id, +c: int
def aggregate_by_country_and_stars(df: pd.DataFrame) -> pd.DataFrame:
    agg_df = df.groupby(["iso2_country", "stars"]).count().rename(columns={"cluster_id": "c"})
    agg_df.reset_index(inplace=True)
    return agg_df.sort_values(["iso2_country", "stars"])

In [None]:
from fugue import FugueWorkflow


with FugueWorkflow() as dag:
    df = dag.df(hotels_df.copy())
    df = df.transform(aggregate_by_country_and_stars)
    df.show(num)

In [None]:
from fugue import FugueWorkflow
from fugue_spark import SparkExecutionEngine


engine = SparkExecutionEngine(spark)

with FugueWorkflow(engine) as dag:
    df = dag.df(hotels_df.copy())
    df = df.transform(aggregate_by_country_and_stars)
    df.show(num)

In [None]:
from fugue import FugueWorkflow
from fugue_spark import SparkExecutionEngine

engine = SparkExecutionEngine(spark)

with FugueWorkflow(engine) as dag:
    df = dag.df(hotels_df.copy())
    df = df.partition(by=["iso2_country"]).transform(aggregate_by_country_and_stars)
    df.show(num)

### Co-transformer

In [None]:
from datetime import datetime, timedelta


cities = ["Munich", "Cluj", "Madrid"]
start_date = datetime(2021, 5, 1)

def generate_temp(num: int, base_temp: 0) -> pd.DataFrame:
    """ Generate a dataframe with random temperatures for different cities and dates """

    data = {
        "city": [random.choice(cities) for i in range(num)],
        "date": [(start_date + timedelta(days=i)).strftime("%Y-%m-%d") for i in range(num)],
        "temp": [base_temp + random.randint(-5, 5) for i in range(num)]
    }
    return pd.DataFrame(data)


# schema: city: str, avg_min: float, avg_max: float 
def calculate_avg_temps(min_temp_df: pd.DataFrame, max_temp_df: pd.DataFrame) -> pd.DataFrame:
    
    avg_min_temp_df = min_temp_df \
        .groupby("city").mean() \
        .rename(columns={"temp": "avg_min"}) \
        .reset_index(level=0)
    avg_max_temp_df = max_temp_df \
        .groupby("city").mean() \
        .rename(columns={"temp": "avg_max"}) \
        .reset_index(level=0)
    
    return avg_min_temp_df.merge(avg_max_temp_df)

In [None]:
from fugue import FugueWorkflow

with FugueWorkflow() as dag:
    min_temp_df = dag.create(generate_temp, params={"num": 25, "base_temp": 4})
    max_temp_df = dag.create(generate_temp, params={"num": 25, "base_temp": 15})
    
    # Both dataframes have to be equally partitioned
    final_df = min_temp_df.zip(
        max_temp_df, 
        how="inner", 
        partition={"by": ["city"]}
    ).transform(calculate_avg_temps)
    
    final_df.show()

## FugueSQL

In [None]:
from fugue_sql import FugueSQLWorkflow

def find_country_name(df: pd.DataFrame) -> pd.DataFrame:
    df['country'] = df.iso2_country.apply(lambda code: iso2_code_map.get(code))
    return df


with FugueSQLWorkflow() as dag:
    df = dag.df(hotels_df.copy())
    
    country_code = "AT"
    output_path = "/usr/local/trustyou/home/javiers/sandbox/fugue/output/at_hotels_from_sql.csv"
    
    dag("""
        SELECT * FROM df WHERE iso2_country='{{country_code}}'
        TRANSFORM USING find_country_name() SCHEMA *, country: str
        PRINT
        SAVE OVERWRITE SINGLE CSV "{{output_path}}"
    """)

In [None]:
!cat ../output/at_hotels_from_sql.csv

### FugueSQL notebook extension

In [None]:
from fugue_notebook import setup
setup()

In [None]:
country_code = "DE"
output_path = "/usr/local/trustyou/home/javiers/sandbox/fugue/output/de_hotels_from_sql.csv"

df = hotels_df.copy()

In [None]:
%%fsql

SELECT * FROM df WHERE iso2_country='{{country_code}}'
TRANSFORM USING find_country_name() SCHEMA *, country: str
PRINT
SAVE OVERWRITE SINGLE CSV "{{output_path}}"

In [None]:
!cat ../output/de_hotels_from_sql.csv

## Resources

### Official resources
* [Documentation](https://fugue.readthedocs.io/en/latest/)
* [Tutorials](https://fugue-project.github.io/tutorials/index.html)
* [Code repository](http://github.com/fugue-project/fugue)

### Articles
* https://jameskle.com/writes/fugue
* https://databricks.com/session_na20/fugue-unifying-spark-and-non-spark-ecosystems-for-big-data-analytics
