# Tutorial 04: Data Movement

This tutorial introduces how to move data between devices. 
- Data Movement - `clone_here` and `copy`, 
- Prefetched Data Movement - via Parla Managed Arrays (PArrays)


In [45]:
from typing import Callable, Optional
from time import perf_counter, sleep
import os

def set_numpy_threads(threads: int = 1):
    """
    Numpy can be configured to use multiple threads for linear algebra operations.
    The backend used by numpy can vary by installation.
    This function attempts to set the number of threads for the most common backends.
    MUST BE CALLED BEFORE IMPORTING NUMPY.

    Args:
        threads (int, optional): The number of threads to use. Defaults to 1.
    """

    os.environ["NUMEXPR_NUM_THREADS"] = str(threads)
    os.environ["OMP_NUM_THREADS"] = str(threads)
    os.environ["OPENBLAS_NUM_THREADS"] = str(threads)
    os.environ["VECLIB_MAXIMUM_THREADS"] = str(threads)
    os.environ["MKL_NUM_THREADS"] = str(threads)

    try:
        # Controlling the MKL backend can use mkl and mkl-service modules if installed.
        # preferred method for controlling the MKL backend.
        import mkl

        mkl.set_num_threads(threads)
    except ImportError:
        pass
    
set_numpy_threads(1)
import numpy as np 
import cupy as cp 

# Handle for Parla runtime
from parla import Parla

# Spawning task API
from parla.tasks import (
    spawn,
    TaskSpace,
    get_current_context,
    get_current_task,
)

# Device Architectures for placement
from parla.devices import cpu, gpu


def run(function: Callable[[], Optional[TaskSpace]]):
    assert callable(function), "The function argument must be callable."

    # Start the Parla runtime.
    with Parla():
        # Create a top-level task to kick off the computation
        @spawn(placement=cpu, vcus=0)
        async def top_level_task():
            # Note that unless function returns a TaskSpace, this will NOT be blocking.
            # If you want to wait on the completion of the tasks launched by function, you must return a TaskSpace that contains their terminal tasks.
            await function()

    # Runtime exists at the end of the context_manager
    # All tasks are guaranteed to be complete at this point

In [46]:
from parla.array import clone_here, copy

In [48]:
def clone_here_copy_example_wrapper():
    import numpy as np
    import cupy as cp
    M = 5
    N = 5
    A = np.random.rand(M)
    B = cp.arange(N)
    
    def clone_here_copy_example():
        T = TaskSpace("T")
        
        @spawn(placement=[cpu if np.random.rand() < 0.5 else gpu])
        def task():
            print(get_current_task(), " running on ", get_current_context())
            C = clone_here(A)
            print("C is a", "Numpy Array" if isinstance(C, np.ndarray) else f"Cupy Array on GPU[{C.device}]", flush=True)
            
            # Writing back to B from any source
            # TODO: FIX!
            # copy(B, C[:N])
            
        return T
    
    run(clone_here_copy_example)
    
    print("B = ", B, flush=True)
        
clone_here_copy_example_wrapper()

Task(global_1)  running on  GPUEnvironment(GPU:0)
C is a Cupy Array on GPU[<CUDA Device 0>]
B =  [0 1 2 3 4]


In [50]:
from parla.array import asarray as parla_asarray

In [57]:


async def parray_example():

    A = np.random.rand(5)
    A = parla_asarray(A)

    @spawn(placement=[cpu if np.random.rand() < 0.5 else gpu], input=[A])
    def task():
        print(get_current_task(), " running on ", get_current_context())
        print(
            "A is a",
            "Numpy Array"
            if isinstance(A.array, np.ndarray)
            else f"Cupy Array on GPU[{A.array.device}]",
            flush=True,
        )
        A.print_overview()


run(parray_example)

Running data movement task: b'global_1.dm.0', NA 0
Task(global_1)  running on  GPUEnvironment(GPU:0)
A is a Cupy Array on GPU[<CUDA Device 0>]
---Overview of PArray
ID: 133866630519392, Name: NA, Parent_ID: None, Slice: None, Bytes: 40, Owner: GPU 0
At GPU 0: state: SHARED
At CPU: state: SHARED
---End of Overview


In [82]:
async def parray_example():
    T = TaskSpace("T")

    A = np.ones(5)
    A = parla_asarray(A)

    @spawn(T[0], placement=[cpu if np.random.rand() < 0.5 else gpu], input=[A])
    def task():
        print(get_current_task(), " running on ", get_current_context())
        print(
            "A is a",
            "Numpy Array"
            if isinstance(A.array, np.ndarray)
            else f"Cupy Array on GPU[{A.array.device}]",
            flush=True,
        )
        A.print_overview()
        print(A.array)

    @spawn(T[1], [T[0]], placement=[cpu if np.random.rand() < 0.5 else gpu], inout=[A])
    def task():
        print(get_current_task(), " running on ", get_current_context())
        print(
            "A is a",
            "Numpy Array"
            if isinstance(A.array, np.ndarray)
            else f"Cupy Array on GPU[{A.array.device}]",
            flush=True,
        )
        A.print_overview()
        A[:] = A + 1

    @spawn(T[2], [T[0], T[1]], placement=[cpu if np.random.rand() < 0.5 else gpu], inout=[A])
    def task():
        print(get_current_task(), " running on ", get_current_context())
        print(
            "A is a",
            "Numpy Array"
            if isinstance(A.array, np.ndarray)
            else f"Cupy Array on GPU[{A.array.device}]",
            flush=True,
        )
        A.print_overview()
        print(A.array)


run(parray_example)

Running data movement task: b'T_0.dm.0', NA 0
Running data movement task: b'T_1.dm.0', NA 2
Task(T_0)  running on  GPUEnvironment(GPU:0)
Exception in Task  Task(T_0) :  'NoneType' object has no attribute 'device' Traceback (most recent call last):
  File "tasks.pyx", line 472, in parla.cython.tasks.Task.run
  File "tasks.pyx", line 661, in parla.cython.tasks.ComputeTask._execute_task
  File "scheduler.pyx", line 494, in parla.cython.scheduler._task_callback
  File "/tmp/ipykernel_311020/3928032198.py", line 14, in task
    else f"Cupy Array on GPU[{A.array.device}]",
                              ^^^^^^^^^^^^^^
AttributeError: 'NoneType' object has no attribute 'device'

Exception in Worker Thread  <WorkerThread(Thread-651, started 133865918154304)> :  'NoneType' object has no attribute 'device' Traceback (most recent call last):
  File "scheduler.pyx", line 196, in parla.cython.scheduler.WorkerThread.run
  File "scheduler.pyx", line 273, in parla.cython.scheduler.WorkerThread.run
parl

Exception in thread Thread-651:
Traceback (most recent call last):
  File "scheduler.pyx", line 196, in parla.cython.scheduler.WorkerThread.run
  File "scheduler.pyx", line 273, in parla.cython.scheduler.WorkerThread.run
parla.cython.scheduler.TaskBodyException: 'NoneType' object has no attribute 'device'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/wlruys/software/miniforge3/envs/py312/lib/python3.12/threading.py", line 1052, in _bootstrap_inner
    self.run()
  File "scheduler.pyx", line 327, in parla.cython.scheduler.WorkerThread.run
parla.cython.scheduler.WorkerThreadException: Unhandled Exception in Task: T_0
