<a href="https://colab.research.google.com/github/vuduclyunitn/learning_python/blob/master/T%C3%ACm_hi%E1%BB%83u_v%E1%BB%81_Redis_Queue_(RQ).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Dịch và bổ sung từ https://python-rq.org/

Redis Queue là một thư viện đơn giản viết bằng Python nhằm sắp hàng (queueing) các công việc (jobs) và xử lý ngầm chúng với các người làm (workers). Phía sau RQ là Redis, thư viện này được thiết kế để  tạo ra một rào chắn thấp dễ dàng cho việc đi vào. Nó có thể được tích hợp với ngăn xếp web dễ dangf.

RQ yêu cầu Redis phiên bản lớn hơn hoặc bằng 3.0.0

## Bắt đầu
Đầu tiên bạn cần chạy một máy chủ Redis, hoặc bạn có thể sử dụng một máy chủ có sẵn. Để đặt các công việc vào hàng đợi (queues), bạn chỉ cần định nghĩa các hàm.

In [0]:
# Cài đặt redis
!pip install rq

## Jobs

Một công việc (job) là một đối tượng trong Python, biểu diễn một hàm được gọi bất động bộ trong một tiến trình người làm, tiến trình chạy ngầm. Bất cứ hàm nào cũng có thể được gọi bất đồng bộ bằng cách đưa một tham chiếu tới hàm và các tham số của nó vào hàng đợi. Đó được gọi là *enqueueing*

## Đưa các công công việc vào hàng đợi

Để đưa các công việc vào các hàng đợi, trước tiên ta cần khai báo một hàm, ví dụ hàm dưới đây:


In [0]:
import requests

def count_words_at_url(url):
  resp = requests.get(url)
  return len(resp.text.split())

Bạn có thể thấy rằng hàm trên không có gì đặc biệt cả! Bất cứ hàm nào cũng có thể được cho vào một hàng đợi RQ.

Để thực hiện hàm này ở chế đồ ngầm, bạn thực hiện

In [0]:
from rq import Queue
from redis import Redis
from somewhere import count_words_at_url
import time

# Nói cho RQ biết bạn sử dụng kết Redis nào
redis_conn = Redis()

# Không có tham số nào truyền vào redis_conn nghĩa là sử dụng hàng đợi mặc định
q = Queue(connection=redis_conn)

# Trì hoãn thực thi hàm count_words_at_url('http://nvie.com')
job = q.enqueue(count_words_at_url, 'http://nvie.com')
print(job.result)  # Kết quả là None

# Bây giờ ta cần chờ cho đến khi công việc được hoàn thành
time.sleep(2)
print(job.result)


Nếu bạn muốn cho một công việc vào một hàng đợi cụ thể, bạn chỉ cần chỉ định tên của nó

In [0]:
q = Queue('low', connection=redis_conn)
q.enqueue(count_words_at_url, 'http://nvie.com')

Bạn chú ý vào ```Queue('low')``à, ta có thể sử dụng bất cứ tên hàng đợi nào cũng được, vì vậy bạn có thể phân tán công việc của mình tới hàng đợi mong muốn. Một mẫu tên chuẩn cho các hàng đợi là theo độ ưu tiên (ví dụ. ```high``` (cao), ```medium``` (trung bình), ```low```(thấp))


Ngoài ra, bạn có thể thêm vào một vài tùy chọn để tùy chỉnh cư xử của một công việc được cho vào hàng đợi. Mặc định, các tùy chọn được lấy từ biến lưu trữ tham số kwargs sẽ được gửi tới hàm job.


*   ```job_timeout``` Chỉ định thời gian chạy tối đa của công việc trước khi nó bị ngắt và được đánh dấu như là ```failed```. Đơn vị mặc định là giây và có thể là một số nguyên hay là một chuỗi văn bản biểu diễn số nguyên (ví dụ. ```2```, ```'2'```). Nó cũng có thể là một chuỗi văn bản với một kí tự chỉ định đơn vị thời gian như là giờ, phút, giây (ví dụ ```'1h'```, ```'3m'```, ```'5s'```)
*   ```result_ttl``` chỉ định kết quả của các công việc thành công sẽ được giữ lại bao lâu. Các công việc bị hết hạn sẽ được tự động bị xóa. Mặc định là 500 giây. 
*    ```ttl``` chỉ định thời gian thời gian chạy của công việc trong hàng đợi trước khi bị loại bỏ. Nếu bạn chỉ định giá trị ```-1``` thì công việc này sẽ chạy mãi mãi. 
*     ```failure_ttl``` chỉ định lượng thời gian các công việc thất bị được giữ lại (mặc định là 1 năm).
*    ```depends_on``` chỉ định một công việc khác phải hoàn thành trước khi công việc sẽ được cho vào hàng đợi.
*    ```job_id``` cho phép bạn chỉ định id của công việc.
*    ```at_front``` sẽ đặt công việc trước hàng đợi, thay vì phía sau
*    ```description``` thêm vào thông tin mô tả cho các công việc trong hàng đợi.
*     ```args``` và ```kwargs```: sử dụng các biến này để gửi các tham số và từ khóa (keyword) tới hàm công việc phía dưới. Điều này hữu dụng nếu hàm của bạn có các tên tham số xung ột với RQ. Ví dụ ```description``` hoặc ```ttl```.



Trong ví dụ trước, nếu bạn muốn truyền vào các tham số từ khóa ```description``` và ```ttl``` tới công việc của bạn mà không phải hàm hàng đợi ta làm như sau

In [0]:
q = Queue('low', connection=redis_conn)
q.enqueue(count_words_at_url,
         ttl=30, # biến ttl này được sử dụng bởi RQ
         args=('http://nvie.com',),
         kwargs={
             'description': 'Function description', # biến này được gửi tới count_words_at_url
             'ttl': 15 # Biến này được gửi tới count_words_at_url
         })

Trong các trường hợp tiến trình web không truy cập được vào source code chạy ở worker (ví dụ code base X gọi một hàm trì hoãn từ code base Y), bạn có thể gửi hàm này như là một tham chiếu chuỗi.

In [0]:
q = Queue('low', connection=redis_conn)
q.enqueue('my_package.module.my_func', 3, 4)

## Làm việc với các hàng đợi
Bên cạnh các công việc trong hoàng đợi, các hàng đợi cũng có các phương thức hữu ích

In [0]:
from rq import Queue
from redis import Redis

redis_conn = Redis()
q = Queue(connection=redis_conn)

# Lấy ra số lượng các công việc trong hàng đợi
print(len(q))

# Lấy ra các công việc 
queued_job_ids = q.job_ids # Lấy ra một danh sách các IDs của các công việc từ hàng đợi
queued_jobs = q.jobs # Lấy ra một danh sách các hiện thực công việc được cho vào hàng đợi
job = q.fetch_job('my_id') # Trả về công việc có ID là 'my_id'

# Làm rỗng một hàng đợi, điều này sẽ xoá hết các công việc trong hàng đợi
q.empty()

# Xóa một hàng đợi
q.delete(delete_jobs=True) # Truyền vào giá trị 'True' sẽ xóa hết tất cả các công việc trong hàng đợi
# hàng đợi hiện tại không còn sử dụng được nữa. Hàng đợi có thể được tạo lại bằng cách cho các công việc vào nó.

## Thiết kế 

Với RQ, bạn không phải thiết lập bất cứ hàng đợi nào trước, và bạn không phải chỉ định bất cứ kênh (channels), các trao đổi (exchanges), các luật định đường đi (routing rules) nào cả. Bạn có thể đặt các công việc vào bất cứ hàng đợi nào bạn muốn. Ngay cả khi bạn đưa một công việc vào một hàng đợi không tồn tại, hàng đợi này sẽ được tạo ngay.

RQ không sử dụng một một phương thức môi giới cao cấp nào để làm việc định hướng thông điệp cho bạn. Bạn có thể xem đó là một lợi thế hay là một sự tật nguyền, phụ thuộc vào vấn đề bạn đang giải.

Cuối cùng, RQ không khải là một giao thức có thể di động, bởi vì nó phụ thuộc vào [pickle] (http://docs.python.org/library/pickle.html) để sắp đặt các công việc, vì vậy nó là hệ thống chỉ có Python.

## Kết quả bị trì hoãn

Khi các công việc được cho vào hàng đợi, phương thức ```queue.enqueue()``` trả về một hiện thực ```Job```. Thứ này không có gì đặc biệt hơn một đối tượng proxy, có thể được sử dụng để kiểm tra kết quả của công việc thực sự.

Giả định công việc ta thực hiện trả về một giá trị, thuộc tính ```result``` của ```Job``` sẽ trả về giá trị ```None``` khi công việc chưa hoàn thành hoặc một giá trị khác ```None``` khi công việc hoàn thành.

## Decorator ```@job```

Nếu bạn quen với Celery, bạn có thể sự dụng decorator ```@task``` của nó. Bắt đầu từ phiên bản RQ 0.3, ta có một decorator tương tự

In [0]:
from rq.decorators import job

@job('low', connection=my_redis_conn, timeout=5)
def add(x, y):
    return x + y

job = add.delay(3, 4)
time.sleep(1)
print(job.result)


## Qua mặt các workers
Để kiểm tra, bạn có thể đặt các công việc vào hàng đợi mà không cần ủy thác thực thi thực sự cho một worker (bắt đầu từ phiên bản 0.3.1.). Để làm điều này, chỉ cần thiết lập tham số ```is_async=False``` vào phương thức xây dựng Queue.

In [0]:
from rq import Queue
from redis import Redis
redis_conn = Redis()
q = Queue('low', is_async=False, connection=redis_conn)
job = q.enqueue(abs, 8)
job.result

ModuleNotFoundError: ignored

Đoạn code phía trên chạy một không có một worker hoạt động nào và thực thi hàm ```abs(8)``` trong cùng một tiến trình. Tuy nhiên nếu bạn cần một kết nối hoạt động tới một hiện thực redis cho việc lưu các trạng thái liên quan tới thực thi công việc và hoàn thiện công việc.

## Các phụ thuộc công việ
Phiên bản RQ mới 0.4.0 hỗ trợ khả năng nối (chain) thực thi của nhiều công việc. Để thực thi một công việc phụ thuộc vào công việc khác, bạn sử dụng tham số ```depends_on```:


In [0]:
q = Queue('low', connection=my_redis_conn)
report_job = q.enqueue(generate_report)
q.enqueue(send_report, depends_on=report_job)

NameError: ignored

Khả năng xử lý các phụ thuộc công việc cho phép bạn tách một công việc lớn thành nhiều công việc nhỏ hơn. Một công việc phụ thuộc vào công việc khác được cho vào hàng đợi chỉ khi các phụ thuộc của nó hoàn thiện thành công.

## Các thức cần xem xét về công việc

Về mặt kĩ thuật, bạn có thể đặt bất cứ lời gọi hàm nào vào một hàng đợi, nhưng điều đó không có nghĩa là đặt cái gì cũng được. Một vài thứ bạn cần xem xét trước khi đặt một công việc vào một hàng đợi:


*   Đảm bảo rằng ```_module``` của hàm có thể được import bởi worker. Cụ thể hơn có nghĩa là bạn không thể cho vào hàng đợi các hàm được khai báo trong module ```__main__```
*   Đảm bảo rằng worker và bộ sinh công việc (work generator) chia sẻ chung một bộ mã nguồn
*   Đảm bảo rằng lời gọi hàm không phụ thuộc vào bối cảnh của nó. các biến toàn cục không thích hợp ở đây, bất cứ trạng thái mà hàm phụ thuộc vào (ví dụ trạng thái hiện hành của người dùng hay yêu cầu truy cập web hiện hành) không có sẵn ở đó khi worker xử lý nó. Nếu bạn muốn hoàn thành công việc cho người dùng hiện hành, bạn nên chuyển người dùng đó sang một hiện thực rõ ràng và gửi một tham chiếu của đối tượng người dùng tới công việc như là một tham số.



## Các giới hạn 

Các workers chỉ chạy trên các hệ thống triển khai ```fork()```. Có nghĩa là ta không thể chạy các workers trên Windows mà không sử dụng [Windows Subsystem for Linux](https://docs.microsoft.com/en-us/windows/wsl/about) và chạy trong một bash shell

# Các Workers

Một worker là một tiến trình Python được chạy ngầm và được dùng để chạy các coogn việc dài đòi hỏi bền bỉ, các công việc như vậy bạn không muốn chạy trong các tiến trình web

## Bắt đầu các Workers
Để bắt đầu công việc, bạn chỉ cần chạy lệnh để khởi động một worker từ thư mục gốc của project của bạn

In [0]:
! rq worker high default low

Các workers sẽ đọc các công việc từ các hàng đợi cho trước (ở đây thứ tự quan trọng) trong một vòng lặp không có điểm dừng, nó sẽ chờ cho một công việc mới đi tới khi tất cả các công việc được hoàn thành

Mỗi worker sẽ xử lý chỉ một công việc tại một thời điểm. Bên trong một worker, không có xử lý song song. Nếu bạn muốn thực hiện các công việc song song, chỉ cần đơn giản khởi động nhiều workers hơn. 

Bạn nên sử dụng các bộ quản lý tiến trình như [Supervisor](https://python-rq.org/patterns/supervisor/) hoặc là [systemd](https://python-rq.org/patterns/systemd/) khi chạy sản phẩm thực tế.

## Chế độ nổ (Burst)
Mặc định, các workers sẽ bắt đầu là việc ngay lập tức và sẽ khóa lại và chờ cho một công việc mới khi nó giải quyết xong các công việc hiện tại. Các workers cũng có thể được sử dụng trong chế độ nổ (burst mode) để hoàn thiện tất cả các công việc còn tồn tại hiện hành và thoát ra khi các hàng đợi cho trước rỗng.

In [0]:
! rq worker --burst high default low

Điều này có thể hữu dụng cho các công việc mang tính chất theo khối cần được xử lý định kì, hoặc chỉ cần nâng các công việc lên cao trong các thời điểm đỉnh điểm.

## Các tham số của Worker

Ngoài ```--burst```, ```rq worker``` còn chấp nhận các tham số sau:


*   ```--url``` hoặc ```-u```: URL mô tả các chi kết về kết nối của Redis (ví dụ ```rq worker --url redis://:secrets@example.com:1234/9```)
*   ```--path``` hoặc ```-P```: chỉ định nhiều đường dẫn được import (ví dụ ```rq worker --path foo --path bar```)
*    ```--config``` hoặc ```-c```: đường dẫn tới module chứa các thiết lập RQ.
*     ```--results-ttl```: các kết quả của công việc sẽ được giữ lại với số lượng giây (mặc định là 500)
*    ```worker-class``` hoặc ```-w```: Lớp Worker được sử dụng (ví dụ ```rq worker --worker-class 'foo.bar.MyWorker'```)
*    ```--job-class``` hoặc ```-j```: lớp công việc được sử dụng
*    ```--queue-class```: Lớp hàng đợi được sử dụng
*   ```--connection-class```: Lớp kết nối được sử dụng, mặc đích là ```redis.StrictRedis```
*    ```--log-format```: Định dạng của các logs cho worker, mặc định là ```%(asctime)s %(message)s```
*    ```-date-format```: Định dạng thời gian cho các logs, mặc định là ```'%H:%M:%S'```.
*    ```--disable-job-desc-logging```: Tắt chế độ log mô tả.
*    ```--max-jobs```: Số lượng tối đa các công việc được thực thi



## Bên trong worker có gì

### Dòng đời thực thi của Worker

Dòng đời của một worker bao gồm các chặng sau:


1.   Khởi động (Boot): Nạp môi trường Python
2.   Đăng ký khai sinh (Birth registration): worker đơn kí nó với hệ thống vì vậy hệ thống biết được worker.
3. Bắt đầu lắng nghe (Start listening): Một công việc được đưa ra từ bất cứ hàng đợi nào. Nếu tất cả các hàng đợi troogns, và worker đang chạy trong chế độ nổ, nó sẽ thoát. Ngược lại, nó chờ cho các công việc đi tới.
4. Chuẩn bị thực thi công việc: worker nói cho hệ thống rằng nó sẽ bắt đầu công việc thông qua việc thiết lập trang thái của nó thành ```busy``` và đăng kí công việc trong ```StartedJobRegistry```.
5. Đẻ ra một tiến trình con (Fork a child process): Một tiến trình con (tiến trình làm việc) được sinh ra để làm công việc thực sự trong một bối cảnh thất bại an toàn (fail-safe context).
6. Xử lý công việc. Chặng này xử lý công việc thực sự trong tiến trình con.
7. Doạn dẹp sau khi thực thi: Worker thiết lập trạng thái của nó sang ```idle``` và thiết lập thời gian hết hạn cho cả công việc và kết quả của nó dựa trên ```result_ttl```. Công việc cũng được loại bỏ từ ```StartedJobRegistry``` và thêm vào ```FinishedJobRegistry``` trong trường hợp thực thi thành công, hoặc ```FAiledJobRegistry``` trong trường hợp thất bại.
8. Lặp lại từ bước 3. 



### Các ghi chú về hiệu năng

Về căn bản thì ```rq worker``` chỉ là một đoạn mã thực hiện một vòng lặp lấy công việc về  - tạo ra một tiến trình thực hiện công việc đó - thực hiện công việc đó. Khi mà rất nhiều việc của bạn yêu cầu các thiết lập dài, hoặc chúng cùng phụ thuộc vào một tập các modules, bạn phải trả giá cho chi phí mỗi lần bạn chạy một công việc (bởi vì bạn đnag thực hiện việc import sau khi thời điểm sinh ra tiến trình). Điều này giúp RQ không làm lộ bộ nhớ nhưng nó chậm.

Một cách bạn có thể sử dụng để cải thiện hiệu năng cho các công việc kiểu này là nạp các modules cần thiết trước khi fork. Không có một cách nào để nói cho các RQ workers thực hiện điều này cho bạn, nhưng bạn có thể làm nó trước khi bắt đầu vào lặp công việc.

Để làm điều đó, cung cấp một đoạn script cho worker của bạ (thay vì sử dụng ```rq worker```). Ví dụ như đoạn mã dưới đây

In [0]:
#!/usr/bin/env python
import sys
from rq import Connection, Worker

# Nạp trước thư viện
import library_that_you_want_preloaded

# Cung cấp các tên hàng đợi để lắng nghe các tham số tơi đoạn mã

with Connection():
  qs = sys.argv[1:] or ['default']
  
  w = Worker(qs)
  w.work()

### Các tên của Worker

Các workers được đăng ký tới hệ thống dưới tên của chúng, các tên này được sinh ra ngẫu nhiên trong suốt quá trình khởi tạo. Để thay đổi tên mặc định, chỉ định tên khi bắt đầu một worker, hoặc sử dụng tùy chọn ```--name``` trong giao diện dòng lệnh

In [0]:
from redis import Redis
from rq import Queue, Worker

redis = Redis()
queue = Queue('queue_name')

# Bắt đầu một worker với một tên tự đặt
worker = Worker([queue], connection=redis, name='foo')

### Lấy thông tin về Worker

Các hiện thực ```Worker``` lưu thông tin trong khi thực thi của nó trong Redis. Dưới đây là cách lấy các thông tin này:

In [0]:
from redis import Redis
from rq import Queue, Worker

# Trả về tất cả các workers được đăng ký trong kết nối này
redis = Redis()
workers = Worker.all(connection=redis)

# Trả về tất cả các workers trong hàng đợi này (mới có trong phiên bản 0.10.0)
queue = Queue('queue_name')
workers = Worker.all(queue=queue)
worker = workers[0]
print(worker.name)

Bên cạnh ```worker.name```, worker còn có các thuộc tính sau:


*   ```hostname``` - chủ thể (host) nơi chạy worker
*   ```pid``` nhận dạng (ID) của tiến trình worker
*   ```queues``` - các hàng đợi mà worker đang lắng nghe cho các công việc
*   ```state``` - các trạng thái có thể của worker ```suspended``` (bị đình chỉ), ```started``` (đã bắt đầu), ```busy``` (bận rộn) và ```idle``` (nhàn rỗi)
*    ```current_job``` - công việc đang thực thi hiện hành của worker
*    ```last_hearbeat``` - thời điểm cuối mà worker này được nhìn thấy
*    ```birth_date``` - thời gian khởi tạo worker
*    ```successful_job_count```: số lượng công việc kết thúc thành công
*     ```failed_job_count``` - số lượng công việc xử lý bị thất bại 
*     ```total_working_time``` - lượng thời gian thực thi các công việc bằng giây


*Mới trong phiên bản 0.10.0*

Nếu bạn chỉ muốn biết số lượng các workers để theo dõi chúng, phương thức ```Worker.count()``` hiệu quả hơn


In [0]:
from redis import Redis
from rq import Worker

redis = Redis()

# Đếm số lượng các workers trong kết nối Redis này
workers = Worker.count(connection=redis)

# Đếm số lượng các workers cho một queue cụ thể
queue = Queue('queue_name', connection=redis)
workers = Worker.all(queue=queue)

## Các thống kê cho Worker
*Mới trong phiên bản 0.9.0*

Nếu bạn muốn kiểm tra mức độ sử dụng của các hàng đợi của bạn, các hiện thực ```Worker``` lưu một vài thông tin hữu dụng:


In [0]:
from rq.worker import Worker
worker = Worker.find_by_key('rq:worker:name')

worker.successful_job_count # Số lượng các công việc hành thành
worker.failed_job_count # Số lượng các công việc được xử lý nhưng thất bại
worker.total_working_time  # Lượng thời gian thực thi các công việc

## Đặt các tiêu đề cho tiến trình worker sẽ tốt hơn

Tiến trình worker sẽ có một tiêu đề tốt hơn (khi nó được hiển thị bởi các công cụ hệ thống như ps hoặc top) khi bạn cài đặt một thư viện thứ 3 ```secproctitle```

In [0]:
pip install setproctitle

## Dừng các Workers

Tại bất cứ thời điểm nào, nếu worker nhận được tín hiệu ```SIGINT``` (thông qua Ctrl+C) hoặc ```SIGTERM``` (thông qua ```kill```), worker sẽ chờ cho đến khi công việc hiện hành kết thống, dừng vòng lặp công việc và khai tử nó. 

Nếu trong quá trình khai tử, ```SIGINT``` hoặc ```SIGTERM``` lại được gửi tới, worker sẽ bắt buộc tiến trình con phải dừng (gửi cho tiến trình con ```SIGKILL```), nhưng sẽ cố gắng đăng kí khai tử cho chính nó.

## Sử dụng một file cấu hình

Nếu bạn muốn cấu hình ```rq worker``` thông qua một file cấu hình thay vì sử dụng các tham số ở cửa sổ dòng lệnh, bạn có thể làm điều đó thông qua việc tạo một file Python như sau, tạo một file ```settings.py```

In [0]:
REDIS_URL = 'redis://localhost:6379/1'

# Bạn cũng có thể chỉ định một cơ sở dữ liệu Redis
# REDIS_HOST = 'redis.example.com'
# REDIS_PORT = 6380
# REDIS_DB = 3
# REDIS_PASSWORD = 'very secret'

# Các hàng đợi lắng nghe
QUEUES = ['high', 'default', 'low']

# Nếu bạn đang sử dụng Sentry để thu thập các ngoại lệ thời gian chạy, bạn có thể sử dụng
# nó để cấu hình RQ cho nó trong chỉ một bước
# Đặt 'sync+' ở phía trước.
SENTRY_DSN = 'sync+http://public:secret@example.com/1'

# Nếu bạn muốn có một tên riêng cho worker
# NAME = 'worker-1024'

Ví dụ phía trên hiển thị tất cả các tùy chọn đang được hỗ trợ.

Chú ý rằng: Các thiết lập ```QUEUES``` và ```REDIS_PASSWORD``` mới có từ 0.3.3

Để chỉ định module ta dùng để đọc các thiết lập, sử dụng tùy chọn ```-c```

In [0]:
rq worker -c settings

## Các lớp Worker tùy chọn

Có những lúc bạn muốn tùy chỉnh cư sử của worker. Một vài nhu cầu như:


1.   Quản lý kết nối cơ sở dữ liệu trước khi chạy một công việc
2.   Sử dụng một mô hình thực thi công việc không yêu cầu ```os.fork```.
3.   Khả năng sử dụng các mô hình xử lý song song như là ```multiprocessing``` hoặc ```gevent```.

Bạn có thể sử dụng tùy chọn ```-w``` để chỉ định một lớp worker khác để dùng:



rq worker -w ''path.to.GeventWorker

## Công việc tùy chỉnh và các lớp hàng đợi tùy chỉnh

Bạn có thể nói worker sử dụng một lớp tùy chỉnh cho các công việc và các hàng đợi sử dụng các tùy chọn ```--job-class``` và/hoặc ```--queue-class```

In [0]:
rq worker --job-class 'custome.JobClass' --queue-class 'custom.QueueClass'

Đừng quen sử các các tên lớp giống như vậy khi cho các công việc vào hàng đợi. Ví dụ

In [0]:
from rq import Queue
from rq.job import Job

class CustomJob(Job):
  pass

class CustomeQueue(Queue):
  job_class = CustomJob
  
queue = CustomQueue('default', connection=redis_conn)
queue.enqueue(some_func)

## Các lớp án tử hình tùy chỉnh (Custome DeathPenalty Classes)

Khi một công việc hết giờ chạy, worker sẽ cố gắng kết thúc nó sử dụng ```death_penalty_class``` (mặc định: ```UnixSignalDeathPenalty```). Điều này có thể được thay đổi nếu bạn muốn kết thúc các công việc theo cách riêng cho ứng dung của bạn hoặc theo một cách sạch sẽ hơn

Các lớp DeathPenalty được xây dựng với các tham số sau ```BaseDeathPenalty(timeout, JobTimeoutException, job_id=job.id)

## Các xử lý ngoại lệ tùy chỉnh

Nếu bạn cần xử lý các lỗi khác nhau cho các kiểu công việc khác nhau hay chỉ đơn giản muốn tùy chỉnh cư xử mặc định của RQ khi xử lý lỗi, chạy ```rq worker``` sử dụng tùy chọn ```--exception-handler```

In [0]:
rq worker --exception-handler 'path.to.my.ErrorHandler'

# Nhiều bộ xử lý ngoại lệ 
rq worker --exception=handler 'path.to.my.ErrorHandler' ---exception-handler 'another.ErrorHandler'


Nếu bạn muốn tắt/vô hiệu hóa xử lý ngoại lệ mặc định của RQ, sử dụng tùy chọn ```--disable-default-exception-handler```:

In [0]:
rq worker --exception-handler 'path.to.my.ErrorHandler' --disable-default-exception-handler.

# Kết quả

Đưa các công việc vào hàng đợi là các lời gọi hàm được thực thi trì hoãn. Có nghĩa là chúng ta đang giải một vấn đề, nhưng ta lấy về các kết quả.

## Xử lý các kết quả
Các hàm Python có thể trả về các giá trị, vì vậy các công việc cũng có thể có chúng. Nếu một công việc trả về một giá trị không phải là ```None```, worker sẽ ghi giá trị trả về đó trả lợi cho mã hã Redis của công việc trong thuộc tính ```result```. Giá trị hash Redis của công việc sẽ hết hạn sau 500 giây mặc định sau khi công việc được hoàn thành. 

Phía đưa job vào hàng đợi đưa trở lại một hiện thực ```Job``` như là kết quả của hành động đưa vào nhóm. Đối tượng ```Job``` là một đối tượng proxy gắn với nhận dạng của công việc, giúp cho việc lấy về các kết quả

**TTL của giá trị trả về** Các giá trị trả về được ghi ngược lại Redis với một dòng đời có giới hạn (thông qua một chìa khóa hết hạn Redis), điều này giúp chánh việc tăng lên của kích thức cơ sở dữ liệu Redis.

Từ phiên bản RQ 0.3.1, giá trị TTL của kết quả công việc có thể được chỉ định sử dụng tham số từ khóa ```result_ttl``` tới ```enqueue()``` và ```enqueue_call()```. Nó cũng có thể được sử dụng để tắt sự vô hiệu hóa song song đó. Bạn chịu trách nhiệu cho việc dọn dẹp các công việc của bạn, vì vậy cẩn thận khi sử dụng nó.

Bạn có thể làm điều sau

In [0]:
q.enqueue(foo) # Kết quả hết hạn sau 500 giây
q.enqueue(foo, result_ttl=86400) # Kết quả sẽ hết hạn sau một ngày
q.enqueue(foo, result_ttl=0) # Kết quả bị xóa ngay lập tức
q.enqueue(foo, result_ttl=-1) # Kết quả không bao giờ bị hết hạn

Ngoài ra, bạn có thể giữ các công việc đã được hoàn thành mà không trả về các giá trị, thứ mà mặc định bị xóa ngay lập tức

In [0]:
q.enqueue(func_without_rv, result_ttl=500) # công việc được giữ rõ ràng

## Giải quyết các ngoại lệ

Các công việc có thể thất bại và tung ra các ngoại lệ. RQ giải quyết vấn đề đó theo cách sau.

Hơn nữa, ta nên có thể thử lại các công việc thất bại. Điều đó nói chung cần một sự diễn dịch thủ công bởi vì không có một cách tự động hoặc đáng tin nào cho RQ đưa ra quyết định xem là có an toàn hay không để thử lại các công việc cụ thể. 

Khi một ngoại lệ bị văng ra trong một công việc, nó được bắt bởi worker, nối tiếp và được lưu dưới chìa khóa ```exc_info``` của mã hash Redis của công việc. Một tham chiếu tới công việc được đặt trong ```FAiledJobRegistry```. Mặc định các công việc thất bại sẽ được giữ trong một năm.

Công việc cũng có một vài thuộc tính hữu dụng có thể được sử dụng để hỗ trợ cho việc kiểm tra.


*   Thời gian tạo
*   thời gian ra khởi hàng đợi
*   thời gian vào hàng đợi
*   mô tả về lời gọi hàm
*  thông tin về ngoại lệ

Điều này cho phép ta xem xét và diễn dịch vấn đề một cách thủ công và có thể gửi lại công việc. 



## Giải quyết các ngắt

Khi các workers bị dừng lại theo một cách lịch sự (Ctrl+C hoặc ```kill```), RQ cố gắng hết sức để không làm mất bất cứ công việc nào. Công việc hiện tại được kết thúc sau đó worker sẽ dừng việc xử lý các công việc. Điều này đảm bảo các công việc luôn luôn có cơ hội như nhau để hoàn thành.

Tuy nhiên các workers có thể bị dừng bắt buộc thông qua ```kill -9```, điều này không cho các workers có một cơ hội để hoàn thành nốt công việc của ình hay đặt công việc vào hàng đợi ```failed```. Do đó, việc bắt buộc dừng một worker có thể dẫn tới các hư hại.

## Giải quyết với việc hết giờ thực hiện của Job

Mặc định các jobs nên thực thi nội trong 180 giây. Sau đó worker tắt tiến trình làm việc và đặt công việc vào hàng đợi ```failed```, ảm chỉ rằng hết giờ thực hiện công việc.

Nếu một công việc yêu cầu nhiều (hay ít) hơn thời gian để hoàn thành, thời gian dành cho nó có thể được mở rộng ra hay co lại thông qua việc chỉ định một tham số từ quá vào hàm enqueue(), như sau:

In [0]:
q = Queue()
q.enqueue(mytask, args=(foo,), kwargs={'bar': qux}, job_timeout=600)

Bạn cũng có thể thay đổi timeout mặc định của các công việc được đưa vào hàng đợi thông qua các hiện thực hàng đợi cụ thể, như đoạn code dưới đây

In [0]:
# Các công việc có độ ưu tiên cao nên thực thi trong 8 giây trong khi các công việ
# với độ ưu tiên thấp mất 10 phút
high = Queue('high', default_timeout=8)  # 8 giây
low = Queue('low', default_timeout=600)  # 10 phút

# Các công việc có thể thay đổi timeout mặc đinh
low.enqueue(really_really_slow, job_timeout=3600) # 1 giờ

# Các công việc

Trong một vài trường hợp ta cần truy cập vào ID của công việc hiện tại từ chính hàm công việc. Hoặc để lưu dữ liệu bất kì trên các công việc

## Lấy công việc từ Redis
Tất cả thông tin về công việc được lưu trong Redis. Bạn có thể mổ xẻ một công việc và các thuộc tính của nó sử dụng ```Job.fetch()```.

In [0]:
from redis import Redis
from rq.job import Job

redis = Redis()
job = Job.fetch('my_job_id', connection=redis)
print('Status: %s' $ job.get_status())

Một vài thuộc tính công việc thú vị bao gồm:


*   job.get_status()
*   job.func_name
*   job.args
*   job.result
*   job.enqueued_at
*   job.started_at
*   job.ended_at
*   job.exc_info



Nếu bạn muốn lấy ra một số lượng công việc, sử dụng ```Job.fetch_many()```

In [0]:
jobs = Job.fetch_many(['foo_id', 'bar_id'], connection=redis)

for job in jobs:
  print('Job %s: %s' % (job.id, job.func_name))

## Truy cập với công việc hiện tại

Bởi vì các hàm công việc là các hàm Python bình thường, bạn phải hỏi RQ Id của công việc hiện hành, nếu có công việc đó. Để làm điều này, bạn có thể sử dụng

In [0]:
from rq import get_current_job

def add(x, y):
  job = get_current_job()
  print("Current job: %s" %(job.id,))
  return x + y

## Lưu dữ liệu tùy ý vào các công việc
*Cải thiện trong phiên bản 0.8.0*

Để thêm/cập nhật thông tin trạng thái của công việc này, bạn truy cập vào thuộc tính ```meta```, thuộc tính cho phép bạn lưu dữ liệu có thể chọn (pickable) lên công việc

In [0]:
import socket

def add(x, y):
  job = get_current_job()
  job.meta["handled_by"] = socket.gethostname()
  job.save_meta()
  
  # Làm nhiều việc hơn
  time.sleep(1)
  return x + y

## Thời gian sống cho công việc trong hàng đợi

*Mới trong phiên bản 0.4.7*

Một công việc có 2 thời gian sống, một cho kết quả công việc và một cho chính công việc đó. Điều đó có nghĩa là nếu bạn có công việc mà không nên được thực thi sau một lượng thời gian, bạn có thể định nghĩa một thời gian sống như sau

In [0]:
# Khi tạo một công việc
job = Job.create(func=say_hello, ttl=43)

# Hoặc khi cho một công việc vào hàng đợi
job = q.enqueue(count_words_at_url, 'http://nvie.com', ttl=43)

## Các công việc bị thất bại
Nếu mọt công việc khi thực thi bị thất bại, worker sẽ đặt công việc đó trong một ```FailedJobRegistry```. Đối với hiện thực Job,  thuộc tính ```is_failed``` sẽ có giá trị true. ```FailedJobRegistry``` có thể được truy cập thông qua ```queue.failed_job_registry```

In [0]:
from redis import StrictRedis
from rq import Queue
from rq.job import Job

def div_by_zero(x):
  return x / 0

connection = StrictRedis()
queue = Queue(connection=connection)
job = queue.enqueue(divi_by_zero, 1)
registry = queue.failed_job_registry

worker = Worker([queue])
worker.work(burst=True)

assert len(registry) == 1 # Các công việc thất bại được giữ tại FailedJobRegistry

registry.requeue(job)  # Đặt công việc quay trở lại queue ban đầu

assert len(registry) == 0

assert queue.count == 1

Mặc định, các công việc bị thất bại được giữ trong 1 năm. Bạn có thể thay đổi thời gian mặc định này bằng cách chỉnh định ```failure_ttl``` (đơn vị giây) khi cho các công việc vào hàng đợi

In [0]:
job = queue.enqueue(foo_job, failure_ttl=300)  # 5 phút

## Cho các công việc thất bại vào lại hàng đợi

RQ cung cấp một công cụ cho phép đặt các công việc bị thất bại vào lại hàng đợi dễ dàng.

In [0]:
# Cho lại vào hàng đợi foo_job_id và bar_job_id từ failed job registry của myqueue
rq requeue -queue myqueue -u redis://localhost:6379 foo_job_id bar_job_id
    
# Câu lệnh này sẽ cho vào lại hàng đợi tất cả các công việc trong failed job registry của myqueue
rq requeue --queue -u redis://localhost:6379 --all

# Giám sát
Cách dễ dàng nhất là sử dụng [RQ dashboard](https://github.com/nvie/rq-dashboard), một công cụ phân tán riêng, công cụ này gọn nhẹ được dùng để quan sát cho RQ.

Để cài đặt nó, sử dụng các câu lệnh dưới đây

In [0]:
pip install rq-dashboard
rq-dashboard

Công cụ này cũng có thể được tích hợp dễ dàng trong ứng dụng Flask của bạn

## Quan sát tại cửa sổ dòng lệnh
Để xem các hàng đợi nào tồn tại và các workers nào đang hoạt động, chỉ cần gõ ```rq info```

## Truy vấn thông qua các tên hàng đợi
Bạn cũng có thể truy vấn một tập các hàng đợi, nếu bạn đang tìm các hàng đợi cụ thể, sử dụng câu lệnh sau

In [0]:
rq info high default

## Tổ chức các workers theo hàng đợi

Mặc định, ```rq info``` in các workers hiện đang hoạt động, và các hàng đợi đang lắng nghe. 

Để thấy cùng dữ liệu này nhưng được sắp xếp theo hàng đợi, sử dụng ```-R``` (hoặc  cờ```--by-queue```).

## Thăm dò định kì

Mặc định ```rq info``` sẽ in ra các thông số và rời khỏi. Bạn có thể chỉ định một thăm dò định kì, thông qua việc sử dụng cờ ```--interval```

In [0]:
rq info --interval 1

```rq info``` sẽ cập nhật màn hình mỗi giây. Bạn có thể chỉ định một giá trị thực để ám chỉ số lượng giây bạn muốn màn hình cập nhật. Nhớ rằng các giá trị định kì thấp sẽ tăng việc tải thông tin trên Redis

rq info --interval 0.5

# Các kết nối

Mặc dù RQ dùng câu lệnh ```use_connect()```  cho tiện, nhưng nó đã bị ngưng sử dụng, bởi vì nó làm ô nhiễm không gian tên toàn cục. Thay vào đó, sử dụng phương thức quản lý kết nối rõ ràng sử dụng ```with Connection(...):``` được ưu chuộng hơn, hoặc cách khác là truyền vào kết nối Redis các tham chiếu tới các hàng đợi trực tiếp.

## Kết nối Redis đơn giản (dễ dàng)

Chú ý rằng: sử dụng ```use_connection``` không còn được dùng nữa. Thay vào đó sử dụng quản lý kết nối rõ ràng

Ở chế độ phát triển, để kết nối tới một máy chủ Redis cục bộ mặc định

In [0]:
from rq import use_connection
use_connection()

Trong chế độ phát hành sản phẩm, kết nối tới một máy chủ Redis cụ thể

In [0]:
from redis import Redis
from rq import use_connection

redis = Redis('my.host.org', 6789, password='secret')
use_connection(redis)

Nhớ rằng ```use_connection``` gây ô nhiễm tên miền toàn cục. Nó ám chỉ rằng bạn chỉ có thể sử dụng một kết nối duy nhất

## Nhiều kết nối Redis

Khi bạn muốn nhiều kết nối bạn nên sử dụng ```Connection```hoặc truyền vào các kết nối rõ ràng

### Các kết nối rõ ràng (chính xác nhưng nhàm chán)

Mỗi hiện thực đói tượng RQ (các hàng đợi, các workers, các công việc) có một tham số từ khóa ```connection``` được gửi vào phương thức xây dựng. Sử dụng nó, bạn không cần sử dụng ```use_connection()```. Thay vì đó, bạn có thể tạo các hàng đợi như sau

In [0]:
from rq import Queue
from redis import Redis

conn1 = Redis('localhost', 6379)
conn2 = Redis('remote.host.org', 9386)

q1 = Queue('foo', connection=conn1)
q2 = Queue('bar', connection=conn2)

Mọi công việc được đưa vào một hàng đợi sẽ biết kết nối nào nó thuộc về. Workers cũng như vậy. 

Cách tiếp cận này rất chính xác, nhưng rườm rà, nhàm chán.

## Các ngữ cảnh kết nối (tóm lược và ngắn gọn)
Đây là một cách tiếp cận tốt hơn nếu bạn muốn sử dụng nhiều kết nối. Mỗi hiện thực RQ, khi được tạo, sẽ sử dụng kết nối trên cùng của ngăn xếp kết nối RQ, đây là cơ chế thay thế tạm thời cho kết nối mặc định. Ví dụ

In [0]:
from rq import Queue, Connection
from redis import Redis

with Connection(Redis('localhost', 6379)):
  q1 = Queue('foo')
  with Connection(Redis('remote.host.org', 9836)):
    q2 = Queue('bar')
  q3 = Queue('qux')
  
assert q1.connection != q2.connection
assert q2.connection != q3.connection
assert q1.connection == q2.connection

Bạn có thể nghĩ như thế này, bên trong ngữ cảnh ```Connection```, mọi hiện thực đối tượng RQ được tạo mới sẽ có tham số ```connection``` được thiết lập ngầm. Cho một công việc vào hàng dợi ```q2``` sẽ được thực hiện trong thực thể Redis thứ 2, thực thể ở xa.

## Đưa vào và đẩy ra các kết nối

Nếu chương trình của bạn không cho phép sử dụng câu ```with```, ví dụ bạn muốn sử dụng nó để thiết lập một unit test, bạn có thể sử dụng ```push_connection()``` và ```pop_connection()``` thay vì sử dụng quản lý ngữ cảnh

In [0]:
import unittest
from rq import Queue
from rq import push_connection, pop_connection

class MyTest(unittest.TestCase):
  def setUp(self):
    push_connection(Redis())
  
  def tearDown(self):
    pop_connection()
    
  def test_foo(self):
    """Bất cứ hàng đợi nào tạo ở đây sử dụng Redis cục bộ"""
    q = Queue()

## Hỗ trợ lính canh (Sentinel support)

Để sử dụng lính canh redis, bạn phải chỉ định một từ điển trong file cấu hình. Sử dụng cấu hình này cùng với ```systemd``` hoặc các docker containers với lựa chọn khởi động lại tự động cho phép các workers và RQ có một kết nối chịu đựng được lỗi tới redis

In [0]:
SENTINEL: {'INSTANCES':[('remote.host1.org', 26379), ('remote.host2.org', 26379), ('remote.host3.org', 26379)],
           'SOCKET_TIMEOUT': None,
           'PASSWORD': 'secret',
           'DB': 2,
           'MASTER_NAME': 'master'}


# Các ngoại lệ

Một công việc có thể thất bại bởi vì các ngoại lệ. Khi các workers chạy ngầm, làm thế nào bạn nhận được các thông báo về các ngoại lệ này

## Mặc định ```FailedJobRegistry```
Mạng lưới an toàn mặc định cho RQ là ```FAiledJobRegistry```. Mọi công việc không thực thi thành công đều được lưu ở đây, cùng với thông tin ngoại lệ (kiểu ngoại lệ, giá trị và traceback). Trong khi điều này làm cho không một công việc thất bại nào "bị lạc", không ai sử dụng nó để lấy thông báo một cách chủ động về sự thất bại của công việc

## Các xử lý ngoại lệ tùy chỉnh

RQ hỗ trợ việc đăng ký các xử lý ngoại lệ tùy chỉnh. Điều này cho phép ta có thể chèn vào logic xử lý lỗi của mình vào các workers.

Đó là cách bạn đăng ký một hay nhiều các xử lý ngoại lệ tùy chỉnh cho một worker

In [0]:
from exception_handlers import foo_handler, bar_handler
w = Worker([q], exception_handlers=[foo_handler, bar_handler])

Bộ xử lý là một hàm nhận vào các tham số sau: ```job```, ```exec_type```, ```exec_value``` và ```traceback```:

In [0]:
def my_handler(job, exec_type, exc_value, traceback):
  # thực hiện các thứ muốn tùy chỉnh ở đây
  # Ví dụ, viết thông tin ngoại lệ tới cơ sở dữ liệu

Bạn cũng có thể thấy 3 tham số ngoại lệ được viết như sau

In [0]:
def my_handler(job, *exc_info):
  # làm các thứ tùy chỉnh ở đây

In [0]:
from exception_handlers import foo_handler

w = Worker([q], exception_handlers=[foo_handler], disable_default_exception_handler=True)

## Nối các bộ xử lý ngoại lệ

Bộ xử lý ngoại lệ chính nó chịu trách nhiệm cho việc quyết định xem việc xử lý ngoại lệ được hoàn thành hay chưa, hoặc nên đi tiếp tới bộ xử lý tiếp theo trong ngăn xếp. Bộ xử lý ngoiaj lệ có thể ám chỉ điều này thông qua việc trả về một giá trị boolearn. ```False``` nghĩa là dừng xử lý các ngoại lệ, ```True``` nghĩa là tiếp tục và đi tới bộ xử lý ngoại lệ tiếp theo.

Mặc định rwangf bộ xử lý không có một giá trị trả về (do đó là ```None```), và được diễn giả là ```True``` (ví dụ tiếp tục chạy tới bộ xử lý tiếp theo)

Để ngăn bộ xử lý tiếp theo trong chuỗi xử lý ngoại lệ thực thi, sử dụng một bộ xử lý ngoại lệ tùy chỉnh, không cho nó đi tới ngoại lệ tiếp theo

In [0]:
def black_hole(job, *exc_info):
  return False