In [None]:
import time
import numpy as np
import redis

Let's allocate a medium sized message.

mb = 2**20
gb = 2**30
message_size = 512 * mb
msg = b"\0"*message_size
msg_buf = memoryview(msg)

In [None]:
from functools import partial

In [None]:
def time_redis_send(c, msg):
    t1 = time.time()
    c.set("a", msg)
    return time.time()-t1

def time_redis_get(c):
    t1 = time.time()
    c.get("a")
    return time.time()-t1

def redis_profile_send(send,recv,message_size=100, nmsg=5):
    """Profile redis sending"""
    mb = 2**20
    message_size *=  mb
    msg = b"\0"*message_size
    
    sperf= []
    rperf =[]
    print(f"Sending {nmsg} messages")
    print(f"Message size = {message_size/mb} MB")
    for i in range(nmsg):
        t1 = time.time()
        send(msg)
        sperf.append((message_size/2**30)/(time.time()-t1))  
        print("Message sent...",end='\r')
        
        if recv:
            t1 = time.time()
            recv()
            rperf.append((message_size/2**30/(time.time()-t1)))
        print("Message sent...Message received")
        
    send_perf = np.mean(sperf)
    if recv:
        recv_perf = np.mean(rperf)
    else:
        recv_perf = np.inf
    print("Average performance")
    print(f"send throughput: {send_perf} GB/s, recv throughput: {recv_perf} GB/s")

In [None]:
c = redis.StrictRedis()
redis_profile_send(partial(c.set, "A"), partial(c.get, "A"))

In [None]:
from subprocess import Popen
from pyarrow import plasma
from contextlib import contextmanager, closing

@contextmanager
def setup_plasma(buf_size=5*2**30):
    print("Starting plasma store process")
    plasma_proc = Popen(['plasma_store', '-m', '12000000000', '-s', '/tmp/plasma'])
    
    print("Connecting to store")
    plasma_client = plasma.connect('/tmp/plasma', "", 0)
    
    plasma_id = plasma.ObjectID(b"1")
    print(f"Creating buffer {buf_size/2**30} GB")
    buffer = plasma_client.create(plasma_id, buf_size)
    print("Done Setting up")
    yield buffer
    print("Killing Plasma Process")
    plasma_proc.kill()

def set_plasma(plasma_buffer, msg):
    plasma_view  = memoryview(buffer)
    msg_view = memoryview(msg).cast(plasma_view.format)
    plasma_view[:msg_view.nbytes] = msg_view

with setup_plasma() as buffer:
    redis_profile_send(partial(set_plasma, buffer), None, message_size=4000)
