In [1]:
! pip install mpi4py

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting mpi4py
  Downloading mpi4py-3.1.4.tar.gz (2.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.5/2.5 MB[0m [31m36.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Building wheels for collected packages: mpi4py
  Building wheel for mpi4py (pyproject.toml) ... [?25l[?25hdone
  Created wheel for mpi4py: filename=mpi4py-3.1.4-cp39-cp39-linux_x86_64.whl size=3380655 sha256=972eb29a3a7ac94a61750d6dff4d7509de46cf850c591184bd789a4192c3294a
  Stored in directory: /root/.cache/pip/wheels/db/81/9f/43a031fce121c845baca1c5d9a1468cad98208286aa2832de9
Successfully built mpi4py
Installing collected packages: mpi4py
Successfully installed mpi4py-3.1.4


In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# Collective communication

In [3]:
%%writefile LB4CollectiveCommunication.py

from mpi4py import MPI
from sklearn.preprocessing import MinMaxScaler
import pandas as pd
import numpy as np
import timeit
import warnings

warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.simplefilter(action='ignore', category=RuntimeWarning)

def create_clusters(centroids, num_clusters, data):
    num_data_points, _ = np.shape(data)
    cluster_idx = np.empty(num_data_points)
    for point in range(num_data_points):
        cluster_idx[point] = closest_centroid(data[point], centroids, num_clusters)
    return cluster_idx

def compute_means(cluster_idx, num_clusters, data):
    centroids = np.empty((num_clusters, data.shape[1]))
    for k in range(num_clusters):
        cluster_points = data[cluster_idx == k] 
        centroids[k] = np.mean(cluster_points, axis=0)
    return centroids

def euclidean_distance(x1, x2):
    squared_distance = np.sum(np.power(x1 - x2, 2))
    distance = np.sqrt(squared_distance)
    return distance

def closest_centroid(data, centroids, num_clusters):
    distances = [euclidean_distance(data, centroid) for centroid in centroids]
    closest_idx = np.argmin(distances)
    return closest_idx  

def initialize_random_centroids(num_clusters, data):
    m, n = np.shape(data)
    centroids = data[np.random.choice(m, size=num_clusters, replace=False)]
    return centroids

def run_Kmeans(num_clusters, data, max_iterations=50):
    centroids = initialize_random_centroids(num_clusters, data)
    for _ in range(max_iterations):
        clusters = create_clusters(centroids, num_clusters, data)
        previous_centroids = centroids
        centroids = compute_means(clusters, num_clusters, data)
        if np.allclose(previous_centroids, centroids):
            return clusters, centroids
    return clusters, centroids

def score_within_cluster_dispersion(cluster, data_clusters):
    cluster_data = data_clusters[cluster]
    cluster_size = cluster_data.shape[0]
    cluster_dispersion = np.sum(np.var(cluster_data, ddof=0, axis=0))
    return cluster_size * cluster_dispersion

def calculate_index(data):
    data_clusters = {}
    data_features = data.copy()    
    label_target = 'XCoord'
    data_target = data_features.pop(label_target)
    labels_clusters = np.unique(data_target)
    num_clusters = len(labels_clusters)

    data_frame = data.copy()

    for cluster in labels_clusters:
        data_clusters[cluster] = data_frame[data_frame[label_target] == cluster].drop(columns=label_target)

    num_observation_for_specific_cluster = {cluster: len(data_clusters[cluster])
             for cluster in labels_clusters}
    
    B = pd.DataFrame()
    data_centroids = data_frame.groupby(by=label_target).mean().T
    data_barycenter = data_features.mean()

    for cluster in labels_clusters:
        B = B.append(np.sqrt(num_observation_for_specific_cluster[cluster]) *
        (data_centroids[cluster] - data_barycenter), ignore_index=True)
    
    scatter_matrix_between_group_BG = B.T.dot(B)

    score_between_group_dispersion = np.trace(scatter_matrix_between_group_BG)

    BGSS_red = score_between_group_dispersion / (num_clusters - 1)

    score_pooled_within_cluster_dispersion = np.sum([score_within_cluster_dispersion(cluster, data_clusters) for cluster in labels_clusters])
    
    num_observations = len(data_features)
    WGSS_red = score_pooled_within_cluster_dispersion / (num_observations - num_clusters)

    index = BGSS_red / WGSS_red

    return index

def main():
    start_timer = timeit.default_timer()   

    comm = MPI.COMM_WORLD
    rank = comm.Get_rank()
    size = comm.Get_size()

    send_size = 0
    recv_size = 0
    send_buf = None
    recv_buf = None
    data = None 
    centroids = None

    if rank == 0:
        data = pd.read_csv('/content/drive/MyDrive/Datasets/brooklyn_sales_map.csv', low_memory=False)
        data.drop(data.columns.difference(['XCoord','YCoord']), 1, inplace=True)
        df = data[data.XCoord.notnull()]
        coordinates = df.head(1000)

        send_buf = coordinates
        send_size = coordinates.size
        Mind = np.array(size, dtype='d')
        Kmean = np.array(size, dtype='d')
            
    send_size = comm.bcast(send_size, root = 0)
    send_buf = comm.bcast(send_buf, root = 0)
    mind = 0

    if rank != 0:
        n = int(send_size / size)
        scaler = MinMaxScaler()
        rndprem = np.random.permutation(send_buf.shape[0])
        send_buf = send_buf.iloc[rndprem[0:500], :].copy()
        coordinates = send_buf
        send_buf = scaler.fit_transform(send_buf)
        kmeans, centroids = run_Kmeans(4, send_buf)
        mind = calculate_index(coordinates)

    Mind = comm.gather(mind, root=0)
    Kmean = comm.gather(centroids, root=0)

    if rank == 0:
        max_val = 0
        max_index = 0
        for i in range(size - 1):
            if (Mind[i] > max_val):
                max_index = i
                max_val = Mind[i]

        print('Max index: ', max_index)
        print('Max value: ',  max_val)
        
        centroid = Kmean[max_index]
        print('Centroids: ', centroid) 

    time = timeit.default_timer() - start_timer
    print('Running time: {:2.4f} sec'.format(time))

main()

Writing LB4CollectiveCommunication.py


In [4]:
! mpirun -n 8 --allow-run-as-root --oversubscribe python LB4CollectiveCommunication.py

Running time: 87.6052 sec
Running time: 86.6113 sec
Running time: 89.4801 sec
Running time: 88.5526 sec
Running time: 89.4020 sec
Running time: 90.8041 sec
Running time: 91.1600 sec
Max index:  6
Max value:  202.32017322826792
Centroids:  [[0.9749057  0.86584511]
 [0.97373962 0.94081565]
 [0.         0.        ]
 [0.97329076 0.75612901]]
Running time: 91.2745 sec
