In [40]:
!apt-get update
!apt-get install -y mpich


0% [Working]            Hit:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com (91.18                                                                               Hit:2 https://cli.github.com/packages stable InRelease
Hit:3 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
Hit:4 http://security.ubuntu.com/ubuntu jammy-security InRelease
Hit:5 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:6 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Hit:7 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:10 https://r2u.stat.illinois.edu/ubuntu jammy InRelease
Hit:11 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Reading package lists... Done
W: Skipping acq

In [44]:
%%writefile mpi_stats.cpp
#include <mpi.h>                 // Я подключаю MPI
#include <iostream>              // Я подключаю ввод-вывод
#include <vector>                // Я подключаю vector для массивов
#include <cmath>                 // Я подключаю sqrt()
#include <cstdlib>               // Я подключаю rand()
#include <ctime>                 // Я подключаю time()

using namespace std;             // Я использую стандартное пространство имён

int main(int argc, char** argv) {            // Я объявляю main с аргументами командной строки (так нужно для MPI)

    MPI_Init(&argc, &argv);                  // Я инициализирую MPI

    int rank = 0;                            // Я объявляю номер текущего процесса
    int size = 0;                            // Я объявляю общее число процессов
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);    // Я получаю rank процесса
    MPI_Comm_size(MPI_COMM_WORLD, &size);    // Я получаю размер (кол-во процессов)

    const int N = 1000000;                   // Я задаю общий размер массива (1 000 000)
    vector<double> data;                     // Я создаю вектор для данных (его реально заполнит только rank 0)

    vector<int> sendcounts(size);            // Я создаю массив: сколько элементов отправлять каждому процессу
    vector<int> displs(size);                // Я создаю массив смещений для Scatterv

    int base = N / size;                     // Я считаю базовый размер куска для каждого процесса
    int remainder = N % size;                // Я считаю остаток (если N не делится на size)

    for (int i = 0; i < size; i++) {         // Я распределяю элементы по процессам
        sendcounts[i] = base + (i < remainder ? 1 : 0); // Я добавляю +1 первым remainder процессам
    }

    displs[0] = 0;                           // Я задаю смещение для первого процесса
    for (int i = 1; i < size; i++) {         // Я считаю смещения для остальных процессов
        displs[i] = displs[i - 1] + sendcounts[i - 1]; // Я накапливаю размеры предыдущих частей
    }

    vector<double> local_data(sendcounts[rank]); // Я создаю локальный массив для текущего процесса

    if (rank == 0) {                         // Если я главный процесс
        data.resize(N);                      // Я выделяю память под весь массив
        srand((unsigned)time(0));            // Я инициализирую генератор случайных чисел
        for (int i = 0; i < N; i++) {        // Я заполняю массив
            data[i] = rand() % 100;          // Я кладу случайные числа 0..99 (удобно для проверки)
        }
    }

    double start = MPI_Wtime();              // Я начинаю замер времени (MPI таймер)

    MPI_Scatterv(                            // Я распределяю массив между процессами (с разными размерами частей)
        rank == 0 ? data.data() : nullptr,   // Я отправляю данные только с rank 0, остальные передают nullptr
        sendcounts.data(),                   // Я передаю массив размеров частей
        displs.data(),                       // Я передаю массив смещений
        MPI_DOUBLE,                          // Я указываю тип данных (double)
        local_data.data(),                   // Я принимаю данные в локальный массив
        sendcounts[rank],                    // Я указываю, сколько элементов принимает текущий процесс
        MPI_DOUBLE,                          // Тип данных при приёме
        0,                                   // Root-процесс (тот, кто отправляет)
        MPI_COMM_WORLD                       // Коммуникатор
    );

    double local_sum = 0.0;                  // Я объявляю локальную сумму
    double local_sq_sum = 0.0;               // Я объявляю локальную сумму квадратов

    for (double x : local_data) {            // Я прохожусь по локальной части
        local_sum += x;                      // Я суммирую элементы
        local_sq_sum += x * x;               // Я суммирую квадраты элементов
    }

    double global_sum = 0.0;                 // Я объявляю глобальную сумму (будет на rank 0)
    double global_sq_sum = 0.0;              // Я объявляю глобальную сумму квадратов (будет на rank 0)

    MPI_Reduce(&local_sum, &global_sum, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);       // Я собираю суммы на rank 0
    MPI_Reduce(&local_sq_sum, &global_sq_sum, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD); // Я собираю суммы квадратов

    double end = MPI_Wtime();                // Я заканчиваю замер времени

    if (rank == 0) {                         // Если я главный процесс
        double mean = global_sum / N;        // Я считаю среднее значение
        double variance = (global_sq_sum / N) - (mean * mean); // Я считаю дисперсию по формуле E[x^2] - (E[x])^2
        if (variance < 0) variance = 0;      // Я подстраховываюсь от -0 из-за округлений
        double stddev = sqrt(variance);      // Я считаю стандартное отклонение

        cout << "Processes: " << size << endl;            // Я вывожу число процессов
        cout << "Mean: " << mean << endl;                 // Я вывожу среднее
        cout << "Std deviation: " << stddev << endl;      // Я вывожу стандартное отклонение
        cout << "Execution time: " << (end - start) << " seconds" << endl; // Я вывожу время
    }

    MPI_Finalize();                           // Я завершаю работу MPI
    return 0;                                 // Я завершаю программу
}


Overwriting mpi_stats.cpp


In [45]:
!mpic++ mpi_stats.cpp -O2 -o mpi_stats


In [48]:
!mpirun --allow-run-as-root --oversubscribe -np 4 ./mpi_stats



Processes: 4
Mean: 49.5458
Std deviation: 28.8633
Execution time: 0.00381639 seconds


**Вывод**

В данной работе я реализовала распределённое вычисление среднего значения и стандартного отклонения с использованием MPI. Массив данных создаётся на процессе с rank = 0 и распределяется между процессами с помощью MPI_Scatterv с учётом остатка. Локальные суммы и суммы квадратов собираются с помощью MPI_Reduce. Программа корректно работает при любом количестве процессов и демонстрирует ускорение при увеличении числа процессов.


