In [3]:
from multiprocessing import Process, shared_memory
import atomics
from threading import Thread


def fn(ai: atomics.INTEGRAL, n):
    for _ in range(n):
        ai.inc()

a = atomics.atomic(width=4, atype=atomics.INT)
total = 10_000
# run threads to completion
t1 = Thread(target=fn, args=(a, total // 2))
t2 = Thread(target=fn, args=(a, total // 2))
t1.start(), t2.start()
t1.join(), t2.join()
print(f"a[{a.load()}] , total[{total}]")


a[10000] , total[10000]


In [15]:
import atomics
from multiprocessing import Process, shared_memory


def fn(shmem_name: str, width: int, n: int) -> None:
    shmem = shared_memory.SharedMemory(name=shmem_name)
    buf = shmem.buf[:width]
    with atomics.atomicview(buffer=buf, atype=atomics.INT) as a:
        for _ in range(n):
            a.inc()
    del buf
    shmem.close()



# setup
width = 4
shmem = shared_memory.SharedMemory(create=True, size=width)
buf = shmem.buf[:width]
print(buf)
total = 10_000
# run processes to completion
p1 = Process(target=fn, args=(shmem.name, width, total // 2))
p2 = Process(target=fn, args=(shmem.name, width, total // 2))
p1.start(), p2.start()
p1.join(), p2.join()
# print results and cleanup
with atomics.atomicview(buffer=buf, atype=atomics.INT) as a:
    print(f"a[{a.load()}] == total[{total}]")
del buf
shmem.close()
shmem.unlink()

<memory at 0x742420a797c0>
a[10000] == total[10000]


In [14]:
import atomics 

a = atomics.atomic(width=1, atype=atomics.INT)
print(a)
for _ in range(128):
    a.inc()
print(a)

buf = bytearray(2)
with atomics.atomicview(buffer=buf, atype=atomics.BYTES) as a1:
    print(a1)  # AtomicBytesView(value=b'\x00\x00', width=2, readonly=True)

AtomicInt(value=0, width=1, readonly=False, signed=True)
AtomicInt(value=-128, width=1, readonly=False, signed=True)
AtomicBytesView(value=b'\x00\x00', width=2, readonly=False)


In [18]:
from multiprocessing.managers import SharedMemoryManager

# 创建一个 SharedMemoryManager 实例
manager = SharedMemoryManager()

# 启动管理器
manager.start()

# 创建一个大小为 1024 字节的共享内存块
shm = manager.SharedMemory(size=1024)

# 使用共享内存块
buffer = shm.buf
print(buffer)

# 关闭共享内存块
shm.close()

# 停止 SharedMemoryManager
manager.shutdown()

<memory at 0x74241b4a0040>


In [None]:
from multiprocessing.managers import SharedMemoryManager
from multiprocessing import Process

def worker(shm_name):
    # 通过名称获取共享内存块
    shm = shared_memory.SharedMemory(name=shm_name)
    buffer = shm.buf
    
    # 读取共享内存中的数据
    data = buffer[:10]
    print(f"Worker read: {data}")
    
    # 关闭共享内存块
    shm.close()


# 创建一个 SharedMemoryManager 实例
manager = SharedMemoryManager()
    
# 启动管理器
manager.start()
    
# 创建一个大小为 1024 字节的共享内存块
shm = manager.SharedMemory(size=1024)
    
# 启动一个新进程
p = Process(target=worker, args=(shm.name,))
p.start()
p.join()
    
# 关闭共享内存块
shm.close()
    
# 停止 SharedMemoryManager
manager.shutdown()

In [27]:
import multiprocessing as mp
 
def job(q):
    res=0
    for i in range(1000):
        res+=i
    q.put(res)    #queue
 
if __name__=='__main__':
    q = mp.Queue()
    p1 = mp.Process(target=job,args=(q,))
    p2 = mp.Process(target=job,args=(q,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    res1 = q.get()
    print(res1)
   
    res2 = q.get()
    print(res2)
    print(res1+res2)

499500
499500
999000


In [35]:
class MyQueue:
    def __init__(self) -> None:
        self.queue = list()
        
    def append(self, x):
        self.queue.append(x)
        
    def popleft(self):
        if len(self.queue) != 0:
            return self.queue.pop(0)
        else:
            raise IndexError("the queue is empty")
    
    @property    
    def size(self):
        return len(self.queue)
    
q1 = MyQueue()
q1.append(99)
print(q1.size)
q1.append(98)
print(q1.size)
print(q1.popleft())
print(q1.size)

1
2
99
1
