# Parallelization with Python (II):

## Demo for basic Dask.

### Online resources:

- https://tutorial.dask.org/

### Hungjui Yu – 20240223

***
## <font color='maroon'>What is Dask?</font>

- Dask is a parallel computing library that integrates seamlessly with popular Python libraries like NumPy, Pandas, and Scikit-Learn. It enables parallel and distributed computing on larger-than-memory datasets.

## <font color='maroon'>What is Dask Arrays?</font>

- Dask arrays provide parallelized and larger-than-memory computations on arrays. They closely resemble NumPy arrays but operate on larger datasets.


***
## <font color='maroon'>Focus 1.</font> `dask` basic.

In [5]:
%reset

Once deleted, variables cannot be recovered. Proceed (y/[n])? y


In [6]:
import dask
import dask.array as da
from dask import delayed
import numpy as np
import time

In [7]:
from memory_profiler import profile
%load_ext memory_profiler

### Creating a Dask Array:

In [None]:
x = np.random.random((10000, 10000))

# x_dask_array = da.from_array(x, chunks='auto')
x_dask_array = da.from_array(x, chunks=(500, 500))


x_dask_array


### Calculate as original numpy array:

In [None]:
code_block = '''
def use_np_array():

    start = time.time()

    y = (x + x.T).mean()
    print(y)

    end = time.time()
    print(f'Finished in {round(end-start, 3)} seconds.')
    
use_np_array()
'''

%memit exec(code_block)


### Calculate as Dask Array:

In [None]:
code_block = '''
def use_dask_array():
    start = time.time()

    y_dask_array = (x_dask_array + x_dask_array.T).mean()
    result = y_dask_array.compute()
    print(result)

    end = time.time()
    print(f'Finished in {round(end-start, 3)} seconds.')
    
use_dask_array()
'''

%memit exec(code_block)


### Side notes:

- **Adaptability to Resources:** When you set `chunk='auto'`, Dask automatically determines the chunk sizes based on the shape of the input array and the available resources.

- **Efficient Parallelization:** Dask is designed to parallelize computations across multiple cores or nodes in a cluster. By automatically determining chunk sizes, Dask can distribute the computation efficiently, minimizing communication overhead and maximizing parallelism.

- **Ease of Use:** Using `chunk='auto'` simplifies the creation of Dask arrays. You don't need to manually specify chunk sizes. But there might be cases where manually specifying chunk sizes based on your specific computation needs could lead to further optimization. It's often a good idea to experiment with different chunking strategies to find the most suitable configuration for your particular use case.

***
## <font color='maroon'>Focus 2.</font> `dask.delayed`

## <font color='maroon'>What is Dask Delayed?</font>

- Dask Delayed is a Dask submodule that allows users to parallelize custom computations by delaying their execution until a later time.

- Instead of executing a function immediately, you create a delayed version of the function using `dask.delayed`. This delayed function represents the computation to be performed.

### Creating a expensive computation:

In [8]:
def expensive_computation(x):
    time.sleep(1)
    return x ** 2

numbers = [1, 2, 3, 4, 5]


### Calculate using for loop:

In [6]:
start = time.time()

eager_results = [expensive_computation(num) for num in numbers]

end = time.time()

print("Eager Results:", eager_results)
print(f'Finished in {round(end-start, 3)} seconds.')


Eager Results: [1, 4, 9, 16, 25]
Finished in 5.005 seconds.


### Calculate using `dask.delayed`:

In [7]:
lazy_results = [dask.delayed(expensive_computation)(num) for num in numbers]

start = time.time()

lazy_computed = dask.compute(*lazy_results)

end = time.time()

print("Lazy Computed Results:", lazy_computed)
print(f'Finished in {round(end-start, 3)} seconds.')

Lazy Computed Results: (1, 4, 9, 16, 25)
Finished in 1.224 seconds.


### Side notes:

#### Lazy Evaluation with `dask.delayed`:

- Tasks are represented as a graph, allowing for optimized execution.

- The computations are not executed until explicitly triggered with `dask.compute`.

- Enables potential parallelism and optimization, suitable for large-scale computations.


### Python Example:

```python
dask_alt = da.from_array(alt, chunks=(81, 500, 500))
dask_lat = da.from_array(lat, chunks=(81, 500, 500))
dask_lon = da.from_array(lon, chunks=(81, 500, 500))

## Function to convert to the 3D ECEF coordinates:

def convert_ecef_coords(lat_arr, lon_arr, alt_arr):
    ecef_x, ecef_y, ecef_z = pm3.geodetic2ecef(lat_arr, lon_arr, alt_arr, ell=None, deg=True)
    return ecef_x, ecef_y, ecef_z

## Apply function to Dask arrays:

lazy_results = dask.delayed(convert_ecef_coords)(dask_lat, dask_lon, dask_alt)

ecef_x, ecef_y, ecef_z = lazy_results.compute()

```

***
## <font color='maroon'>Focus 3.</font> shared array computation.

### Calculate using for loop:

In [10]:
def numpy_shared_array_calculation(array_size):
    shared_array = np.zeros(array_size, dtype=float)

    for i in range(array_size):
        shared_array[i] = expensive_computation(i)
    
    return shared_array

In [11]:
array_size = 10

start_time = time.time()

numpy_result = numpy_shared_array_calculation(array_size)

numpy_execution_time = time.time() - start_time

print("NumPy Result:", numpy_result)
print("NumPy Execution Time:", numpy_execution_time)


NumPy Result: [ 0.  1.  4.  9. 16. 25. 36. 49. 64. 81.]
NumPy Execution Time: 10.010547637939453


### Calculate using dask:

In [12]:
def dask_shared_array_calculation(array_size):

    shared_array = da.arange(array_size, dtype=float)
    
    result = da.map_blocks(expensive_computation, shared_array, dtype=float)
    
    return result.compute()

In [14]:
array_size = 10

start_time = time.time()

dask_result = dask_shared_array_calculation(array_size)

dask_execution_time = time.time() - start_time

print("Dask Result:", dask_result)
print("Dask Execution Time:", dask_execution_time)


Dask Result: [ 0.  1.  4.  9. 16. 25. 36. 49. 64. 81.]
Dask Execution Time: 2.006019115447998


### Side notes:

#### `dask.array.map_blocks`:

- It allows you to apply a function independently to blocks of a Dask array, facilitating parallel processing and distributed computing.

- It creates a new Dask array representing the result of applying the specified function to each block of the original array. It operates lazily, meaning the computation is not performed until explicitly requested (e.g., by calling `compute()`).

- This function is particularly useful for element-wise operations where the computation can be parallelized across blocks of the array. It can be applied to large datasets that do not fit into memory and can be efficiently distributed across multiple cores or nodes in a cluster.

***
## <font color='maroon'>Recap:</font>

### `dask`:

#### 1. `dask.array.from_array( ..., chunks='')`

#### 2. `dask.delayed()`

#### 3. `dask.compute()`

#### 4. `dask.array.map_blocks`

***
***
# <font color='teal'>**Supplement:**</font>
***
***

In [None]:
@profile
def use_np_array():

    start = time.time()

    y = (x + x.T).mean()
    print(y)

    end = time.time()
    print(f'Finished in {round(end-start, 3)} seconds.')
    
%mprun -f use_np_array use_np_array()


In [8]:
@delayed
def square(x):
    return x ** 2

def add(a, b):
    return a + b

# Delayed computation
a = square(2)
b = square(3)
c = add(a, b)

# Compute the result
result_delayed = c.compute()
print(result_delayed)

13


In [None]:
def simple_computation(a, b):
    result = a + b
    return result

# Timing the computation without Dask Delayed
start_time = time.time()
result_without_delayed = simple_computation(2, 3)
elapsed_time_without_delayed = time.time() - start_time

print(f"Without Dask Delayed - Result: {result_without_delayed}, Elapsed Time: {elapsed_time_without_delayed:.4f} seconds")


In [None]:
# Using Dask delayed to parallelize a simple computation
@delayed
def simple_computation_delayed(a, b):
    result = a + b
    return result

# Timing the computation with Dask Delayed
start_time = time.time()
result_with_delayed = simple_computation_delayed(2, 3).compute()
elapsed_time_with_delayed = time.time() - start_time

print(f"With Dask Delayed - Result: {result_with_delayed}, Elapsed Time: {elapsed_time_with_delayed:.4f} seconds")


### Matrix Multiplication without Dask:

In [None]:
def matrix_multiply_np(size):
    A = np.random.random((size, size))
    B = np.random.random((size, size))
    start_time = time.time()
    result_np = np.dot(A, B)
    elapsed_time = time.time() - start_time
    return result_np, elapsed_time

result_np, time_np = matrix_multiply_np(10000)

print(f"Without Dask - Elapsed Time: {time_np:.4f} seconds")


### Matrix Multiplication with Dask:

In [None]:
def matrix_multiply_dask(size):
    A = da.random.random((size, size), chunks='auto')
    B = da.random.random((size, size), chunks='auto')
    start_time = time.time()
    result_dask = da.dot(A, B).compute()
    elapsed_time = time.time() - start_time
    return result_dask, elapsed_time

result_dask, time_dask = matrix_multiply_dask(10000)

print(f"With Dask - Elapsed Time: {time_dask:.4f} seconds")


In [None]:
# Importing Dask, NumPy, and other necessary libraries
import dask
import dask.array as da
import numpy as np
import time

# Function to perform matrix multiplication without Dask
def matrix_multiply_np(size):
    np.random.seed(42)
    A = np.random.random((size, size))
    B = np.random.random((size, size))
    
    start_time = time.time()
    result_np = np.dot(A, B)
    elapsed_time = time.time() - start_time
    
    return result_np, elapsed_time

# Function to perform matrix multiplication with Dask
def matrix_multiply_dask(size, chunk_size):
    da.random.seed(42)
    A = da.random.random((size, size), chunks=(chunk_size, chunk_size))
    B = da.random.random((size, size), chunks=(chunk_size, chunk_size))
    
    start_time = time.time()
    result_dask = da.dot(A, B).compute()
    elapsed_time = time.time() - start_time
    
    return result_dask, elapsed_time

# Parameters
matrix_size = 1000
chunk_size = 500

# Matrix multiplication without Dask
result_np, time_np = matrix_multiply_np(matrix_size)

# Matrix multiplication with Dask
result_dask, time_dask = matrix_multiply_dask(matrix_size, chunk_size)

# Display results and comparison
print(f"Matrix Size: {matrix_size}x{matrix_size}")
print("Without Dask:")
print(f"   Elapsed Time: {time_np:.4f} seconds")

print("\nWith Dask:")
print(f"   Elapsed Time: {time_dask:.4f} seconds")

# Check if results are close (within a tolerance) due to potential floating-point differences
if np.allclose(result_np, result_dask):
    print("\nThe results are close.")
else:
    print("\nThe results differ.")

