In [2]:
import multiprocessing as mp
import socket
import time

In [4]:
mp.get_all_start_methods(), mp.get_start_method()

(['spawn', 'fork', 'forkserver'], 'spawn')

In [6]:
def print_data(data):
    print("print_data", data)

data = {1: 11}

proc = mp.Process(target=print_data, args=(data,))
proc.start()
proc.join()

Traceback (most recent call last):
  File [35m"<string>"[0m, line [35m1[0m, in [35m<module>[0m
    from multiprocessing.spawn import spawn_main; [31mspawn_main[0m[1;31m(tracker_fd=89, pipe_handle=93)[0m
                                                  [31m~~~~~~~~~~[0m[1;31m^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^[0m
  File [35m"/Library/Frameworks/Python.framework/Versions/3.14/lib/python3.14/multiprocessing/spawn.py"[0m, line [35m122[0m, in [35mspawn_main[0m
    exitcode = _main(fd, parent_sentinel)
  File [35m"/Library/Frameworks/Python.framework/Versions/3.14/lib/python3.14/multiprocessing/spawn.py"[0m, line [35m132[0m, in [35m_main[0m
    self = reduction.pickle.load(from_parent)
[1;35mAttributeError[0m: [35mmodule '__main__' has no attribute 'print_data'[0m


In [16]:
import os

class User:
    def __init__(self, val):
        self.val = val

    def __repr__(self):
        return f"User[{self.val}]"


def print_data(data):
    print("print_data", data, f"{os.getpid()=}, {os.getppid()=}")

data = {1: 11, "234": User(99)}


proc_ctx = mp.get_context("fork")

print("current pid", os.getpid())

proc = proc_ctx.Process(target=print_data, args=(data,))
proc.start()
proc.join()

print("finish")

current pid 80640
print_data {1: 11, '234': User[99]} os.getpid()=29709, os.getppid()=80640
finish


In [21]:
class User:
    def __init__(self, val):
        self.val = val

    def __repr__(self):
        return f"User[{self.val}]"


def update_data(data):
    print("update data", data)

    data[22] = "202"
    data[("qw", "77")] = "qwerty"
    data["234"] = User(99)
    
    print("updated data", data)


data = {22: "22 init", 33: 99}
print("data before:", data)

proc = proc_ctx.Process(target=update_data, args=(data,))
proc.start()
proc.join()

print("data after:", data)
print("finish")

data before: {22: '22 init', 33: 99}
update data {22: '22 init', 33: 99}
updated data {22: '202', 33: 99, ('qw', '77'): 'qwerty', '234': User[99]}
data after: {22: '22 init', 33: 99}
finish


In [25]:
class User:
    def __init__(self, val):
        self.val = val

    def __repr__(self):
        return f"User[{self.val}]"


def update_data(data):
    print("update data", data)

    data[22] = "202"
    data[("qw", "77")] = "qwerty"
    data["234"] = User(99)
    
    print("updated data", data)


with proc_ctx.Manager() as manager:
    data = manager.dict()
    data[22] = "22 init"
    data[33] = 99
    print("data before:", data)
    
    proc = proc_ctx.Process(target=update_data, args=(data,))
    proc.start()
    proc.join()

    print("data after:", data)
    data_bkp = dict(data)

print("finish")

data before: {22: '22 init', 33: 99}
update data {22: '22 init', 33: 99}
updated data {22: '202', 33: 99, ('qw', '77'): 'qwerty', '234': User[99]}
data after: {22: '202', 33: 99, ('qw', '77'): 'qwerty', '234': User[99]}
finish


In [29]:
data_bkp["234"].val

99

In [11]:
def server():
    print("start server")
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

        sock.bind(("localhost", 15000))
        sock.listen(5)

        i = 5
        while i > 0:
            i -= 1

            client_sock, client_addr = sock.accept()
            print("server: conn from", client_addr, client_sock)
    
            data = client_sock.recv(1024)
            data = data.decode()
            print(f"server: recv {data=}")
    
            client_sock.sendall(data.upper().encode())
            print("server: finished client", client_addr)

    print("server: stop")


def client(data):
    print("start client")
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
        sock.connect(("localhost", 15000))
        
        sock.sendall(data.encode())
        print("client: recv data = ", sock.recv(1024).decode())

    print("client: stop")

In [3]:
proc_ctx = mp.get_context("fork")

In [12]:
proc_server = proc_ctx.Process(target=server)
proc_server.start()

start server
server: conn from ('127.0.0.1', 53883) <socket.socket fd=36, family=2, type=1, proto=0, laddr=('127.0.0.1', 15000), raddr=('127.0.0.1', 53883)>
server: recv data='Планета Земля'
server: finished client ('127.0.0.1', 53883)
server: conn from ('127.0.0.1', 53918) <socket.socket fd=48, family=2, type=1, proto=0, laddr=('127.0.0.1', 15000), raddr=('127.0.0.1', 53918)>
server: recv data='Планета Марс'
server: finished client ('127.0.0.1', 53918)
server: conn from ('127.0.0.1', 53958) <socket.socket fd=36, family=2, type=1, proto=0, laddr=('127.0.0.1', 15000), raddr=('127.0.0.1', 53958)>
server: recv data='Море'
server: finished client ('127.0.0.1', 53958)
server: conn from ('127.0.0.1', 53988) <socket.socket fd=48, family=2, type=1, proto=0, laddr=('127.0.0.1', 15000), raddr=('127.0.0.1', 53988)>
server: recv data='Солнце'
server: finished client ('127.0.0.1', 53988)
server: conn from ('127.0.0.1', 54022) <socket.socket fd=36, family=2, type=1, proto=0, laddr=('127.0.0.1', 1500

In [13]:
proc_server.is_alive()

True

In [14]:
proc_client = proc_ctx.Process(target=client, args=("Планета Земля",))
proc_client.start()

start client
client: recv data =  ПЛАНЕТА ЗЕМЛЯ
client: stop


In [15]:
proc_client = proc_ctx.Process(target=client, args=("Планета Марс",))
proc_client.start()

start client
client: recv data =  ПЛАНЕТА МАРС
client: stop


In [16]:
proc_client = proc_ctx.Process(target=client, args=("Море",))
proc_client.start()

start client
client: recv data =  МОРЕ
client: stop


In [17]:
proc_client = proc_ctx.Process(target=client, args=("Солнце",))
proc_client.start()

start client
client: recv data =  СОЛНЦЕ
client: stop


In [18]:
proc_client = proc_ctx.Process(target=client, args=("Солнце 123",))
proc_client.start()

start client
client: recv data =  СОЛНЦЕ 123
client: stop


In [19]:
proc_client = proc_ctx.Process(target=client, args=("Солнце 123",))
proc_client.start()

start client


Process ForkProcess-11:
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.14/lib/python3.14/multiprocessing/process.py", line 320, in _bootstrap
    self.run()
    ~~~~~~~~^^
  File "/Library/Frameworks/Python.framework/Versions/3.14/lib/python3.14/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
    ~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/var/folders/j3/m947cq0j0fx8t0rrdzw5kmb00000gp/T/ipykernel_42456/763150226.py", line 29, in client
    sock.connect(("localhost", 15000))
    ~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^
ConnectionRefusedError: [Errno 61] Connection refused


In [23]:
import aiohttp

In [24]:
import asyncio
import time

In [25]:
async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)


async def main():
    print(f"started at {time.strftime('%X')}")
    await say_after(1, 'hello')
    await say_after(2, 'world')
    print(f"finished at {time.strftime('%X')}")


async def run():
    t1 = time.time()
    await main()
    t2 = time.time()
    print("TT", t2 - t1)


await run()

started at 19:49:52
hello
world
finished at 19:49:55
TT 3.001940965652466


In [26]:
main()

<coroutine object main at 0x10a7da500>

In [27]:
async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)


async def main():
    print(f"started at {time.strftime('%X')}")
    say1 = say_after(1, 'hello')
    await say_after(2, 'world')
    await say1
    print(f"finished at {time.strftime('%X')}")


async def run():
    t1 = time.time()
    await main()
    t2 = time.time()
    print("TT", t2 - t1)


await run()

started at 19:51:47
world
hello
finished at 19:51:50
TT 3.002268075942993


In [28]:
async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)


async def main():
    print(f"started at {time.strftime('%X')}")
    say1 = asyncio.create_task(say_after(1, 'hello'))
    await say_after(2, 'world')
    await say1
    print(f"finished at {time.strftime('%X')}")


async def run():
    t1 = time.time()
    await main()
    t2 = time.time()
    print("TT", t2 - t1)


await run()

started at 19:54:12
hello
world
finished at 19:54:14
TT 2.001584053039551


In [29]:
async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)


async def main():
    print(f"started at {time.strftime('%X')}")
    await asyncio.gather(say_after(1, 'hello'), say_after(2, 'world'))
    print(f"finished at {time.strftime('%X')}")


async def run():
    t1 = time.time()
    await main()
    t2 = time.time()
    print("TT", t2 - t1)


await run()

started at 19:55:45
hello
world
finished at 19:55:47
TT 2.0021049976348877


In [30]:
async def say_after(delay, what):
    #await asyncio.sleep(delay)
    time.sleep(delay)
    print(what)


async def main():
    print(f"started at {time.strftime('%X')}")
    await asyncio.gather(say_after(1, 'hello'), say_after(2, 'world'))
    print(f"finished at {time.strftime('%X')}")


async def run():
    t1 = time.time()
    await main()
    t2 = time.time()
    print("TT", t2 - t1)


await run()

started at 19:56:29
hello
world
finished at 19:56:32
TT 3.006443977355957


In [32]:
async def say_after(delay, what):
    #await asyncio.sleep(delay)
    await asyncio.to_thread(time.sleep, delay)
    print(what)


async def main():
    print(f"started at {time.strftime('%X')}")
    await asyncio.gather(say_after(1, 'hello'), say_after(2, 'world'))
    print(f"finished at {time.strftime('%X')}")


async def run():
    t1 = time.time()
    await main()
    t2 = time.time()
    print("TT", t2 - t1)


await run()

started at 19:58:38
hello
world
finished at 19:58:40
TT 2.0043439865112305


In [40]:
URL = "https://docs.python.org/3/whatsnew/3.14.html"
URLS = [URL] * 20


async def fetch_url(url, session):
    await asyncio.sleep(1)
    # async with session.get(url) as resp:
    #     return await resp.text()


async def fetch_batch_urls(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(url, session) for url in urls]

        for task in tasks:
            await task


async def run():
    t1 = time.time()
    await fetch_batch_urls(URLS)
    t2 = time.time()
    print("TT", t2 - t1)


await run()

TT 20.023504972457886


In [41]:
URL = "https://docs.python.org/3/whatsnew/3.14.html"
URLS = [URL] * 20


async def fetch_url(url, session):
    await asyncio.sleep(1)
    # async with session.get(url) as resp:
    #     return await resp.text()


async def fetch_batch_urls(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(url, session) for url in urls]

        await asyncio.gather(*tasks)


async def run():
    t1 = time.time()
    await fetch_batch_urls(URLS)
    t2 = time.time()
    print("TT", t2 - t1)


await run()

TT 1.001899003982544


In [45]:
URL = "https://docs.python.org/3/whatsnew/3.14.html"
URLS = [URL] * 20
WORKERS = 5


async def fetch_url(url, session):
    await asyncio.sleep(1)
    # async with session.get(url) as resp:
    #     return await resp.text()


async def fetch_worker(que, session, worker):
    print(f"start {worker=}")
    while True:
        url = await que.get()
        if url is None:
            await que.put(url)
            break

        try:
            await fetch_url(url, session)
        except Exception:
            pass

    print(f"finish {worker=}")


async def run():
    t1 = time.time()

    que = asyncio.Queue()
    for url in URLS:
        await que.put(url)
    await que.put(None)

    async with aiohttp.ClientSession() as session:
        workers = [
            asyncio.create_task(
                fetch_worker(que, session, f"worker_{i}")
            )
            for i in range(WORKERS)
        ]
        for worker in workers:
            await worker

    t2 = time.time()
    print("TT", t2 - t1)


await run()

start worker='worker_0'
start worker='worker_1'
start worker='worker_2'
start worker='worker_3'
start worker='worker_4'
finish worker='worker_0'
finish worker='worker_1'
finish worker='worker_2'
finish worker='worker_3'
finish worker='worker_4'
TT 4.005888938903809


In [46]:
URL = "https://docs.python.org/3/whatsnew/3.14.html"
URLS = [URL] * 20
WORKERS = 5


async def fetch_url(url, session):
    await asyncio.sleep(1)
    # async with session.get(url) as resp:
    #     return await resp.text()


async def fetch_worker(que, session, worker):
    print(f"start {worker=}")
    while True:
        url = await que.get()

        try:
            await fetch_url(url, session)
        except Exception:
            pass
        finally:
            que.task_done()

    print(f"finish {worker=}")


async def run():
    t1 = time.time()

    que = asyncio.Queue()
    for url in URLS:
        await que.put(url)

    async with aiohttp.ClientSession() as session:
        workers = [
            asyncio.create_task(
                fetch_worker(que, session, f"worker_{i}")
            )
            for i in range(WORKERS)
        ]
        await que.join()

        for worker in workers:
            worker.cancel()

    t2 = time.time()
    print("TT", t2 - t1)


await run()

start worker='worker_0'
start worker='worker_1'
start worker='worker_2'
start worker='worker_3'
start worker='worker_4'
TT 4.005237102508545
