https://campus.datacamp.com/courses/parallel-programming-with-dask-in-python/lazy-evaluation-and-parallel-computing?ex=1

# chapter 1

In [None]:
#Dask delayed
from dask import delayed
def my_square_funtion(x):
    returnx**2
# reate delayed version of above funtion
delayed_square_funtion = delayed(my_square_funtion)

#use the delayed funtion with input 4
delayed_result = delayed_square_funtion(4)

print(delayed_result)

##### Delaying functions
You have been tasked with adding up your company's costs for the last 2 weeks. Because you know that you will want to run this same computation in the future with many more weeks, you think it might be a good idea to write it in a way that can be parallelized.

The arrays of costs of items for the last two weeks are available as costs_week_1 and costs_week_2, and numpy has been imported as np.

In [None]:
# Import the delayed function from Dask
from dask import delayed 

# Lazily calculate the sums of costs_week_1 and costs_week_2
sum1 = delayed(np.sum)(costs_week_1)
sum2 = delayed(np.sum)(costs_week_2)

# Add the two delayed sums
total = delayed(sum1 + sum2)

# Compute and print the final answer
print(total.compute())

#### Plotting the task graph
You are trying to analyze your company's spending records. Your manager wants to see what fraction of the total spending occurred in each month. But you are going to have to run it for many files, so it would be good to set up a lazy calculation so you can speed it up using threads or processes. To figure out which of these task scheduling methods might be better for this calculation, you would like to visualize the task graph.

The totals spent in two months are available for you as delayed objects as month_1_costs and month_2_costs. dask has also been imported for you.

In [None]:
# Add the two delayed month costs
total_costs = month_1_costs + month_2_costs

# Calculate the fraction of total cost from month 1
month_1_fraction = month_1_costs / total_costs

# Calculate the fraction of total cost from month 2
month_2_fraction = month_2_costs / total_costs

# Plot the joint task graph used to calculate the fractions
dask.visualize(month_1_fraction, month_2_fraction)

#### Analyzing songs on Spotify
You have a list of CSV files that you want to aggregate to investigate the Spotify music catalog. Importantly, you want to be able to do this quickly and to utilize all your available computing power to do it.

Each CSV file contains all the songs released in a given year, and each row gives information about an individual song.

dask and delayed() have been imported for you, and the list of filenames is available in your environment as filenames. pandas has been imported as pd.

In [None]:
n_songs_in_c, n_songs = 0, 0 

for file in filenames:
    # Load in the data
    df = delayed(pd.read_csv)(file)
    
    # Add to running totals
    n_songs_in_c += (df['key'] == 'C').sum()
    n_songs += df.shape[0]

# Efficiently compute total_n_songs_in_c and total_n_songs
total_n_songs_in_c, total_n_songs = dask.compute(n_songs_in_c, n_songs)

fraction_c = total_n_songs_in_c / total_n_songs
print(total_n_songs, fraction_c)

##### How danceable are songs these days?
It's time to dive deeper into the Spotify data to analyze some trends in music.

In each CSV file, the 'danceability' column contains the score between 0 and 1 of how danceable each song is. The score describes how suitable a track is for dancing based on a combination of musical elements, including tempo, rhythm stability, beat strength, and overall regularity. Do you think songs are getting better or worse to dance to?

dask and the delayed() function have been imported for you. pandas has been imported as pd, and matplotlib.pyplot has been imported as plt. The list of filenames is available in your environment as filenames, and the year of each file is stored in the years list.

In [None]:
danceabilities = []

for file in filenames:
	# Lazily load in the data
    df = delayed(pd.read_csv)(file)
    # Calculate the average danceability in the file of songs
    mean_danceability = df['danceability'].mean()
    danceabilities.append(mean_danceability)

# Compute all the mean danceabilities
danceability_list = dask.compute(danceabilities)[0]
# Plot the results
plt.plot(years, danceability_list)
plt.show()

#### Most popular songs
You have one more task on this Spotify data, which is to find the top 10 most popular songs across all available years. The algorithm you will need to use to compute this is to calculate the top 10 songs in each year, and then combine these and find the top 10 of the top 10s.

The following function, which finds the top 10 songs in a DataFrame, has been provided for you and is available in your environment.

def top_10_most_popular(df):
  return df.nlargest(n=10, columns='popularity')
dask and the delayed() function have been imported for you. pandas has been imported as pd. The list of filenames is available in your environment as filenames, and the year of each file is stored in the list years.

In [None]:
top_songs = []

for file in filenames:
    df = delayed(pd.read_csv)(file)
    # Find the top 10 most popular songs in this file
    df_top_10 = top_10_most_popular(df)
    top_songs.append(df_top_10)

# Compute the list of top 10s
top_songs_list = dask.compute(top_songs)[0]

# Concatenate them and find the best of the best
top_songs_df = pd.concat(top_songs_list)
df_all_time_top_10 = top_10_most_popular(top_songs_df)
print(df_all_time_top_10)

##### Loading and processing photos
Let's say you are training a machine learning model to read American sign language images and translate them into text. The first part of this is loading images, and you have a lot of them. Thankfully, Dask can help you load and process these images lazily.

In [None]:
# Import the image subpackage from dask.array
from dask.array import image

# Lazily load in all jpegs inside all subdirectories inside data/asl
image_array = image.imread('data/asl/*.jpeg')

# Load only the zeroth image into memory
zeroth_image = image_array[0].compute()

# Plot the image
plt.imshow(zeroth_image)
plt.show()

#### An image processing pipeline
Your colleague has written a preprocessing function to use on the American sign language images in order to boost your machine learning model's accuracy. This function will take a grayscale image and run Canny edge detection on it. Canny edge detection is commonly used in classical computer vision and highlights the edges of objects in an image. You want to apply it to all of the images in your dataset.

The function your colleague has written is available in your environment as compute_edges(), and it takes an image that has dimensions (1, h, w) where the height h and the width w can be any integers.

The Dask array of your images is available in the environment as image_array. This array has shape (N, h, w, 3) where N is the number of images, and there are 3 channels for red, blue, and green.

dask.array has been imported for you as da.

In [None]:
# Convert the color photos to grayscale
grayscale_images = image_array.mean(axis=-1)

# Apply the edge detection function
edge_images = grayscale_images.map_blocks(compute_edges)

# Select the zeroth image and compute its values
sample_image = edge_images[0].compute()

# Show the result
plt.imshow(sample_image, cmap='gray')
plt.show()

##### Creating Dask dataframes from CSVs
Previously, you analyzed the Spotify song data using loops and delayed functions. Now you know that you can accomplish the same thing more easily using a Dask DataFrame. Let's see how much easier the same tasks you did earlier are if you do them using these methods instead of loops. First, however, you will need to load the dataset into a Dask DataFrame.

In [None]:
# Import dask dataframe as dd
import dask.dataframe as dd

# Load in the DataFrame
df  = dd.read_csv("data/spotify/*.csv", blocksize="1MB")

# Convert the release_date column from string to datetime
df['release_date'] = dd.to_datetime(df['release_date'])

# Show 5 rows of the DataFrame
print(df.head())

#### Read Dask DataFrames from Parquet
In Chapter 1, you analyzed some Spotify data, which was split across multiple files to find the top hits of 2005-2020. You did this using the dask.delayed() function and a loop. Let's see how much easier this analysis becomes using Dask DataFrames.

dask.dataframe has been imported for you as dd.

In [None]:
# Read the spotify_parquet folder
df = dd.read_parquet("data/spotify_parquet")

# Find the 10 most popular songs
top_10_songs = df.nlargest(n=10, columns='popularity')

# Convert the delayed result to a pandas DataFrame
top_10_songs_df = top_10_songs.compute()

print(top_10_songs_df)

##### Summertime grooves
In chapter 1, you found that the average danceability of songs has been rising since 2005. Now, you are tasked with finding the average danceability of songs released in different months. Is there some part of the year where more danceable songs are released? If you were a musician who had written a new dance song, is there a best time of year to release that? Let's find out.

In [None]:
# Extract the months from the release_date column using its datetime accessor 
months = df['release_date'].dt.month

# Group the danceabilities by month
monthly_groupby = df['danceability'].groupby(months)

# Find the mean danceability by month
monthly_danceability = monthly_groupby.mean()

# Compute the result
monthly_danceability_result = monthly_danceability.compute()

monthly_danceability_result.plot()
plt.show()

##### Dask arrays from HDF5 datasets
You have been tasked with analyzing European rainfall over the last 40 years. The monthly average rainfall in a grid of locations over Europe has been provided for you in HDF5 format. Since this file is pretty large, you decide to load and process it using Dask.

h5py has been imported for you, and dask.array has been imported as da.

In [None]:
# Open the HDF5 dataset using h5py
hdf5_file = h5py.File('data/era_eu.hdf5')

# Load the file into a Dask array with a reasonable chunk size
precip = da.from_array(hdf5_file['/precip'], chunks=(12, 15, 15))

# Select only the months of January
january_rainfalls = precip[0::12]

# Calculate the mean rainfall in January for each location
january_mean_rainfall = january_rainfalls.mean(axis=0)

plt.imshow(january_mean_rainfall.compute())
plt.show()

##### Dask arrays from Zarr datasets
You are tasked with analyzing European temperatures, and are given the same dataset which was in era_eu.hdf but this time in Zarr format. Zarr is a modern, powerful dataset format for storing chunked data. It is particularly good for use on cloud computing services but is also great on your own computer.

dask.array has been imported for you as da.

In [None]:
# Load the temperature data from the Zarr dataset
temps = da.from_zarr('data/era_eu.zarr', component='temp')

# Print the Dask array of temperatures to see the chunk sizes
print(temps)

# Find the minimum of the mean monthly temperatures
all_time_low = temps.min()

# Compute the answer
all_time_low_value = all_time_low.min()

print(all_time_low_value, "Â°C")

##### Exploratory data analysis with xarray
Xarray makes working with multi-dimensional data easier, just like pandas makes working with tabular data easier. Best of all, Xarray can use Dask in the background to help you process the data quickly and efficiently.

You have been tasked with analyzing the European weather dataset further. Now that you know how to use Xarray, you will start by doing some exploratory data analysis.

xarray has been imported for you as xr.

In [None]:
# Open the ERA5 dataset
ds = xr.open_zarr("data/era_eu.zarr")

# Select the zeroth time in the DataSet
ds_sel = ds.isel(time=0)

fig, (ax1, ax2) = plt.subplots(1,2, figsize=(8, 3))

# Plot the zeroth temperature field on ax1
ds_sel['temp'].plot(ax=ax1)

# Plot the zeroth precipitation field on ax2
ds_sel['precip'].plot(ax=ax2)
plt.show()

##### Monthly mean temperatures
After seeing the analysis of the average temperatures, you simply need to see the rest of the months. This would be rather annoying to do using the method we used before as it involves a lot of complicated slicing in time to select the right months. Thankfully, with Xarray, it is a lot simpler.

The European weather dataset is available in your environment as the Xarray DataSet ds.

In [None]:
# Extract the months from the time coordinates
months = ds['time'].dt.month

# Select the temp DataArray and group by months
monthly_groupby = ds['temp'].groupby(months)

# Find the mean temp by month
monthly_mean_temps = monthly_groupby.mean()

# Compute the result
monthly_mean_temps_computed = monthly_mean_temps.compute()

monthly_mean_temps_computed.plot(col='month', col_wrap=4, add_colorbar=False)
plt.show()

##### Calculating the trend in European temperatures
You want to calculate the average European temperature from 1980 to present using the ERA5 dataset. This data is a Zarr dataset of monthly mean temperature and precipitation, on a grid of latitudes and longitudes. The Zarr file is chunked so that each subfile on the disk is an array of 15 latitudes, 15 longitudes, and 12 months.

xarray has been imported for you as xr.

In [None]:
# Open the ERA5 dataset
ds = xr.open_zarr('data/era_eu.zarr')

# Select the temperature dataset and take the latitude and longitude mean
temp_timeseries = ds['temp'].mean(dim=('lat', 'lon'))

# Calculate the 12 month rolling mean
temp_rolling_mean = temp_timeseries.rolling(time=12).mean()

# Plot the result
temp_rolling_mean.plot()
plt.show()

##### Creating a Dask bag
You have been tasked with analyzing some reviews left on TripAdvisor. Your colleague has provided the reviews as a list of strings. You want to use Dask to speed up your analysis of the data, so to start with, you need to load the data into a Dask bag.

In [None]:
# Import the Dask bag subpackage as db
import dask.bag as db

# Convert the list to a Dask bag
review_bag = db.from_sequence(reviews_list, npartitions=3)

# Print 1 element of the bag
print(review_bag.take(1))

##### Creating a bag from saved text
This time your colleague has saved the reviews to some text files. There are multiple files and multiple reviews in each file. Each review is on a separate line of the text file.

You want to load these into Dask lazily so you can use parallel processing to analyze them more quickly.

dask.bag has been imported for you as db.

In [None]:
# Load in all the .txt files inside data/tripadvisor_hotel_reviews
review_bag = db.read_text("data/tripadvisor_hotel_reviews/*.txt")

# Count the number of reviews in the bag
review_count = review_bag.count()

# Compute and print the answer
print(review_count.compute())

#### String operations
Now that you can load the text data into bags, it is time to actually do something with it. To detect how positive or negative the reviews are, you will start by counting some keywords.

The bag you created in the last exercise, review_bag, is available in your environment.

In [None]:
# Convert all of the reviews to lower case
lowercase_reviews = review_bag.str.lower()

# Count the number of times 'excellent' appears in each review
excellent_counts = lowercase_reviews.str.count('excellent')

# Print the first 10 counts of 'excellent'
print(excellent_counts.take(10))

#### Loading JSON data
You have been asked to analyze some data about politicians from different countries. This data is stored in JSON format. The first step you need to accomplish is to load it in and convert it from string data to dictionaries.

dask.bag has been imported for you as db.

In [None]:
# Import of the json package
import json

# Read all of the JSON files inside data/politicians
text_bag = db.read_text("data/politicians/*.json")

# Convert the JSON strings into dictionaries
dict_bag = text_bag.map(json.loads)

# Show an example dictionary
print(dict_bag.take(1))

#### Filtering Dask bags
The politician data you are working with comes from different sources, so it isn't very clean. Many of the dictionaries are missing keys that you may need to run your analysis. You will need to filter out the elements with important missing keys.

A function named has_birth_date() is available in the environment. It checks the input dictionary to see if it contains the key 'birth_date'. It returns True if the key is in the dictionary and False if not

In [None]:
# Print the number of elements in dict_bag
print(dict_bag.count().compute())

# Filter out records using the has_birth_date() function
filtered_bag = dict_bag.filter(has_birth_date)

# Print the number of elements in filtered_bag
print(filtered_bag.count().compute())

##### Chaining operations
Now that you have loaded and cleaned the data, you can begin analyzing it. Your first task is to look at the birth dates of the politicians. The birth dates are in string format like 'YYYY-MM-DD'. The first 4 characters in the string are the year.

The filtered Dask bag you created in the last exercise, filtered_bag, is available in your environment.

In [None]:
# Select the 'birth_date' from each dictionary in the bag
birth_date_bag = filtered_bag.pluck('birth_date')

# Extract the year as an integer from the birth_date strings
birth_year_bag = birth_date_bag.map(lambda x: int(x[:4]))

# Calculate the min, max and mean birth years
min_year = birth_year_bag.min()
max_year = birth_year_bag.max()
mean_year = birth_year_bag.mean()

# Compute the results efficiently and print them
print(dask.compute(min_year, max_year, mean_year))


##### Restructuring a dictionary
Now you want to clean up the politician data and move it into a Dask DataFrame. However, the politician data is nested, so you will need to process it some more before it fits into a DataFrame.

One particular piece of data you want to extract is buried a few layers inside the dictionary. This is a link to a website for each politician. The example below shows how it is stored inside the dictionary.

record = {
...
 'links': [{'note': '...',
            'url': '...'},],  # Stored here
...
}
The bag of politician data is available in your environment as dict_bag.

In [None]:
def extract_url(x):
    # Extract the url and assign it to the key 'url'
    x['url'] = x['links'][0]['url']
    return x
  
# Run the function on all elements in the bag.
dict_bag = dict_bag.map(extract_url)

print(dict_bag.take(1))

#### Converting to DataFrame
You want to make a DataFrame out of the politician JSON data. Now that you have de-nested the data, all you need to do is select the keys to keep as columns in the DataFrame.

The Dask bag you created in the last exercise is available in your environment as dict_bag.

In [None]:
def select_keys(dictionary, keys_to_keep):
  new_dict = {}
  # Loop through kept keys and add them to new dictionary
  for k in keys_to_keep:
    new_dict[k] = dictionary[k]
  return new_dict

# Use the select_keys to reduce to the 4 required keys
filtered_bag = dict_bag.map(select_keys, keys_to_keep=['gender','name', 'birth_date', 'url'])

# Convert the restructured bag to a DataFrame
df = filtered_bag.to_dataframe()

# Print the first few rows of the DataFrame
print(df.head())

##### Loading wav data
To work with any non-standard data using Dask bags, you will need to write a lot of functions yourself. For this task, you are analyzing audio data, and so you need a custom function to load it.

Some of the audio recordings failed, and the audio is silent in these. Regular audio data looks like a wave, where the amplitude goes to large positive and negative values. Therefore, to check if a recording is silent, you can check whether the audio clip has very small amplitudes overall.

The scipy.io.wavfile module has been imported into your environment as wavfile, and numpy has been imported as np.

In [None]:
def load_wav(filename):
    # Load in the audio data
    sampling_freq, audio = wavfile.read(filename)
    
    # Add the filename, audio data, and sampling frequency to the dictionary
    data_dict = {
        'filename': filename,
        'audio': audio, 
        'sample_frequency': sampling_freq
    }
    return data_dict

def not_silent(data_dict):
    # Check if the audio data is silent
    return np.mean(np.abs(data_dict['audio'])) > 100

#### Constructing custom Dask bags
A common use case for Dask bags is to convert some code you have already written to run in parallel. Depending on the code, sometimes it can be easier to construct lists of delayed objects and then convert them to a bag. Other times it will be easier to form a Dask bag early on in the code and map functions over it. Which of these options is easier will depend on your exact code, so it's important that you know how to use either method.

dask has been imported for you, and dask.bag has been imported as db. A list of file names strings is available in your environment as wavfiles.

In [None]:
# Convert the list of filenames into a Dask bag
filename_bag = db.from_sequence(wavfiles)

# Apply the load_wav() function to each element of the bag
loaded_audio_bag = filename_bag.map(load_wav)

In [None]:
delayed_loaded_audio = []

for wavfile in wavfiles:
    # Append the delayed loaded audio to the list
    delayed_loaded_audio.append(dask.delayed(load_wav)(wavfile))

# Convert the list to a Dask bag
loaded_audio_bag = db.from_delayed(delayed_loaded_audio)

#### Processing unstructured audio
You have a lot of .wav files to process, which could take a long time. Fortunately, the functions you just wrote can be used with Dask bags to run the analysis in parallel using all your available cores.

Here are descriptions of the not_silent() function you wrote, plus two extras you can use.

not_silent(audio_dict) - Takes an audio dictionary, and checks if the audio isn't silent. Returns True/False.
peak_frequency(audio_dict) - Takes a dictionary of audio data, analyzes it to find the peak frequency of the audio, and adds it to the dictionary.
delete_dictionary_entry(dict, key_to_drop) - Deletes a given key from the input dictionary.
The audio data loaded_audio_bag is available in your environment.

In [None]:
# Filter out blank audio files
filtered_audio_bag = loaded_audio_bag.filter(not_silent)

# Apply the peak_frequency function to all audio files
audio_and_freq_bag = filtered_audio_bag.map(peak_frequency)

# Use the delete_dictionary_entry function to drop the audio
final_bag = audio_and_freq_bag.map(delete_dictionary_entry, key_to_drop='audio')

# Convert to a DataFrame and run the computation
df = final_bag.to_dataframe().compute()
print(df)

#### Clusters and clients
Depending on your computer hardware and the calculation you are trying to complete, it may be faster to run it using a mixture of threads and processes. To do this, you need to set up a local cluster.

There are two ways to set up a local cluster that Dask will use. The first way is to create the local cluster and pass it to a client. This is very similar to how you would set up a client to run across a cluster of computers! The second way is to use the client directly and allow it to create the local cluster itself. This is a shortcut that works for local clusters, but not for the other types of cluster.

In this exercise, you will create clients using both methods.

Be careful when creating the cluster and clients. If you configure them incorrectly, your session may time out.

In [None]:
# Import Client and LocalCluster
from dask.distributed import Client, LocalCluster

# Create a thread-based local cluster
cluster = LocalCluster(
	processes=False, 
    n_workers=4,
    threads_per_worker=1
)

# Create a client
client = Client(cluster)

In [None]:
from dask.distributed import Client

# Create a client without creating cluster first
client = Client(
	processes=False, 
    n_workers=4,
    threads_per_worker=1
)

#### Using Dask to train a linear model
Dask can be used to train machine learning models on datasets that are too big to fit in memory, and allows you to distribute the data loading, preprocessing, and training across multiple threads, processes, and even across multiple computers.

You have been tasked with training a machine learning model which will predict the popularity of songs in the Spotify dataset you used in previous chapters. The data has already been loaded as lazy Dask DataFrames. The input variables are available as dask_X and contain a few numeric columns, such as the song's tempo and danceability. The target values are available as dask_y and are the popularity score of each song.

In [None]:
# Import the SGDRegressor and the Incremental wrapper
from sklearn.linear_model import SGDRegressor
from dask_ml.wrappers import Incremental

# Create a SGDRegressor model
model = SGDRegressor()

# Wrap the model so that it works with Dask
dask_model = Incremental(model, scoring='neg_mean_squared_error')

# Fit the wrapped model
dask_model.fit(dask_X, dask_y)

#### Making lazy predictions
The model you trained last time was good, but it could be better if you passed through the training data a few more times. Also, it is a shame to see a good model go to waste, so you should use this one to make some predictions on a separate dataset from the one you train on.

An unfitted version of the model you created in the last exercise is available in your environment as dask_model. Dask DataFrames of training data are available as dask_X and dask_y.

In [None]:
# Loop over the training data 5 times
for i in range(5):
	dask_model.partial_fit(dask_X, dask_y)

# Use your model to make predictions
y_pred_delayed = dask_model.predict(dask_X)

# Compute the predictions
y_pred_computed = y_pred_delayed.compute()

print(y_pred_computed)

#### Lazily transforming training data
Preprocessing your input variables is a vital step in machine learning and will often improve the accuracy of the model you create. In the last couple of exercises, the Spotify data was preprocessed for you, but it is important that you know how to do it yourself.

In this exercise, you will use the StandardScaler() scaler object, which transforms columns of an array so that they have a mean of zero and standard deviation of one.

The Dask DataFrame of Spotify songs is available in your environment as dask_df. It contains both the target popularity scores and the input variables which you used to predict these scores.

In [None]:
# Import the StandardScaler class
from dask_ml.preprocessing import StandardScaler

X = dask_df[['duration_ms', 'explicit', 'danceability', 'acousticness', 'instrumentalness', 'tempo']]

# Select the target variable
y = dask_df['popularity']

# Create a StandardScaler object and fit it on X
scaler = StandardScaler()
scaler.fit(X)

# Transform X
X = scaler.transform(X)
print(X)

#### Lazy train-test split
You have transformed the X variables. Now you need to finish your data prep by transforming the y variables and splitting your data into train and test sets.

The variables X and y, which you created in the last exercise, are available in your environment.

In [None]:
# Import the train_test_split function
from dask_ml.model_selection import train_test_split

# Rescale the target values
y = y / 100

# Split the data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, shuffle=True, test_size=0.2)

print(X_train)