# Introduction

### Multiprocessing dataset computing

Dask allows managing large datasets through many data structures such as Dask DataFrame and Dask Bags that we already covered on the slides. The following paragraphs are left to you if you want to go more in detail on their functioning.

### Dask DataFrame
The ```dask.dataframe``` module implements a blocked parallel DataFrame object that mimics a large subset of the Pandas DataFrame. One Dask DataFrame is composed by of many pandas DataFrames. Each Pandas dataframe is a "*partition*" of the distributed dataframe. One operation on a Dask DataFrame triggers the same pandas operations on the constituent pandas DataFrames keeping into account the parallelism and memory constraints. 

In light of this, Dask DataFrame is important for two reasons mainly:

+ It is familiar to Pandas users
+ The partitioning approach is important for efficient queries

Is it easy to see that, from a Spark perspective, those DataFrames are basically a lighter and pythonic version of the Spark DataFrames.\
As Pandas, Dask DataFrame supports a large set of input data formats as:
+ CSV
+ Parquet
+ Avro
+ XML
+ Excel
+ ...

Even though these DataFrames are powerful, *Pandas is more mature and fully featured*. If your data fits in memory, then you should use Pandas. One of the main reasons is that not all the Pandas API have been implemented in Dask yet.


### Dask Bag 
Alongside the DataFrames, Dask offers another data collection structure: The Bag data structure. This data structure implements operations like `map`, `filter`, `groupby`, and aggregations on collections of Python objects. It does this in parallel and in small memory using Python iterators. It is similar to a parallel version of itertools, but it can be also considered a more lighter and pythonic version of the *Spark RDD*.

Dask Bags are often used to do simple preprocessing on log files, JSON records, or other user-defined Python objects.\
Basically, Bags is a good substitutes to the DataFrames when the data are non or partially structured.

In our daily use cases, dask Bag structures are an optimal tool to manage, extract and work with structured or non-structured text files, where we can have hundreds of gigabytes of data that must be elaborated. Some examples are:
+ WBQA traces
+ Jsonl datasets
+ WBQA logs

# Hands-on: a real use case


This Hands-on session is designed to be really close to our daily use-cases. To this end, we prepared some data from Wikipedia dumps. The data must be downloaded from [URL](https://drive.corp.amazon.com/personal/lauivano/Workshop-dask-data). Once unzipped the downloaded file will find two data folders:
+ data_full: for who is using an EC2 instance. It contains 100 parquet files for a total of 1.2GB
+ data_small: for who is using its own laptop. It contains 50 parquet files for a total of ~300Mb

Each folders contains a set o files, each file contains a collection of documents, and each document is composed by three fields:
+ id: UUID
+ title: text
+ content: text

Given these files, we want to:
+ count the number of words in each document
+ get the mean and the stdev of the number of words with respect to all documents
+ count the number of sentences for each document
+ get the document with the max number of sentences
+ count how many times the word "amazon" occurs in those documents
+ take the top 10 documents that contain the larger number of "amazon" occurrences

### Step 1: Create a cluster
To instantiate a cluster in our machines we have to use the `LocalCluster` object as previously seen in the slide.



In [1]:
from dask.distributed import Client, LocalCluster
import dask

In [None]:
WORKERS_NUMBER = 4

In [None]:
cluster = LocalCluster(name="workshop_cluster", n_workers=4)
client = Client(cluster)
client

**HINT**: The client object now is our entry point to the cluster. Anyway, DataFrames and Bags do not need to directly use this object.

### Step 2: Read the provieded files
In this step we read the files through the `dask.dataframe` class.

In [None]:
import dask.dataframe as dd

In [None]:
#if you have a good computing resource (EC2) please use the 'data_full' folder instead of 'data_small'
df = dd.read_parquet('data_small').persist() 

Now let's see what happens if we tray to look into the dataset

In [None]:
df

As you can see the columns seem to be empty. This is because, Dask is *lazy*.\
When we work in a distributed environment, we have to keep in mind that the computations are triggered only when they are necessary. Let's see what happens if we call the ```head``` method of the dataframe

In [None]:
df.head()

In this case, the computation was longer than before. This is because the `head` method triggered the dataset reading. 

### Step 3: Count the number of words of each document

In this step we use the `NLTK` library to tokenize the text. After that, we count how many words each document contains

#### Step 3a - Document tokenization

In [None]:
from nltk.tokenize import word_tokenize, sent_tokenize

def tokenization_function(text):
    return word_tokenize(text)

In [None]:
df = df.assign(doc_words = df.text.apply(tokenization_function, meta=list))

In [None]:
df.head()

Let's take a look at the computational graph and see what is happened

In [None]:
# avoid to run this cell if you have not installed the graphviz package
# double click on the below image to zoom
df.visualize(ranking="LR", filename='3a-graph.png') 

This computational graph represents operations executed in parallel for the dataset.

However, sometimes it is not useful to do this in a lazy way, since each time you require the field `doc_words` you need to recompute it unless you use the `compute()` method.

Having said that, the natural thought could be to use the `compute` method to execute the operation and reuse the results without recomputing them.\
That's not true. \
We can use the `persist` method. This method allows you to fix in the workers memory the dataset and its partial operations or new fields avoiding its recomputation. Moreover, Dask has a sort of caching mechanism, so even if you don't use the aforementioned method, some data are retrieved from the cache rather than recomputing them.
Let's see the difference in terms of computation of running `head` method before and after the `persist`.



In [None]:
%%time

df.head()

In [None]:
df = df.persist()

In [None]:
%%time
df.head()

#### Step 3b - Document word number count
After words tokenization we can compute, for each document, the number of words it contains.

In [None]:
def word_counter(text_words):
    return len(text_words)


In [None]:

df = df.assign(doc_words_number=df.doc_words.apply(word_counter, meta=int))

In [None]:
df.head()

In [None]:
df = df.persist()

### Step 4: Get the mean and the stdev of the number of words

Now we are ready to compute the mean and the standard deviation of the number of words.


In [None]:
%%time
mean = df.doc_words_number.mean().compute()
stdev = df.doc_words_number.std().compute()
print(f'Mean of number of words is {round(mean, 2)} +/- {round(stdev,2)}')

This step performs two computations that are quite similar and that share some partial results. \
As seen in the presentation there is a way to optimize these computations?\
Yes: we can merge those computations making Dask able to generate a unique graph and reusing some common partial results.

In [None]:
%%time
mean, stdev = dask.compute(df.doc_words_number.mean(), df.doc_words_number.std())
print(f'Mean of number of words is {round(mean, 2)} +/- {round(stdev,2)}')

### Step 5: Count the number of sentences of each document

The goal of this point is to use the NLTK library to tokenize the sentences of the text and to count how many sentences each document contains

#### Step 5a - Document sentence tokenization

In [None]:
from nltk.tokenize import sent_tokenize
def sentence_tokenization_function(text):
    return sent_tokenize(text)

In [None]:
df = df.assign(doc_sentences = df.text.apply(..., meta=list)) #complete the function

Now, let's persist into the cluster memory the dataset

In [None]:
df = df.persist()

In [None]:
df.head()

#### Step 5b - Document sentences number count

Ok, now we are ready to count the number of sentences per document:

In [None]:
def sentences_counter(text_sentences):
    return len(text_sentences)

In [None]:
df = df.assign(doc_sentences_number= ... ) #complete the cell

In [None]:
df = df.persist()

In [None]:
df.head()

### Step 6 - Get the document with the larger number of phrases 
Once finished the previous step, we can extract the document with larger number of sentences in our dataset

In [None]:
df.loc[df.doc_sentences_number == df.doc_sentences_number.max()].compute()

### Step 7: Count how many time the word "amazon" occur in those documents

Let's move on and try to count the occurrences of the word "*amazon*" for each document.

In [None]:
def search_for_amazon_word(text):
    return text.lower().count('amazon')

In [None]:
df = df.assign(occurencies_of_amazon= ... )#complete the cell

In [None]:
df = df.persist()

### Step 8: Take the top 10 documents that contain the larger number of "amazon" occurrences

At first let's filter the data in orther to keep only the document that contains at least one occurrence of our target word.

Then we can sort the dataset by ```occurencies_of_amazon``` column, and then extract the top 10 documents.

In [None]:
text_that_contains_word_amazon = df.loc[df.occurencies_of_amazon != 0].reset_index(drop=True)


In [None]:
text_that_contains_word_amazon = text_that_contains_word_amazon.sort_values('occurencies_of_amazon').reset_index(drop=True)
text_that_contains_word_amazon.tail(10)[::-1]

Great! we finished all we planned is a easy way!
Now, remember to close the cluster!

In [None]:
client.close()
cluster.close()

Thanks to joined this session!