# Cloud Workflows

Cloud and High-Performance Computing (HPC) workflows can handle complex computational tasks. However, they differ significantly in their architecture, scalability, and use cases. Cloud workflows leverage cloud computing resources to execute tasks across distributed environments by utilizing virtual machines, containers, and serverless architectures. The cloud approach provides flexibility, scalability, and cost-efficiency because resources can be dynamically allocated and scaled based on demand. Cloud workflows provide easy access to shared resources and data over the internet, which makes them well-suited for collaborative tasks shared by geographically dispersed teams. Additionally, cloud platforms integrate with a wide range of services and tools, facilitating seamless workflow automation, data analysis, and machine learning.

In contrast, HPC workflows are designed to maximize computational performance through tightly coupled, high-performance hardware configurations, such as supercomputers or computing clusters. HPC systems are optimized for executing large-scale, computationally intensive tasks with high efficiency and speed. This makes them ideal for scientific simulations, complex modeling, and data-intensive research. Unlike cloud workflows, which prioritize flexibility and ease of access, HPC workflows are built on raw computational power and low-latency communication between processing nodes. This requires specialized hardware and software configurations, as well as expertise in parallel programming and performance optimization. HPC workflows can deliver optimized performance for specific types of workloads, they can lack the elasticity and cost-effectiveness of cloud-based solutions. The choice between cloud and HPC workflows depends on the specific requirements of the computational tasks, including factors such as scalability, performance, cost, and ease of collaboration.

Here is a comparison of cloud workflows and HPC workflows.

**Cloud Computing**

Cloud computing provides on-demand access to computing resources over the internet, enabling scalability, flexibility, and cost efficiency. Key features include:

- Elasticity: Resources can be scaled up or down based on demand.
- Pay-as-you-go pricing: Users pay only for the resources they consume.
- Managed services: Cloud providers handle maintenance, security, and updates.
- Broad accessibility: Services are available remotely via the internet.
- Common cloud computing use cases include web hosting, SaaS applications, and - general-purpose workloads.

**High Performance Computing (HPC)**

HPC focuses on solving complex computational problems by leveraging powerful hardware and parallel processing. Key characteristics include:

- High-speed processing: Uses specialized hardware like NVIDIA GPUs (e.g., A100, H100) for accelerated performance.
- Low-latency networking: Optimized interconnects minimize communication delays.
- Batch processing: Large jobs are often executed in scheduled batches.
- Specialized workloads: Designed for simulations, AI training, and scientific research.

## Data Gravity and Choosing Between Cloud and HPC

Data gravity is a concept asserts that as data accumulates, it attracts additional data, services, and applications. In cloud computing, the larger and more valuable a dataset becomes, the more it attracts related data and computational processes. Data gravity spotlights the relationship between data locality and the efficient data processing. Moving large datasets across networks is time-consuming and costly. By placing computational resources close to the data, performance improves, latency is reduced, and the cost of data egress from cloud storage decreases. Large datasets influence architectural choices in cloud computing, and as a corollary, the volume of data changes scientific workflows.

The default choice between cloud and HPC is run wherever you have resources But if you have choices, run where your data is. If the data is located on premise and on NFS use HPC. If you’re processing data that is hosted on AWS or another cloud provider, then run on the cloud.

## Methods and Tools for Cloud Workflows

Parallelization, or concurrent programming, in scientific computing is the technique of dividing computational tasks into smaller subtasks that can be executed simultaneously across multiple processing units. It is an efficient way to work with data in the cloud. There are several ways to implement parallelization, ranging from pure Python solutions to [Dask](https://www.dask.org/), a distributed task scheduler. 

### Parallel Processing and Concurrency with Python

Python implements parallel and concurrent programming through multiprocessing and multithreading modules. Each module serves a distinct purpose for a class of tasks. The multiprocessing module enables parallelism by using multiple processors or cores, which allows executing multiple processes simultaneously. This is advantageous for CPU-bound tasks, where the workload can be distributed across several processes to significantly enhance performance and reduce execution time. In contrast, the multithreading module is designed for operations that involve waiting for external events, such as data I/O or network responses. Multithreading creates multiple threads to run concurrently within the same process, making it efficient for tasks that require frequent waiting. However, due to Python's [Global Interpreter Lock (GIL)](https://wiki.python.org/moin/GlobalInterpreterLock), threads are not truly parallel but rather interleaved, which limits performance gains for CPU-bound tasks. Together, these modules optimize application performance through parallel and concurrent execution, catering to different types of computational challenges.

| Feature           | Concurrency                                       | Parallelism                                       |
|------------------|---------------------------------------------------|--------------------------------------------------|
| **Definition**    | Managing multiple tasks at once (interleaving)    | Executing multiple tasks simultaneously          |
| **Execution**     | Tasks appear to run at the same time              | Tasks actually run at the same time              |
| **CPU Usage**     | Can be done on a single core (via context switching) | Requires multiple cores or processors        |
| **Purpose**       | Maximize responsiveness, resource efficiency      | Maximize performance, throughput                 |
| **Example**       | Switching between tasks while waiting for I/O     | Performing computations on multiple cores        |
| **Analogy**       | One cook managing several dishes sequentially     | Multiple cooks preparing multiple dishes at once |
| **Implementation**| Threads, coroutines, async/await                  | Multiprocessing, GPU computation, distributed systems |

#### Multiprocessing

Multiprocessing runs many processes across multiple CPU cores that do not share the resources among them. Each process can have many threads running in its own memory space. In Python, each process has its own instance of Python interpreter doing the job of executing the instructions. Multiprocessing can be faster for CPU-bound tasks and more robust as a crash in one process doesn't affect others. However it has higher overhead costs, slower startup time, and more complex data sharing between processes. 

The diagram below illustrates how multiprocessing works when performing line counts on multiple text files.

```
import multiprocessing

def count_lines(file_name):
    with open(file_name, 'r') as file:
        return len(file.readlines())

files = ["file1.txt", "file2.txt", "file3.txt"]

with multiprocessing.Pool() as pool:
    line_counts = pool.map(count_lines, files)

print(line_counts)



                             +-------------------------+
                             |     Main Program        |
                             +-------------------------+
                                        |
                                        v
                             +-------------------------+
                             |  multiprocessing.Pool() |
                             +-------------------------+
                                        |
              ---------------------------------------------------
              |                         |                         |
              v                         v                         v
     +----------------+       +----------------+       +----------------+
     | Worker Process |       | Worker Process |       | Worker Process |
     |   count_lines  |       |   count_lines  |       |   count_lines  |
     | "file1.txt"    |       | "file2.txt"    |       | "file3.txt"    |
     +----------------+       +----------------+       +----------------+
              |                         |                         |
              v                         v                         v
      +---------------+       +---------------+       +---------------+
      | Line Count: X |       | Line Count: Y |       | Line Count: Z |
      +---------------+       +---------------+       +---------------+
              \                         |                         /
               \                        |                        /
                \_______________________|_______________________/
                                        v
                          +-----------------------------+
                          | line_counts = [X, Y, Z]     |
                          +-----------------------------+

```

#### Multithreading 

A thread is the smallest unit of executionMultithreading is a technique where multiple threads are spawned by a process to do different tasks, at about the same time, just one after the other. This gives you the illusion that the threads are running in parallel, but they are actually run in a concurrent manner. In Python, the Global Interpreter Lock (GIL) prevents the threads from running simultaneously.


good for external
the GIL

Concurrency within a single process: Threads run within a single process, sharing the same memory space.
Global Interpreter Lock (GIL): Python's GIL restricts the execution of only one thread at a time, even on multi-core systems, thereby limiting true parallelism.
I/O-bound tasks: Best suited for tasks involving I/O operations (like network requests, file access) where threads spend most of their time waiting for external resources, notes Built In. The GIL is released during these waiting periods, allowing other threads to run concurrently.
Pros: Lower overhead, faster startup time, and simpler data sharing between threads.
Cons: Limited parallelism due to GIL, can be slower for CPU-bound tasks. 


In [None]:
from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import partial
import os

import boto3
import tqdm

AWS_BUCKET = "my-bucket"
OUTPUT_DIR = "downloads"

def download_one_file(bucket: str, output: str, client: boto3.client, s3_file: str):
    """
    Download a single file from S3
    Args:
        bucket (str): S3 bucket where images are hosted
        output (str): Dir to store the images
        client (boto3.client): S3 client
        s3_file (str): S3 object name
    """
    client.download_file(
        Bucket=bucket, Key=s3_file, Filename=os.path.join(output, s3_file)
    )


files_to_download = ["file_1", "file_2", ..., "file_n"]
# Creating only one session and one client
session = boto3.Session()
client = session.client("s3")
# The client is shared between threads
func = partial(download_one_file, AWS_BUCKET, OUTPUT_DIR, client)

# List for storing possible failed downloads to retry later
failed_downloads = []

with tqdm.tqdm(desc="Downloading images from S3", total=len(files_to_download)) as pbar:
    with ThreadPoolExecutor(max_workers=32) as executor:
        # Using a dict for preserving the downloaded file for each future, to store it as a failure if we need that
        futures = {
            executor.submit(func, file_to_download): file_to_download for file_to_download in files_to_download
        }
        for future in as_completed(futures):
            if future.exception():
                failed_downloads.append(futures[future])
            pbar.update(1)
if len(failed_downloads) > 0:
    print("Some downloads have failed. Saving ids to csv")
    with open(
        os.path.join(OUTPUT_DIR, "failed_downloads.csv"), "w", newline=""
    ) as csvfile:
        wr = csv.writer(csvfile, quoting=csv.QUOTE_ALL)
        wr.writerow(failed_downloads)

# Choosing Multiprocessing vs Multithreading

If your application involves a lot of I/O operations and waiting (e.g., fetching data from a web server or database), multithreading is often the better choice. If your application is CPU-intensive and performs heavy computations, multiprocessing is generally preferred to leverage multiple CPU cores. In essence: Multithreading helps hide latency by allowing concurrent execution, while multiprocessing helps speed up CPU-intensive tasks through parallel execution.

### Workflow Management

Workflow management is essential for organizing and optimizing the sequence of tasks within any process, ensuring efficiency and effectiveness in achieving desired outcomes. At its core, workflow management involves designing, executing, and monitoring a series of tasks or activities that transform inputs into outputs. A key concept in this domain is the use of directed acyclic graphs (DAGs), which provide a visual and structural framework for representing workflows. In a DAG, tasks are depicted as nodes, and the directed edges between these nodes define the dependencies and flow of tasks, ensuring that each task is executed only after its prerequisites are completed. This acyclic property prevents circular dependencies, guaranteeing that workflows progress in a logical and orderly manner. By leveraging DAGs, workflow management systems can efficiently schedule tasks, allocate resources, and handle dependencies, thereby streamlining complex processes and enhancing overall productivity. The clarity and structure provided by DAGs make them invaluable for modeling workflows in various fields, from data processing and scientific computing to business process management.

#### Directed Acyclic Graphs (DAG)

In this section, you're introduced to **workflow parallelization** using Directed Acyclic Graphs (DAGs), a foundational concept in scalable computing with libraries like `dask`. DAGs allow us to break down complex, multi-step workflows into discrete, interdependent tasks — making it easier to identify which processes can be executed in parallel rather than sequentially.

You'll start by visualizing the parallelization potential in a typical waveform processing pipeline. This example mimics a simplified seismic analysis using data from multiple stations, and demonstrates how task-level parallelism is encoded using a DAG.

A DAG represents tasks (nodes) and their dependencies (edges) in a one-way structure — no loops, no cycles. This allows us to:
- Visually understand task order and dependencies.
- Identify opportunities for parallel execution.
- Optimize computation by minimizing idle time.

In the context of the `trad_vs_cloud-SAGE.ipynb` notebook, this DAG represents a pipeline where:
- Waveforms from multiple stations are processed.
- Shared preprocessing steps (like inventory loading) feed into independent station-specific tasks.
- Results eventually converge into common postprocessing steps.

In [None]:
import matplotlib.pyplot as plt
import networkx as nx

# Create a directed graph
G = nx.DiGraph()

# Divide  workflow into different tasks
Task1 = 'Inventory'
Task2_1, Task2_2, Task2_3 = 'station 1', 'station 2', 'station 3' # suppose we are accessing waveforms for three stations
Task3 = 'Correction'
Task4 = 'Picking'
Task5 = 'Trimming'
Task6 = 'Frequency'

# Add nodes for tasks
G.add_node(Task1) 
G.add_node(Task2_1), G.add_node(Task2_2), G.add_node(Task2_3)  
G.add_node(Task3)
G.add_node(Task4)
G.add_node(Task5)
G.add_node(Task6)
# Add edges to show dependencies
G.add_edges_from([(Task1, Task2_1), (Task1, Task2_2), (Task1, Task2_3),
                  (Task2_1, Task3), (Task2_2, Task3), (Task2_3, Task3),
                  (Task3, Task4), (Task4, Task5), (Task5, Task6)])

# annotate each node with its stage
layers = {
    Task1:         0,
    Task2_1:       1,
    Task2_2:       1,
    Task2_3:       1,
    Task3:         2,
    Task4:         3,
    Task5:         4,
    Task6:         5
}

for n, L in layers.items():
    G.nodes[n]['layer'] = L

plt.figure(figsize=(12, 4))
pos = nx.multipartite_layout(G, subset_key='layer', align='vertical')
nx.draw(G, pos,
        with_labels=True,
        node_size=3000,
        node_color="skyblue",
        font_size=10,
        font_weight="bold",
        arrowsize=20)
plt.title("Dask Task Graph (linear multipartite layout)")
plt.axis('off')
plt.show()

This example serves as a realistic analog of seismic workflows in cloud environments, where tasks such as waveform retrieval and preprocessing for multiple stations can be executed independently — a classic case of what's known as an **embarrassingly parallel** problem. In such problems, individual tasks require little to no communication with one another, making them ideal candidates for concurrent execution across distributed computing resources. In the context of seismology, each station's data can be fetched, corrected, and processed in isolation before being merged downstream for higher-level analysis. By leveraging cloud-based parallelization frameworks like Dask, we can significantly reduce total computation time through intelligent task scheduling and resource allocation.

#### Dask Intro

Dask is a Python parallel computing library designed to scale from a single laptop to a cluster of machines, Dask provides several key advantages that enhance computational efficiency and productivity:

Scalability: Dask can scale your computations from a local machine to a distributed cluster with minimal code changes, allowing you to handle larger-than-memory datasets and complex computations effortlessly.

Parallel Computing: By leveraging multi-core processors and distributed computing, Dask significantly speeds up data processing tasks, making it ideal for large-scale scientific computations.

Integration with Existing Tools: Dask works seamlessly with popular Python libraries such as NumPy, Pandas, and Scikit-Learn. This compatibility allows users to leverage familiar APIs while benefiting from Dask's parallel computing capabilities.

Lazy Evaluation: Dask uses lazy evaluation to optimize computation graphs, executing only the necessary computations and reducing redundant operations. This approach improves efficiency and performance.

Flexible Data Structures: Dask provides high-level abstractions like Dask Arrays, DataFrames, and Bags that mimic NumPy arrays, Pandas DataFrames, and Python lists, respectively. These abstractions make it easier to work with large datasets in a distributed environment.

Fault Tolerance: Dask's distributed scheduler includes mechanisms for fault tolerance, ensuring that computations can continue even if individual tasks fail. This robustness is crucial for long-running scientific computations.

Ease of Use: With its intuitive API and comprehensive documentation, Dask is accessible to both beginners and experienced users. Its design philosophy emphasizes simplicity and ease of use, making it a practical choice for scientific computing.

Community and Ecosystem: Dask benefits from a vibrant and growing community of users and developers. This active ecosystem ensures continuous improvements, extensive support, and a wealth of resources for troubleshooting and learning.

In summary, Dask is a powerful and versatile tool that addresses many of the challenges associated with scientific computing. Its ability to scale, integrate with existing tools, and optimize computations makes it an essential library for researchers and data scientists working with large datasets and complex computational tasks.