# Distributed Data Analysis with Dask  
*__[Dask](https://www.dask.org/)__ with the __MovieLens__ dataset*  

**Part 1: Getting Started, Load the MovieLens dataset**

### <font color='green'>__Support for Google Colab__  </font>  
    
open this notebook in Colab using the following button:  
  
<a href="https://colab.research.google.com/github/shauryashaurya/learn-data-munging/blob/main/02-Pandas/02.01-Data-Wrangling-with-MovieLens-and-Pandas.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>  

**Note**: The Dask Dashboard is not available on Google Colab unless you register with tunnelling systems like Saturn Cloud or NGrok - these are both good approaches - for folks running this on colab I have not built support for bit for the workshop. Your contributions / PRs would be very welcome.
  
<font color='green'>uncomment and execute the cell below to setup and run this notebook on Google Colab.</font>

### Graphviz or Cytoscape (ipycytoscape)

For some Dask exercises, we may want to visualize the task graph. 
To do so we'll need: [GraphViz](https://graphviz.org/) or [Cytoscape](https://cytoscape.org/download.html) and [ipycytoscape](https://ipycytoscape.readthedocs.io/en/latest/installing.html)

These come with ready installers. Install, restart Jupyter if needed. 

In [None]:
# # SETUP FOR COLAB: select all the lines below and uncomment (CTRL+/ on windows)

# # grab dask - in most cases it should already be available in colab
# ! python -m pip install --quiet --upgrade --no-cache-dir "dask[complete]"
# # Let's download and unzip the MovieLens 25M Dataset as well.
# ! mkdir ./../data
# ! wget -q https://files.grouplens.org/datasets/movielens/ml-25m.zip
# ! unzip ./ml-25m.zip -d ./../data/

# ! echo "DONE"

## Overview
Select, filter, join, groupby, pivot, and windows.  

Instead of toy examples and '10 minutes to xx' we load an actual dataset and ask meaningful questions about it.
  
We'll use the [MovieLens](https://grouplens.org/datasets/movielens/) dataset for these exercises.  
This dataset is non trivial and should expand to about __1GB__ on you local disk.  

Download and unzip [MovieLens 25M Dataset](https://grouplens.org/datasets/movielens/25m/) for this analysis.

Either ensure the data is in ```"./data/ml-25m"``` folder or update the path to the data below.

**Citation**:  
*F. Maxwell Harper and Joseph A. Konstan.* 2015.  
The MovieLens Datasets: History and Context.  
ACM Transactions on Interactive Intelligent Systems (TiiS) 5, 4: 19:1–19:19. <https://doi.org/10.1145/2827872>  

## Approach

The idea is to tackle simple Dask use-cases first and move on to more complex ones.  

Starting with simply loading the data into a dask distributed dataframe, we then perform a data evaluation, some cleanup and finally analysis. We first ask questions based on individual data files, then move on to combining data from multiple files.

We are going to try and avoid the more mathematically involved parts of exploratory data analysis - for e.g. statistical analysis on various features etc. - the core focus in the ability to grok pyspark functions and have fun while doing it.  

By the end you'd not only have an idea of Dask, but also how we ask questions and analyze a chunk of data.  

_You may also end up with a watch-list to binge on your next weekend._ :)   

# Setup Dask, Pandas and Numpy

## Setup the Dask Cluster

### Installation

A local install is as simple as ```pip install "dask[complete]"```  
  
Unlike Spark, Dask is incredibly easy to setup - checkout [Dask Installation Docs](https://docs.dask.org/en/stable/install.html)  

In [1]:
# Step 1: numpy and pandas

import numpy as np
import pandas as pd

print("numpy version: ", np.__version__)
print("pandas version: ", pd.__version__)

numpy version:  1.26.0
pandas version:  2.1.1


In [2]:
# Step 2: import dask and related
import dask
import dask.array as da
import dask.bag as db
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster

print("dask version: ", dask.__version__)

dask version:  2023.9.2


In [3]:
# Step 3: Create a Dask Cluster and a Client
try:
    cluster = LocalCluster()
    # alternative, when you want to specify the dashboard address/port
    # cluster = LocalCluster(dashboard_address = 'localhost:8786')
    if (cluster.shutdown_on_close == False):
        cluster.shutdown_on_close = True
except Exception:
    pass
#
try:
    client = Client(cluster)
except Exception:
    pass

In [4]:
# see cluster information
# cluster

In [5]:
# see client information
# client

# Client, Workers, and Scheduler - How Dask does it.

Dask is very similar to Spark.  
Both lazily construct directed acyclic graphs (DAGs) of tasks,  
and split large datasets into small portions (partitions/chunks) 

  

    
<img src="https://docs.dask.org/en/stable/_images/dask-overview.svg" width="80%" height="80%" />

<font size = '2px' color='green'>Image from docs.dask.org</font>

* The **client** is the machine that submits tasks to the **scheduler** (create Task Graphs).  
* The scheduler is responsible for managing the tasks and distributing them to the **workers** (execute Task Graphs).  
* The workers are responsible for executing the tasks.  
* The scheduler will then return the results to the client.

   
 

The client, scheduler, and workers communicate with each other using a message passing protocol. The client sends a message to the scheduler, which then sends a message to the worker. The worker executes the task and sends a message back to the scheduler, which then sends the results back to the cli  ent.

Instead of just distributing data across nodes, Dask distributes the functions too - for e.g. you can imagine that each worker node in the dask cluster that's working on Dask.dataframe is running it's own copy of Pandas, the scheduler builds a task graph that let's these workers perform the computations in parallel. 

Dask structures (Array, DataFrame, and Bag) are *lazy* by default, meaning they don't evaluate until you explicitly ask for a result using ```compute()```.  

* Dask Bag is like Spark RDD - a general purpose distributed, parallel compute engine.  
* Dask Dataframes, built on top of Dask Bag and Distributed, are like Spark Dataframes.  
* Spark DataFrames have an advantage with optimizations from the Catalyst engine and Tungsten, but Dask DataFrames provide a more "Pandas-like" experience.

# Locate the data

In [6]:
datalocation = "../data/ml-25m/"

In [7]:
# specify file names
file_path_movies = datalocation + "movies.csv"
file_path_links = datalocation + "links.csv"
file_path_ratings = datalocation + "ratings.csv"
file_path_tags = datalocation + "tags.csv"
file_path_genome_tags = datalocation + "genome-tags.csv"
file_path_genome_scores = datalocation + "genome-scores.csv"

# Load the dataset(s)

From the ```README.txt``` file in the small MovieLens dataset:
The dataset files are written as [**comma-separated values**](http://en.wikipedia.org/wiki/Comma-separated_values) files with a **single header row**. Columns that contain commas (`,`) are **escaped using double-quotes (`"`)**. These files are encoded as **UTF-8**. If accented characters in movie titles or tag values (e.g. Misérables, Les (1995)) display incorrectly, make sure that any program reading the data, such as a text editor, terminal, or script, is configured for UTF-8.

So, we specify:
* Separator - ```,```
* Escape Character - ```"```
* Encoding - ```UTF-8```

Often this is called the **dialect** of the CSV file.
These dialects vary often, so need our attention.

In [8]:
# dask dataframes parallelize pandas dataframes
# so many of the idioms are similar
csv_separator = ","
csv_escapechar = '"'
csv_quotechar = csv_escapechar
csv_encoding = "utf-8"

## Movies

Let's specify the [-  ```dtypes```  ](https://pandas.pydata.org/docs/user_guide/basics.html#dtypes) of each of the columns in the movies file. 

In [9]:
# schema, inferred from the README.txt file
movies_schema = {"movieId": "Int32", "title": "string", "genres": "string"}

Two of the columns are [strings of text](https://pandas.pydata.org/docs/user_guide/text.html#working-with-text-data). Pandas may treat those as ```object```, but we wanted to use the [```pandas.StringDType```](https://pandas.pydata.org/docs/reference/api/pandas.StringDtype.html#pandas-stringdtype) here.

In [10]:
# we are using dd - dask.dataframe
movies = dd.read_csv(
    file_path_movies,
    dtype=movies_schema,
    sep=csv_separator,
    quotechar=csv_quotechar,
    encoding=csv_encoding,
)

In [79]:
# needs GraphViz or ipycytoscape
movies.visualize()

CytoscapeWidget(cytoscape_layout={'name': 'dagre', 'rankDir': 'BT', 'nodeSep': 10, 'edgeSep': 10, 'spacingFact…

In [29]:
# show the first 15 lines
# just like Pandas
movies.head(5)

Unnamed: 0,movieId,title,genres
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,2,Jumanji (1995),Adventure|Children|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance
3,4,Waiting to Exhale (1995),Comedy|Drama|Romance
4,5,Father of the Bride Part II (1995),Comedy


The dask dataframe understands when to internally call Dask's ```compute()``` or ```gather()``` methods.

In [30]:
# data types of each column
movies.dtypes

movieId              Int32
title      string[pyarrow]
genres     string[pyarrow]
dtype: object

In [50]:
# to distribute data across nodes, 
# dask maintains an empty _meta object
movies._meta

Unnamed: 0,movieId,title,genres


From [Dask DataFrames Internal Design](https://docs.dask.org/en/stable/dataframe-design.html)

Many DataFrame operations rely on knowing the name and dtype of columns. To keep track of this information, all Dask DataFrame objects have a ```_meta``` attribute which contains an empty Pandas object with the same dtypes and names.

Internally, Dask DataFrame does its best to propagate this information through all operations, so most of the time a user shouldn’t have to worry about this. Usually this is done by evaluating the operation on a small sample of fake data, which can be found on the ```_meta_nonempty``` attribute.  

In [52]:
movies._meta_nonempty

Unnamed: 0,movieId,title,genres
0,1.0,a,a
1,,,


Just for practice, let's load the other datasets too...

## Links

In [31]:
# schema, inferred from the README.txt file
# load imdbId,tmdbId as strings because the are a part of a URL.
#   IMDB: http://www.imdb.com/title/imdbId/
#   TMDB: https://www.themoviedb.org/movie/tmdbId
links_schema = {"movieId": "Int32", "imdbId": "string", "tmdbId": "string"}

In [32]:
links = dd.read_csv(
    file_path_links,
    dtype=links_schema,
    sep=csv_separator,
    quotechar=csv_quotechar,
    encoding=csv_encoding,
)

In [80]:
links.visualize()

CytoscapeWidget(cytoscape_layout={'name': 'dagre', 'rankDir': 'BT', 'nodeSep': 10, 'edgeSep': 10, 'spacingFact…

In [33]:
links.head(5)

Unnamed: 0,movieId,imdbId,tmdbId
0,1,114709,862
1,2,113497,8844
2,3,113228,15602
3,4,114885,31357
4,5,113041,11862


In [34]:
links.dtypes

movieId              Int32
imdbId     string[pyarrow]
tmdbId     string[pyarrow]
dtype: object

In [53]:
links._meta

Unnamed: 0,movieId,imdbId,tmdbId


In [54]:
links._meta_nonempty

Unnamed: 0,movieId,imdbId,tmdbId
0,1.0,a,a
1,,,


## Ratings

Reading through the ```README``` file:  
Ratings are made on a 5-star scale, with half-star increments (0.5 stars - 5.0 stars).  
Timestamps represent seconds since midnight Coordinated Universal Time (UTC) of January 1, 1970.  

Ooh! Ooh! We got our first [DateTime](https://pandas.pydata.org/docs/reference/api/pandas.DatetimeTZDtype.html#pandas.DatetimeTZDtype)!

In [35]:
# schema, inferred from the README.txt file
# read timestamps as integers then convert to dates later.

ratings_schema = {
    "userId": "Int32",
    "movieId": "Int32",
    "rating": "Float32",
    "timestamp": "Int64",
}
#

In [36]:
ratings = dd.read_csv(
    file_path_ratings,
    dtype=ratings_schema,
    sep=csv_separator,
    quotechar=csv_quotechar,
    encoding=csv_encoding,
)

In [37]:
ratings.head()

Unnamed: 0,userId,movieId,rating,timestamp
0,1,296,5.0,1147880044
1,1,306,3.5,1147868817
2,1,307,5.0,1147868828
3,1,665,5.0,1147878820
4,1,899,3.5,1147868510


In [38]:
ratings.visualize()

CytoscapeWidget(cytoscape_layout={'name': 'dagre', 'rankDir': 'BT', 'nodeSep': 10, 'edgeSep': 10, 'spacingFact…

In [43]:
# now let's add a datetime column that we derive from the raw timestamp
ratings["datetime"] = dd.to_datetime(ratings["timestamp"], unit="s", utc=True)

In [44]:
ratings.dtypes

userId                     Int32
movieId                    Int32
rating                   Float32
timestamp                  Int64
datetime     datetime64[ns, UTC]
date             string[pyarrow]
dtype: object

In [55]:
ratings._meta

Unnamed: 0,userId,movieId,rating,timestamp,datetime,date


In [56]:
ratings._meta_nonempty

Unnamed: 0,userId,movieId,rating,timestamp,datetime,date
0,1.0,1.0,1.0,1.0,1970-01-01 00:00:00+00:00,1970-01-01
1,,,,,1970-01-01 00:00:00+00:00,1970-01-01


In [45]:
ratings["date"] = ratings["datetime"].dt.date

In [46]:
ratings.head()

Unnamed: 0,userId,movieId,rating,timestamp,datetime,date
0,1,296,5.0,1147880044,2006-05-17 15:34:04+00:00,2006-05-17
1,1,306,3.5,1147868817,2006-05-17 12:26:57+00:00,2006-05-17
2,1,307,5.0,1147868828,2006-05-17 12:27:08+00:00,2006-05-17
3,1,665,5.0,1147878820,2006-05-17 15:13:40+00:00,2006-05-17
4,1,899,3.5,1147868510,2006-05-17 12:21:50+00:00,2006-05-17


Niiice!  
Wait, let's check the data types once.

In [47]:
ratings.dtypes

userId                     Int32
movieId                    Int32
rating                   Float32
timestamp                  Int64
datetime     datetime64[ns, UTC]
date             string[pyarrow]
dtype: object

We'd prefer if date was a datetime type as well.  
Let's prepare the date column again, wrapping it in ```pd.to_datetime()```


In [48]:
ratings["date"] = dd.to_datetime(ratings["datetime"].dt.date)

In [49]:
# check the data types again
ratings.dtypes

userId                     Int32
movieId                    Int32
rating                   Float32
timestamp                  Int64
datetime     datetime64[ns, UTC]
date              datetime64[ns]
dtype: object

Ah! much better.  
Why you say?  
We could easily extract and manipulate dates this way.  
for e.g.

In [57]:
# extract the day, month and year of each rating
ratings["day"] = ratings["date"].dt.day
ratings["month"] = ratings["date"].dt.month
ratings["year"] = ratings["date"].dt.year

In [58]:
ratings.dtypes

userId                     Int32
movieId                    Int32
rating                   Float32
timestamp                  Int64
datetime     datetime64[ns, UTC]
date              datetime64[ns]
day                        int32
month                      int32
year                       int32
dtype: object

In [59]:
ratings._meta

Unnamed: 0,userId,movieId,rating,timestamp,datetime,date,day,month,year


In [61]:
ratings._meta_nonempty

Unnamed: 0,userId,movieId,rating,timestamp,datetime,date,day,month,year
0,1.0,1.0,1.0,1.0,1970-01-01 00:00:00+00:00,1970-01-01,1,1,1
1,,,,,1970-01-01 00:00:00+00:00,1970-01-01,1,1,1


very clean!

In [82]:
ratings.visualize()

CytoscapeWidget(cytoscape_layout={'name': 'dagre', 'rankDir': 'BT', 'nodeSep': 10, 'edgeSep': 10, 'spacingFact…

In [62]:
ratings.head()

Unnamed: 0,userId,movieId,rating,timestamp,datetime,date,day,month,year
0,1,296,5.0,1147880044,2006-05-17 15:34:04+00:00,2006-05-17,17,5,2006
1,1,306,3.5,1147868817,2006-05-17 12:26:57+00:00,2006-05-17,17,5,2006
2,1,307,5.0,1147868828,2006-05-17 12:27:08+00:00,2006-05-17,17,5,2006
3,1,665,5.0,1147878820,2006-05-17 15:13:40+00:00,2006-05-17,17,5,2006
4,1,899,3.5,1147868510,2006-05-17 12:21:50+00:00,2006-05-17,17,5,2006


Let's do this for the Tags data set too - just for practice.


## Tags

From ```README```:  
Tags are user-generated metadata about movies. Each tag is typically a single word or short phrase. The meaning, value, and purpose of a particular tag is determined by each user.  
  
Timestamps represent seconds since midnight Coordinated Universal Time (UTC) of January 1, 1970.

In [83]:
# schema, inferred from the README.txt file
# read timestamps as integers then convert to dates later.
# userId,movieId,tag,timestamp
tags_schema = {
    "userId": "Int32",
    "movieId": "Int32",
    "tag": "string",
    "timestamp": "Int64",
}
#

In [84]:
tags = dd.read_csv(
    file_path_tags,
    dtype=tags_schema,
    sep=csv_separator,
    quotechar=csv_quotechar,
    encoding=csv_encoding,
)

In [85]:
tags.visualize()

CytoscapeWidget(cytoscape_layout={'name': 'dagre', 'rankDir': 'BT', 'nodeSep': 10, 'edgeSep': 10, 'spacingFact…

In [86]:
tags.head()

Unnamed: 0,userId,movieId,tag,timestamp
0,3,260,classic,1439472355
1,3,260,sci-fi,1439472256
2,4,1732,dark comedy,1573943598
3,4,1732,great dialogue,1573943604
4,4,7569,so bad it's good,1573943455


just like before let's add a more readable ```datetime``` column here

In [87]:
tags["datetime"] = dd.to_datetime(tags["timestamp"], unit="s", utc=True)

In [88]:
tags.dtypes

userId                     Int32
movieId                    Int32
tag              string[pyarrow]
timestamp                  Int64
datetime     datetime64[ns, UTC]
dtype: object

In [89]:
# extract date into a new column
tags["date"] = dd.to_datetime(tags["datetime"].dt.date)

In [90]:
tags.dtypes

userId                     Int32
movieId                    Int32
tag              string[pyarrow]
timestamp                  Int64
datetime     datetime64[ns, UTC]
date              datetime64[ns]
dtype: object

In [91]:
tags.head(5)

Unnamed: 0,userId,movieId,tag,timestamp,datetime,date
0,3,260,classic,1439472355,2015-08-13 13:25:55+00:00,2015-08-13
1,3,260,sci-fi,1439472256,2015-08-13 13:24:16+00:00,2015-08-13
2,4,1732,dark comedy,1573943598,2019-11-16 22:33:18+00:00,2019-11-16
3,4,1732,great dialogue,1573943604,2019-11-16 22:33:24+00:00,2019-11-16
4,4,7569,so bad it's good,1573943455,2019-11-16 22:30:55+00:00,2019-11-16


umm... go nuts.  
Extract the day, month and year from date, because why not?

In [92]:
tags["day"] = tags["date"].dt.day
tags["month"] = tags["date"].dt.month
tags["year"] = tags["date"].dt.year

In [93]:
tags._meta

Unnamed: 0,userId,movieId,tag,timestamp,datetime,date,day,month,year


In [94]:
tags._meta_nonempty

Unnamed: 0,userId,movieId,tag,timestamp,datetime,date,day,month,year
0,1.0,1.0,a,1.0,1970-01-01 00:00:00+00:00,1970-01-01,1,1,1
1,,,,,1970-01-01 00:00:00+00:00,1970-01-01,1,1,1


In [95]:
tags.visualize()

CytoscapeWidget(cytoscape_layout={'name': 'dagre', 'rankDir': 'BT', 'nodeSep': 10, 'edgeSep': 10, 'spacingFact…

In [78]:
tags.head()

Unnamed: 0,userId,movieId,tag,timestamp,datetime,date,day,month,year
0,3,260,classic,1439472355,2015-08-13 13:25:55+00:00,2015-08-13,13,8,2015
1,3,260,sci-fi,1439472256,2015-08-13 13:24:16+00:00,2015-08-13,13,8,2015
2,4,1732,dark comedy,1573943598,2019-11-16 22:33:18+00:00,2019-11-16,16,11,2019
3,4,1732,great dialogue,1573943604,2019-11-16 22:33:24+00:00,2019-11-16,16,11,2019
4,4,7569,so bad it's good,1573943455,2019-11-16 22:30:55+00:00,2019-11-16,16,11,2019


# Wrap Up the cluster

In [96]:
# wrap up like this
client.retire_workers()
# QQ - do we really need cluster.close() here?
# cluster.close()
client.shutdown()

# Insights

1. Client, Workers, and Scheduler
	* The client is the machine that submits tasks to the scheduler (create Task Graphs).
	* The scheduler is responsible for managing the tasks and distributing them to the workers (execute Task Graphs).
	* The workers are responsible for executing the tasks.
	* The scheduler will then return the results to the client.
1. ```.visualize()```, ```._meta``` and ```._meta_nonempty```

# Next

* Let's play with the MovieLens dataset some more.