# 09b. Demo analysis - remote part

## Overview

This notebook is intended to be executed on the cluster as a continuation of notebook

```
09a-Demo_analysis_-_local_part.ipynb
```.

## Import idact and load the cluster

We will use a wildcard import for convenience:

In [None]:
%matplotlib inline

from idact import *

In [None]:
load_environment()
cluster = show_cluster("test")
cluster

In [None]:
node = cluster.get_access_node()
node

In [None]:
node.connect()

## Pull the Dask deployment

Let's get the deployments we pushed from the local notebook:

In [None]:
deployments = cluster.pull_deployments()
deployments

There is the nodes deployment:

In [None]:
nodes = deployments.nodes[-1]
nodes

And Dask deployment:

In [None]:
dask_deployment = deployments.dask_deployments[-1]
dask_deployment

## Load the CSV data into a Dask DataFrame

Get a Dask client:

In [None]:
client = dask_deployment.get_client()
client

First, we will specify the path to the `taxi` dir we created:

In [None]:
import os
scratch = os.environ['SCRATCH']
data_root = os.path.realpath(os.path.join(scratch, 'taxi'))
data_root

In [None]:
csvs = os.path.join(data_root, '*.csv')
csvs

Make sure the data is there:

In [None]:
import glob
globbed = glob.glob(csvs)
globbed[:3], len(globbed)

Let's see what a sample file looks like:

In [None]:
with open(globbed[0]) as f:
    head = ''.join([next(f) for i in range(5)])
print(head)

We will load the files to a Dask dataframe. If you want to learn more about working with Dask, check out the links at the bottom of this notebook.

In [None]:
%%time
import dask.dataframe as dd
df = dd.read_csv(csvs,
                 parse_dates=['pickup_datetime', 'dropoff_datetime'],
                 dtype={
                     'vendor_id': str,
                     'store_and_fwd_flag': str,
                     'payment_type': str
                 },
                 error_bad_lines=False,
                 header=0,
                 names=['vendor_id', 'pickup_datetime', 'dropoff_datetime', 'passenger_count', 'trip_distance', 'pickup_longitude', 'pickup_latitude', 'rate_code', 'store_and_fwd_flag', 'dropoff_longitude', 'dropoff_latitude', 'payment_type', 'fare_amount', 'surcharge', 'mta_tax', 'tip_amount', 'tolls_amount', 'total_amount'])

In [None]:
df

Only a small fragment of each file was loaded. We can verify this by checking the memory usage on each node:

In [None]:
[node.resources.memory_usage for node in nodes]

For the files to be loaded completely, we need to `persist` the dataframe to RAM:

In [None]:
%%time
client.persist(df)

This operation is executed in the background.

Observe the Dask Dashboard to see the work Dask is performing.
You can see that, as tasks are being performed, `Bytes stored` count is steadily increasing.

This is also reflected in the node memory usage:

In [None]:
[node.resources.memory_usage for node in nodes]

And CPU usage:

In [None]:
[node.resources.cpu_usage for node in nodes]

You can also check the resource usage from the local notebook, especially when executing a blocking computation on this notebook.

When all tasks on the dashboard are completed, we can see all the data is now in memory, spread across the workers, which are now more or less idle.

In [None]:
[node.resources.memory_usage for node in nodes]

In [None]:
[node.resources.cpu_usage for node in nodes]

## Categorize the data

We will use Pandas categoricals for better performance later. We will categorize the string columns, which have relatively few values:

In [None]:
%%time
df.head()

In [None]:
%%time
df = df.categorize(columns=['vendor_id', 'store_and_fwd_flag', 'payment_type'])

The columns are being categorized in the background. Wait until all tasks are completed and proceeed.

In [None]:
df

We can see that the columns we selected are now categorized.

There is still some work to perform, which will happen anyway before saving to file, so let's `persist` right now:

In [None]:
%%time
client.persist(df)

Wait until the tasks on the Dashboard are completed.

Let's look at the memory usage after categorization:

In [None]:
[node.resources.memory_usage for node in nodes]

We're using more memory for now. We can also see the values did not change. Internally however, they are stored as numerical values:

In [None]:
%%time
df.head()

## Save the DataFrame to Apache Parquet

Create the directory for the Apache Parquet output:

In [None]:
from subprocess import run
parquet_path = os.path.join(data_root, 'taxi.parquet')
run(['mkdir', '-p', parquet_path])

Then, remove any files that might already be there:

In [None]:
parquet_to_glob = os.path.join(parquet_path, "*.parquet")
globbed_parquet = glob.glob(parquet_to_glob)
for f in globbed_parquet:
    os.remove(f)

Now, save the dataframe to Parquet:

In [None]:
%%time
df.to_parquet(parquet_path)

Let's look at the files we created:

In [None]:
globbed_parquet = glob.glob(parquet_to_glob)
globbed_parquet[:3], len(globbed_parquet)

We have one file for each partition.

## Restart the client and read from Parquet

Now, we will wipe the data stored in memory and try to load it from Parquet:

In [None]:
del df
client.restart()

We need to specify the categories manually:

In [None]:
%%time
df = dd.read_parquet(parquet_path, categories=['vendor_id', 'store_and_fwd_flag', 'payment_type'])

In [None]:
df

For the file to be loaded, we need to call `persist`, like before:

In [None]:
%%time
client.persist(df)

Look at the Dashboard. Loading should now take a fraction of time it initially took to load the CSV files.

We can also see that less memory is used than before:

In [None]:
[node.resources.memory_usage for node in nodes]

We can `categorize` again, which will make the categorical indices known:

In [None]:
%%time
df = df.categorize()

In [None]:
df

Let's take a look at the data again:

In [None]:
%%time
df.head()

## Simple analysis

We could be asking a lot of interesting questions about the taxi dataset, but let's keep this simple for now.
Suppose we're interested in tipping habits of people taking taxis.
Let's find out:

 - What the mean tip percentage is by year, month, and hour of the day.
 - What the highest tip amount in the years 2010-2014 was.
 - What the highest tip percentage in the years 2010-2014 was.

In [None]:
paid = df[df['fare_amount'] > 0]

with_tip_percentage = paid.assign(
    tip_percentage=paid['tip_amount'] / paid['fare_amount'])

mean_by_year = with_tip_percentage.groupby(
    with_tip_percentage['pickup_datetime'].dt.year)['tip_percentage'].mean()
mean_by_month = with_tip_percentage.groupby(
    with_tip_percentage['pickup_datetime'].dt.month)['tip_percentage'].mean()
mean_by_hour = with_tip_percentage.groupby(
    with_tip_percentage['pickup_datetime'].dt.hour)['tip_percentage'].mean()

highest_amount = with_tip_percentage['tip_amount'].max()
highest_percentage = with_tip_percentage['tip_percentage'].max()

highest_amount_values = with_tip_percentage[with_tip_percentage['tip_amount'] == highest_amount]
highest_percentage_values = with_tip_percentage[with_tip_percentage['tip_percentage'] == highest_percentage]

We will persist each result first, and then view them one by one:

In [None]:
mean_by_year = client.persist(mean_by_year)
mean_by_month = client.persist(mean_by_month)
mean_by_hour = client.persist(mean_by_hour)

highest_amount = client.persist(highest_amount)
highest_percentage = client.persist(highest_percentage)

highest_amount_values = client.persist(highest_amount_values)
highest_percentage_values = client.persist(highest_percentage_values)

Let's see how many fares there were in total:

In [None]:
paid.shape[0].compute()

862.7 million trips over 5 years, that's about 470 thousand yellow taxi trips each day.

In [None]:
mean_by_year.compute().plot()

The average tip over the years has increased by over 4 percentage points.

In [None]:
mean_by_month.compute().plot()

The average tip seems to increase as the year progresses, only to sharply drop after around the New Year.

In [None]:
mean_by_hour.compute().plot()

The average tip is relatively high in the late morning hours, then sharply drops around lunch.
After 3PM, it steadily increases to reach the peak in the evening, and then drops again in the early morning.

In [None]:
highest_amount.compute()

That's a tip of $938. How many were that generous?

In [None]:
highest_amount_values.shape[0].compute()

Let's take a look at that fare:

In [None]:
highest_amount_values_final = highest_amount_values.head(n=1, npartitions=-1)
highest_amount_values_final

There were two passengers, and the tip was 1646%. Let's find out where they went:

In [None]:
from IPython.core.display import display, HTML

def open_maps(pdf, i):
    p_lo = pdf['pickup_longitude'].iloc[i]
    p_la = pdf['pickup_latitude'].iloc[i]
    d_lo = pdf['dropoff_longitude'].iloc[i]
    d_la = pdf['dropoff_latitude'].iloc[i]
    display(HTML('<a href="https://www.google.com/maps/dir/{},{}/{},{}">link</a>'.format(
        p_la, p_lo, d_la, d_lo)))

In [None]:
open_maps(highest_amount_values_final, 0)

And what was the highest percentage?

In [None]:
highest_percentage.compute()

That's a tip of 120000%. How many were there?

In [None]:
highest_percentage_values.shape[0].compute()

In [None]:
highest_percentage_values_final = highest_percentage_values.head(n=1, npartitions=-1)
highest_percentage_values_final

There was one passenger. They paid a cent, tipped $120 and left the cab after 50 minutes. Interesting.

In [None]:
open_maps(highest_percentage_values_final, 0)

## Learn more about Dask

Here are a few helpful links:

 - [Dask Tutorial on GitHub](https://github.com/dask/dask-tutorial).
 - [Dask DataFrame Performance Tips](http://docs.dask.org/en/latest/dataframe-performance.html)
 - [Managing Memory](http://distributed.dask.org/en/latest/memory.html)
 - [Dask Distributed Documentation](http://distributed.dask.org/en/latest/quickstart.html).
 - [Dask Documentation](https://docs.dask.org/en/latest/).

## Continue with the local notebook

Perform the rest of instructions in the local notebook.