In [1]:
- title: The Virtues of a Programming your ETL Pipeline using a Fluent API
- author: Jesus Caro
- date: 2024-01-10
- category: python
- tags: docker, vscode, github, codespaces, development containers, poetry, venv
- Subcells: [3,None]

SyntaxError: leading zeros in decimal integer literals are not permitted; use an 0o prefix for octal integers (1287805114.py, line 3)

In [2]:
from pyspark.sql import SparkSession
from fablr.dataframes import Fablr
from datetime import datetime as dt
from fablr.sample_assets.artists import artists as artists_list
spark = SparkSession.builder.getOrCreate()
fablr = Fablr()
fablr.set_seed(123)


In [9]:
### Create dataframes
date_format = '%Y-%m-%d'
users_dict = {
  'user_id': {'provider':'random_int', 'kwargs':{"min":0, "max": 2e5}},
  'first_name': {'provider': 'first_name', 'kwargs': {}},
  'last_name': {'provider': 'last_name', 'kwargs': {}},
  'email': {'provider': 'email', 'kwargs': {}},
  'last_login': {'provider': 'date_between_dates',
                 'kwargs': {'date_start': dt.strptime('2023-01-01', date_format), 'date_end': dt.strptime('2023-12-01', date_format)}},
  'subscription_tier' : {'provider': 'sample_list',
                         'kwargs':{'list':["Prime", "Standard"], 'unique': False}}
}
users = fablr.generate_dataframe(10000, users_dict, primary_keys=['user_id'])


events_dict = {
    'event_id': {'provider':'bothify', 'kwargs':{'text': 'eid_????-#####'}},
    'event_name': {'provider': 'sample_list', 'kwargs': {'list': artists_list}},
    'event_date': {'provider': 'date_between_dates',
                 'kwargs': {'date_start': dt.strptime('2019-01-01', date_format), 'date_end': dt.strptime('2023-12-01', date_format)}},
    'event_location': {'provider': 'city', 'kwargs': {}},
    'event_result': {'provider': 'sample_list', 'kwargs': {'list': ['succesfull']*91 + ['cancelled']*9}},
}
events = fablr.generate_dataframe(800, events_dict)

tickets_dict = {
    'ticket_no': {'provider':'random_int', 'unique': True, 'kwargs':{"min":0, "max": 1e10}},
    'event_id': {'provider':'sample_dataframe', 'kwargs': {'df': events, 'column': 'event_id'}},
    'user_id': {'provider':'sample_dataframe', 'kwargs': {'df': users, 'column': 'user_id'}},
    'total_charged': {'provider': 'random_float', 'kwargs': {'min': 20, 'max': 800}},
    'surcharge': {'provider': 'sample_list', 'kwargs': {'list': [5.99]*20 + [11.99]*15 + [0]*15}},
}

tickets = fablr.generate_dataframe(100000, tickets_dict)

users_df = spark.createDataFrame(users)
events_df = spark.createDataFrame(events).withColumn('event_date', events_df['event_date'].cast('date'))
tickets_df = spark.createDataFrame(tickets)


      user_id   first_name  last_name                       email  last_login  \
0       23356      Kristen      Davis   michaelholmes@example.org  2023-08-14   
1       49263         John   Campbell          qdixon@example.net  2023-01-22   
2      101846     Kimberly    Mcclain     christina51@example.com  2023-01-22   
3      155790      Shannon    Johnson       richard39@example.com  2023-04-02   
4       45003        Ricky     Harper     lanerebecca@example.org  2023-09-07   
...       ...          ...        ...                         ...         ...   
9995   195033    Gabrielle      Kline    leechristian@example.org  2023-02-01   
9996   115447  Christopher   Robinson    nicholashart@example.com  2023-09-20   
9997   195040     Jennifer     Miller   ramirezdaniel@example.com  2023-07-12   
9998   185801        Linda  Wilkinson     gibsonjason@example.net  2023-09-06   
9999    64472        Ariel      Lewis  keithschroeder@example.org  2023-06-03   

     subscription_tier  
0 

  df = df.append(
  df = df.append(
  df = df.append(


           event_id             event_name  event_date  event_location  \
0    eid_ykjP-68711              Bob Dylan  2020-02-19   Hooperchester   
1    eid_Todl-21745                   Muse  2023-05-04     New Carolyn   
2    eid_ZatJ-84380        The Stone Roses  2021-06-15    Douglasburgh   
3    eid_DuHa-46969              New Order  2019-10-23     Marquezberg   
4    eid_izHt-43743                  Queen  2021-06-18   Christinebury   
..              ...                    ...         ...             ...   
795  eid_JuOH-18681                   Blur  2021-06-15     Fisherhaven   
796  eid_DgHZ-43911             The Eagles  2021-06-21       Millstown   
797  eid_WcUT-41297                   KISS  2023-01-03  New Kellyshire   
798  eid_rfmg-04567           Led Zeppelin  2020-07-21   Fernandezberg   
799  eid_rUct-34589  Red Hot Chili Peppers  2020-09-07  East Geneburgh   

    event_result  
0      cancelled  
1      cancelled  
2     succesfull  
3     succesfull  
4      cancelled

AnalysisException: Resolved attribute(s) event_date#14 missing from event_id#266,event_name#267,event_date#268,event_location#269,event_result#270 in operator !Project [event_id#266, event_name#267, cast(event_date#14 as date) AS event_date#276, event_location#269, event_result#270]. Attribute(s) with the same name appear in the operation: event_date. Please check if the right attribute(s) are used.;
!Project [event_id#266, event_name#267, cast(event_date#14 as date) AS event_date#276, event_location#269, event_result#270]
+- LogicalRDD [event_id#266, event_name#267, event_date#268, event_location#269, event_result#270], false


### Introduction: The Fluent Interface Pattern

This week marked a new chapter in my career. I started a new job as a Data Engineer for Synaptiq building ETL pipelines that feed into ML and AI models. Looking back at my (nearly) year long career at Sequoia (A First American Startup),
I cant help but feel grateful for the opportunity to have worked with such a talented team. I learned a lot about Data Engineering. One of the most undervalues things I learned in that role 
came in the form of a design preference from my Team lead. Often he would ask me to write additional transforms and apply them in pipelines using the **Fluent API**. I googled it, and there is some literature on the topic, but it's 
a bit technical, and not really to the point. So I decided to write this post to explain what it is, and some benefits with leveraging this design by using a simple illustration in the form of a sample Pyspark pipeline. 





### What is a `Fluent Interface?`

A fluent interface is a design pattern that allows you to chain methods (and in our case transformations) together in a way that is easy to read and understand. Leveraging this design pattern can make your code more readable, and easier to maintain. Those are not the only benefits, using a fluent interface, along with "unit transformations" can make your code more testable. 

Let's look at an example. In the following example, I have created a simple data model for a ticket reseller. The data model is simple, and contains three tables:

- Users: The users table contains information about the users of the platform.
- Tickets: The tickets table contains information about the tickets that were purchased by each user.
- Events: The events table contains information about the events that the tickets are for.

An ERD for the data model is shown below:

![ERD](images/fluent_api/hot_tickets_erd.png)

In this example, we'll be using building our ETL pipeline in pyspark. Pyspark has a neat method called `.transform()` that allows you to apply a transform to a dataframe. This method returns a new dataframe, and allows you to chain transformations together. First, let's print out our dataframes to see what we're working with.

In [None]:
print("Users dataframe")
users_df.show(3, False)
print("Tickets dataframe")
tickets_df.show(3, False)
print("Events dataframe")
events_df.show(3, False)

Users dataframe
+-------+----------+---------+--------------------+----------+-----------------+
|user_id|first_name|last_name|email               |last_login|subscription_tier|
+-------+----------+---------+--------------------+----------+-----------------+
|13726  |Joseph    |Mack     |daniel06@example.net|2023-08-07|Standard         |
|89338  |Melissa   |Moore    |ann32@example.org   |2023-06-19|Prime            |
|156657 |Nicholas  |Smith    |cgarcia@example.org |2023-09-17|Prime            |
+-------+----------+---------+--------------------+----------+-----------------+
only showing top 3 rows

Tickets dataframe
+----------+--------------+-------+-------------+---------+
|ticket_no |event_id      |user_id|total_charged|surcharge|
+----------+--------------+-------+-------------+---------+
|9957101488|eid_MPlU-54882|118225 |513.25       |11.99    |
|967991971 |eid_JvhR-30831|141011 |250.34       |0.0      |
|5662180548|eid_TqKz-24766|168873 |46.43        |0.0      |
+----------+--


Our task is to build an ETL pipeline that will return the number of tickets sold for events between two dates, as well as the total revenue generated for those events, by subscription tier.

Let's start by listing the transformations, so that we can build our ETL pipeline in a fluent way.

- 1.) Filter events between two dates, that were marked `successful` and not cancelled (these were refunded)
- 2.) Join events and tickets on event_id to grab the total charge and surcharge.
- 3.) Join events and users on user_id to grab the subscription tiers.
- 4.) Group by events, and subscription tier and sum to get the total revenue and surcharge for subscription tier `standard`.

In [108]:
from pyspark.sql import DataFrame
from pyspark.sql.types import DateType
import pyspark.sql.functions as F

def filter_events(start_date, end_date, result) -> DataFrame:
    def _df(df) -> DataFrame:
        return (
                df.filter(F.col('event_date')
                          .between(start_date, end_date))
                  .filter(F.col('event_result') == 'succesfull')
        )
    return _df

def agg_revenue(df: DataFrame) -> DataFrame:
    return df.groupBy(['event_id','event_name', 'subscription_tier']).agg({'total_charged': 'sum', 'surcharge': 'sum'})

In [111]:
events_df.transform(filter_events(start_date='2021-01-01', end_date='2021-12-31'))

DataFrame[event_id: string, event_name: string, event_date: date, event_location: string, event_result: string]

In [None]:

def get_revenue(
    events: DataFrame,
    tickets: DataFrame,
    users: DataFrame,
    start_date: DateType,
    end_date: DateType,
    event_result: str,
    subscription_tier: str) -> DataFrame:
    
    events.transform()

In [58]:
def _df():
    return "hello"

In [59]:
_df

<function __main__._df()>