In [1]:
import dask
from dask_kubernetes import KubeCluster
import numpy as np
import pyarrow

In [2]:
# Specify a remote deployment using a load blanacer if we are running the NB outside of the cluster
#dask.config.set({"kubernetes.scheduler-service-type": "LoadBalancer"})

In [3]:
cluster = KubeCluster.from_yaml('worker-spec.yaml', namespace='dask') # deploy_mode='remote')

distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:   tcp://10.42.4.201:38419
distributed.scheduler - INFO -   dashboard at:                     :8787


In [4]:
cluster.adapt(minimum=1, maximum=10)

distributed.deploy.adaptive - INFO - Adaptive scaling started: minimum=1 maximum=10


<distributed.deploy.adaptive.Adaptive at 0x7f4b1da910>

In [5]:
# Example usage
from dask.distributed import Client
import dask.array as da

# Connect Dask to the cluster
client = Client(cluster)
client # the repr gives us useful links

distributed.scheduler - INFO - Receive client connection: Client-f5704627-1df8-11eb-8072-6211e88f4455
distributed.core - INFO - Starting established connection


0,1
Client  Scheduler: tcp://10.42.4.201:38419  Dashboard: http://10.42.4.201:8787/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


In [6]:
client.scheduler_comm.comm.handshake_info()

{'compression': 'lz4', 'python': (3, 8, 6), 'pickle-protocol': 5}

In [7]:
# Create a large array and calculate the mean
array = da.ones((1000, 1000, 1000))
print(array.mean().compute())  # Should print 1.0|

distributed.scheduler - INFO - Register worker <Worker 'tcp://10.42.4.208:38881', name: 0, memory: 0, processing: 64>
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.42.4.208:38881
distributed.core - INFO - Starting established connection


1.0


So now we know the cluster is doing ok :)

Configure dask to talk to our local MinIO

In [8]:
# The anon false wasted so much time
minio_storage_options = {
#    "anon": "false",
    "key": "YOURACCESSKEY",
    "secret": "YOURSECRETKEY",
    "client_kwargs": {
        "endpoint_url": "http://minio-1602984784.minio.svc.cluster.local:9000",
        "region_name": 'us-east-1'
    },
    "config_kwargs": {"s3": {"signature_version": 's3v4'}},
}

#tag::minio_storage_options[]
minio_storage_options = {
    "key": "YOURACCESSKEY",
    "secret": "YOURSECRETKEY",
    "client_kwargs": {
        "endpoint_url": "http://minio-1602984784.minio.svc.cluster.local:9000",
        "region_name": 'us-east-1'
    },
    "config_kwargs": {"s3": {"signature_version": 's3v4'}},
}
#end::minio_storage_options[]

Download the GH archive data

In [9]:
import datetime
import dask.dataframe as dd

In [10]:
current_date=datetime.datetime(2020,10,1, 1)

In [12]:
#tag::make_file_list[]
gh_archive_files=[]
while current_date < datetime.datetime.now() -  datetime.timedelta(days=1):
    current_date = current_date + datetime.timedelta(hours=1)
    datestring = f'{current_date.year}-{current_date.month:02}-{current_date.day:02}-{current_date.hour}'
    gh_url = f'http://data.githubarchive.org/{datestring}.json.gz'
    gh_archive_files.append(gh_url)
#end::make_file_list[]

In [13]:
gh_archive_files[0]

'http://data.githubarchive.org/2020-10-01-2.json.gz'

In [14]:
#tag::load_data[]
df = dd.read_json(gh_archive_files, compression='gzip')
df.columns
#end::load_data[]

In [15]:
len(df)

92084

In [16]:
# What kind of file systems are supported?
#tag::known_fs[]
from fsspec.registry import known_implementations
known_implementations
#end::known_fs[]
#tag::known_fs_result[]

{'file': {'class': 'fsspec.implementations.local.LocalFileSystem'},
 'memory': {'class': 'fsspec.implementations.memory.MemoryFileSystem'},
 'dropbox': {'class': 'dropboxdrivefs.DropboxDriveFileSystem',
  'err': 'DropboxFileSystem requires "dropboxdrivefs","requests" and "dropbox" to be installed'},
 'http': {'class': 'fsspec.implementations.http.HTTPFileSystem',
  'err': 'HTTPFileSystem requires "requests" and "aiohttp" to be installed'},
 'https': {'class': 'fsspec.implementations.http.HTTPFileSystem',
  'err': 'HTTPFileSystem requires "requests" and "aiohttp" to be installed'},
 'zip': {'class': 'fsspec.implementations.zip.ZipFileSystem'},
 'gcs': {'class': 'gcsfs.GCSFileSystem',
  'err': 'Please install gcsfs to access Google Storage'},
 'gs': {'class': 'gcsfs.GCSFileSystem',
  'err': 'Please install gcsfs to access Google Storage'},
 'gdrive': {'class': 'gdrivefs.GoogleDriveFileSystem',
  'err': 'Please install gdrivefs for access to Google Drive'},
 'sftp': {'class': 'fsspec.impl

In [None]:
#end::known_fs_result[]

What does our data look like?

In [17]:
df.columns

Index(['id', 'type', 'actor', 'repo', 'payload', 'public', 'created_at',
       'org'],
      dtype='object')

In [18]:
h = df.head()
h

Unnamed: 0,id,type,actor,repo,payload,public,created_at,org
0,13697091615,PushEvent,"{'id': 53307937, 'login': 'randomperson190', '...","{'id': 256356140, 'name': 'randomperson190/Con...","{'push_id': 5779108204, 'size': 1, 'distinct_s...",True,2020-10-01 02:00:00+00:00,
1,13697091617,PushEvent,"{'id': 14303400, 'login': 'fortyeightbits', 'd...","{'id': 296996950, 'name': 'fortyeightbits/Bobb...","{'push_id': 5779108210, 'size': 1, 'distinct_s...",True,2020-10-01 02:00:00+00:00,
2,13697091625,PushEvent,"{'id': 50973899, 'login': 'tielm1997', 'displa...","{'id': 300105421, 'name': 'tielm1997/easy_note...","{'push_id': 5779108215, 'size': 1, 'distinct_s...",True,2020-10-01 02:00:00+00:00,
3,13697091628,PushEvent,"{'id': 70508524, 'login': 'mildredcleveland134...","{'id': 299801944, 'name': 'mildredcleveland134...","{'push_id': 5779108219, 'size': 1, 'distinct_s...",True,2020-10-01 02:00:00+00:00,
4,13697091631,PushEvent,"{'id': 33073314, 'login': 'sprucelabs-ci', 'di...","{'id': 250079471, 'name': 'sprucelabsai/spruce...","{'push_id': 5779108213, 'size': 1, 'distinct_s...",True,2020-10-01 02:00:00+00:00,"{'id': 31485022, 'login': 'sprucelabsai', 'gra..."


In [19]:
import pandas as pd
j = pd.io.json.json_normalize(h.repo)
j

  j = pd.io.json.json_normalize(h.repo)


Unnamed: 0,id,name,url
0,256356140,randomperson190/ControlDeOrganicos,https://api.github.com/repos/randomperson190/C...
1,296996950,fortyeightbits/BobbinApp,https://api.github.com/repos/fortyeightbits/Bo...
2,300105421,tielm1997/easy_notes_app,https://api.github.com/repos/tielm1997/easy_no...
3,299801944,mildredcleveland134/helloworld,https://api.github.com/repos/mildredcleveland1...
4,250079471,sprucelabsai/spruce-schema,https://api.github.com/repos/sprucelabsai/spru...


In [20]:
j.name

0    randomperson190/ControlDeOrganicos
1              fortyeightbits/BobbinApp
2              tielm1997/easy_notes_app
3        mildredcleveland134/helloworld
4            sprucelabsai/spruce-schema
Name: name, dtype: object

Since we want to partition on the repo name, we need to extract that to it's own column

In [21]:
data_bag = df.to_bag()

In [22]:
# The records returned by the bag are tuples not named tuples, so use the df columns to look up the tuple index
cols = df.columns

In [23]:
def parse_record(record):
    r = {
        "repo": pd.io.json.json_normalize(record[cols.get_loc("repo")]),
        "repo_name": record[cols.get_loc("repo")]["name"],
        "type": record[cols.get_loc("type")],
        "id": record[cols.get_loc("id")],
        "created_at": record[cols.get_loc("created_at")],
        "payload": pd.io.json.json_normalize(record[cols.get_loc("payload")])}
    return r

#tag::cleanup[]
def clean_record(record):
    r = {
        "repo": record[cols.get_loc("repo")],
        "repo_name": record[cols.get_loc("repo")]["name"],
        "type": record[cols.get_loc("type")],
        "id": record[cols.get_loc("id")],
        "created_at": record[cols.get_loc("created_at")],
        "payload": record[cols.get_loc("payload")]}
    return r

cleaned_up_bag = data_bag.map(clean_record)
res = cleaned_up_bag.to_dataframe()
#end::cleanup[]
parsed_bag = data_bag.map(parse_record)

In [24]:
cleaned_up_bag.take(1)

({'repo': {'id': 256356140,
   'name': 'randomperson190/ControlDeOrganicos',
   'url': 'https://api.github.com/repos/randomperson190/ControlDeOrganicos'},
  'repo_name': 'randomperson190/ControlDeOrganicos',
  'type': 'PushEvent',
  'id': 13697091615,
  'created_at': Timestamp('2020-10-01 02:00:00+0000', tz='UTC'),
  'payload': {'push_id': 5779108204,
   'size': 1,
   'distinct_size': 1,
   'ref': 'refs/heads/master',
   'head': '75aa54756c61184d73064fdfc03a90ab9bc005b6',
   'before': '7ee325335c81d127a1e63bd483c3d846fbd479f0',
   'commits': [{'sha': '75aa54756c61184d73064fdfc03a90ab9bc005b6',
     'author': {'name': 'randomperson190',
      'email': '2c19ed4ffbb8866b68a86026e8e0f9eb4284310f@users.noreply.github.com'},
     'message': 'Updated',
     'distinct': True,
     'url': 'https://api.github.com/repos/randomperson190/ControlDeOrganicos/commits/75aa54756c61184d73064fdfc03a90ab9bc005b6'}]}},)

In [25]:
res = cleaned_up_bag.to_dataframe()
parsed_res = parsed_bag.to_dataframe()
h = res.head()
h_parsed = parsed_res.head()

In [26]:
h

Unnamed: 0,repo,repo_name,type,id,created_at,payload
0,"{'id': 256356140, 'name': 'randomperson190/Con...",randomperson190/ControlDeOrganicos,PushEvent,13697091615,2020-10-01 02:00:00+00:00,"{'push_id': 5779108204, 'size': 1, 'distinct_s..."
1,"{'id': 296996950, 'name': 'fortyeightbits/Bobb...",fortyeightbits/BobbinApp,PushEvent,13697091617,2020-10-01 02:00:00+00:00,"{'push_id': 5779108210, 'size': 1, 'distinct_s..."
2,"{'id': 300105421, 'name': 'tielm1997/easy_note...",tielm1997/easy_notes_app,PushEvent,13697091625,2020-10-01 02:00:00+00:00,"{'push_id': 5779108215, 'size': 1, 'distinct_s..."
3,"{'id': 299801944, 'name': 'mildredcleveland134...",mildredcleveland134/helloworld,PushEvent,13697091628,2020-10-01 02:00:00+00:00,"{'push_id': 5779108219, 'size': 1, 'distinct_s..."
4,"{'id': 250079471, 'name': 'sprucelabsai/spruce...",sprucelabsai/spruce-schema,PushEvent,13697091631,2020-10-01 02:00:00+00:00,"{'push_id': 5779108213, 'size': 1, 'distinct_s..."


In [27]:
type(h.iloc[0]["repo"])

dict

In [28]:
type(h_parsed.iloc[0]["repo"])

pandas.core.frame.DataFrame

In [29]:
#to_csv brings it back locally, lets try parquet. csv doesn't handle nesting so well so use original json inside
df.to_csv("s3://dask-test/boop-test-csv", storage_options=minio_storage_options)

distributed.scheduler - INFO - Remove worker <Worker 'tcp://10.42.4.208:38881', name: 0, memory: 0, processing: 1>
distributed.core - INFO - Removing comms to tcp://10.42.4.208:38881
distributed.scheduler - INFO - Lost all workers
distributed.scheduler - INFO - Register worker <Worker 'tcp://10.42.4.208:34231', name: 0, memory: 0, processing: 1>
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.42.4.208:34231
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register worker <Worker 'tcp://10.42.2.226:33757', name: 1, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.42.2.226:33757
distributed.core - INFO - Starting established connection


['dask-test/boop-test-csv/0.part']

In [30]:
# This will probably still bring everything back to the client? I'm guessing though.
res.to_parquet("s3://dask-test/boop-test-pq", compression="gzip", storage_options=minio_storage_options, engine="pyarrow")

distributed.scheduler - INFO - Retire worker names (0,)
distributed.scheduler - INFO - Retire workers {<Worker 'tcp://10.42.4.208:34231', name: 0, memory: 0, processing: 0>}
distributed.scheduler - INFO - Closing worker tcp://10.42.4.208:34231
distributed.scheduler - INFO - Remove worker <Worker 'tcp://10.42.4.208:34231', name: 0, memory: 0, processing: 0>
distributed.core - INFO - Removing comms to tcp://10.42.4.208:34231
distributed.deploy.adaptive - INFO - Retiring workers [0]


In [31]:
parsed_res.to_parquet("s3://dask-test/boop-test-pq-p-nested", compression="gzip", storage_options=minio_storage_options, engine="fastparquet")

distributed.scheduler - INFO - Remove worker <Worker 'tcp://10.42.2.226:33757', name: 1, memory: 0, processing: 1>
distributed.core - INFO - Removing comms to tcp://10.42.2.226:33757
distributed.scheduler - INFO - Lost all workers
distributed.scheduler - INFO - Register worker <Worker 'tcp://10.42.2.226:41689', name: 1, memory: 0, processing: 1>
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.42.2.226:41689
distributed.core - INFO - Starting established connection


ValueError: Error converting column "repo" to bytes using encoding UTF8. Original error: bad argument type for built-in operation

In [None]:
#tag::write[]
res.to_parquet("s3://dask-test/boop-test-partioned",
              partition_on=["type", "repo_name"], # Based on " there will be no global groupby." I think this is the value we want.
              compression="gzip",
              storage_options=minio_storage_options, engine="pyarrow")
#end::write[]

distributed.scheduler - INFO - Remove worker <Worker 'tcp://10.42.2.226:41689', name: 1, memory: 0, processing: 1>
distributed.core - INFO - Removing comms to tcp://10.42.2.226:41689
distributed.scheduler - INFO - Lost all workers
distributed.scheduler - INFO - Register worker <Worker 'tcp://10.42.2.226:45549', name: 1, memory: 0, processing: 1>
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.42.2.226:45549
distributed.core - INFO - Starting established connection
