In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=a299db90907fdd6d01606117d672f52c63d02448d9dc447cd98d8bd4c187bb5b
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
import numpy as np
import time
import multiprocessing
import psutil  # For system monitoring

In [None]:


# Simulate large-scale dataset
data_size = 1000000
data = np.random.random(data_size)

# Function to simulate processing task
def process_data(data_chunk):
    # Perform some computation on the data
    result = np.mean(data_chunk)
    return result

# Function to dynamically allocate resources based on system metrics
def dynamic_resource_allocation():
    try:
      while True:
          # Get current system metrics
          cpu_percent = psutil.cpu_percent()  # CPU utilization percentage
          available_memory = psutil.virtual_memory().available  # Available memory in bytes

          # Define resource allocation based on system metrics
          if cpu_percent < 80 and available_memory > 2 * data_size * 8:  # Example thresholds
              num_processes = multiprocessing.cpu_count()  # Use all available cores
          else:
              num_processes = multiprocessing.cpu_count() // 2  # Use half of the available cores

          # Split data into chunks for parallel processing
          data_chunks = np.array_split(data, num_processes)

          # Process data chunks in parallel
          start_time = time.time()
          with multiprocessing.Pool(num_processes) as pool:
              results = pool.map(process_data, data_chunks)
          parallel_time = time.time() - start_time

          print(f"Parallel processing with {num_processes} processes took {parallel_time} seconds")

          # Sleep for a while before checking system metrics again
          time.sleep(10)  # Adjust as needed
    except KeyboardInterrupt:
        print("Program interrupted. Exiting gracefully...")
        # Additional cleanup if needed

# Main function
def main():
    dynamic_resource_allocation()

if __name__ == "__main__":
    main()


Parallel processing with 2 processes took 0.09510135650634766 seconds
Program interrupted. Exiting gracefully...


In [None]:
import time
from pyspark import SparkContext

# Initialize Spark context
sc = SparkContext("local", "ParallelAlgorithms")

# Define process_data function (for demonstration purposes)
def process_data(data):
    return data * 2

# Define your data (or pass it as a parameter to the functions)
data = sc.parallelize(range(1000000))  # Example data

# 1. MapReduce
def map_reduce_example(data):
    # Map phase: Process each element in parallel
    mapped_data = data.map(process_data)

    # Reduce phase: Aggregate results
    reduced_result = mapped_data.reduce(lambda x, y: x + y)

    return reduced_result

# 2. Parallel Sorting
def parallel_sorting_example(data):
    sorted_data = data.sortBy(lambda x: x)
    return sorted_data.collect()

# 3. Parallel Search
def parallel_search_example(data, target):
    # Perform parallel search
    result = data.filter(lambda x: x == target).collect()
    return result

# 4. Parallel Join
def parallel_join_example(data1, data2):
    # Example datasets
    data1 = sc.parallelize([(1, 'A'), (2, 'B'), (3, 'C')])
    data2 = sc.parallelize([(1, 'X'), (2, 'Y'), (4, 'Z')])

    # Perform parallel join
    joined_data = data1.join(data2)
    return joined_data.collect()

# 5. Parallel Aggregation
def parallel_aggregation_example(data):
    aggregated_result = data.reduce(lambda x, y: x + y)
    return aggregated_result

# 6. Parallel Graph Algorithms (e.g., Parallel BFS)
def parallel_bfs_example(graph, start_node):
    # Perform parallel BFS
    visited = []
    queue = [start_node]

    while queue:
        node = queue.pop(0)
        if node not in visited:
            visited.append(node)
            neighbors = graph[node]
            queue.extend(neighbors)

    return visited

# 7. Parallel Machine Learning Algorithms (e.g., Parallel k-means)
def parallel_kmeans_example(data, k):
    # Perform parallel k-means clustering
    # (Note: This is a simplified example)
    centroids = data.takeSample(False, k)
    # Iterate until convergence
    # Implement k-means clustering algorithm here
    return centroids

# Example usage
print("MapReduce Example:", map_reduce_example(data))
print("Parallel Sorting Example:", parallel_sorting_example(data))
print("Parallel Search Example:", parallel_search_example(data, 42))
print("Parallel Join Example:", parallel_join_example(data, data))
print("Parallel Aggregation Example:", parallel_aggregation_example(data))
print("Parallel BFS Example:", parallel_bfs_example({1: [2, 3], 2: [4, 5], 3: [], 4: [], 5: []}, 1))
print("Parallel k-means Example:", parallel_kmeans_example(data, 3))

# Stop Spark context
sc.stop()


MapReduce Example: 999999000000


IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Current values:
NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
NotebookApp.rate_limit_window=3.0 (secs)



Parallel Search Example: [42]
Parallel Join Example: [(2, ('B', 'Y')), (1, ('A', 'X'))]
Parallel Aggregation Example: 499999500000
Parallel BFS Example: [1, 2, 3, 4, 5]
Parallel k-means Example: [179643, 200926, 302507]


In [9]:
import multiprocessing
import time

# Example data for demonstration
data = list(range(10))

# 1. Parallel Map
def parallel_map(function, data):
    with multiprocessing.Pool() as pool:
        results = pool.map(function, data)
    return results

# 2. Parallel Reduce
def parallel_reduce(function, data):
    with multiprocessing.Pool() as pool:
        results = pool.map(function, data)
    return sum(results)

# 3. Parallel Task Queue
def worker(task_queue, result_queue):
    while True:
        task = task_queue.get()
        if task is None:
            break
        result = process_task(task)
        result_queue.put(result)

def parallel_task_queue(tasks):
    task_queue = multiprocessing.Queue()
    result_queue = multiprocessing.Queue()

    # Start worker processes
    num_processes = multiprocessing.cpu_count()
    processes = [multiprocessing.Process(target=worker, args=(task_queue, result_queue)) for _ in range(num_processes)]
    for process in processes:
        process.start()

    # Distribute tasks to worker processes
    for task in tasks:
        task_queue.put(task)

    # Stop worker processes
    for _ in range(num_processes):
        task_queue.put(None)
    for process in processes:
        process.join()

    # Retrieve results from result queue
    results = []
    while not result_queue.empty():
        result = result_queue.get()
        results.append(result)

    return results

# Example function for processing data
def process_data(data):
    # Simulate some processing time
    time.sleep(1)
    return data * 2

# Example function for reducing data
def reduce_data(data):
    return sum(data)

if __name__ == "__main__":
    # Example usage
    print("Parallel Map Example:", parallel_map(process_data, data))
    print("Parallel Reduce Example:", parallel_reduce(process_data, data))
    print("Parallel Task Queue Example:", parallel_task_queue(data))


Parallel Map Example: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
Parallel Reduce Example: 90


Process Process-25:
Process Process-26:
Traceback (most recent call last):
Traceback (most recent call last):
  File "/usr/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/usr/lib/python3.10/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "<ipython-input-9-dab7ecb335f5>", line 25, in worker
    result = process_task(task)
NameError: name 'process_task' is not defined
  File "/usr/lib/python3.10/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-9-dab7ecb335f5>", line 25, in worker
    result = process_task(task)
NameError: name 'process_task' is not defined


Parallel Task Queue Example: []


MPI


In [8]:
pip install mpi4py

Collecting mpi4py
  Downloading mpi4py-3.1.6.tar.gz (2.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.4/2.4 MB[0m [31m10.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Building wheels for collected packages: mpi4py
  Building wheel for mpi4py (pyproject.toml) ... [?25l[?25hdone
  Created wheel for mpi4py: filename=mpi4py-3.1.6-cp310-cp310-linux_x86_64.whl size=2746313 sha256=6a8f11a6e8e1aec39cec9fb83f1ba62be8dde7cf8fe76b08e61a0950482f6d05
  Stored in directory: /root/.cache/pip/wheels/4c/ca/89/8fc1fb1c620afca13bb41c630b1f948bbf446e0aaa4b762e10
Successfully built mpi4py
Installing collected packages: mpi4py
Successfully installed mpi4py-3.1.6


In [18]:
from mpi4py import MPI

def distribute_data(data, size):

    data_size = 1000000
    data = np.random.random(data_size)
    data_chunks = []
    chunk_size = len(data) // size
    remainder = len(data) % size
    start_idx = 0

    for i in range(size):
        end_idx = start_idx + chunk_size + (1 if i < remainder else 0)
        data_chunks.append(data[start_idx:end_idx])
        start_idx = end_idx

    return data_chunks

if __name__ == "__main__":
    comm = MPI.COMM_WORLD
    rank = comm.Get_rank()
    size = comm.Get_size()

# Master-Slave Architecture
    if rank == 0:  # Master process
        # Allocate memory for data and results arrays
        cluster_array = ...
        assignment_results = ...

        # Read data from disk or memory
        data = ...

        size_buffer = bytearray(4)  # Assuming an integer requires 4 bytes
        size_buffer[:4] = size.to_bytes(4, byteorder='big')  # Convert integer to bytes and store in buffer

        # Broadcast size to all processes
        comm.Bcast(size_buffer, root=0)

        # Distribute data across processors
        data_chunks = distribute_data(data, size)
        for i in range(1, size):
            comm.Send(data_chunks[i], dest=i)

        # Receive results from slaves
        for i in range(1, size):
            assignment_results[i] = comm.Recv(source=i)

        # Analyze results and perform further computations

    else:  # Slave processes
        # Receive processor number from master
        processor_number = comm.Bcast(None, root=0)

        # Receive data chunk from master
        data_chunk = comm.Recv(source=0)

        # Perform k-means assignment for assigned data chunk
        local_assignments = kmeans_assignment(data_chunk)

        # Send results back to master
        comm.Send(local_assignments, dest=0)