# ***demo***  - putting together a "real world" workflow with Prefect
Let's look at how we can leverage the Prefect engine alongside a few of today's bleeding edge techologies to build a rudimentary end-to-end data pipeline.

## what does orchestrating a data pipeline look like?
If we break it down at a high-level, we have a process that can:

- trigger an **extract** of raw data sources from somewhere and drop them in our warehouse 

- **transform** that raw data somehow into clean, reporting-ready tables

- provide visibility into any failures that occur in this process

... and that process should ideally be easy to **manage**, **scale**, and **understand**! (this is where an orchestrator like Prefect comes in)

## ⚙️ what's in our *stack*?

<center>
    <img src="imgs/logos.png" width=900/>
</center>

<br>

<hr>


#### <img src="imgs/marvin.png" width=33/>  &emsp; [**prefect**](https://www.prefect.io) &emsp; our **workflow orchestrator**

[Marvin](https://www.prefect.io/blog/introducing-marvins-challenge/), the resident blue rubber duck at Prefect, helps us turn vanilla python code into production-ready workflows.

<br>

<center>
    <img src="imgs/prefect-orca.svg" width=500/>
</center>


#### <img src="imgs/airbyte.png" width=40/>  &ensp; [**airbyte**](https://www.airbyte.com) &ensp; our **data integration** tool ([try it yourself](https://docs.airbyte.com/quickstart/deploy-airbyte))


[Octavia](https://airbyte.com/blog/how-we-chose-our-logo-and-mascot), good friend of Marvin, helps us in moving batch data A → B.

<center>
    <img src="imgs/airbyte-connectors.png" width=600/>
</center>

<br>

<div style="background-color: #0d4a73;border-radius: 10px;padding: 20px;">

💡 **note**:

Make sure your instance is an accessible network space.

For example, I'm running airbyte on an [AWS EC2 instance](https://aws.amazon.com/pm/ec2) and port-forwarding the SSH connection to my `localhost:8000`) with the following command: 

<code>$ ssh demo-airbyte -L 8000:localhost:8000 </code>

</div>

#### <img src="imgs/dbt-logo.png" width=30/>  &emsp; [**dbt (data build tool)**](https://www.getdbt.com/) &ensp; our **transformation** layer

dbt takes raw data sources that live in a data warehouse and transforms them into reporting-ready data sources.


<center>
    <img src="imgs/dbt-schematic.png" width=1000/>
</center>



#### <img src="imgs/snowflake-logo.webp" width=33/> &ensp; [**Snowflake**](https://signup.snowflake.com) &ensp; our **data warehouse** ([try it yourself](https://signup.snowflake.com))

Our data warehouse is the place data lives:

<center>
    <img src="imgs/snowflake-schematic.png" width=1000/>
</center>

... using some process like flow below, we fill our warehouse with raw data and use DBT to run SQL against it, sculpting clean and meaningful data.

# let's go! my basic data pipeline

in Python, it ways starts with imports...

In [2]:
from prefect import flow, task

import json, requests, subprocess, time

We're going to be making some API calls and dealing in JSON, so let's define some...

In [3]:
headers = {
    'Accept': 'application/json',
    'Content-Type': 'application/json'
}

Let's write a `task` that can trigger a sync of our airbyte connection:

In [4]:
# running on EC2, port forwarded to local via EIP
@task(name='trigger airbtye sync')
def trigger_airbyte_sync(connectionId: str) -> bool:
    try:
        response = requests.post(
            url='http://localhost:8000/api/v1/connections/sync',
            headers=headers,
            data=json.dumps({"connectionId": connectionId})
        )
        assert response.ok

        job_id = response.json()["job"]["id"]
        job_status = ''
        
        while job_status != 'succeeded':
            job_response = requests.post(
                url='http://localhost:8000/api/v1/jobs/get/',
                headers=headers,
                data=json.dumps({"id": job_id})
            )
            
            job_status = job_response.json()["job"]["status"]
            print(job_status)
            time.sleep(5)
        
        return True
    
    except AssertionError:
        print('Bad Response')
        raise

... and now a `task` that will run our dbt transformation:

In [5]:
# dbt running locally, connected to my snowflake instance
@task(name='trigger dbt job')
def trigger_dbt_job(dbt_command: str) -> None:
    
    subprocess.run(dbt_command.split())

<div style="background-color: #0d4a73;border-radius: 10px;padding: 20px;">

💡 **note**:

`subprocess` is a standard library package to run shell commands using Python, `subprocess.run` takes `List[str]` like `["cmd", "-arg"]`

</div>


In [6]:
@flow(name='My Basic Data Pipeline')
def my_flow(airbyte_connection_id: str, dbt_command: str) -> None:
    
    sync = trigger_airbyte_sync(airbyte_connection_id)

    if sync.result():
        trigger_dbt_job(dbt_command)

In [None]:
airbyte_connection_id = 'e1b2078f-882a-4f50-9942-cfe34b2d825b'
dbt_command = 'dbt run --project-dir my_dbt_project'

my_flow(
    airbyte_connection_id, 
    dbt_command
)

Let's go look at what happened to our data in snowflake!