In [51]:
import time
from time import perf_counter

# Data processing.
import json
import numpy as np

import pandas as pd

# Q6: For loading multiple CSVs into a single (pandas) dataframe.
import glob
import os

# Plotting.
import matplotlib.pyplot as plt
%matplotlib notebook
import seaborn as sns
sns.set_style('whitegrid')

# "Vanilla" python parallelism.
import multiprocessing

# Scalable data analytics: dask.
import dask
import dask.bag as db
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster
import graphviz

# Unused: scalable data analytics using Spark.
#from pyspark.sql import SparkSession

# For GC large pandas dataframes after use.
import gc

# Ignore warnings.
import warnings
warnings.simplefilter("ignore")

In [15]:
from collections import defaultdict

In [16]:
# This lazily creates a timeseries dataset for us with around 7.6M rows.
dd_df = dd.demo.make_timeseries(start='2018-01-01',
                                end='2018-02-01',
                                # Use the one above; I'll use this larger one in class.
                                #start='2008-01-01',
                                #end='2018-03-30',
                                dtypes={'x': float, 'y': float, 'id': int},
                                freq='1s',
                                partition_freq='24h')

pd_df = dd_df.compute()

In [49]:
def scale(cores):
    n_workers = min(cores, multiprocessing.cpu_count())
    print('Scale cluster up again: ')
    cluster.scale(n_workers)

    print('\nWait a bit for scaling to take effect...')
    time.sleep(1)

    print('\nCluster workers:')
    print(client.ncores())

In [18]:
n_workers = 8
cluster = LocalCluster(ip=None, n_workers=8, processes=True, threads_per_worker=1)
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://127.0.0.1:33867  Dashboard: http://127.0.0.1:42547/status,Cluster  Workers: 8  Cores: 8  Memory: 10.46 GB


In [59]:
def get_q2_results():
    window_5min = 60*5
    pd_times = []
    dd_times = []
    cores = [1,2,4,6,8]
    for i in cores:
        scale(i)
        # Get dask results
        start_dd = perf_counter()
        dd_result = dd_df.y.rolling(window=window_5min).mean().compute()
        dd_times.append((perf_counter() - start_dd)*1000)
        # Get pandas results
        start_pd = perf_counter()
        pd_result = pd_df.y.rolling(window=window_5min).mean()
        pd_times.append((perf_counter() - start_pd)*1000)
    return dd_times, dd_result, pd_times, pd_result

q2_results = get_q2_results()

# Plot
cores = [1,2,4,6,8]
width = 0.4
plt.bar([c+width/2 for c in cores], q2_results[0], width, label="dask times")
plt.bar([c-width/2 for c in cores], q2_results[2], width, label="pandas times")
plt.ylabel("Time (ms)")
plt.xlabel("Cores")
plt.legend()
plt.show()


In [39]:
## Q3
mybinder_url = 'https://archive.analytics.mybinder.org/'
db.read_text(mybinder_url+"index.jsonl").map(json.loads).take(20)

({'name': 'events-2018-11-03.jsonl', 'date': '2018-11-03', 'count': '7057'},
 {'name': 'events-2018-11-04.jsonl', 'date': '2018-11-04', 'count': '7489'},
 {'name': 'events-2018-11-05.jsonl', 'date': '2018-11-05', 'count': '13590'},
 {'name': 'events-2018-11-06.jsonl', 'date': '2018-11-06', 'count': '13920'},
 {'name': 'events-2018-11-07.jsonl', 'date': '2018-11-07', 'count': '12766'},
 {'name': 'events-2018-11-08.jsonl', 'date': '2018-11-08', 'count': '14105'},
 {'name': 'events-2018-11-09.jsonl', 'date': '2018-11-09', 'count': '11843'},
 {'name': 'events-2018-11-10.jsonl', 'date': '2018-11-10', 'count': '7047'},
 {'name': 'events-2018-11-11.jsonl', 'date': '2018-11-11', 'count': '6940'},
 {'name': 'events-2018-11-12.jsonl', 'date': '2018-11-12', 'count': '16322'},
 {'name': 'events-2018-11-13.jsonl', 'date': '2018-11-13', 'count': '16530'},
 {'name': 'events-2018-11-14.jsonl', 'date': '2018-11-14', 'count': '14099'},
 {'name': 'events-2018-11-15.jsonl', 'date': '2018-11-15', 'count': 

In [38]:
def get_q3_results():
    
    urls = (db.read_text('https://archive.analytics.mybinder.org/index.jsonl')
                        .map(json.loads)
                        .pluck('name')
                        .filter(lambda x: x.split('-')[2] == "08" and x.split('-')[1] == "2019")
                        .compute())
    urls = ['https://archive.analytics.mybinder.org/' + u for u in urls]
    results = defaultdict(lambda: 0)
    for url in urls:
        data = db.read_text(url).map(json.loads)
        for d in data:
            results[d["provider"]] += 1
    return results


## Q3 
q3_times = []
cores = [1,2,4]
repetitions = 5
for num_cores in cores:
    tot_time = 0
    scale(num_cores)
    for i in range(repetitions):
        start = perf_counter()
        get_q3_results()
        tot_time += perf_counter() - start
    q3_times.append((tot_time/repetitions)*1000)

    
# Plot Timing Results
plt.bar(cores, q3_times)
# plt.plot(cores, q2_results[2], label="pandas times")
plt.ylabel("Time (ms)")
plt.xlabel("Cores")
plt.show()

# Sort Results
results_list = list()
for key, value in results.items():
    results_list.append((key, value))
results_list = sorted(results_list, key=lambda x: x[1], reverse=True)

results_list