In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285398 sha256=e4f44577278faa13e70d5b2e6fb9e654e7bfb5af1e80640dc5badf07c999b02d
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [4]:
sample = 1000000

## [RandomData] Pyspark vs Python multiprocess

### Pyspark

In [2]:
import numpy as np
from pyspark.sql import SparkSession
import time
from tqdm import tqdm

In [5]:
def split_data_into_partitions(X, y, num_partitions=4):
    # Split the data into partitions
    data_partitions = []
    chunk_size = len(X) // num_partitions

    for i in range(num_partitions):
        start_idx = i * chunk_size
        end_idx = (i + 1) * chunk_size
        X_partition = X[start_idx:end_idx]
        y_partition = y[start_idx:end_idx]
        data_partitions.append((X_partition, y_partition))

    return data_partitions

def map_function(data_partition, params):
    # Compute gradients on a data partition using current model parameters
    X, y = data_partition
    gradients = np.dot(X.T, np.dot(X, params.value) - y)
    return gradients

def reduce_function(intermediate_results, learning_rate):
    # Combine gradients and update model parameters
    total_gradients = np.sum(intermediate_results, axis=0)
    updated_params = learning_rate * total_gradients
    return updated_params

def main():
    # Create a SparkSession
    spark = SparkSession.builder.appName("UNLV").getOrCreate()

    # Generate sample data for linear regression
    X = np.random.rand(sample, 3)  # 100 samples with 3 features
    y = 3 * X[:, 0] + 2 * X[:, 1] + np.random.randn(sample)  # Linear relationship with random noise

    # Split the data into partitions
    data_partitions = split_data_into_partitions(X, y)

    # Broadcast the initial model parameters to all workers
    params = spark.sparkContext.broadcast(np.zeros(X.shape[1]))

    # Set learning rate and number of iterations
    learning_rate = 0.1
    num_iterations = 100

    start_time = time.time()
    for _ in tqdm(range(num_iterations)):
        # Parallelize the data partitions
        rdd = spark.sparkContext.parallelize(data_partitions)

        # Map step: compute gradients on each data partition in parallel
        intermediate_results = rdd.map(lambda x: map_function(x, params)).collect()

        # Reduce step: combine gradients and update model parameters
        params = spark.sparkContext.broadcast(reduce_function(intermediate_results, learning_rate))
    end_time = time.time()
    print(f"Time: {(end_time - start_time) // 60}min {(end_time - start_time) % 60:.3f}sec\n")

    # # Use the final parameters for prediction
    # X_test = np.random.rand(100, 3)  # New test data
    # y_pred = np.dot(X_test, params.value)
    # print("Predictions:", y_pred)

    # Stop the SparkSession
    spark.stop()


if __name__ == "__main__":
    main()

(1000000, 3)
(1000000,)
[0. 0. 0.]


100%|██████████| 100/100 [01:20<00:00,  1.24it/s]


Time: 1.0min 20.584sec



### Python-multiproess

In [7]:
import numpy as np
import multiprocessing
import time
from tqdm import tqdm

In [8]:
def split_data_into_partitions(X, y, num_partitions=4):
    # Split the data into partitions
    data_partitions = []
    chunk_size = len(X) // num_partitions

    for i in range(num_partitions):
        start_idx = i * chunk_size
        end_idx = (i + 1) * chunk_size
        X_partition = X[start_idx:end_idx]
        y_partition = y[start_idx:end_idx]
        data_partitions.append((X_partition, y_partition))

    return data_partitions

def map_function(data_partition, params):
    # Compute gradients on a data partition using current model parameters
    X, y = data_partition
    gradients = np.dot(X.T, np.dot(X, params) - y)
    return gradients

def reduce_function(intermediate_results, learning_rate):
    # Combine gradients and update model parameters
    total_gradients = np.sum(intermediate_results, axis=0)
    updated_params = learning_rate * total_gradients
    return updated_params

def main():

    # Generate sample data for linear regression
    X = np.random.rand(sample, 3)
    y = 3 * X[:, 0] + 2 * X[:, 1] + np.random.randn(sample)  # Linear relationship with random noise

    # Split the data into partitions
    data_partitions = split_data_into_partitions(X, y)

    # Set initial model parameters
    params = np.zeros(X.shape[1])

    # Set learning rate and number of iterations
    learning_rate = 0.1
    num_iterations = 100

    start_time = time.time()

    for _ in tqdm(range(num_iterations)):
        # Create a multiprocessing Pool
        pool = multiprocessing.Pool()

        # Map step: compute gradients on each data partition in parallel
        intermediate_results = pool.starmap(map_function, [(data_partition, params) for data_partition in data_partitions])

        # Reduce step: combine gradients and update model parameters
        params = reduce_function(intermediate_results, learning_rate)

        # Close the multiprocessing Pool
        pool.close()
        pool.join()
    end_time = time.time()
    print(f"\nTime: {(end_time - start_time) // 60}min {(end_time - start_time) % 60:.3f}sec\n")

    # # Use the final parameters for prediction
    # X_test = np.random.rand(100, 3)  # New test data
    # y_pred = np.dot(X_test, params)
    # print("Predictionxs:", y_pred)


if __name__ == "__main__":
    main()

100%|██████████| 100/100 [00:17<00:00,  5.66it/s]


Time: 0.0min 17.678sec




