In [1]:
import rpy2.robjects as ro

In [2]:
from multiprocessing.managers import SharedMemoryManager
from brmspy.session.transport import ShmPool



if 'shm' not in globals():
    mgr = SharedMemoryManager()
    mgr.start()
    shm = ShmPool(mgr)

In [3]:
print("adr", mgr.address)

adr /var/folders/_1/qqn_3y451wscsjn69121kp_w0000gn/T/pymp-e_x_jehh/listener-0uo6vvxs


In [4]:
from typing import Any, cast
import numpy as np
import rpy2.robjects as ro
from rpy2.rinterface_lib.sexp import SexpVector
from rpy2.rinterface import SexpVectorWithNumpyInterface


def _get_vector_types(obj: Any) -> tuple[None | str, None | int]:
    if not isinstance(obj, SexpVectorWithNumpyInterface):
        return None, None

    dtypestr = obj._NP_TYPESTR
    itemsize = obj._R_SIZEOF_ELT

    if not dtypestr or not itemsize:
        return None, None

    return dtypestr, itemsize

def _rmatrix_to_numpy(obj: ro.Matrix,
                 shm: ShmPool | None = None):
    if len(obj.dim) != 2:
        raise Exception("Matrix with dims != 2. Unimplemented conversion")

    nrow, ncol = obj.dim

    # No shm, fall back to regular numpy
    if shm is None:
        return np.array(obj)
    
    dtypestr, itemsize = _get_vector_types(obj)
    print("dtypestr", dtypestr, itemsize)
    
    if not dtypestr or not itemsize:
        return np.array(obj)
    dtype = np.dtype(dtypestr)

    assert isinstance(obj, SexpVectorWithNumpyInterface) and isinstance(obj, SexpVector) # type fix
    if hasattr(obj, 'memoryview'):
        print("optimized!")
        src = cast(Any, obj).memoryview()
    else:
        return np.array(obj)
    
    expected_bytes = nrow * ncol * itemsize

    # Raw buffer view over R's underlying data (column-major)
    if src.nbytes != expected_bytes:
        raise RuntimeError(
            f"R matrix bytes={src.nbytes}, expected={expected_bytes}"
        )

    # Allocate shm once
    block = shm.alloc(expected_bytes)
    assert block.shm.buf

    # Single bulk copy: R → shm, no intermediate ndarray
    src_bytes = src.cast("B")
    block.shm.buf[:expected_bytes] = src_bytes

    # Wrap shm buffer as a numpy array, matching R's column-major layout
    arr = np.ndarray(
        shape=(nrow, ncol),
        dtype=dtype,
        buffer=block.shm.buf,
        order="F",  # same memory layout as R
    )

    return arr




In [5]:
import pandas as pd

def _rdf_to_pandas(df: ro.DataFrame, shm: ShmPool | None) -> pd.DataFrame:
    nrow = df.nrow
    #sexp_list: SexpVector = cast(SexpVector, df.__sexp__)         # this is a ListSexpVector of columns
    sexp_list = df
    colnames = list(df.colnames)

    if shm is None:
        # baseline fallback: let pandas convert normally
        print("no shm")
        return pd.DataFrame(
            data=np.array(df),
            index=df.rownames,
            columns=colnames,
        )

    columns: dict[str, np.ndarray] = {}

    for j, name in enumerate(colnames):
        col_sexp = sexp_list[j]  # low-level R vector for column j

        dtypestr, itemsize = _get_vector_types(col_sexp)
        if not dtypestr or not itemsize or not hasattr(col_sexp, 'memoryview'):
            # unsupported / weird column -> fall back to regular conversion
            col_py = ro.conversion.get_conversion().rpy2py(col_sexp)
            columns[name] = np.asarray(col_py)
            continue

        dtype = np.dtype(dtypestr)
        length = len(col_sexp)
        expected_bytes = length * itemsize

        # R → memoryview over this column
        src = cast(Any, col_sexp).memoryview()
        if src.nbytes != expected_bytes:
            raise RuntimeError(
                f"Column {name}: bytes={src.nbytes}, expected={expected_bytes}"
            )

        # shm block for this column
        block = shm.alloc(expected_bytes)
        assert block.shm.buf

        # single bulk copy R → shm
        src_bytes = src.cast("B")
        block.shm.buf[:expected_bytes] = src_bytes

        # shm-backed 1D numpy array, order irrelevant for 1D
        arr = np.ndarray(
            shape=(length,),
            dtype=dtype,
            buffer=block.shm.buf,
        )

        columns[name] = arr

    # pandas DataFrame over shm-backed columns
    return pd.DataFrame(
        data=columns,
        index=df.rownames,
    )

In [6]:
r_df = cast(ro.DataFrame, ro.r("""
as.data.frame(
  matrix(rnorm(8192L * 256L), nrow = 8192L, ncol = 256L)
)
"""))

In [7]:
import pandas as pd

In [8]:
_rdf_to_pandas(r_df, shm=shm)

Unnamed: 0,V1,V2,V3,V4,V5,V6,V7,V8,V9,V10,...,V247,V248,V249,V250,V251,V252,V253,V254,V255,V256
1,1.298048,-0.736436,-0.419567,0.180617,-0.653178,-0.081526,0.283151,-1.761420,-0.288986,2.405579,...,-1.418254,-0.699973,-1.361541,1.600579,0.452347,-1.067924,1.773163,0.740209,0.030115,-1.644407
2,0.015429,-0.243888,-2.425763,-0.354490,-2.100701,1.726358,-0.402972,0.248224,-0.618138,0.194784,...,-1.257760,-0.242491,-1.027403,0.027532,-1.210805,-1.481721,1.678513,1.171743,-0.680985,0.442252
3,-0.122057,0.780156,-1.907930,-0.667393,0.057294,1.543141,0.398303,1.302865,0.192921,1.476084,...,0.312416,-0.627529,-0.286933,2.131173,0.001487,0.106941,1.302346,0.949113,0.327126,-1.027965
4,0.591363,-0.108297,-0.946178,0.323740,1.033209,0.155030,-0.821305,-0.530579,0.757028,-1.254433,...,-0.754046,0.654189,-0.946687,0.599937,0.740304,0.159610,-0.426795,0.548803,-1.767500,-0.801445
5,0.074445,-0.925823,-0.298444,-1.218107,-0.101637,-2.203752,1.739921,-1.908283,0.463878,-0.377461,...,-1.140191,-0.166229,0.807262,0.644456,0.360629,-0.609128,0.108333,-0.011444,-0.280462,-0.096680
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
8188,0.662164,0.677685,0.291314,2.384612,0.401014,0.950470,1.165750,-0.440806,1.257648,-0.035546,...,0.062003,-0.389184,1.841529,0.032324,0.183058,0.441180,-0.041187,0.204095,0.074507,-1.376908
8189,0.473994,-0.115991,-0.594708,-1.738757,-0.168861,0.561432,0.238050,1.009036,-0.460172,0.877910,...,-1.537673,0.390030,-0.160152,-0.227907,-0.295902,-3.130483,0.232074,-0.051284,1.049954,-0.427067
8190,0.464474,0.501962,0.986330,-0.179029,-0.190292,0.393374,0.701150,-0.598398,1.411626,-0.627554,...,-0.674681,-2.401252,0.814551,0.384828,0.432405,-1.276439,1.594302,-0.802426,-1.372584,-2.194541
8191,0.113200,0.456533,-0.645067,-0.342442,0.979564,0.063055,0.541286,0.383565,-0.792371,-0.316488,...,0.175943,0.477764,2.060170,-0.890163,0.605869,-0.583218,-0.011579,0.679388,0.862789,-0.525197


In [5]:
#%uv pip install -q memory-profiler
%load_ext memory_profiler

In [6]:
import gc
import time
import os
import sys
import psutil
import resource

_proc = psutil.Process(os.getpid())

def _snapshot():
    """Return (rss_bytes, peak_bytes)."""
    rss = _proc.memory_info().rss
    ru = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
    # ru_maxrss: bytes on macOS, kilobytes on Linux
    if sys.platform == "darwin":
        peak_bytes = ru
    else:
        peak_bytes = ru * 1024
    return rss, peak_bytes

def measure_mem(fn, *args, steps: int = 1, **kwargs):
    """
    Run fn(*args, **kwargs) several times and measure:
      - per-run change in RSS (live memory)
      - per-run increase in peak RSS (max usage during call)

    Returns MB deltas.
    """
    rss_deltas = []
    peak_deltas = []

    for _ in range(steps):
        gc.collect()
        ro.r('gc()')
        before_rss, before_peak = _snapshot()

        result = fn(*args, **kwargs)

        time.sleep(0.05)
        after_rss, after_peak = _snapshot()


        del result
        time.sleep(0.05)

        rss_deltas.append((after_rss - before_rss) / 1024**2)
        peak_deltas.append((after_peak - before_peak) / 1024**2)
        gc.collect()
        ro.r('gc()')

    return {
        "rss_avg_mb":  sum(rss_deltas) / len(rss_deltas),
        "rss_min_mb":  min(rss_deltas),
        "rss_max_mb":  max(rss_deltas),
        "rss_raw_mb":  rss_deltas,
        "peak_avg_mb": sum(peak_deltas) / len(peak_deltas),
        "peak_min_mb": min(peak_deltas),
        "peak_max_mb": max(peak_deltas),
        "peak_raw_mb": peak_deltas,
    }


In [7]:
import rpy2.rinterface as ri


def do_raw():
    r_matrix = ri.evalr("""
    nrow <- 8192L
    ncol <- 8192L

    m <- matrix(0, nrow, ncol)
    m
    """)
    ar = np.array(r_matrix)
    return ar, r_matrix

def do_shm():
    r_matrix = cast(ro.Matrix, ro.r("""
    nrow <- 8192L
    ncol <- 8192L

    m <- matrix(0, nrow, ncol)
    m
    """))
    ar = _r2py_matrix(r_matrix, shm=shm)
    return ar, r_matrix

In [8]:
#%timeit -n 1 -r 1 do_raw()
%timeit -n 1 -r 1 do_shm()


118 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [9]:
print("raw", measure_mem(do_raw, steps=1)['rss_max_mb'])
print("shm", measure_mem(do_shm, steps=1)['rss_max_mb'])

raw 1024.046875
shm 512.0
