In [6]:
from pathlib import Path
import numpy as np
from PIL import Image
import dask.dataframe as dd
from dask.distributed import Client
from dask.distributed import get_worker
from fabric import Connection, Config

### Connecting the client

In [7]:
client = Client("tcp://128.110.217.91:8786")
client

0,1
Connection method: Direct,
Dashboard: http://128.110.217.91:8787/status,

0,1
Comm: tcp://128.110.217.91:8786,Workers: 2
Dashboard: http://128.110.217.91:8787/status,Total threads: 32
Started: 1 minute ago,Total memory: 125.38 GiB

0,1
Comm: tcp://128.110.217.112:38567,Total threads: 16
Dashboard: http://128.110.217.112:46397/status,Memory: 62.69 GiB
Nanny: tcp://128.110.217.112:37321,
Local directory: /users/prsridha/dask-worker-space/worker-_bbf_qic,Local directory: /users/prsridha/dask-worker-space/worker-_bbf_qic
Tasks executing: 0,Tasks in memory: 0
Tasks ready: 0,Tasks in flight: 0
CPU usage: 4.0%,Last seen: Just now
Memory usage: 100.07 MiB,Spilled bytes: 0 B
Read bytes: 2.76 kiB,Write bytes: 1.17 kiB

0,1
Comm: tcp://128.110.217.73:33701,Total threads: 16
Dashboard: http://128.110.217.73:39685/status,Memory: 62.69 GiB
Nanny: tcp://128.110.217.73:33303,
Local directory: /users/prsridha/dask-worker-space/worker-6vkge3pz,Local directory: /users/prsridha/dask-worker-space/worker-6vkge3pz
Tasks executing: 0,Tasks in memory: 0
Tasks ready: 0,Tasks in flight: 0
CPU usage: 4.0%,Last seen: Just now
Memory usage: 100.74 MiB,Spilled bytes: 0 B
Read bytes: 1.80 kiB,Write bytes: 0.95 kiB


In [8]:
# We will be automating this through REST API calls.
# each worker will initialze a dask-worker instance on it
# the scheduler can then obtain the newly created dask-worker's IP
# through a REST GET call.

def initialize_workers(client):
    """Initialize workers (get worker IPs)"""
    worker_id_ip_dict = {}
    all_worker_details = client.scheduler_info()['workers']
    i = 0
    for ip in all_worker_details:
        # set the mapping between worker ID and worker IP
        worker_id_ip_dict[i] = str(ip)
        i += 1
    return worker_id_ip_dict
workers = initialize_workers(client)

In [23]:
node_to_dask_worker = {
    0: "tcp://128.110.217.73:37225",
    1: "tcp://128.110.217.112:40287"
}

### Reading the dataset

In [10]:
DATA_URL = "metadata.csv"
feature_names = ["image_path", "label"]
dtypes = {'image_path': np.str, 'label': np.int16}
df0 = dd.read_csv(DATA_URL, names=feature_names, dtype=dtypes)
df = df0.sample(frac=0.001)

Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  dtypes = {'image_path': np.str, 'label': np.int16}


In [11]:
df.head()

Unnamed: 0,image_path,label
33731,data/trainingSet/trainingSet/7/img_36135.jpg,7
21289,data/trainingSet/trainingSet/4/img_3790.jpg,4
20349,data/trainingSet/trainingSet/4/img_1831.jpg,4
28533,data/trainingSet/trainingSet/6/img_33670.jpg,6
24025,data/trainingSet/trainingSet/5/img_33007.jpg,5


### Shuffling the data

In [12]:
shuffled_df = df.sample(frac=1)
shuffled_df.head()

Unnamed: 0,image_path,label
4298,data/trainingSet/trainingSet/1/img_27897.jpg,1
9878,data/trainingSet/trainingSet/2/img_27434.jpg,2
22162,data/trainingSet/trainingSet/5/img_34013.jpg,5
19343,data/trainingSet/trainingSet/4/img_36635.jpg,4
16383,data/trainingSet/trainingSet/3/img_13297.jpg,3


In [13]:
sharded_df = shuffled_df.repartition(npartitions=len(workers))

### Pulling each dask partition (multi-media columns) into workers

In [14]:
# This will also be done through the REST API
# The scheduler will pass the paths of the images to each worker.
# Each worker will then pull its list of images from the scheduler/remote S3

user = "prsridha"
host = "ms0921.utah.cloudlab.us"
pem_path = "./cloudlab.pem"
connect_kwargs = {"key_filename":pem_path}
conn = Connection(host, user=user, connect_kwargs=connect_kwargs)

def pull(from_path, to_path):
    to_path_dir = "/".join(to_path.split("/")[:-1])
    Path(to_path_dir).mkdir(parents=True, exist_ok=True)
    result = conn.get(from_path, to_path)
    print("Pulled from {0.remote} to {0.local}".format(result))

In [16]:
part0 = sharded_df.partitions[0]
part1 = sharded_df.partitions[1]

In [17]:
images_list0 = list(part0.image_path.compute())
images_list1 = list(part1.image_path.compute())
with open("node0_list.txt", "w") as f:
    f.write("\n".join(images_list0))
with open("node1_list.txt", "w") as f:
    f.write("\n".join(images_list1))

### Define the preprocessing routine and call map_partitions on each dask partition

In [18]:
def preprocess_row(row):
    return str(row["image_path"]) + ";" + str(row["label"])

In [19]:
def preprocess_col(path):
    new_path = "/users/prsridha/" + path

    im = Image.open(str(new_path))
    pixels = list(im.getdata())
    return sum(pixels)

In [20]:
processed_col_part0 = part0.image_path.map_partitions(lambda x: x.apply(lambda y: preprocess_col(str(y))), meta=('processed_col_part0', int))
processed_col_part1 = part1.image_path.map_partitions(lambda x: x.apply(lambda y: preprocess_col(str(y))), meta=('processed_col_part1', int))

### Compute partitions on their respective workers

In [26]:
processed_col_part0.compute(workers=workers[1])
processed_col_part1.compute(workers=workers[0])

7825     15733
21289    26416
34377    22767
9323     41658
5083     12202
28533    31922
2523     44622
4963     26622
11854    33994
8394     18847
14276    25136
27627    36019
33731    26381
31769    20826
38792    21332
17884    33401
23106    35784
19207    21204
8368     21760
11087    29936
4124     42718
Name: processed_col_part1, dtype: int64

In [28]:
# The next step is to save the new_cols as
# tf records/PyTorch tensors.
# In the actual implementation, compute will not be called here