In [None]:
# https://stat.columbia.edu/~rachel/datasets/
# https://stat.columbia.edu/~rachel/datasets/csv1.csv

In [None]:
import dask
import numpy as np
import numpy.typing as npt
from typing import *
import pandas as pd
import dask.dataframe as dd


In [None]:
url = "https://stat.columbia.edu/~rachel/datasets/nyt0.csv"
#tag::ex_load_1kb[]
df = dd.read_csv(url)

In [None]:
df.info

In [None]:
df.head()

In [None]:
df.dtypes

In [None]:
# mayby dropna

In [None]:
# df.compute()

In [None]:
# Split the dataframe by the 'adult' column

adult_df = df[df["Age"] >= 19]
minors_df = df[df["Age"] < 19]

In [None]:
adult_df.compute()

In [None]:
b_df = adult_df.categorize(columns=['Age'])

In [None]:
b_df.compute()

In [None]:
df2 = df.categorize(
    columns=["Age"],
    split_every=10
)
df2.compute()

In [None]:
df.categorize(
    x=df["Age"],
    bins=[0, 18, 24, 34, 44, 54, 64, np.inf]
)

Machine Learning with Dask

In [None]:
# !pip install scikeras>=0.1.8
# !pip install tensorflow>=2.3.0
# !pip install -U skorch
# !pip install torch
# !pip install torchvision
# !pip install pytorch-cpu #not sure if i need to fix this
!pip install s3fs

In [None]:
!pip install cloudpickle==2.1.0
!pip install dask==2022.05.0
!pip install distributed==2022.5.0
!pip install lz4==4.0.0
!pip install msgpack==1.0.3
!pip install toolz==0.11.2


setup cluster

In [None]:
import dask
# Dask multithreading is only suited for mostly non-Python code (like pandas, numpy, etc.)
#tag::threads[]
dask.config.set(scheduler='threads')
#end::threads[]
#tag::process[]
dask.config.set(scheduler='processes')
#end::process[]
#tag::dask_use_forkserver[]
dask.config.set({"multiprocessing.context": "forkserver", "scheduler": "processes"})
#end::dask_use_forkserver[]

In [None]:
!export

In [None]:
#tag::make_dask_k8s_client[]
import dask
from dask.distributed import Client
from dask_kubernetes import KubeCluster, make_pod_spec
# Use load balancer to make it externally available, for purely internal
# the default of "ClusterIP" is better.
dask.config.set({"kubernetes.scheduler-service-type": "LoadBalancer"})
worker_template = make_pod_spec(image='holdenk/dask:latest',
                         memory_limit='8G', memory_request='8G',
                         cpu_limit=1, cpu_request=1)
scheduler_template = make_pod_spec(image='holdenk/dask:latest',
                         memory_limit='4G', memory_request='4G',
                         cpu_limit=1, cpu_request=1)
cluster = KubeCluster(pod_template = worker_template, scheduler_pod_template = scheduler_template)
cluster.adapt()    # or create and destroy workers dynamically based on workload
from dask.distributed import Client
client = Client(cluster)
#end::make_dask_k8s_client[]

In [None]:
client.close()

In [11]:
from dask.distributed import Client
# when working with clusters, specify cluster config, n_workers and worker_size
client = Client()

In [12]:
import pandas as pd
import glob
import toolz
import dask
import dask.array as da
import torch
from torchvision import transforms
from PIL import Image
import numpy as np
import torch.nn as nn
import torch.optim as optim # optimization algo (eg SGD, Adam)
import torch.nn.functional as F # non-linear activation fn (e.g. relu, softmin, softamx, logsigmoid)
from torchvision import datasets, transforms # convenience wrapper for datasets and model architectures, common image transformations
from torch.utils.data.sampler import SubsetRandomSampler #validation test split
import urllib.request
import zipfile

1. Extract: get fashion-mnist

In [15]:
# we use dask.delayed so that load and transform is lazily done in cluster
@dask.delayed
def transform(img):
    trn = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.5,), (0.5,),)
    ]) #normalize mean / std. given as tuples
    #we convert PIL image or numpy.ndarray [0,255] to torch.FloatTensor, (C,H,W) [0.0,1.0]
    return trn(img)

def transform_nonlazy(img):
    trn = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.5,), (0.5,),)
    ]) #normalize mean / std. given as tuples
    #we convert PIL image or numpy.ndarray [0,255] to torch.FloatTensor, (C,H,W) [0.0,1.0]
    return trn(img)

In [29]:
@dask.delayed
def load_fashionMNIST_trainset(transform):
    trainset = datasets.FashionMNIST('~/.pytorch/F_MNIST_data', download=True, train=True, transform = transform)
    return trainset

def load_fashionMNIST_trainset_nonlazy(transform):
    trainset = datasets.FashionMNIST('~/.pytorch/F_MNIST_data', download=True, train=True, transform = transform)
    return trainset

In [17]:
@dask.delayed
def load_fashionMNIST_testset(transform):
    testset = datasets.FashionMNIST('~/.pytorch/F_MNIST_data', download=True, train=False, transform = transform)
    return testset

In [None]:
# import s3fs

In [None]:
# fs = s3fs.S3FileSystem(anon=True)

In [None]:
# # https://github.com/zalandoresearch/fashion-mnist
# URL_BASE = "http://fashion-mnist.s3-website.eu-central-1.amazonaws.com"
# CHECKSUMS = {
#         "train-images-idx3-ubyte.gz": "3aede38d61863908ad78613f6a32ed271626dd12800ba2636569512369268a84",
#         "train-labels-idx1-ubyte.gz": "a04f17134ac03560a47e3764e11b92fc97de4d1bfaf8ba1a3aa29af54cc90845",
#         "t10k-images-idx3-ubyte.gz": "346e55b948d973a97e58d2351dde16a484bd415d4595297633bb08f03db6a073",
#         "t10k-labels-idx1-ubyte.gz": "67da17c76eaffca5446c3361aaab5c3cd6d1c2608764d35dfb1850b086bf8dd5",
#     }

# prefix = "train" #if split == train, else "t10k"
# images_file = f"{prefix}-images-idx3-ubyte.gz"
# labels_file = f"{prefix}-labels-idx1-ubyte.gz"
# # http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/

In [14]:

@dask.delayed
def load(path, fs=__builtins__):
    with fs.open(path, 'rb') as f:
        img = Image.open(f)
        return img

# @dask.delayed
# def load_csv(path):
#     import pandas as pd
#     return pd.read_csv(path, sep=',')

In [None]:
# objs = [load_csv(x) for x in glob.glob("fashion_mnist/dataset/fashion-mnist_train.csv")]

In [None]:
objs = load_fashionMNIST_trainset(transform)

In [None]:
x = dask.compute(objs)

In [None]:
images = x[0].data

In [None]:
labels = x[0].train_labels

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline

fig = plt.figure(figsize=(15,5))
for idx in np.arange(20):
    ax = fig.add_subplot(4, 20/4, idx+1, xticks=[], yticks=[])
    ax. imshow(np.squeeze(images[idx]), cmap='gray')
    ax.set_title(labels[idx].item())
    fig.tight_layout()

In [2]:
trainset_len = 6000 #cheating a bit - we already know length
indices = list(range(trainset_len))
split = int(np.floor(0.2 * trainset_len))
batch_size = 64
num_workers = 4 #would get it from distributed client
train_sampler = SubsetRandomSampler(indices[:split])

# import torch.utils.data as td

train_loader = torch.utils.data.DataLoader(
    objs, sampler=train_sampler, batch_size=batch_size, num_workers=num_workers
)
# train_loader = td.DataLoader(
#     objs, sampler=train_sampler, batch_size=batch_size, num_workers=num_workers
# )

NameError: name 'objs' is not defined

In [None]:
# tensors = [transform(x) for x in objs]

In [None]:
# batches = [dask.delayed(torch.stack)(batch)
#            for batch in toolz.partition_all(10, tensors)]

In [None]:
# images are 28x28 2d tensors. flatten to 1d vector
# 28*28 = 784 


In [None]:
# define NN architecture in this class.
# Pytorch's nn.module that we inherit for the architecture definition
# we use 3 hidden layers, 1 output layer

class Classifier(nn.Module):
    import torch.nn as nn
    def __init__(self):
        nn.Module.__init__(self)
        self.fc1 = nn.Linear(784, 256) # linear transformation from 784 input -> 256 output for first hidden layer
        self.fc2 = nn.Linear(256, 128)
        self.fc3 = nn.Linear(128, 64)
        self.fc4 = nn.Linear(64, 10)
#         define 20% dropout
        self.dropout = nn.Dropout(0.2)
    
    def forward(self, x):
        x = x.view(x.shape[0], -1)
        x = self.dropout(F.relu(self.fc1(x))) # we use RELU activation function for each hidden layer
        x = self.dropout(F.relu(self.fc2(x)))
        x = self.dropout(F.relu(self.fc3(x)))
#         dont use dropout on output layer
        x = F.log_softmax(self.fc3(x), dim=1) #final output layer use softmax for log-probabilities
        return x

In [None]:
model = Classifier()
#loss fn
criterion = nn.NLLLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01)

valid_loss_min = np.Inf #use high val to ensure weights are updated first time
epochs = 20 #short epoch for now
steps = 0
model.train() #model prep prior to training
train_losses, valid_losses = [],[]


In [None]:
@dask.delayed
def train(images, labels):
    optimizer.zero_grad()
    log_ps = model(images)
    loss = criterion(log_ps, labels)
    loss.backward()
    optimizer.step()
    running_loss += loss.item()*images.size(0)

In [None]:
@dask.delayed
def fake_train(x):
    return (x)

In [37]:
# tensors_01 = client.submit(load_fashionMNIST_trainset(transform))
tensors = []
tensor = client.submit(load_fashionMNIST_trainset(transform))
tensors.append(tensor)

In [45]:
xx = client.gather(tensors)

In [52]:
xxx = xx[0].result()

In [54]:
xxx.compute()

Key:       apply-687810a0-1f94-4a8d-a038-78c95b18a69f
Function:  execute_task
args:      ((<function apply at 0x7f83feb93ca0>, Dataset FashionMNIST
    Number of datapoints: 60000
    Root location: /Users/mk/.pytorch/F_MNIST_data
    Split: Train
    StandardTransform
Transform: <function transform at 0x7f84049de160>, (<class 'tuple'>, []), (<class 'dict'>, [])))
kwargs:    {}
Exception: 'TypeError("\'FashionMNIST\' object is not callable")'



TypeError: 'FashionMNIST' object is not callable

In [48]:
batches = []
for b in toolz.partition_all(10, tensors):
    print(b.result())
#     batch = client.submit(torch.stack, b)
#     batches.append(batch)


Key:       stack-f5ad50c8f8d53abb4b73b81319ff5201
Function:  execute_task
args:      ((<built-in method stack of type object at 0x11f504770>, (Delayed('apply-687810a0-1f94-4a8d-a038-78c95b18a69f'),)))
kwargs:    {}
Exception: 'TypeError("stack(): argument \'tensors\' (position 1) must be tuple of Tensors, not Delayed")'



In [9]:
import pickle

whole_dataset = datasets.FashionMNIST('~/.pytorch/F_MNIST_data', download=True, train=False, transform = transform)
train_loader = torch.utils.data.DataLoader(
    whole_dataset, sampler=train_sampler, batch_size=batch_size, num_workers=num_workers
)

for image, labels in train_loader:
    pass
# #     print(image)
# #     print(labels)

PicklingError: Can't pickle <function transform at 0x7f803fa2c040>: it's not the same object as __main__.transform

In [None]:
for e in range(epochs):
    valid_loss = 0
    running_loss = 0
    #train model
#     for images, labels in trainloader:
    for images, labels in objs:
        optimizer.zero_grad()
        log_ps = model(images)
        loss = criterion(log_ps, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()*images.size(0)
    
#     for images, labels in validloader:
#         log_ps = model(images)
#         loss = criterion(log_ps, labels)
#         valid_loss += loss.item()*images.size(0)

# running_loss =


In [None]:
# whole_trainset = datasets.FashionMNIST('~/.pytorch/F_MNIST_data', download=True, train=True, transform = transform)
# trainset_len = 6000 #cheating a bit - we already know length
# indices = list(range(trainset_len))
# split = int(np.floor(0.2 * trainset_len))
# batch_size = 64
# num_workers = 4 #would get it from distributed client
# sampler = SubsetRandomSampler(indices[:split])

# train_loader = torch.utils.data.DataLoader(
#     whole_trainset, sampler=sampler, batch_size=batch_size, num_workers=num_workers
# )

In [None]:
loader = load_fashionMNIST_trainset(transform)

In [None]:
loaded = dask.compute(loader)

In [None]:
# lazy load
# note we aren't using torch.utils.DataLoader
# whole_trainset = load_fashionMNIST_trainset(transform)
# whole_trainset = datasets.FashionMNIST('~/.pytorch/F_MNIST_data', download=True, train=True, transform = transform)

non-dask way

In [None]:
non_dask_transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.5,), (0.5,),)
    ])
whole_trainset = datasets.FashionMNIST('~/.pytorch/F_MNIST_data', download=True, train=True, transform = non_dask_transform)

In [None]:
trainset_len = 6000 #cheating a bit - we already know length
indices = list(range(trainset_len))
split = int(np.floor(0.2 * trainset_len))
batch_size = 64
num_workers = 4 #would get it from distributed client
validation_sampler = SubsetRandomSampler(indices[:split])

In [None]:
train_loader = torch.utils.data.DataLoader(
    whole_trainset, sampler=validation_sampler, batch_size=batch_size, num_workers=num_workers
)

In [None]:
dataiter = iter(train_loader)

In [None]:
images, labels = dataiter.next()

In [None]:
labels

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline

fig = plt.figure(figsize=(15,5))
for idx in np.arange(20):
    ax = fig.add_subplot(4, 20/4, idx+1, xticks=[], yticks=[])
    ax. imshow(np.squeeze(images[idx]), cmap='gray')
    ax.set_title(labels[idx].item())
    fig.tight_layout()