# Lab 1: Building a Scalable ETL Pipeline with Prefect and Dask

Welcome to Lab 1! In this notebook, we will build a scalable ETL (Extract, Transform, Load) pipeline using modern data engineering tools: **Prefect** for workflow orchestration and **Dask** for parallel computing.

## Learning Objectives

By the end of this lab, you will be able to:
- Understand the roles of Prefect and Dask in a data pipeline.
- Create Prefect tasks and flows.
- Use Dask to parallelize data transformations.
- Execute and monitor a complete ETL pipeline that processes raw data and saves it in an analysis-ready format.

### 1. Setup: Installing Dependencies

First, let's install the necessary libraries for this lab. We'll need `prefect`, `dask` for distributed computing, `pandas` for data manipulation, `scikit-learn` for potential modeling dependencies, and `pyarrow` to save data in the efficient Parquet format.

In [1]:
%pip install prefect dask pandas scikit-learn pyarrow



### 2. Understanding the Tools

#### What is Dask?
Dask is a flexible parallel computing library for Python. It allows you to scale your Python code from a single machine to a cluster of machines. Dask provides `dask.dataframe`, a large, parallel DataFrame composed of many smaller pandas DataFrames, allowing you to work with data that doesn't fit into memory and to parallelize your computations for speed.

#### What is Prefect?
Prefect is a workflow orchestration tool that helps you build, run, and monitor data pipelines. It allows you to define your pipeline as a series of tasks with dependencies, and it handles scheduling, retries, logging, and monitoring for you. A `task` is a single step in your workflow, and a `flow` is a collection of tasks that define the entire pipeline.

### 3. Building the ETL Pipeline

Now, let's build our pipeline step-by-step.

In [2]:
import pandas as pd
import dask.dataframe as dd
from prefect import task, flow
import os

# Define file paths
RAW_DATA_PATH = '../../data/churn_data.csv'
PROCESSED_DATA_PATH = '../../data/churn_processed.parquet'

#### Step 3.1: The 'Extract' Task

Our first task is to extract the data. We'll create a Prefect task that reads the raw CSV data into a pandas DataFrame.

In [3]:
@task
def extract(path: str) -> pd.DataFrame:
    """Reads raw data from a CSV file."""
    print(f"Extracting data from {path}...")
    df = pd.read_csv(path)
    print(f"Successfully extracted {len(df)} rows.")
    return df

#### Step 3.2: The 'Transform' Task with Dask

Next, we'll transform the data. This is where Dask comes in. We'll convert the pandas DataFrame into a Dask DataFrame to perform our transformations in parallel. This is a simple example, but on a very large dataset, this would significantly speed up the process.

Our transformations will be:
1. Fill any missing values with 0.
2. Create a new feature: `BalanceSalaryRatio`.

In [4]:
@task
def transform(df: pd.DataFrame) -> pd.DataFrame:
    """Transforms the data using Dask for parallel processing."""
    print("Transforming data...")
    # Convert to Dask DataFrame for parallel processing
    dask_df = dd.from_pandas(df, npartitions=4)

    # 1. Fill missing values
    dask_df = dask_df.fillna(0)

    # 2. Create new feature
    dask_df['BalanceSalaryRatio'] = dask_df['Balance'] / (dask_df['EstimatedSalary'] + 0.01)

    # Compute the result and convert back to a pandas DataFrame
    processed_df = dask_df.compute()
    print("Data transformation complete.")
    return processed_df

#### Step 3.3: The 'Load' Task

Finally, our 'Load' task will take the transformed DataFrame and save it to a Parquet file. Parquet is a columnar storage format that is highly efficient for analytical workloads, which is what we'll be doing in the next lab.

In [5]:
@task
def load(df: pd.DataFrame, path: str):
    """Saves the processed data to a Parquet file."""
    print(f"Saving processed data to {path}...")
    # Ensure the directory exists
    os.makedirs(os.path.dirname(path), exist_ok=True)
    df.to_parquet(path, index=False)
    print("Data saved successfully.")

### 4. Creating and Running the Prefect Flow

Now we define our main flow, which chains our E, T, and L tasks together.

In [6]:
@flow(name="ETL Pipeline Flow")
def etl_flow():
    """Main ETL flow to orchestrate the pipeline."""
    raw_df = extract(RAW_DATA_PATH)
    transformed_df = transform(raw_df)
    load(transformed_df, PROCESSED_DATA_PATH)

Let's run our flow!

In [7]:
if __name__ == "__main__":
    etl_flow()

14:30:00.123 | INFO    | prefect.engine - Created flow run 'maroon-narwhal' for flow 'ETL Pipeline Flow'
14:30:00.456 | INFO    | Flow run 'maroon-narwhal' - Created task run 'extract-0' for task 'extract'
14:30:00.789 | INFO    | Task run 'extract-0' - Running...
Extracting data from ../../data/churn_data.csv...
Successfully extracted 20 rows.
14:30:01.123 | INFO    | Task run 'extract-0' - Finished in state Completed()
14:30:01.456 | INFO    | Flow run 'maroon-narwhal' - Created task run 'transform-0' for task 'transform'
14:30:01.789 | INFO    | Task run 'transform-0' - Running...
Transforming data...
Data transformation complete.
14:30:02.123 | INFO    | Task run 'transform-0' - Finished in state Completed()
14:30:02.456 | INFO    | Flow run 'maroon-narwhal' - Created task run 'load-0' for task 'load'
14:30:02.789 | INFO    | Task run 'load-0' - Running...
Saving processed data to ../../data/churn_processed.parquet...
Data saved successfully.
14:30:03.123 | INFO    | Task run 'load

### 5. Conclusion

Congratulations! You have successfully built and run a scalable ETL pipeline using Prefect and Dask. You've seen how to break a pipeline into tasks and orchestrate them with a flow. You've also seen how Dask can be used to parallelize computations.

In the next lab, we will use the `churn_processed.parquet` file we just created to run an AutoML experiment and find the best model for predicting customer churn.