# Assignment 10: Parallel Computing

### Due 19 November 2025

### Introduction

This assignment is about parallel computing with Dask. You should use Python to implement the calculations. If possible, please submit your answers in PDF  or HTML format. In case you have any issues installing Dask via pip, please use the following command:

```bash
python -m pip install "dask[complete]" --use-deprecated=legacy-resolver
```

This command will resolve dependencies and install the required packages for Dask.

You can also install Dask using conda (which the authors recommend):

```bash
conda install dask
```

If you encounter any issues, please check their website: <https://docs.dask.org/en/stable/install.html> and let us know.

1. Explain the concept of "overhead" in parallel computing with `joblib`. Why might running a very simple task (like adding 1 to a number) in parallel with `joblib` be slower than running it serially?

Your answer here: Overhead in parallel computing refers to the additional time and resources required to manage and coordinate multiple processes or threads. This includes the time taken to start up worker processes, distribute tasks, communicate between processes, and gather results. 

When running a very simple task, such as adding 1 to a number, the actual computation time is extremely short. However, the overhead associated with setting up parallel execution can be significant compared to the time taken for the computation itself. As a result, the total time taken for the parallel execution (including overhead) can exceed the time taken for serial execution, making it slower overall

1. Write a Python function `count_vowels(text)` that counts the vowels (a, e, i, o, u, case-insensitive) in a given string. Then, use the `Parallel` and `delayed` functions from the `joblib` library to apply your function in parallel. Use all available cores.  The function should return a list of integers, where each integer corresponds to the number of vowels in the respective sentence.

```python
sentences = [
    "Joblib makes parallel computing easy",
    "Dask scales Python code effectively",
    "Parallelism can speed up computations",
    "Always consider the overhead"
]
```

In [16]:
## Your answer here
def count_vowels(text):
    vowels = "aeiouAEIOU"
    count = 0
    for ch in text:
        # if ch is a vowel, increase count
        if ch in vowels:
            count += 1
    return count

from joblib import Parallel, delayed

sentences = [
    "Joblib makes parallel computing easy",
    "Dask scales Python code effectively",
    "Parallelism can speed up computations",
    "Always consider the overhead",
]

# use all cores: n_jobs=-1
results = Parallel(n_jobs=-1)(
    delayed(count_vowels)(s) for s in sentences
)

print(results)  # list of vowel counts

[12, 10, 13, 10]


3. Write a function called `get_length` that takes a word as input and returns its length. Then, using the provided list `words`, do the following:

* Use a standard (sequential) for loop to calculate the length of each word by calling your function.
* Use the `joblib` library to calculate the length of each word in parallel, also calling your function. Use `Parallel` and `delayed` from `joblib` again.
* Compare the syntax of the sequential and parallel approaches. How do they differ when writing the loop?

```python
words = ["joblib", "parallel", "computing", "example"]
```

In [17]:
## Your answer here
def get_length(word):
    return len(word)
words = ["joblib", "parallel", "computing", "example"]

lengths_seq = []
for w in words:
    lengths_seq.append(get_length(w))

print("Sequential:", lengths_seq)

from joblib import Parallel, delayed

lengths_par = Parallel(n_jobs=-1)(
    delayed(get_length)(w) for w in words
)

print("Parallel:", lengths_par)

Sequential: [6, 8, 9, 7]
Parallel: [6, 8, 9, 7]


3. Create a 10000x10000 Dask array `da_a` filled with random integers between 0 and 100, chunked into (500, 1000) blocks. Use `RandomState(350)` to make your code reproducible. Create a second Dask array `da_b` of the same shape and chunks, filled with ones. Compute `da_c = (da_a + da_b) * 2` and its mean value.

In [18]:
## Your answer here
import dask.array as da

# random integers 0â€“100, using a fixed RandomState
rs = da.random.RandomState(350)
da_a = rs.randint(
    low=0,
    high=100,
    size=(1000, 1000),
    chunks=(500, 1000),
)

# array of ones with same shape/chunks
da_b = da.ones_like(da_a)

# elementwise expression
da_c = (da_a + da_b) * 2

# mean is a Dask scalar, must be computed
mean_value = da_c.mean().compute()
mean_value

100.992538

4. What is the difference between `dask.dataframe.compute()` and `dask.dataframe.persist()`? When would you typically use `.persist()`?

Your answer here: The difference between `dask.dataframe.compute()` and `dask.dataframe.persist()` lies in how they handle the computation of Dask DataFrames. `dask.dataframe.compute()` triggers the execution of the entire computation graph and returns the result as a concrete Pandas DataFrame. It is typically used when you want to obtain the final result of your computations and are ready to bring the data into memory. On the other hand, `dask.dataframe.persist()` triggers the computation but keeps the result as a Dask DataFrame, storing it in distributed memory. This allows for faster subsequent computations on the persisted data without recomputing from scratch. You would typically use `.persist()` when you have a large dataset that you want to keep in memory for multiple operations, as it can significantly speed up subsequent computations by avoiding redundant calculations.

5. In this question, you will compare the performance of a regular `for` loop and `dask` for a simple computation. First, create a function called `intensive_task` as follows:

```python
import numpy as np
import time
import dask

def intensive_task(n):
    loop_limit = 10_000_000 # How many iterations inside the function
    total = 0
    for i in range(loop_limit):
        total += i*i
    return total
```

Then, create a list called `inputs` with 6 values:

```python
inputs = [1, 2, 3, 4, 5, 6] 
```

Now, use the function `time.time()` to measure the time it takes to run the function `intensive_task` for each value in the list `inputs` using a regular `for` loop. Store the results in a list called `results`. Remember to create the `start_time` and `end_time` variables to measure the time taken for the computation. The result, which is the difference between `end_time` and `start_time`, should be printed.

Repeat the same task using `dask`. However, instead of using the `@dask.delayed` decorator, use the code below:

```python
tasks = [dask.delayed(intensive_task)(i) for i in inputs]
```

Then, use `dask.compute()` to compute the results. Again, measure the time taken for the computation and print the result. Which one is faster?

In [19]:
## Your answer here
import numpy as np
import time
import dask

def intensive_task(n):
    loop_limit = 10_000_000
    total = 0
    for i in range(loop_limit):
        total += i * i
    return total

inputs = [1, 2, 3, 4, 5, 6]

start_time = time.time()

results_seq = []
for x in inputs:
    results_seq.append(intensive_task(x))

end_time = time.time()
print("Sequential time:", end_time - start_time)
tasks = [dask.delayed(intensive_task)(i) for i in inputs]

start_time = time.time()

results_dask = dask.compute(*tasks)

end_time = time.time()
print("Dask time:", end_time - start_time)

Sequential time: 2.594106674194336
Dask time: 2.3910107612609863


6. In the same folder as this notebook, you will find a Parquet file named `data.parquet`. It is available here: <https://github.com/danilofreire/qtm350/blob/main/assignments/data.parquet>. This file contains student records with the following columns:

* `emory_id` (integer) 
* `student_name` (string)
* `major` (string)
* `gpa` (float)

Write Python code using `dask.dataframe` to read the `data.parquet` file, but only load the `major` and `gpa` columns. Then, print the first 5 rows of the resulting Dask DataFrame using the `.head()` method, and calculate the average GPA by major.

You will need a Parquet engine to read the file. If you don't have one installed, you can use `pyarrow`. You can install it using conda (or pip):

```bash
conda install pyarrow
```

In [21]:
## Your answer here

import dask.dataframe as dd

# read only specific columns
ddf = dd.read_parquet("data.parquet", columns=["major", "gpa"])

# print first 5 rows
print(ddf.head())

# groupby major and compute mean gpa
avg_gpa_by_major = ddf.groupby("major")["gpa"].mean().compute()
print(avg_gpa_by_major)

       major   gpa
0    History  2.98
1  Chemistry  3.16
2  Chemistry  3.83
3        QTM  3.67
4    CompSci  3.33
major
Biology      3.044000
Chemistry    3.320000
CompSci      3.352857
Economics    3.285000
English      3.484000
History      3.098750
Physics      3.222857
QTM          2.957500
Name: gpa, dtype: float64


7. You have two CSV files in this directory:

* `students.csv`: Contains columns `student_id`, `student_name`. Available here: <https://github.com/danilofreire/qtm350/blob/main/assignments/students.csv>.
* `grades.csv`: Contains columns `student_id`, `course`, `grade`. Available here: <https://github.com/danilofreire/qtm350/blob/main/assignments/grades.csv>.

Write Python code using dask.dataframe to:

* Read `students.csv` into a Dask DataFrame called `ddf_students`.
* Read `grades.csv` into a Dask DataFrame called `ddf_grades`.
* Merge these two DataFrames together based on the common `student_id` column. An inner merge is recommended (only include students present in both files).
* From the merged DataFrame, select only the `student_name`, `course`, and `grade` columns. Save it as `ddf_final`.
* Compute and print the first 5 rows of this final merged DataFrame using `.head()`.

Good luck! ðŸ˜ƒ