# Пример очереди в разработческой жизни

![](src/1.png)

## Семафоры

### Сначала чуток про lock

![](src/11.png)

![](src/12.png)

![](src/13.png)

### Пример конкуррентного доступа к ресурсу (counter)

In [5]:
from threading import Thread
from time import sleep


counter = 0

def increase(by):
    global counter

    local_counter = counter
    local_counter += by

    sleep(0.1)

    counter = local_counter
    print(f'counter={counter}')

In [6]:
t1 = Thread(target=increase, args=(10,))
t2 = Thread(target=increase, args=(20,))

t1.start()
t2.start()

t1.join()
t2.join()

print(f'The final counter is {counter}')

# если поток 2 закончит раньше, то счетчик будет некорректно изменен

counter=20counter=10

The final counter is 10


### Использование lock для решения проблемы выше

In [9]:
from threading import Thread, Lock
from time import sleep


counter = 0


def increase(by, lock):
    global counter

    lock.acquire()

    local_counter = counter
    local_counter += by

    sleep(0.1)

    counter = local_counter
    print(f'counter={counter}')

    lock.release()
    
#     with lock:
#         ...


lock = Lock()

In [10]:
t1 = Thread(target=increase, args=(10, lock))
t2 = Thread(target=increase, args=(20, lock))

# start the threads
t1.start()
t2.start()


# wait for the threads to complete
t1.join()
t2.join()


print(f'The final counter is {counter}')

counter=10
counter=30
The final counter is 30


### Теперь уже про сам семафор

![](src/10.png)

In [13]:
import threading, random, time

class ActivePool:
    start = time.time()

    def __init__(self):
        super(ActivePool, self).__init__()
        self.active = []
        self.lock = threading.Lock()

    def makeActive(self, name):
        with self.lock:
            self.active.append(name)
            tm = time.time() - self.start
            print(f'Время: {round(tm, 3)} Running: {self.active}')

    def makeInactive(self, name):
        with self.lock:
            self.active.remove(name)
            tm = time.time() - self.start
            print(f'Время: {round(tm, 3)} Running: {self.active}')


def worker(sem, pool):
    with sem:
        th_name = threading.current_thread().name
        print(f'{th_name} ожидает присоединения к пулу')
        pool.makeActive(th_name)
        time.sleep(1.5)
        pool.makeInactive(th_name)

In [14]:
sem = threading.Semaphore(2)

pool = ActivePool()
for i in range(4):
    t = threading.Thread(
        target=worker,
        args=(sem, pool),
    )
    t.start()

Thread-21 (worker) ожидает присоединения к пулуThread-22 (worker) ожидает присоединения к пулу
Время: 0.41 Running: ['Thread-22 (worker)']

Время: 0.41 Running: ['Thread-22 (worker)', 'Thread-21 (worker)']
Время: 1.918 Running: ['Thread-22 (worker)']
Время: 1.919 Running: []
Thread-23 (worker) ожидает присоединения к пулу
Время: 1.919 Running: ['Thread-23 (worker)']
Thread-24 (worker) ожидает присоединения к пулу
Время: 1.919 Running: ['Thread-23 (worker)', 'Thread-24 (worker)']
Время: 3.422 Running: ['Thread-24 (worker)']
Время: 3.424 Running: []


### Скачивание файлов с ограничением семафора

In [15]:
import threading
import urllib.request

MAX_CONCURRENT_DOWNLOADS = 3
semaphore = threading.Semaphore(MAX_CONCURRENT_DOWNLOADS)

def download(url):
    with semaphore:
        print(f"Downloading {url}...")
        
        response = urllib.request.urlopen(url)
        data = response.read()
        
        print(f"Finished downloading {url}")

        return data

        

def main():
    urls = [
        'https://www.ietf.org/rfc/rfc791.txt',
        'https://www.ietf.org/rfc/rfc792.txt',
        'https://www.ietf.org/rfc/rfc793.txt',
        'https://www.ietf.org/rfc/rfc794.txt',
        'https://www.ietf.org/rfc/rfc795.txt',
    ]

    
    threads = []
    for url in urls:
        thread = threading.Thread(target=download, args=(url,))
        threads.append(thread)
        thread.start()

    for thread in threads:
        thread.join()

In [16]:
main()

Downloading https://www.ietf.org/rfc/rfc791.txt...
Downloading https://www.ietf.org/rfc/rfc792.txt...
Downloading https://www.ietf.org/rfc/rfc793.txt...
Finished downloading https://www.ietf.org/rfc/rfc793.txt
Downloading https://www.ietf.org/rfc/rfc794.txt...
Finished downloading https://www.ietf.org/rfc/rfc791.txt
Downloading https://www.ietf.org/rfc/rfc795.txt...
Finished downloading https://www.ietf.org/rfc/rfc794.txt
Finished downloading https://www.ietf.org/rfc/rfc795.txt
Finished downloading https://www.ietf.org/rfc/rfc792.txt


## Реализация стека через очередь

![](src/2.png)

### Реализация, когда операция вставки будет более затратной, чем изъятие

In [17]:
from _collections import deque
 
 
class Stack:
    def __init__(self):
        self.q1 = deque()
        self.q2 = deque()

    def push(self, x):
        self.q2.append(x)
        while (self.q1):
            self.q2.append(self.q1.popleft())
        self.q1, self.q2 = self.q2, self.q1

    def pop(self):
        if self.q1:
            self.q1.popleft()

    def top(self):
        if (self.q1):
            return self.q1[0]
        return None

    def size(self):
        return len(self.q1)

In [18]:
s = Stack()
s.push(1)
s.push(2)
s.push(3)

print("current size: ", s.size())
print(s.top())
s.pop()
print(s.top())
s.pop()
print(s.top())

print("current size: ", s.size())

current size:  3
3
2
1
current size:  1


### Реализация, когда операция изъятия будет более затратной, чем вставка

In [23]:
class Stack:
    def __init__(self):
        self.q1 = deque()
        self.q2 = deque()
 
    def push(self, x):
        self.q1.append(x)
 
    def pop(self):
        if (not self.q1):
            return
        while(len(self.q1) != 1):
            self.q2.append(self.q1.popleft())

        self.q1, self.q2 = self.q2, self.q1
 
    def top(self):
        if (not self.q1):
            return
        while(len(self.q1) != 1):
            self.q2.append(self.q1.popleft())

        top = self.q1[0]
        self.q2.append(self.q1.popleft())

        self.q1, self.q2 = self.q2, self.q1
 
        return top
 
    def size(self):
        return len(self.q1)

In [24]:
s = Stack()
s.push(1)
s.push(2)
s.push(3)

print("current size: ", s.size())
print(s.top())
s.pop()
print(s.top())
s.pop()
print(s.top())

print("current size: ", s.size())

current size:  3
3
2
1
current size:  3


### С помощью одной очереди

In [25]:
class Stack:
    def __init__(self):
        self.q = deque()

    def push(self, data):
        s = len(self.q)
        self.q.append(data)
        for i in range(s):
            self.q.append(self.q.popleft())

    def pop(self):
        if (not self.q):
            print("No elements")
        else:
            self.q.popleft()
 
    # Returns top of stack
    def top(self):
        if (not self.q):
            return
        return self.q[0]
 
    def size(self):
        return len(self.q)

In [26]:
s = Stack()
s.push(1)
s.push(2)
s.push(3)

print("current size: ", s.size())
print(s.top())
s.pop()
print(s.top())
s.pop()
print(s.top())

print("current size: ", s.size())

current size:  3
3
2
1
current size:  1


## Буфферизация сообщений

## RabbitMQ

### Строение изнутри (уже видно очереди)
![](src/3.png)


### Круги ада, через которые происходит соединение
![](src/4.jpeg)

### Взаимодействие с очередями крупным планом
![](src/5.jpeg)


### Как сообщения уходят из очереди
![](src/6.jpeg)


### Про создание очередей
![](src/7.jpeg)

![](src/8.jpeg)


### Как это выглядит после создания
![](src/9.jpeg)

### Отправка сообщений в очередь

In [None]:
# !pip install pika

In [27]:
import pika

credentials = pika.PlainCredentials('guest', 'guest')

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost',
    port='5672',
    credentials=credentials
))
channel = connection.channel()

In [28]:
channel.queue_declare(queue='hello')

<METHOD(['channel_number=1', 'frame_type=1', "method=<Queue.DeclareOk(['consumer_count=0', 'message_count=0', 'queue=hello'])>"])>

In [30]:
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
# print(" [x] Sent 'Hello World!'")

In [31]:
connection.close()

### Получение сообщений

In [32]:
credentials = pika.PlainCredentials('guest', 'guest')

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost',
    port='5672',
    credentials=credentials
))
channel = connection.channel()

In [33]:
channel.queue_declare(queue='hello')

<METHOD(['channel_number=1', 'frame_type=1', "method=<Queue.DeclareOk(['consumer_count=0', 'message_count=2', 'queue=hello'])>"])>

In [34]:
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

In [35]:
channel.basic_consume(queue='hello',auto_ack=True,on_message_callback=callback)

'ctag1.40a21db573a14a5db0bb069a8f18949f'

In [36]:
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

 [*] Waiting for messages. To exit press CTRL+C
 [x] Received b'Hello World!'
 [x] Received b'Hello World!'


KeyboardInterrupt: 