# Introduction to Data Pipelines

In this session, we will explore the fundamentals of building **data pipelines** in Python using **PySpark** with a focus on **reproducibility**, **reliability**, and **efficient** workflow orchestration


## What is a data pipeline?

A data pipeline is a series of processes that automate the movement and transformation of data from one or more sources to a final destination, where it can be used for analysis. A data pipeline ensures the reliable, reproducible, and efficient flow of data through different stages, while avoiding redundant processing and supporting scalability.

## Why data pipelines?

Raw data often comes from multiple sources, which can be messy and inconsistent. Data pipelines are crucial in this context because they automate and standardize the process of Extracting, Transforming, and Loading data, making it ready for analysis or or other downstream purposes. 

Consider the cases when you obtain data from an Organization/ Government for a specific year. You work through cleaning the data and produce some analysis. The next year new data comes in, potentially similar but with minor modiifcations, you should be able to reuse the work from the previous year and make consistent decisions on how the data is to be treated. These are the scenarios where data pipelines are most useful.  They are crucial for automating the extraction, cleaning, transformation, and export of data, enabling more **efficient**, **reliable**, and **repeatable** data workflows. 

In addition to this, we maintain a clear **lineage** of data -- which makes transparent the changes made to the raw data and what transformations have been applied to obtain the final dataset. In databricks it is possible to configure controls such that the lineage of each column in a table can be traced schematically. 




## Do we already use data pipelines? Or are they completely new?

In the code snippet below, we obtain the some subnational population data by making a requests call to a specific url from census.gov. Then, we manipulate this data using pandas to get it in the form we require and retaining the relevant columns. Finally, we *save* this data. This code is taken from [here](subnational_population.ipynb).

In [None]:
database_name = "data_pipelines_tutorial"
URL = 'https://www2.census.gov/programs-surveys/international-programs/tables/time-series/prh/kenya.xlsx'

df_raw = pd.read_excel(URL, sheet_name='2000 - 2040', skiprows=2, header=None)
# Read correct header
header = df_raw.iloc[1]
df_raw.columns = header
df_raw = df_raw.drop([0,1,2])

# Extract Total population columns 
df_pop_wide = df_raw[df_raw.ADM_LEVEL==1][['CNTRY_NAME', 'ADM1_NAME']+[x for x in header if 'BTOTL' in x]]
df_pop = pd.melt(df_pop_wide, id_vars=['CNTRY_NAME', 'ADM1_NAME'], var_name='year', value_name='population')
df_pop['year'] = df_pop['year'].str.extract(r'(\d+)').astype(int)
df_pop.columns = ['country_name', 'adm1_name', 'year', 'population']

# Modifications to the admin1 and county name and add data_source
name_standardization = {
    ''
}
df_pop['country_name'] = df_pop['country_name'].str.title()
df_pop['adm1_name'] = df_pop['adm1_name'].str.replace(r'[-/]+', ' ', regex=True).str.title()
df_pop = df_pop.astype({'year': 'int', 'population': 'int'})
df_pop = df_pop.sort_values(['adm1_name', 'year'], ignore_index=True).reset_index(drop=True)

# Save to subnational population table
spark.createDataFrame(df_pop).write.format("delta").mode("overwrite").saveAsTable(f"{database_name}.kenya_subnational_population")

## What are the building blocks of a Data Pipeline?

A data pipeline is a structured framework that enables the flow of data from source to destination, encompassing several key processes. Specific implementation may vary but the fundamental components of a data pipeline can be abstracted as follows:

**Data Ingestion:**

Collect data from various sources, such as databases, APIs, or files.This initial step involves loading raw data into the system. For example, reading data from an Excel file, a CSV file, or pulling data from an API. For illustration with an example we will ingest data from Excel files.

**ETL (Extract, Transform, Load):**

_Extract:_
Extract raw data that needs to be processed, often involving multiple formats and structures.

_Transform:_
This involves cleaning, filtering, enriching, and preparing the data for analysis. Typically this could include creating new columns or aggregating data as well.
Transformation ensures that the data is useful, and reliable.

_Load:_
After transformation, the cleaned data is saved into a destination storage system (like a database or a data lake) for visualization, or further processing.

In cases when compute resources are robust, the general order followed is **ELT**. Here the data is loaded in a slightly raw form and the transformations happen after loading the data into the compute infrastructure. This allows one to retain access the large compute resources and also store intermediate forms of the data. In this tutorial we will explore this format with a **medallion** style transformation where the data is treated in different _layers_ each corresponding to a cleaner/more transformed version, generally called *bronze*, *silver* and *gold*.

We will explore this in greater detail and practice constructing a pipline in the medallion architecture [below](#Medallion-Architecture)

**Orchestration:**

This stage manages the execution of the pipeline tasks and dependencies. Orchestration ensures that each task runs in the correct order and handles dependencies, enabling automation and scheduling of the entire pipeline process.

Think of each unit of code doing a specific/ collection of specific tasks as a **Task**. Then, Tasks can be linked to define the order in which evaluation of these tasks take place and the dependencies involved. This conceptual linking of tasks into an ordered graph (DAG) allows for complex evaluations to be handled in an orderly manner -- as well as on a schedule. The example below shows one such orchestration view for a specific project. Green implies that the task ran without errors and red implies an error. This allows for easy debuggin and maintainance. 

 In this tutorial we will use the workflows feature in **databricks** for orchestration, although it's possible to consrtuct it in pySpark itself. The implementation for our sample data pipeline process is shown [below](#Orchestration).

**Monitoring and Logging:**

Implementing monitoring mechanisms to log the pipeline execution, detect failures, and ensure data quality throughout the process.

![](assets/pipeline-workflow.png)


The goal of the session is to help you understand how to create basic data pipelines to handle typical data processing tasks and how to modularize these steps to improve maintainability and scalability.

We will follow a structured approach, starting with data ingestion, then cleaning and transforming the data, and then exporting the processed data and aggregating it to a useful table. Finally, we will use the processed data to produce a plot, to illustrate a typical workflow. 

## Sample Data (Kenya public expenditure)

For this tutorial we will use the Publicly available Kenya BOOST expenditure dataset available [here](https://datacatalog.worldbank.org/search/dataset/0038086). This data contains spending information at the central and local levels and continues to be yearly updated, curated and disseminated by the Government of Kenya.


We will construct a data pipeline that will ingest this data, make certain modifications, enrich it by adding relevant columns and finally save the resulting tables which would be used to produce some plots to further summarize and understand this expenditure data

We will use some domain specific understanding of this data, but the process and logic behind the modifications are not domain dependent and illustrate the general principles of data pipeline construction. Transformation operations performed here are typical of the kind of operations encountered when working with raw data

Unnamed: 0,Year,Class,Vote Groups,National Government Votes & Counties,Sub Votes,Head/Dept,SubHead/SubDept,County Government Ministries,National/County (Geo1),Counties (Geo2),...,SOF2,SOF3,SOF4,Sector,Programme,Sub-programme,Initial Budget (Printed Estimate),Final Budget (Approved Estimate),Final Expenditure (Total Payment Comm.),Unnamed: 35
39293,2019-20,0 Recurrent Expenditure,"1 National Government Ministries, Departments ...","118 Ministry of East African Affairs, Commerce...",1185 State Department For Social Protection,11850012 Cash Transfers,1185001202 Cash Transfers - Field Services,,02 Counties,4610 Narok County,...,00 Domestic Resources,00001 Consolidated Fund,00001001 Exchequer ( GOK ),"09 Social Protection, Culture and Recreation",0909000000 National Social Safety Net,0909010000 Social Assistance to Vulnerable Groups,0,88326.0,0.0,
505287,2021-22,0 Recurrent Expenditure,"1 National Government Ministries, Departments ...","110 Ministry of Environm,ent, Water and Natura...",1109 Ministry of Water & Sanitation and Irriga...,11090022 Land Reclamation Services,1109002201 Land Reclamation Services - HQ,,01 National,0000 Nation-Wide,...,00 Domestic Resources,00001 Consolidated Fund,00001001 Exchequer ( GOK ),"10 Environment Protection, Water And Natural R...",1014000000 Irrigation and Land Reclamation,1014020000 Land Reclamation,22156,11078.0,0.0,
395201,2021-22,0 Recurrent Expenditure,"1 National Government Ministries, Departments ...",107 The National Treasury,1071 The National Treasury,10710019 District Treasuries Services,1071001901 Headquarters,,02 Counties,4660 Kajiado County,...,00 Domestic Resources,00001 Consolidated Fund,00001001 Exchequer ( GOK ),07 Public Administration And International Rel...,0718000000 Public Financial Management,0718040000 Accounting Services,0,633.0,633.0,
192100,2020-21,0 Recurrent Expenditure,"1 National Government Ministries, Departments ...",107 The National Treasury,1071 The National Treasury,10710019 District Treasuries Services,1071001901 Headquarters,,02 Counties,4710 Kericho County,...,00 Domestic Resources,00001 Consolidated Fund,00001001 Exchequer ( GOK ),07 Public Administration And International Rel...,0718000000 Public Financial Management,0718040000 Accounting Services,0,11681.0,11681.0,
248888,2020-21,0 Recurrent Expenditure,"1 National Government Ministries, Departments ...",108 Ministry of Health,1081 Ministry of Health,10810090 Kenya Expanded Programme Immunization,1081009001 Headquarters,,01 National,0000 Nation-Wide,...,00 Domestic Resources,00001 Consolidated Fund,00001001 Exchequer ( GOK ),04 Health,"0401000000 Preventive, Promotive & Reproductiv...",0401030000 Reproductive Maternal Neo-natal Chi...,900000,900000.0,460000.0,


Note that the data essentially consists of expenditure line items, tagged with columns representing an institutional hierarcy, geographic hierarchy, a program hierqarchy and finaly it includes budgeted, revised and executed amounts. 

## PySpark
PySpark is the Python interface for [Apache Spark](https://spark.apache.org/), a distributed computing framework used for processing large datasets. PySpark allows users to perform transformations at scale using the familiar Python syntax, while Spark handles the underlying distributed computation efficiently.

In these code snippets, the `spark` variable is a pre-configured SparkSession, which is the entry point for interacting with Spark and is available by default in databricks. It enables users to load data, run queries, and perform transformations across a cluster of machines, making it easy to process large datasets in parallel. This SparkSession also manages connections to the underlying cluster resources.

We can read data from the datalake (for which we have permission) as follows:

In [None]:
spark_df = spark.table('prd_corpdata.dm_operation_gold.project')
df = spark_df.toPandas()
df

In the above example, we have read the data (which you are already familiar with from the previous session) into a **spark dataframe**. Then, if we prefer to work with Pandas we can convert the spark dataframe into a **python dataframe** with the .to_Pandas() method. 

So, accessing and reading data from the data lake only takes a line of code.

# Medallion Architecture

In this tutorial we follow the medallion architecture which is an architecture type where we have a series of data layers that denote the quality of data stored (in the data lake).This architecture ensures consistency and isolation data passes through multiple layers of validations and transformations before being stored. The terms bronze (raw), silver (validated), and gold (enriched) refer to the quality of data at each respective layer.

The flow diagram below shows the flow of data through these layers, and we will illustrate this model using the Kenya BOOST data. 

These datasets at various stages are written to tables in the **Delta format** to the data lake. The Delta format allows for scalability, consistency during concurrent writes, data versioning and schea enforcement. 


![Medallion Flowchart](assets/medallion_flow.png)


## Bronze

The code snippet below demonstrates the bronze stage of a data pipeline following the medallion architecture. 

The full script is saved as bronze.py in this project folder. We will not run the following snippet but will illustrate the working of the code. 



Raw microdata is ingested from CSV files, cleaned by standardizing column names, and then stored in the bronze table using the optimized Delta format. Additionally, a database is created in Databricks SQL to manage the tables effectively. This step prepares the raw data for further processing in the pipeline.

In [0]:
bronze_df = (spark.read
             .format("csv")
             .options(**CSV_READ_OPTIONS)
             .option("inferSchema", "true")
             .load(Data_DIR))

# Clean column names by replacing spaces and special characters
for old_col_name in bronze_df.columns:
    new_col_name = old_col_name.replace(" ", "_").replace("(", "").replace(")", "").replace(",", "")
    bronze_df = bronze_df.withColumnRenamed(old_col_name, new_col_name)


# Create the database in Databricks SQL
spark.sql(f"CREATE DATABASE IF NOT EXISTS {database_name}")

# Save to bronze table (Databricks Delta format for optimization)
bronze_df.write.format("delta").mode("overwrite").saveAsTable(f"{database_name}.kenya_bronze")


## Silver

In the [silver](silver.ipynb) stage, we read the data produced in the bronze stage and transform and refine the data. 

In this stage, we append columns constructed from existing columns in the bronze table using the *.withColumn()* method. We construct columns representing where money is spent (geo1), who wwas responsible for the spending (admin0 and admin1), along with other columns like the functional category of the spending (func and func_sub). We also filter the data using the *.filter()* method to retain only the data related to financial execution.

We use a helper function to check if a line item needs to be classified as 'Recreation and Culture' or 'Housing' by checking for the presence of certain key terms within the *Programme_pro2* column. The actual conditions for classifying the line items is domain dependent but the transformations used here are illustrative of the typical transformations that one encounters in general workflows.

Finally, we write the transformed and enriched dataset to a delta table called silver to the datalake.



In [0]:
silver_df = (spark.read.table(f"{database_name}.kenya_bronze")
    # Here we use the filter method with a specific condition on the Class column to retain only the rows we need
     .filter(~col('Class').isin('2 Revenue', '4 Funds & Deposits (BTL)'))
    
        #... Other filter conditions follow
    
     # Here we use the withColumn method to define a new column -- admin2 using some condition on other existing columns   
     # We use a specific condition on the column called National_Government_Votes_&_Counties_adm2 and modifying the entries there
     # The regex_replace method helps manipulate the strings in the previously mentioned column to modify the string when a certain pattern is seen
    .withColumn('admin2', regexp_replace(col("National_Government_Votes_&_Counties_adm2"), '^[0-9\\s]*', ''))
    
         #... multiple other .withColumn conditions follow where we define some new columns
    
    # Here again we modify the column called Year and then construct the column year. 
    # We also use the cast method to set its data type
    .withColumn('year', concat(lit('20'), substring(col('Year'), -2, 2)).cast('int'))
             
    # Here again we use the withColumn method to construct a column called func_sub but with a more complicated set of rules
    .withColumn('func_sub',
                when((col('Sector_prog1').startswith('06') & 
                     (col('National_Government_Votes_&_Counties_adm2').startswith('102') |
                      col('National_Government_Votes_&_Counties_adm2').startswith('210') |
                      col('National_Government_Votes_&_Counties_adm2').startswith('215'))),
                     "public safety")
                .when(col('Sector_prog1').startswith('06'), "judiciary")
                .when(col('Programme_pro2').startswith('0401'), 'primary and secondary health')
                .when(col('Programme_pro2').startswith('0402'), 'tertiary and quaternary health')
                .when((col('Sector_prog1').startswith('05') & 
                      col('Programme_pro2').startswith('0501')), 'primary education')
                .otherwise('General public services'))
        #... Other modifications follow to construct and produce the new columns
             
# Save to silver table
silver_df.write.format("delta").mode("overwrite").saveAsTable(f"{database_name}.kenya_silver")

## Gold

In the gold stage of the pipeline (gold.py in the project folder), we finalize the dataset for analysis. The focus here is on filtering, selecting key columns, and ensuring the data is structured for consumption.

This stage refines the data into a ready-to-use form. We retain columns deemed necessary for the analysis, as the remaining columns are available in the silver table if required later.

In [0]:
gold_df = (spark.read.table(f"{database_name}.kenya_silver")
    .filter(col('year') != 2015) # Exclude 2015 data
    .withColumn('country_name', lit('Kenya'))
    .select('country_name',
            'year',
            col('Initial_Budget_Printed_Estimate').alias('approved').cast(DoubleType()),
            col('Final_Budget_Approved_Estimate').alias('revised'),
            col('`Final_Expenditure_Total_Payment_Comm.`').alias('executed'),
            'admin0',
            'admin1',
            'admin2',
            'geo1',
            'is_foreign',
            'func_sub'
           )
)

# Save to gold table
gold_df.write.format("delta").mode("overwrite").saveAsTable(f"{database_name}.kenya_gold")


## Aggregation and appending additional columns

In the above steps we cleaned the original source BOOST Kenya data through multiple stages. In addition to this, we might need external datasources to append to this data. In our example, we want to add information about per capita spending.

In this case, we want to get the subnational population from an external source (an API) and then aggregate our kenya_gold data to get a table containing the spending by *geo1* region and year. Since we referenced this very dataset [above](#do-we-already use-data-pipelines?-or-are-they-completely-new?), we will not go over the code again but it can be accessed [here](subnational_population.ipynb). Then we will append a new column called per_capita_spending to this aggregated table. We finally save this table so that these descriptive figures are readily available. 

### Aggregation
Finally, to get the aggregation table we read the kenya_gold table and kenya_subnational_population tables. Wre aggregate the kenya_gold table along the year, geo1 and func columns and join the population column to the aggregated table

We compute per_capita_spending and append this column to the aggregated table above. 

In [0]:
pop_df = spark.read.table(f"{database_name}.kenya_subnational_population").alias("pop_df")

agg_df = (spark.read.table(f"{database_name}.kenya_gold")
    .groupBy("country_name", "year", "geo1", "func",)
    .agg(sum("executed").alias("expenditure"))
).alias("agg_df")

agg_with_pop_df = (agg_df
    .join(pop_df, 
          (agg_df.country_name == pop_df.country_name) & 
          (agg_df.geo1 == pop_df.adm1_name) & 
          (agg_df.year == pop_df.year), 
          "inner")
    .select("agg_df.country_name", "agg_df.year", "agg_df.geo1", "agg_df.func", "agg_df.expenditure", "pop_df.population")
)

agg_with_pop_df = agg_with_pop_df.withColumn(
    'per_capita_spending',
    (col('expenditure') / col('population')).alias('per_capita_spending')
)

agg_with_pop_df.write.format("delta").mode("overwrite").saveAsTable(f"{database_name}.kenya_func_geo1_agg")


# Orchestration

Orchestration refers to the process of coordinating and automating the various stages of the pipeline—such as data extraction, transformation, and loading—to ensure smooth, sequential execution. This is equivalent to *connecting the individual jobs* so that they run in an organized manner keeping track of the dependencies between the jobs. In PySpark, this involves managing the flow of tasks.

Orchestration tools help **schedule** and manage these stages, ensuring that dependencies between tasks are handled correctly and that the pipeline runs efficiently. This ensures data moves through the pipeline in an automated manner, handling errors and retries where necessary.

We will use the databricks built-in *Workflows* for managing these dependencies and sceduling. 

![Medallion Flowchart](assets/kenya_workflow.png)


The above flow diagram of the jobs and dependencies between the jobs is obtained by adding the jobs and then adding the other jobs that the job at hand depends on.

Note that for silver.py script to run, we need the bronze.py to run since we need to read the bronze table in silver.py. This shows the dependence of silver.py on bronze.py. Similarly, we can get all the dependencies as shown in the flow diagram above. This forms a directed acyclic graph (DAG) which ensures that no circular dependence is possible. 

The above diagram shows the successful (manual) run of the workflow -- the success of each job indicated by the green line at the top edge of each box. This also makes debugging easier since we can identify the failure of a job within this workflow.


This saves the user the difficulty of running these scripts **manually**. In addition to this, the workflow can be scheduled so that the entire collection runs in the correct order. This scheduling can be done either manually (i.e., run the workflow manually-- not the individual jobs), or set to run at specific intervals, or triggered when there is a new file added to a specific folder for instance, which makes the **automation** of this workflow extremely flexible. 

### References

1. [databricks reference for data pipelines](https://docs.databricks.com/en/getting-started/data-pipeline-get-started.html)
2. [PySpark basics](https://docs.databricks.com/en/pyspark/basics.html)
3. General notes on [orchestration](https://www.datacamp.com/blog/introduction-to-data-orchestration-process-and-benefits)
