In [2]:
# ! pip install pyspark
# ! pip install fugue

In [3]:
import pandas as pd
from typing import Dict

input_df = pd.DataFrame({"id":[0,1,2], "value": (["A", "B", "C"])})
mapping = {"A": "Apple", "B": "Banana", "C": "Carrot"}

def map_letter_to_food(df: pd.DataFrame, mapping: Dict) -> pd.DataFrame:
    df["food"] = df["value"].map(mapping)
    return df

In [4]:
from fugue import transform
from pyspark.sql import SparkSession

spark_session = SparkSession.builder.getOrCreate()

df = transform(input_df,
               map_letter_to_food,
               schema="*, food:str",
               params=dict(mapping=mapping),
               engine=spark_session
               )
df.show()

+---+-----+------+
| id|value|  food|
+---+-----+------+
|  0|    A| Apple|
|  1|    B|Banana|
|  2|    C|Carrot|
+---+-----+------+



In [5]:
from typing import List, Dict, Any, Iterable

def map_letter_to_food2(df: List[Dict[str,Any]], mapping: Dict) -> Iterable[Dict[str,Any]]:
    for row in df:
        row["food"] = mapping[row["value"]]
        yield row

def map_letter_to_food3(df: List[List[Any]], mapping: Dict) -> List[List[Any]]:
    for row in df:
        row.append(mapping[row[1]])
    return df

def map_letter_to_food4(df: List[List[Any]], mapping: Dict) -> pd.DataFrame:
    for row in df:
        row.append(mapping[row[1]])
    df = pd.DataFrame.from_records(df, columns=["id", "value", "food"])
    return df

In [6]:
import pandas as pd
from pyspark.sql import SparkSession
from fugue.workflow import FugueWorkflow

data = pd.DataFrame({'col1': [1,2,3], 'col2':[2,3,4]})

def make_new_col(df: pd.DataFrame) -> pd.DataFrame:
    df['col3'] = df['col1'] + df['col2']
    return df

spark_session = SparkSession.builder.getOrCreate()
dag = FugueWorkflow()
df = dag.df(data)
df = df.transform(make_new_col, schema="*, col3:int")
dag.run(spark_session)
df.result.show()

SparkDataFrame
col1:long|col2:long|col3:int
---------+---------+--------
1        |2        |3       
2        |3        |5       
3        |4        |7       
Total count: 3



In [9]:
from fugue import fsql

fsql("""SELECT col1, col2 
          FROM data 
     TRANSFORM USING make_new_col SCHEMA *,col3:int 
         PRINT""").run(spark_session)

SparkDataFrame
col1:long|col2:long|col3:int
---------+---------+--------
1        |2        |3       
2        |3        |5       
3        |4        |7       
Total count: 3



FugueWorkflowResult()