# From concurrent.futures to Dataframes

In this notebook we look at real data while using a cluster of computers.  For programming we will start with concurrent.futures and then transition to parallel dataframes.  This will give us experience with real data and provide some intuition about what is happening when we use big dataframes such as are provided by Spark or Dask dataframe.

To begin, we look at the [New York City Taxi Cab dataset](http://www.nyc.gov/html/tlc/html/about/trip_record_data.shtml).  This includes every ride made in the city of New York in the year 2016.  This data is stored in the Parquet format, which we can read with the [fastparquet](http://fastparquet.readthedocs.io/en/latest/) and [gcsfs](http://gcsfs.readthedocs.io/en/latest/) Python libraries.

In [None]:
from gcsfs import GCSFileSystem
gcs = GCSFileSystem(token='cloud')
gcs

In [None]:
import fastparquet

pf = fastparquet.ParquetFile('anaconda-public-data/nyc-taxi/2015.parquet', open_with=gcs.open)
pf

## Reading a subset 

Normally we would call the `pf.to_pandas()` method to read this data into memory as a Pandas dataframe.  However in this case that would be unwise because this data is too large to fit comfortably in RAM (please do not try this, you will likely kill your notebook session).

Fortunately Parquet files are split into row groups, each of which does fit nicely into memory.  The following function will read a single row group for us from our Parquet file.

In [None]:
from fastparquet.api import _pre_allocate
from fastparquet.core import read_row_group_file

columns = ['tpep_pickup_datetime', 'passenger_count', 'pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude', 'payment_type', 'fare_amount', 'tip_amount', 'total_amount']

def read_row_group(rg):
    fn = pf.row_group_filename(rg)
    categories = {}
    index = None
    cs = {}
    dt = pf.dtypes
    schema = pf.schema

    df, views = _pre_allocate(rg.num_rows, columns, categories, index, cs, dt)
    read_row_group_file(fn,rg, columns, categories, schema, cs,
                        open=gcs.open, assign=views)

    return df

In [None]:
read_row_group(pf.row_groups[0])

The result of this function call is one Pandas dataframe with a few million rows.  There are several such row groups.

In [None]:
len(pf.row_groups)

### Remote execution with concurrent.futures

While we don't have enough memory to handle all of this data locally, we can ask the machines in our cluster to do this work for us.  We connect to the cluster with Dask below and use the concurrent.futures interface to load call this same function remotely.

In [None]:
from dask.distributed import Client, progress
client = Client('schedulers:9000')
client

In [None]:
future = client.submit(read_row_group, pf.row_groups[0])
future

If you watch [Dask's diagnostic dashboard](../../../9002/status) you will see this function run and stay in memory on one of the remote workers.

The Pandas dataframe now lives on that machine.  We can submit computations to run on that remote dataframe by submitting new tasks onto our future.

In [None]:
len_future = client.submit(len, future)
len_future

This too runs remotely.  By calling submit on futures we can chain computations without ever bringing the data back to our local machine.

However, if we do want to bring data back, we can do so with the `.result()` method like before.  

In [None]:
len_future.result()

With a `ThreadPoolExecutor` calling `.result()` did two things

1.  Wait for the computation to finish
2.  Return the finished value

Now calling result does *three* things

1.  Wait for the computation to finish
2.  **Communicate** the data from the worker to our local machine
3.  Return the finished value

This extra step of communication can be *expensive* so we prefer not to call result unless we really have to.  For example, it might take a while if we gather the full dataframe back from the worker to our local machine.

In [None]:
%time local_df = future.result()

This extra cost of communication is something that we should be aware of.

### Exercise

Now that we have some real data, lets compute some things about New York.

1.  How many passengers rode in cab rides in 2016 total?
2.  How many rides had more than two passengers?
3.  What was the average number of passengers over all rides?
4.  (hard) How many rides were there holding one passenger, two passengers, three passengers, etc..

First, use `client.submit` or `client.map` and the `read_row_group` function on each of the row groups to create a list of futures of Pandas dataframes in remote memory.

In [None]:
# Use map or submit with the `read_row_group` function 
# on each of the row groups to get a list of futures of Pandas dataframes

futures = ...

# How much memory do these take up across the cluster
# (this is on the diagnostic dashboard)

In [None]:
# How many passengers rode in cab rides in 2016 total?
# (answer provided for this question)

def f(df):
    return df.passenger_count.sum()

counts = client.map(f, futures)
total = client.submit(sum, counts)
total.result()

In [None]:
# How many rides were there that had more than two passengers?



In [None]:
# What was the average number of passengers over all rides?



In [None]:
# (hard) How many rides were there for each passenger count?



In [None]:
%load solutions/nyc-futures.py

### Up Next

The exercises that we have just done are exactly how projects like Spark Dataframes and Dask dataframes work and the algorithms that we've built are very similar to the algorithms contained within those projects.  However, because all of these tricks have already been implemented we can use them to accomplish the same results, but in much less code.

It's useful to remember that "big" dataframes are just collections of smaller in-memory dataframes on which we run normal functions.

In [None]:
import dask.dataframe as dd
df = dd.read_parquet('gcs://anaconda-public-data/nyc-taxi/2015.parquet',
                     columns=columns,
                     storage_options={'token': 'cloud'}).persist()
progress(df)

In [None]:
df.passenger_count.sum().compute()

In [None]:
df.passenger_count.mean().compute()

In [None]:
df.passenger_count.value_counts().compute()