Step 1: download and convert dataset from CSV to Parquet

In [1]:
!pip install pyarrow

Collecting pyarrow
  Using cached pyarrow-8.0.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (27.2 MB)
Installing collected packages: pyarrow
Successfully installed pyarrow-8.0.0


In [2]:
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

In [3]:
import dask.dataframe as dd
import pandas as pd
import numpy as np
from dask.distributed import Client, LocalCluster
import dask
from dask.distributed import get_task_stream

In [4]:
import time

In [5]:
print('pandas version: %s' % pd.__version__)
print('numpy version: %s' % np.__version__)
print('dask version: %s' % dask.__version__)

pandas version: 1.4.1
numpy version: 1.22.2
dask version: 2022.02.0


In [6]:
print('dask distributed version: %s' % dask.distributed.__version__)

dask distributed version: 2022.02.0


https://docs.databricks.com/_static/notebooks/koalas-benchmark-distributed-execution.html?_ga=2.216403934.95291449.1648935555-599276868.1645477063

Read in parquet to dask

In [None]:
# !conda install -c conda-forge s3fs

# set up testbench

In [7]:
def benchmark(f, df, benchmarks, task_name, **kwargs):
    """Benchmark the given function against the given DataFrame.
    
    Parameters
    ----------
    f: function to benchmark
    df: data frame
    benchmarks: container for benchmark results
    name: task name
    
    Returns
    -------
    Duration (in seconds) of the given operation
    """
    ret_benchmark_vals = {}
    with get_task_stream(plot='save', filename="task-stream.html") as ts:
        start_time = time.time()
        ret = f(df, **kwargs)
        ret_benchmark_vals['raw_duration'] = time.time() - start_time
        ret_benchmark_vals['history'] = ts.data
    benchmarks[task_name] = ret_benchmark_vals
    print(f"{task_name} took: {benchmarks[task_name].get('raw_duration')} seconds")
    return benchmarks[task_name].get("raw_duration")

In [8]:
import collections
dask_benchmarks = collections.defaultdict(dict)
# benchmarks = {"task1" : {"stat1": val, "stat2": val}}

# Define benchmark tasks

In [9]:
all_tasks = []

In [10]:
# sum, then means = simple mapreduce
def read_to_basic_ETL(df = None):
    df = dd.read_parquet(
    "./tmp/", 
    storage_options={"anon": True, 'use_ssl': True})    
    return (df.fare_amount + df.tip_amount).mean().compute()
all_tasks.append(read_to_basic_ETL)

In [11]:
# counts of values seen = simple map, groupby, reduce
def count_values(df):
    return df.fare_amount.value_counts().compute()
all_tasks.append(count_values)

In [12]:
# cpu heavy arithmetic : mapreduce
def complicated_arithmetic_operation(df):
    theta_1 = df.pickup_longitude
    phi_1 = df.pickup_latitude
    theta_2 = df.dropoff_longitude
    phi_2 = df.dropoff_latitude
    temp = (np.sin((theta_2-theta_1)/2*np.pi/180)**2
           + np.cos(theta_1*np.pi/180)*np.cos(theta_2*np.pi/180) * np.sin((phi_2-phi_1)/2*np.pi/180)**2)
    ret = 2 * np.arctan2(np.sqrt(temp), np.sqrt(1-temp))
    return ret.compute()
all_tasks.append(complicated_arithmetic_operation)

In [13]:
def groupby_statistics(df):
    return df.groupby(by='passenger_count').agg(
      {
        'total_amount': ['mean', 'std'], 
        'tip_amount': ['mean', 'std']
      }
    ).compute()
all_tasks.append(complicated_arithmetic_operation)

In [14]:
# join two datasets
def join_data(df):
    return dd.merge(df, other, left_index=True, right_index=True).compute()
all_tasks.append(complicated_arithmetic_operation)

#configure cluser minio

In [15]:
import getpass

In [16]:
#tag::minio_storage_options[]
# straight from holden's example
minio_storage_options = {
    "key": getpass.getpass(),
    "secret": getpass.getpass(),
    "client_kwargs": {
        # "endpoint_url": "http://minio-1602984784.minio.svc.cluster.local:9000",
        # "endpoint_url": "http://minio.pigscanfly.ca:9000",
        "endpoint_url": "http://10.43.207.163:9000",
        "region_name": 'us-east-1'
    },
    "config_kwargs": {"s3": {"signature_version": 's3v4'}},
}
#end::minio_storage_options[]

 ····
 ···············································


# pull in configs and launch dask distributed client

In [7]:
from dask.distributed import get_task_stream
from dask.distributed import Client
from dask_kubernetes import KubeCluster

In [11]:
cluster = KubeCluster.from_yaml('./worker-spec.yaml', namespace='dask') # deploy_mode='remote')

Creating scheduler pod on cluster. This may take some time.


ApiException: (403)
Reason: Forbidden
HTTP response headers: <CIMultiDictProxy('Audit-Id': '9740237c-94a5-4456-90d4-c8902d4072e2', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Kubernetes-Pf-Flowschema-Uid': 'f4b0b858-1f7f-4889-bc0e-2f88e34c5df3', 'X-Kubernetes-Pf-Prioritylevel-Uid': '89a653fb-1121-41a3-aced-f4d46e6ab8d6', 'Date': 'Sun, 08 May 2022 21:13:18 GMT', 'Content-Length': '303')>
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"pods \"dask-jovyan-4b1e4b37-b\" is forbidden: error looking up service account dask/mkimmins: serviceaccount \"mkimmins\" not found","reason":"Forbidden","details":{"name":"dask-jovyan-4b1e4b37-b","kind":"pods"},"code":403}



In [12]:
#tag::make_dask_k8s_client[]
import dask
from dask.distributed import Client
from dask_kubernetes import KubeCluster, make_pod_spec
worker_template = make_pod_spec(image='holdenk/dask:latest',
                         memory_limit='8G', memory_request='8G',
                         cpu_limit=1, cpu_request=1, extra_container_config={ "imagePullPolicy": "Always" })
scheduler_template = make_pod_spec(image='holdenk/dask:latest',
                         memory_limit='4G', memory_request='4G',
                         cpu_limit=1, cpu_request=1, extra_container_config={ "imagePullPolicy": "Always" })


In [None]:
cluster = KubeCluster(pod_template = worker_template, scheduler_pod_template = scheduler_template)
# cluster.adapt(minimum=1)    # or create and destroy workers dynamically based on workload
from dask.distributed import Client
client = Client(cluster)
#end::make_dask_k8s_client[]

Creating scheduler pod on cluster. This may take some time.


In [None]:
# # Connect Dask to the cluster
# client = Client(cluster)
# client # the repr gives us useful links

In [None]:
client.scheduler_comm.comm.handshake_info()

# run the tasks

In [None]:
df = dd.read_parquet(
    f's3://mika/dask_sandbox/', 
    storage_options=minio_storage_options, 
                      engine="pyarrow"
)

In [None]:
df.head()

In [None]:
# df = dd.read_parquet(
#     "./tmp/", 
#     storage_options={"anon": True, 'use_ssl': True})

In [None]:
# for task in all_tasks:
#     benchmark(task, df=df, benchmarks = dask_benchmarks, task_name = task.__name__)

In [6]:
# from scheduler_profilers import pyspy_on_scheduler, viztrace_scheduler


In [8]:
# with (
#     pyspy_on_scheduler("pyspy2.json"),
#     # ^ Saves a speedscope profile to `pyspy.json` locally
#     viztrace_scheduler(
#         "viztracer.json", trace_sparse="distributed.Scheduler.update_graph_hlg"
#     ),
#     # ^ Saves a Chrome trace to `viztracer.json` locally
# ):
#     df.fare_amount.value_counts().compute()

2022-04-22 19:15:47,523 - distributed.core - ERROR - update_graph_hlg() missing 1 required positional argument: 'self'
Traceback (most recent call last):
  File "/Users/mk/projects/distributed/distributed/core.py", line 625, in handle_stream
    handler(**merge(extra, msg))
TypeError: update_graph_hlg() missing 1 required positional argument: 'self'
2022-04-22 19:15:47,527 - distributed.core - ERROR - Exception while handling op register-client
Traceback (most recent call last):
  File "/Users/mk/projects/distributed/distributed/core.py", line 559, in handle_comm
    result = await result
  File "/Users/mk/projects/distributed/distributed/scheduler.py", line 5498, in add_client
    await self.handle_stream(comm=comm, extra={"client": client})
  File "/Users/mk/projects/distributed/distributed/core.py", line 625, in handle_stream
    handler(**merge(extra, msg))
TypeError: update_graph_hlg() missing 1 required positional argument: 'self'
2022-04-22 19:15:47,529 - tornado.application - E

Saving report to /var/folders/jn/54vtqbx5231b0w09shgckwv80000gn/T/tmpz5a1_29yviztracer..json ...Loading finish                                        

Dumping trace data to json, total entries: 2, estimated json file size: 240.0B
Report saved.


CancelledError: ('value-counts-agg-6077da6b9ab31c4748e89eb4ef5c072d', 0)

In [None]:
import sys
print(sys.executable)
print(sys.version)
print(sys.version_info)

In [None]:
df.fare_amount.value_counts().compute()

In [None]:
[benchmark(task, df=df, benchmarks = dask_benchmarks, task_name = task.__name__) for task in all_tasks]

# history groking

In [None]:
# add the analyzed dataframes

for task_name, output_values in dask_benchmarks.items():
    dask_hx = output_values.get("history")
    hx_df = pd.DataFrame (dask_hx, columns = ['worker','status','nbytes', 'thread', 'type', 'typename', 'metadata', 'startstops', 'key'])
    hx_ddf = dd.from_pandas(hx_df, npartitions=1)
    exploded_df = hx_ddf.explode("startstops")
    exploded_df['action'] = exploded_df['startstops'].apply(lambda x: x['action'], meta = ("action", str))
    exploded_df['start'] = exploded_df['startstops'].apply(lambda x: x['start'], meta = ("start", np.float64))
    exploded_df['end'] = exploded_df['startstops'].apply(lambda x: x['stop'], meta = ("stop", np.float64))
    exploded_df['action_duration'] = exploded_df['end'] - exploded_df['start']
    exploded_df_only_agg_fields = exploded_df[['worker', 'action', 'action_duration']]
    time_per_worker_and_action = exploded_df_only_agg_fields.groupby(['worker','action']).agg("sum")
    nbytes_per_worker = hx_ddf[['worker', 'nbytes']].groupby(["worker"]).agg("sum")
    output_values["time_per_worker_and_action"] = time_per_worker_and_action.compute()
    output_values["nbytes_per_worker"] = nbytes_per_worker.compute()

In [None]:
# access the analyzed dataframes like so:
dask_benchmarks['read_to_basic_ETL']["time_per_worker_and_action"]
dask_benchmarks['read_to_basic_ETL']["nbytes_per_worker"]

# try to do something fancy with the history

In [None]:
hx = dask_benchmarks.get("read_to_basic_ETL").get("history")

use dask to do dask :D

In [None]:
import pandas as pd
pd.set_option('display.max_colwidth', None)

In [None]:
hx_df = pd.DataFrame (hx, columns = ['worker','status','nbytes', 'thread', 'type', 'typename', 'metadata', 'startstops', 'key'])

In [None]:
hx_ddf = dd.from_pandas(hx_df, npartitions=1)

In [None]:
# the startstops are nested. we need to unnest this for action stuff only. but use nested for all other
hx_ddf.head(20)

In [None]:
# the startstops are nested. we need to unnest this for action stuff only.
exploded_df = hx_ddf.explode("startstops")

In [None]:
# AAAAGH THIS TOOK FOREVER TO FIGURE OUT O_O
exploded_df['action'] = exploded_df['startstops'].apply(lambda x: x['action'], meta = ("action", str))
exploded_df['start'] = exploded_df['startstops'].apply(lambda x: x['start'], meta = ("start", np.float64))
exploded_df['end'] = exploded_df['startstops'].apply(lambda x: x['stop'], meta = ("stop", np.float64))
exploded_df['action_duration'] = exploded_df['end'] - exploded_df['start']

In [None]:
exploded_df_only_agg_fields = exploded_df[['worker', 'action', 'action_duration']]

In [None]:
time_per_worker_and_action = exploded_df_only_agg_fields.groupby(['worker','action']).agg("sum")

This is final for time_per_worker_and_action

In [None]:
time_per_worker_and_action.head(20)

In [None]:
# now get nbytes per worker

In [None]:
nbytes_per_worker = hx_ddf[['worker', 'nbytes']].groupby(["worker"]).agg("sum")

In [None]:
nbytes_per_worker.head(20)