# <font color='green'>Data Engineering Zoom Camp - Detailed Week 2 Notes</font>

## <font color='green'>Table of Contents</font>

<a href='#the_destination_1'>1) Introduction to Data lake</a> \
<a href='#the_destination_2'>2) Introduction to Data warehouse</a> \
<a href='#the_destination_3'>3) Difference between the two concepts</a> \
<a href='#the_destination_4'>4) Workflow Orchestration - Intro to Prefect</a> \
<a href='#the_destination_5'>5) Introduction to Prefect</a> \
<a href='#the_destination_6'>6) ETL with GCP and Prefect</a> \
<a href='#the_destination_7'>7) GCP to BigQuery</a> \
<a href='#the_destination_8'>8) Creating Prefect Deployments</a> \
<a href='#the_destination_9'>9) Docker Storage with Infrastructure</a> \
<a href='#the_destination_10'>10) References</a>

# <font color='green'><a id='the_destination_1'>1) Introduction to Data lake</a></font>

A data lake is a centralized repository that allows you to store all your structured and unstructured data at any scale, including semi-structured data such as JSON, XML, and text files. You can store your data as-is, without having to first structure the data, and run different types of analytics—from dashboards and visualizations to big data processing, real-time analytics, and machine learning to guide better decisions. It is designed to handle big data and provides a scalable and flexible architecture for data storage, processing and analysis. The ability to store unstructured data is one of the key benefits of a data lake, as it allows organizations to capture and store all types of data, regardless of format or structure, in a centralized repository. 

Data lakes are typically implemented as a scale-out architecture, where data is distributed across multiple storage nodes, enabling parallel processing and storage of large amounts of data. There are several popular data lake solutions available, some of them are:

1) `Amazon S3`: A scalable and highly available data lake solution offered by Amazon Web Services (AWS)
2) `Microsoft Azure Data Lake Storage`: A cloud-based data lake solution offered by Microsoft Azure
3) `Google Cloud Storage`: A data lake solution offered by Google Cloud that provides scalable and flexible data storage and management
4) `Hadoop HDFS`: The original data lake solution that is part of the Apache Hadoop ecosystem
5) `Snowflake Data Warehouse`: A cloud-based data warehousing solution that includes a data lake component



### <font color='green'>Use Cases</font>

- Storing and archiving large amounts of data from various sources
- Centralizing data for easier access and management
- Analyzing and processing big data with flexible tools and technologies
- Supporting various data formats and structures, including semi-structured and unstructured data
- Enabling data democratization, allowing various teams and departments to access and analyze data with their own tools

### <font color='green'>Advantages</font>


- Cost-effective: eliminates the need for expensive data warehousing solutions and allows organizations to store and process large amounts of data without constraints
- Scalability: can store and process large amounts of data without constraints
- Flexibility: can handle various data formats and structures
- Integration: allows for the integration of data from various sources, including transactional systems, logs, social media, and other sources. This enables organizations to have a complete view of their data and makes it easier to perform cross-functional analysis
- Improved data access: allows easy and quick access to data by various stakeholders
- Processing options: supports a variety of processing options, including batch processing, real-time streaming, and interactive SQL. This allows organizations to choose the most appropriate processing option for their specific use case

### <font color='green'>Limitations</font>

The main challenge with a data lake architecture is that raw data is stored with no oversight of the contents. For a data lake to make data usable, it needs to have defined mechanisms to catalog, and secure data. Without these elements, data cannot be found, or trusted resulting in a “data swamp." Meeting the needs of wider audiences require data lakes to have governance, semantic consistency, and access controls.

- Data governance and management: Data lakes can store vast amounts of data, but without proper governance and management, it can be challenging to find, access, and use the data effectively.
- Data security: As data lakes store sensitive and valuable data, security and privacy of the data need to be properly managed and protected.
- Data quality: As data lakes allow for the storage of raw and unprocessed data, it can be challenging to ensure the quality of the data stored in the lake.
- Integration with other systems: Data lakes may need to integrate with other systems, such as data warehouses, to be effectively used for analysis and reporting purposes.


### <font color='green'>ELT vs ETL in a data lake</font>


- `ETL (Extract, Transform, Load)` refers to the traditional approach of extracting data from various sources, transforming the data into the required format, and loading the data into a data warehouse.
- `ELT (Extract, Load, Transform)` refers to the newer approach where data is extracted from various sources and loaded into a data lake first, and then transformed and processed for analysis.

In a data lake, ELT is a more efficient approach as it allows for the storage of raw and unprocessed data in its native format, reducing the time and effort required for data transformation. This enables faster and more flexible data processing and analysis. However, ELT also requires more sophisticated data processing and management capabilities to ensure data quality and reliability.



![image-2.png](attachment:image-2.png)





Image - https://vitalflux.com/data-lake-design-principles-best-practices/


# <font color='green'><a id='the_destination_2'>2) Introduction to Data warehouse</a></font>

A data warehouse is a central repository of information that can be analyzed to make more informed decisions. Data flows into a data warehouse from transactional systems, relational databases, and other sources, typically on a regular cadence. Business analysts, data engineers, data scientists, and decision makers access the data through business intelligence (BI) tools, SQL clients, and other analytics applications.

Data and analytics have become indispensable to businesses to stay competitive. Business users rely on reports, dashboards, and analytics tools to extract insights from their data, monitor business performance, and support decision making. Data warehouses power these reports, dashboards, and analytics tools by storing data efficiently to minimize the input and output (I/O) of data and deliver query results quickly to hundreds and thousands of users concurrently.

There are several popular data warehouse solutions available, some of them are:

`Amazon Redshift`: A fully managed, petabyte-scale data warehouse service provided by Amazon Web Services (AWS). Redshift uses columnar storage, parallel query execution, and advanced compression algorithms to provide fast query performance.

`Microsoft Azure Synapse Analytics (formerly SQL Data Warehouse)`: A cloud-based big data analytics service provided by Microsoft Azure. It combines the capabilities of data warehousing and big data analytics, allowing organizations to analyze data at scale.

`Google BigQuery`: A serverless, highly scalable, and cost-effective cloud data warehouse provided by Google Cloud Platform. BigQuery supports SQL-like queries and provides fast query performance through its columnar storage and massive parallel processing.

`Snowflake`: A cloud-based data warehousing solution that supports both structured and semi-structured data. Snowflake provides a SQL-based data warehousing solution that is fully managed, highly scalable, and offers fast query performance.

`Teradata`: A relational database management system for data warehousing and big data analytics provided by Teradata Corporation. Teradata provides a scalable, high-performance platform for storing and analyzing large amounts of structured data.

`HP Vertica`: A columnar database management system for data warehousing and big data analytics provided by Hewlett Packard Enterprise. Vertica provides a high-performance platform for data warehousing and analytics, using columnar storage and parallel processing to provide fast query performance.

`PostgreSQL`: An open-source relational database management system that can also be used as a data warehouse. PostgreSQL provides a flexible, scalable platform for data warehousing, supporting SQL-based data warehousing and analytics workloads.

### <font color='green'>Use Cases</font>

- Business intelligence and decision making: Data warehouses provide a single source of truth for business intelligence and decision making, by integrating data from multiple sources and providing a consistent view of the data.
- Analytics: Data warehouses provide a platform for advanced analytics, such as data mining and predictive modeling, enabling organizations to gain insights into their data and make informed decisions.
- Reporting: Data warehouses provide a platform for generating reports and visualizations, enabling organizations to communicate their findings and insights effectively.

### <font color='green'>Advantages</font>

- Data integration: Data warehouses allow for the integration of data from multiple sources, providing a consistent and unified view of the data.
- Data quality: Data warehouses enforce data quality standards, ensuring that the data stored in the warehouse is accurate, complete, and consistent.
- Performance: Data warehouses are optimized for fast query performance, enabling organizations to perform complex analysis on large amounts of data.
- Scalability: Data warehouses can scale to accommodate growing amounts of data, allowing organizations to store and manage large amounts of data effectively.
- Informed decision making and historical data analysis
- Separation of analytics processing from transactional databases, which improves performance of both systems

### <font color='green'>Limitations</font>

- Cost: Data warehouses can be expensive to implement and maintain, as they require specialized hardware, software, and personnel.
- Maintenance: Data warehouses require ongoing maintenance and tuning to ensure optimal performance and data quality.
- Data governance: Data warehouses require proper data governance and management to ensure data privacy and security, and to meet regulatory requirements.
- Data processing: Data warehouses are designed for structured data, and may not be suitable for unstructured or semi-structured data.
- Flexibility: Data warehouses can be inflexible, as they enforce a specific data model and structure that may not be suitable for all data analysis and reporting needs.


### <font color='green'>ELT vs ETL in a data warehouse</font>

ELT (Extract, Load, Transform) and ETL (Extract, Transform, Load) are two approaches to moving data from disparate sources into a data warehouse for analysis and reporting.

In traditional ETL, data is extracted from source systems, transformed into a common format, and then loaded into a data warehouse. The transformation step is performed prior to loading the data into the warehouse, and can be time-consuming and resource-intensive.

In contrast, ELT involves extracting data from source systems and loading it into the data warehouse in its raw format. The transformation step is then performed within the data warehouse using specialized tools, such as SQL queries or data integration pipelines.

<b>Advantages of ELT over ETL include:</b>

- Scalability: ELT enables organizations to handle larger amounts of data by offloading the transformation step to the data warehouse, which is optimized for handling large amounts of data.
- Performance: ELT can be faster than ETL as the transformation step is performed within the data warehouse, which is optimized for performance.
- Flexibility: ELT provides greater flexibility in terms of the types of transformations that can be performed, as the transformation step can be performed using SQL or other specialized tools within the data warehouse.

<b>However, ELT can also have some disadvantages, such as:</b>

- Complexity: ELT can be more complex than ETL, as the transformation step is performed within the data warehouse, which requires specialized knowledge and skills.
- Cost: ELT can be more expensive than ETL as it requires specialized tools and resources within the data warehouse to perform the transformation step.

Ultimately, the choice between ELT and ETL will depend on the specific needs and requirements of the organization, including the amount and type of data, the desired level of scalability and performance, and the resources and expertise available.


![image.png](attachment:image.png)

Image - https://www.geeksforgeeks.org/top-15-popular-data-warehouse-tools/

# <font color='green'><a id='the_destination_3'>3) Difference between the two concepts</a></font>

A data warehouse is specially designed for data analytics, which involves reading large amounts of data to understand relationships and trends across the data. A database is used to capture and store data, such as recording details of a transaction. Data warehouses and Business Intelligence tools support reporting and analytics on historical data, while data lakes support newer use cases that leverage data for machine learning, predictions, and real-time analysis.

Unlike a data warehouse, a data lake is a centralized repository for all data, including structured, semi-structured, and unstructured. A data warehouse requires that the data be organized in a tabular format, which is where the schema comes into play. The tabular format is needed so that SQL can be used to query the data. But not all applications require data to be in tabular format. Some applications, like big data analytics, full text search, and machine learning, can access data even if it is ‘semi-structured’ or completely unstructured.

Typically, businesses use a combination of a database, a data lake, and a data warehouse to store and analyze data. As the volume and variety of data increases, it’s advantageous to follow one or more common patterns for working with data across your database, data lake, and data warehouse

![image-5.png](attachment:image-5.png)


(Below) Land data in a database or datalake, prepare the data, move selected data into a data warehouse, then perform reporting.

![image.png](attachment:image.png)


(Below) Land data in a data warehouse, analyze the data, then share data to use with other analytics and machine learning services.

![image-2.png](attachment:image-2.png)




<b>Differences between the two:</b>


![image-4.png](attachment:image-4.png)

![image-3.png](attachment:image-3.png)

Image Sources -

https://aws.amazon.com/data-warehouse/ \
https://www.qubole.com/data-lakes-vs-data-warehouses-the-co-existence-argument


# <font color='green'><a id='the_destination_4'>4) Workflow Orchestration</a></font>

Workflow orchestration refers to the coordination and management of various data processing tasks in a defined sequence to achieve a specific goal. It involves scheduling, executing, monitoring, and managing the interdependent tasks in a data pipeline to ensure smooth data flow and efficient use of resources. The aim of workflow orchestration is to automate and streamline the data processing process, ensuring consistency and reliability in data processing, and making it easier to manage large and complex data processing operations.

Workflow orchestration was introduced in the context of data engineering as a solution to the growing complexity of data processing operations. As data processing operations became more complex, with an increasing number of tasks involved, it became increasingly difficult to manage and coordinate these tasks manually. Workflow orchestration provides a centralized and automated solution for coordinating these tasks, making it easier to manage and process large amounts of data.

Data orchestration solutions can power many processes including but not limited to 
1) cleansing, organizing, and publishing data into a data warehouse, 
2) computing business metrics, 
3) applying rules to target and engage users through email campaigns, 
4) maintaining data infrastructure like database scrapes, 
5) running a TensorFlow task to train a machine learning model

There are several tools available for workflow orchestration in data engineering, some of the most popular ones include:

`Apache Airflow`: An open-source platform for programmatically authoring, scheduling, and monitoring workflows.

`Luigi`: An open-source Python module for building complex pipelines and workflows.

`AWS Glue`: A fully managed extract, transform, and load (ETL) service offered by Amazon Web Services (AWS).

`Prefect`: An open-source workflow orchestration tool that provides a simple and flexible interface for building and managing data pipelines.

`Apache Nifi`: An open-source data integration tool that provides a web-based interface for designing and managing data pipelines.

`Google Cloud Composer`: A managed workflow orchestration service offered by Google Cloud Platform (GCP).

`Microsoft Azure Data Factory`: A cloud-based data integration and workflow orchestration service offered by Microsoft Azure.

These are just some of the many tools available for workflow orchestration in data engineering. The choice of tool depends on several factors, including the specific requirements of the data processing operations, the available resources and budget, and the preferred programming language and technology stack.


### <font color='green'>Workflow Process</font>

To gain a deeper understanding of workflow orchestration process, it is important to consider the following:

<b>1) Workflow definitions</b>: Workflow orchestration involves defining the tasks involved in data processing, as well as the order in which they should be executed. Workflow definitions can be written using code, or using a graphical user interface (GUI) provided by the tool.

<b>2) Task dependencies</b>: Tasks in a workflow can have dependencies on one another, meaning that one task must be completed before another can start. Workflow orchestration tools provide a way to define and manage these dependencies, ensuring that tasks are executed in the correct order.

<b>3) Task execution</b>: Once the tasks and their dependencies have been defined, workflow orchestration tools can be used to execute the workflows. This involves scheduling the tasks to run at specific times, and monitoring the progress of the tasks as they are executed.

<b>4) Error handling</b>: Workflow orchestration tools provide mechanisms for handling errors that may occur during the execution of tasks. This can include retrying failed tasks, skipping tasks that cannot be executed, or providing notifications when errors occur.

<b>5) Monitoring and reporting</b>: Workflow orchestration tools provide monitoring and reporting capabilities to help you keep track of the status of your workflows. This can include visualizations of workflow execution, logs of task execution, and alerts for when workflows fail.

<b>6) Integration with other tools</b>: Workflow orchestration tools often integrate with other data processing and storage tools, such as databases and cloud storage services. This allows for seamless data flow between these tools, improving the efficiency of data processing operations.


### <font color='green'>Advantages</font>

- Automation: It automates the manual tasks involved in data processing, reducing the risk of human error and improving efficiency.
- Scalability: It makes it easier to scale data processing operations, making it easier to handle large amounts of data.
- Monitoring: It provides centralized monitoring and management of data processing tasks, making it easier to keep track of what's happening in the pipeline.
- Flexibility: It allows for the easy modification and addition of new tasks to the data processing pipeline.
- Improved efficiency: It streamlines the data processing process, reducing the time and resources required to process data.

### <font color='green'>Limitations</font>

- Complexity: It can be difficult to set up and manage complex workflows, especially for large and complex data processing operations.
- Cost: The cost of implementing workflow orchestration can be high, especially for large data processing operations.
- Maintenance: It requires ongoing maintenance to ensure it continues to work effectively and efficiently.

# <font color='green'><a id='the_destination_5'>5) Introduction to Prefect</a></font>

The basic concepts of workflow orchestration, such as defining and executing tasks, handling dependencies between tasks, and monitoring the progress of workflows, are common across most tools. By learning one tool, you can gain a good understanding of these concepts, and the process of building and managing workflows. Here, we will be using Prefect to perform workflow orchestration. 

That being said, it's always a good idea to have exposure to multiple tools, as this can give you a deeper understanding of the field, and help you make informed decisions when choosing a tool for a specific use case. Now let's look at an overview of Prefect.

Prefect is an open-source, modern, and easy-to-use workflow automation and management platform designed for data engineers and scientists. It provides a simple way to automate complex workflows and manage data pipelines, making it easier to run and track your tasks, visualize your workflows, and share your results. Prefect helps you build, run, and monitor your workflows in a scalable and reliable manner.



### <font color='green'>Core Concepts in Prefect</font>

Before we jump into the topics from week 2, let's take a look at some of the core concepts in Prefect.

Prefect is a modern workflow management system that provides a simple and flexible way to automate and manage data pipelines. The core concepts in Prefect include:

`Flows`: A flow is a directed acyclic graph (DAG) that defines a series of tasks and the dependencies between them. In a flow, tasks represent individual steps in a process, and the dependencies between tasks define the order in which they should be executed.

`Tasks`: A task is the basic unit of work in Prefect. It encapsulates a single step in a process and can be run as a standalone unit or as part of a flow. Tasks can take inputs, generate outputs, and have dependencies on other tasks.

`Runners`: A runner is the component in Prefect that actually executes the tasks. There are several different runners available, including local runners for running tasks on your own machine, and cloud runners for running tasks in a distributed manner.

`Task Runs`: A task run is an instance of a task that has been executed. Each task run has a unique run ID and a status that indicates whether the task was successful or not.

`Task Results`: Task results are the outputs of tasks. When a task is run, it generates a result that is stored in Prefect's database. The results of tasks can be used as inputs for subsequent tasks, allowing you to build complex, multi-step processes.

`State`: Prefect tracks the state of each task run, including its inputs, outputs, and intermediate results. This information is used to manage the flow, handle failures, and allow for resuming flows from where they left off.

`Triggers`: A trigger is a mechanism that starts a flow. Triggers can be configured to start a flow based on a schedule, on demand, or when certain conditions are met.

`Scheduling`: Prefect provides scheduling capabilities that allow you to run flows on a schedule, such as daily or weekly. You can also trigger flows to run in response to events, such as the completion of another task.

`Caching`:  Prefect provides caching capabilities that allow you to save the results of tasks so that they can be reused in subsequent runs. This can significantly improve the performance of your flows, especially for tasks that take a long time to run.

`Automation`: Prefect provides a powerful automation engine that allows you to manage and run your flows in production, with features like automatic failure handling, monitoring, and logging.

`Integrations`: Prefect integrates with a wide range of data sources and tools, including databases, cloud platforms, and data science tools like Jupyter Notebooks and Apache Airflow.

`Monitoring`: Prefect provides monitoring capabilities that allow you to monitor the progress and health of your flows. You can view the status of tasks and flows, view logs and results, and receive notifications when flows fail or complete.

These are the core concepts in Prefect. By understanding these concepts and using the Prefect API, you can create and manage complex data pipelines with ease.

<b>Python Decorators</b>

In Python, a decorator is a special kind of function or class that is used to modify the behavior of another function or class. A decorator takes in a function or class as an argument and returns a modified version of that function or class.

Decorators are typically defined using the `@` syntax in Python, and are applied to a function or class by placing the decorator immediately before the function or class definition.

Decorators can be used for a variety of purposes, including:

- Adding or modifying behavior of a function or class
- Wrapping a function or class to provide additional functionality, such as logging or error handling
- Modifying the input or output of a function or class

Overall, decorators are a powerful feature in Python that provide a flexible and reusable way to modify the behavior of functions and classes. They are commonly used in a variety of applications, including web development, testing, and automation.

In Prefect, you can use decorators to add additional functionality to your tasks, such as changing the way that inputs are handled, changing the way that outputs are stored, or adding additional logic to control the execution of the task. You can also use the decorator to specify additional parameters for the task, such as the task's name, its schedule, or its retry logic.

<b>Common Arguments used in a Prefect Decorators</b>

Some of the common arguments used in a Prefect `@task` decorator are:

`name`: The name of the task. This is used for display purposes and for identifying the task in the flow.

`automated`: A boolean indicating whether the task should be executed automatically or not.

`log_prints`: Controls whether printed statements within the task should be logged. By default, log_prints is set to False, which means that printed statements within the task will not be logged. If log_prints is set to True, then all printed statements within the task will be logged, allowing you to track the output of the task and debug issues more easily.

`cache_key_fn`: Specifies a custom function for generating the cache key for a Task. The cache key is used to identify whether the result of a Task has been cached or not. By default, Prefect generates a cache key based on the Task name, inputs, and parameters.

`max_retries`: The maximum number of times a task should be retried if it fails.

`retry_delay`: The amount of time to wait before retrying a task if it fails.

`trigger`: The trigger that should be used to determine when the task should be run.

`upstream_tasks`: A list of tasks that should be executed before this task is run.

`on_failure`: A list of tasks that should be executed if this task fails.

`on_success`: A list of tasks that should be executed if this task succeeds.

`state_handlers`: A list of state handlers that should be used to change the state of the task.

`resources`: The resources that this task requires in order to run.

`version`: The version of the task.

These are some of the most common arguments used in a Prefect `@task` decorator, but there are many others that can be used to configure a task to meet specific needs. By using these arguments, you can customize the behavior of a task to fit your workflow requirements.

There are other Prefect decorators that can have arguments, such as `@flow` and `@step`.

The `@flow` decorator is used to define a flow, which is a collection of tasks that are connected together to form a workflow. The `@flow` decorator can take arguments such as name, schedule, storage, and environment, among others.

The `@step` decorator is used to define a step in a flow, and can take arguments such as inputs, outputs, and upstream_tasks.

In addition to these decorators, Prefect also provides other decorators such as `@retry`, `@timeout`, and `@group` that can be used to further customize the behavior of tasks and flows.


### <font color='green'>Loading data into Postgres using Prefect</font>

Now that we have covered some of the basics of Prefect, let's take a look at the key steps from the DE Zoomcamp Week 2 - 2.2.2 Video -

<b>Step 1</b>

Create a new conda environment so that we can install all the relevant libraries without affecting the base environment.

First run `conda create -n de-zoomcamp python=3.9`, where de-zoomcamp is the environment name I chose. You can choose any name.

Then activate the environment by running `conda activate de-zoomcamp`

Since I was running the activate command in GitBash, I was getting the following error 

![image.png](attachment:image.png)

Thus to make it work, I ran `conda init bash` (since Gitbash is a bash shell) and restarted the terminal. Then I ran `conda activate de-zoomcamp`, which worked.
If the environment has been changed, you will see the following:

![image-2.png](attachment:image-2.png)

As you can see, the name changed from (base) to (de-zoomcamp)


<b>Step 2</b>

Create a requirements.txt file which contains all the relevant libraries which we will be using for this lesson and save it in your working directory. 

![image-3.png](attachment:image-3.png)

Then we need to run `pip install -r requirements.txt` which would load all the required libraries in our environment. 

You can verify that the libraries have been loaded by checking if their versions are the same as what we specified. As you can see below, the Prefect version is exactly the same as the one we specified.

![image-4.png](attachment:image-4.png)

<b>Step 3</b>

Now, we can transform the `ingest_data.py` script we created in Week 1 to turn it into tasks and flows. Just a quick reminder on what a task and flow is in Prefect:

`Task`: A unit of work in a Prefect flow. It can perform a variety of operations, such as executing a Python function, running a shell command, or querying a database.

`Flow`: A collection of Tasks that defines a workflow.

Now we can use this concept of task and flow to break the ingest_data python script into multiple tasks and flows which would help us visualize our whole workflow better. We're simply going to split the existing `ingest_data.py` into multiple functions and add Python decorators to make a Prefect workflow.

1) Load the necessary libraries

![image-10.png](attachment:image-10.png)

2) Create an extract function which will help us extract data from the given url, where the url is an argument in the function. As you can see, we have added a @task decorator with a few essential arguments.

![image-5.png](attachment:image-5.png)


3) Next we create a transform function which will help us transform the data:

![image-7.png](attachment:image-7.png)

4) Next we create a load function which will help us load the data:

![image-8.png](attachment:image-8.png)

5) Finally, we create a main function which will help us run all of these functions

![image-9.png](attachment:image-9.png)


<b>Step 4</b>

Save this file as `ingest_data_flow.py` and run the file through GitBash by using the command `python ingest_data_flow.py`

This should successfully load the data into postgres. But since we have only given the first chunk, it wont load the full dataset. As you can see below, since we set `log_prints` as True, we can clearly see the logs for each function. Also as you can see, the number of records is ~100,000 as only the first chunk is loaded. [185 records with 0 passenger count was removed. Check the transform function in the code above]

![image-11.png](attachment:image-11.png)

![image-12.png](attachment:image-12.png)



### <font color='green'>Parameterization and Subflows</font>

`Parametrization` is the process of defining a flow that can be executed with different input parameters. This allows you to run the same flow multiple times with different inputs, making it easier to manage and reuse your workflows.

`Subflows` refer to the ability to define a flow as a reusable component that can be used as a task within another flow. Subflows can be used to encapsulate complex workflows or to define reusable components that can be reused across multiple flows.

In Prefect, you can create a subflow by using the @flow decorator to define a flow and then using the Flow class to instantiate a task from the flow. This task can then be added to another flow just like any other task.

Let's update our `ingest_data_flow.py` to make it more parameterized. Please find below the final updated code. It's pretty self-explanatory. We just added a subflow and called it in the main function.

![image-13.png](attachment:image-13.png)

![image-14.png](attachment:image-14.png)

When we run this, we can see that the subflow is working as intended. (Check line 3)

![image-15.png](attachment:image-15.png)


### <font color='green'>Orion UI</font>

Orion UI is a user interface for Prefect, a popular open-source workflow management system for data engineering, data science, and machine learning. The Orion UI provides a graphical interface for managing and visualizing Prefect flows, tasks, and runs. With the Orion UI, users can easily monitor and manage the execution of Prefect flows, view the status of individual tasks, and explore the data and outputs of completed runs.

The Orion UI is designed to work seamlessly with Prefect and is built using modern web technologies. It provides an intuitive and powerful interface for Prefect users, making it easier to understand and work with complex data workflows. With its visual representation of flows and tasks, and its ability to track and visualize the data and outputs of runs, the Orion UI helps users to manage and monitor their workflows more effectively.

First we need to run `prefect config set PREFECT_API_URL=http://127.0.0.1:4200/api`

This command is used to set the Prefect API URL in the Prefect configuration. The Prefect API URL is the endpoint that Prefect uses to communicate with the Prefect API, which is a REST API that provides programmatic access to Prefect flows, tasks, and runs.

By setting the Prefect API URL to http://127.0.0.1:4200/api, you are telling Prefect to use a local instance of the Prefect API running at http://127.0.0.1:4200/api. This is often used for local development or testing purposes, where you want to run the Prefect API on your local machine.

The prefect config set command is used to set configuration values in the Prefect configuration file, which is used to store configuration settings for Prefect. In this case, you are using it to set the value of the PREFECT_API_URL configuration setting.

![image-16.png](attachment:image-16.png)

Then we can run `prefect orion start` which would allow us to start the UI and access it from the URL we set

![image-17.png](attachment:image-17.png)


![image-18.png](attachment:image-18.png)

You can play around with the UI to get a better understanding of what each function does.


### <font color='green'>Blocks</font>

Blocks enable you to store configuration and provide an interface for interacting with external systems. With blocks, you are able to securely store credentials for authenticating with services like AWS, GitHub, Slack, or any other system you'd like to orchestrate with Prefect.

Blocks are the underlying components behind familiar Prefect concepts like deployments and storage. To learn more about creating and using blocks programmatically, see the Blocks documentation.

You can create, edit, and manage blocks in the Prefect UI and Prefect Cloud. On a Prefect Orion API server, blocks are created in the server's database. On Prefect Cloud, blocks are created on a workspace.

Select the Blocks page to see all blocks currently defined on your Prefect Orion API server or Prefect Cloud workspace.


In our `ingest_data_flow.py`, instead of hard-coding all of the input credentials (url, user, password, etc.), we can create a block which an store the credentials and can be called directly.

We can do it the following way:

<b>Step 1</b>: Go to Blocks and select SQLAlchemy Connector. Enter the relevant details:

![image-20.png](attachment:image-20.png)

![image-21.png](attachment:image-21.png)

Once you create the block, you should see something like this:

![image-22.png](attachment:image-22.png)


After the creation of the block, we can directly use it in our code in the following way. The code is once again self-explanatory

![image-23.png](attachment:image-23.png)

![image-25.png](attachment:image-25.png)

We can then run the python file and it will work as intended.

![image-26.png](attachment:image-26.png)

![image-27.png](attachment:image-27.png)

Notice the table name changed from `green_taxi_trips` to `green_trips`? Look through the code to figure out how that happened.


# <font color='green'><a id='the_destination_6'>6) ETL with GCP and Prefect</a></font>

The video 2.2.3 is pretty self-explanatory and hence I am not going to explain anything in this section. Watch the video as it gives clear step by step explanation of how to  move data into the GCS data lake.

https://www.youtube.com/watch?v=W-rMz_2GwqQ&list=PL3MmuxUbc_hJed7dXYoJw8DoCuVHhGEQb&index=20

The video covers the following-  
- Creating a single main flow function which is going to call a number of task functions 
- The step by step coding of the python script to execute the tasks and flow
- Downloading the green taxi data from the web 
- Cleaning it up and saving it in parquet file format locally
- Creating blocks in Orion UI to store the GCP credentials
- Uploading the parquet data from local to Google Cloud Storage bucket

# <font color='green'><a id='the_destination_7'>7) GCP to BigQuery</a></font>

The video 2.2.4 is pretty self-explanatory and hence I am not going to explain anything in this section either. Watch the video as it gives clear step by step explanation of how to move data from GCS to BigQuery.

https://www.youtube.com/watch?v=Cx5jt-V5sgE&list=PL3MmuxUbc_hJed7dXYoJw8DoCuVHhGEQb&index=21

The video covers the following-

- Creating a single main flow function which is going to call a number of task functions
- The step by step coding of the python script to execute the tasks and flow
- Downloading the green taxi data that we stored in GCS to Local
- Transforming the data to get rid of NAs
- Using GCP credentials to establish connection and moving the transformed data to Big Query 

# <font color='green'><a id='the_destination_8'>8) Creating Prefect Deployments</a></font>


Prefect Deployment refers to the process of deploying a Prefect workflow to a production environment so that it can be run on a schedule or triggered by external events. There are several ways to deploy a Prefect workflow, depending on the specific requirements and constraints of your use case.

Here are some of the common deployment methods for Prefect workflows:

`Prefect Cloud`: Prefect Cloud is a managed service provided by OpenAI that allows you to run Prefect workflows in a scalable, secure, and managed environment. Prefect Cloud provides a web-based interface for managing workflows, as well as a REST API for programmatic access.

`Prefect Server`: Prefect Server is a self-hosted platform that provides the same functionality as Prefect Cloud, but it can be run on-premise or in your own cloud environment. Prefect Server provides a web-based interface for managing workflows, as well as a REST API for programmatic access.

`Prefect CLI`: The Prefect CLI can be used to run Prefect workflows from the command line. This is useful for testing workflows during development or for running workflows on a schedule using a task scheduler like cron or Windows Task Scheduler.

`Prefect API`: The Prefect API can be used to start and manage Prefect workflows programmatically. This is useful for integrating Prefect workflows into other systems or for triggering workflows based on external events.

In general, the deployment method you choose will depend on the specific requirements and constraints of your use case. For example, if you need to run Prefect workflows in a secure, managed environment, you may want to use Prefect Cloud or Prefect Server. If you need to run workflows on a schedule or integrate workflows into other systems, you may want to use the Prefect CLI or the Prefect API.

Regardless of the deployment method you choose, Prefect provides a flexible and scalable platform for running data workflows, making it easier to build, manage, and deploy workflows in production environments.


### <font color='green'>Using Prefect CLI and UI for deployments</font>


We can use the following command to create a deployment package -

`prefect deployment build ./parameterized_flow.py:etl_parent_flow -n "Parameterized ETL"`

This command is used to build a Prefect workflow deployment. The build command takes a Python file and a workflow object as arguments, in this case, the file is "parameterized_flow.py" and the workflow object is "etl_parent_flow". The -n or --name flag is used to specify a name for the deployment, in this case, "Parameterized ETL". 

Once the build command is executed, Prefect will create a deployment package that can be deployed to a Prefect Cloud instance or Prefect Server. This package contains all the necessary information to run the workflow, including the workflow definition, dependencies, and any parameters or configuration values that need to be set.

Once the deployment package has been created, you can deploy it to a Prefect Cloud instance or Prefect Server using the Prefect CLI or the Prefect API. This will make the workflow available for running and scheduling, allowing you to automate your ETL process and ensure that your data is up-to-date and accurate.


![image.png](attachment:image.png)

As you can see from the output, this command creates a YAML file that contains the definition of a Prefect deployment. It specifies the details of the workflow you are deploying, including the name, the version, and the environment.

The YAML file includes information about the Python file that contains the workflow code, the name of the workflow object within that file, the name of the deployment, and any configuration values that need to be set for the deployment to run successfully.

The YAML file also contains metadata about the deployment, including the date and time it was created, the version of Prefect being used, and any other information that is relevant to the deployment.

This YAML file can be used as the input to the Prefect CLI or API to deploy the workflow. Once the workflow is deployed, the YAML file can also be used to update the deployment, or to create a new deployment if you need to make changes to the workflow.

This is how it looks like:

![image-2.png](attachment:image-2.png)

![image-3.png](attachment:image-3.png)

Before deploying, we can edit some of the work queue in the YAML file to add more details like Parameters, schedules, etc.

For now we'll just add the Parameters as you can see below on line 10

![image-4.png](attachment:image-4.png)


Then we can use the following command to apply the deployment 

`prefect deployment apply etl_parent_flow-deployment.yaml`

This command would apply the deployment specified in the etl_parent_flow-deployment.yaml file. This means that the workflow described in the YAML file would be deployed to the target environment, either a Prefect Cloud instance or a Prefect Server, depending on the configuration specified in the YAML file.

The prefect deployment apply command will create a new deployment or update an existing deployment, depending on the state of the target environment. If the deployment specified in the YAML file does not exist in the target environment, a new deployment will be created. If the deployment exists and the YAML file specifies different configuration values, the deployment will be updated with the new values.

Once the deployment is created or updated, the Prefect CLI or API can be used to manage the deployment, including monitoring its status, updating its configuration, or deleting the deployment.

![image-5.png](attachment:image-5.png)

If you navigate to the Orion UI, you can see that the deployment has been created -


![image-6.png](attachment:image-6.png)


You can then add different details or edit any existing parameters from the UI and then run the deployment 


![image-7.png](attachment:image-7.png)

You have 2 types of run - 

`Quick Run` - Simpler way to run a workflow, providing only a limited set of options. The inputs and environment variables are pre-configured based on the default values specified in the workflow, and you cannot specify additional options or modify the inputs and environment variables.

`Custom Run` - Allows you to manually specify the input parameters and environment variables for a run, and provides greater control over the execution of the workflow. You can also specify advanced run options, such as the run schedule and how long to retain logs and artifacts.

The choice between a "Custom Run" and a "Quick Run" depends on your specific use case. If you need more control over the execution of a workflow, or if you need to manually specify inputs or environment variables, you should use a "Custom Run". If you just want to quickly run a workflow with its default inputs and environment variables, a "Quick Run" is the quickest and simplest option.

<b>Core, Agents and Work Queues</b>

Before running the workflow, let's look at the concepts of Core, Agents and Work Queues

In Prefect, "Work Queues" and "Agents" are key components for executing workflows in a distributed manner.

`Prefect Core` is the backend component of Prefect, an open-source platform for automating data workflows. Prefect Core provides the building blocks for defining, scheduling, executing, and monitoring data workflows. It provides APIs for creating and manipulating workflows, tasks, and flows. Prefect Core also provides a scheduling engine for executing workflows and a backend for storing and retrieving workflow state and metadata. It forms the backbone of the Prefect platform, providing the necessary infrastructure for executing data workflows and managing their state.

A `Work Queue` is a data structure that holds tasks that are ready to be executed. It acts as a buffer between the Prefect Core and the agents, allowing the Core to manage tasks while the agents execute them.

An `Agent` is a piece of software that pulls tasks from the work queue and executes them. Agents can run on a variety of platforms, including local machines, cloud infrastructure, or containers. They can be deployed in a single node or in a cluster, and they work together to process the tasks in the work queue.



Agents and work queues allow Prefect to scale and distribute the execution of workflows, improving performance and reliability. This makes it possible to process large amounts of data or perform complex computations in a parallel and distributed manner, increasing the overall efficiency and reducing the time it takes to complete a workflow.


<b>Stages in a workflow</b>

Here's a general timeline of how a Prefect workflow is executed:

`Workflow Definition`: The first step is to define the workflow using the Prefect API. The workflow consists of a directed acyclic graph (DAG) of tasks and the relationships between them.

`Task Assignment`: Once the workflow is defined, Prefect Core assigns tasks to the work queue. This is done based on the relationships between the tasks and the state of each task.

`Task Pulling`: Agents pull tasks from the work queue and execute them. When an agent pulls a task, it marks the task as "running" in the Prefect Core.

`Task Execution`: The agent executes the task by running the code associated with it. The agent reports the result of the task execution back to the Prefect Core.

`Task Completion`: If the task execution is successful, the agent marks the task as "successful" in the Prefect Core. If the task execution fails, the agent marks the task as "failed".

`Task Flow`: The result of each task execution is used to determine the next task in the workflow. The Prefect Core updates the state of the tasks based on the result and assigns the next task to the work queue.

`Repeat`: The process repeats until all tasks in the workflow are complete.

At each stage of the workflow execution, work queues and agents play an important role in ensuring the efficient and reliable execution of the workflow. The work queue holds tasks that are ready to be executed and acts as a buffer between the Prefect Core and the agents. The agents pull tasks from the work queue, execute them, and report the results back to the Prefect Core, allowing the Core to manage tasks while the agents execute them. This allows Prefect to scale and distribute the execution of workflows, improving performance and reliability.


<b>Our Example</b>

For our example, we will be using the default work queue


Prefect has a built-in task execution engine that uses work queues to manage the execution of tasks within a workflow. The default work queue in Prefect is called the "local" queue, which is a simple in-memory queue that processes tasks locally, one after another, on the same machine that is running Prefect. The local queue is useful for testing and development workflows, but for more complex or production-level workflows, a distributed work queue such as the Prefect Cloud queue should be used. The Prefect Cloud queue provides a scalable and highly available task execution engine that can handle large and complex workflows, and is optimized for execution in the cloud.

Finally, we press `Quick run` and we can see that it is available in the work queue

![image-8.png](attachment:image-8.png)

We can then call the agent to pick up the workflow and execute it by using the following command - 

`prefect agent start --work-queue "default"`



# <font color='green'><a id='the_destination_9'>9) Docker Storage with Infrastructure</a></font>


<b>Creating a Docker Image</b>

1) Login to your Docker Hub. If you dont have a docker account, create one.

2) Create a Dockerfile

![image-13.png](attachment:image-13.png)

3) Type the command `winpty docker build -t <hub-user>/<repo-name>[:<tag>] .`

For me, I used - `winpty docker build -t balajirvp/prefect:de-zoomcamp .`

![image-2.png](attachment:image-2.png)

4) Then push it into the hub so that it can be used 

`docker image push balajirvp/prefect:de-zoomcamp`

![image-3.png](attachment:image-3.png)

You can verify if it's pushed through docker desktop

![image-4.png](attachment:image-4.png)

5) Create a Docker Block in Orion UI. Edit the below fields accordingly, give any name and create it. 

![image-5.png](attachment:image-5.png)

<b>Prefect deployment using Python</b>

1) Write the python code

![image-8.png](attachment:image-8.png)

2) Execute it

![image-7.png](attachment:image-7.png)

3) Check the Orion UI

![image-9.png](attachment:image-9.png)


4) Use `prefect profile ls` to check which profile is currently being used. For us, since we haven't created any profile, it ll point to default

![image-10.png](attachment:image-10.png)

5) `prefect config set PREFECT_API_URL="http://127.0.0.1:4200/api"`

This command sets the API URL for the Prefect CLI. The API URL is the endpoint for Prefect API, which is the interface for managing flows, tasks, and runs. The API URL is set to http://127.0.0.1:4200/api, which means it is pointing to the localhost on port 4200. This configuration is used by the Prefect CLI to communicate with the Prefect API when you run commands such as prefect flow run or prefect flow get.

If the PREFECT_API_URL is not set, then Prefect will default to the cloud API endpoint which is https://api.prefect.io/v1/. If you are running Prefect locally, you need to set PREFECT_API_URL to the local API endpoint to ensure that the Prefect client communicates with your local instance of Prefect Server.

![image-11.png](attachment:image-11.png)

In our context, it allows our docker container to interface with the orion server.

6) Start an agent using `prefect agent start -q default`

![image-12.png](attachment:image-12.png)

7) Run the docker flow we created from CLI by running `prefect deployment run etl-parent-flow/docker-flow`

This should complete the workflow as you can see below and load the data into GCS

![image-14.png](attachment:image-14.png)

# <font color='green'><a id='the_destination_10'>10) References</a></font>

https://aws.amazon.com/big-data/datalakes-and-analytics/what-is-a-data-lake/ \
https://aws.amazon.com/data-warehouse/