## Big Data Day 2 Afternoon Assignment

In this assignment, we will learn about machine learning with Dask. We will use the market basket dataset loaded below and cluster our data.

In [1]:
!pip install distributed>=2.4.0 --quiet
!pip install "dask[complete]" -U --quiet
!pip install -U ipykernel --quiet
!pip install dask-ml -U --quiet

In [2]:
import dask.dataframe as dd
import numpy as np
import pandas as pd

In [9]:
market_basket = dd.read_csv('./Mall_Customers.csv')

In [10]:
market_basket.head()

Unnamed: 0,CustomerID,Gender,Age,Annual Income (k$),Spending Score (1-100)
0,1,Male,19,15,39
1,2,Male,21,15,81
2,3,Female,20,16,6
3,4,Female,23,16,77
4,5,Female,31,17,40


To cluster our data, we must first examine and process it. The first step is to check for missing data. Do this in the cell below. If there is missing data, drop all rows containing missing data.

In [11]:
market_basket.isna().mean().compute()

CustomerID                0.0
Gender                    0.0
Age                       0.0
Annual Income (k$)        0.0
Spending Score (1-100)    0.0
dtype: float64

Next, we will get rid of the customer ID column. Drop the column and assign the resulting dataframe to a new variable.

In [12]:
market_basket = market_basket.drop(columns=['CustomerID'])

We will now create a dummy variable from the gender variable.

In [13]:
dummified = dd.get_dummies(market_basket.categorize()).compute()

In [14]:
dummified

Unnamed: 0,Age,Annual Income (k$),Spending Score (1-100),Gender_Male,Gender_Female
0,19,15,39,1,0
1,21,15,81,1,0
2,20,16,6,0,1
3,23,16,77,0,1
4,31,17,40,0,1
...,...,...,...,...,...
195,35,120,79,0,1
196,45,126,28,0,1
197,32,126,74,1,0
198,32,137,18,1,0


Since the data has different scales, we will scale all columns using min max scaling. We will write our own min max scaling function since the minmax scaler will produce numpy arrays instead of dask arrays. Recall that min max scaling requires finding the min and the max. We subtract the min from each observation and divide by the difference between the max and the min. Complete the function below.

In [15]:
def dask_min_max(x):
    #Input: dask column
    min_array = x.min(axis=0)
    max_array = x.max(axis=0)
    scaled = (x - min_array) / (max_array - min_array)
    #Output: dask column that has been min max scaled
    return scaled

Transform all columns using the scaler you wrote above.

In [16]:
scl = dask_min_max(dummified)
scl

Unnamed: 0,Age,Annual Income (k$),Spending Score (1-100),Gender_Male,Gender_Female
0,0.019231,0.000000,0.387755,1.0,0.0
1,0.057692,0.000000,0.816327,1.0,0.0
2,0.038462,0.008197,0.051020,0.0,1.0
3,0.096154,0.008197,0.775510,0.0,1.0
4,0.250000,0.016393,0.397959,0.0,1.0
...,...,...,...,...,...
195,0.326923,0.860656,0.795918,0.0,1.0
196,0.519231,0.909836,0.275510,0.0,1.0
197,0.269231,0.909836,0.744898,1.0,0.0
198,0.269231,1.000000,0.173469,1.0,0.0


Import the dask clustering function and cluster the data. Use 3 clusters and print the centroids.

In [19]:
from dask.distributed import Client, progress
client = Client(processes=False, threads_per_worker=4,
                n_workers=1, memory_limit='2GB')
client

0,1
Client  Scheduler: inproc://192.168.1.2/8868/1  Dashboard: http://192.168.1.2:8787/status,Cluster  Workers: 1  Cores: 4  Memory: 2.00 GB


In [21]:
from dask_ml import cluster

km = cluster.KMeans(n_clusters=3)
km.fit_transform(scl)
km.cluster_centers_

array([[0.57902098, 0.35916542, 0.34471243, 0.        , 1.        ],
       [0.41936189, 0.38710879, 0.48480983, 1.        , 0.        ],
       [0.20074224, 0.36612022, 0.68045113, 0.        , 1.        ]])