# Тестування підключеня до Redis

---
В даномій задачі Jupyter Notebooks  використовуються як інструменти тестування та моніторингу,
тому тестування підключння до всіх важдивих компонентів я необхідним кроком

In [1]:
import os
import json
import time

import redis
from rq import Queue


import utils.tasks

# Читаю параметри підключення до Redis з env змінних
irds_host = os.getenv('RDS_HOST')
irds_port = os.getenv('RDS_PORT')
irds_psw = os.getenv('RDS_PSW')
irdsq_outmsg = os.getenv('RDSQ_OUTMSG')

# для debug
#print(f"Прочита значення змінних підключеня до Redis {irds_host} {irds_port} {irds_psw} {irdsq_outmsg}")


## Підключаюся до  Redis

In [2]:
# Саме підключення
red = redis.StrictRedis(irds_host, irds_port, password=irds_psw, decode_responses=False)

# Пробую  Ping
rping=red.ping()
print( f"Статус підключення: {rping} ")
q = None
q_msg = None

# Аналізуємо значення ping
if rping:
    print("До Redis підключидлися успішно")
    q = Queue(connection=red)
    q_msg = Queue( name=irdsq_outmsg, connection=red)
    print(f"Задач у черзі {irdsq_outmsg}: {len(q_msg)}")
else:
    print(" НE ПІДКЛЮЧИЛИСЯ ДО REDIS!!!")    

Статус підключення: True 
До Redis підключидлися успішно
Задач у черзі 'video_queue': 2


## Пробуємо відправити повідмолення в Redis

In [10]:



body={
    "filename": "FFFFFF.txt",
    "fullpath": "home/pshdev/test/test"
}

print('Тіло запита в чергу: ' + json.dumps(  body ))
print("Запит буде відправлено в чергу " + q_msg.name)
job=q_msg.enqueue( utils.tasks.crttask_sendmsg, json.dumps(  body ))
print( f"Запит успішно відправлено! job_id = {job.id}" )
info = {
    "job_id": job.id,
    "status": job.get_status(),
    "created_at": str(job.created_at),
    "enqueued_at": str(job.enqueued_at),
    "origin": job.origin  # назва черги
}
print(json.dumps(info, indent=4))

Request body is: {"filename": "FFFFFF.txt", "fullpath": "home/pshdev/test/test"}
Send the body into queue 'video_queue'
Message sent into queue with job_id = eb85ae4d-f3c7-4498-91ef-f72acf76dd11
{
    "job_id": "eb85ae4d-f3c7-4498-91ef-f72acf76dd11",
    "status": "queued",
    "created_at": "2026-02-23 05:35:48.471919+00:00",
    "enqueued_at": "2026-02-23 05:35:48.472674+00:00",
    "origin": "'video_queue'"
}


## Отримати статус тільки що відпавелного  завдання в чергу

In [15]:
import time
# 1. Отримуємо ID завдання (з попередньої клітинки)
current_job_id = job.id 
print(f"Задач у черзі {irdsq_outmsg}: {len(q_msg)}")
print(f"Checking status for Job ID: {current_job_id}")
i=3
# 2. Цикл очікування результату
while i!=0:
    # Оновлюємо дані про завдання з Redis
    job.refresh()
    
    status = job.get_status()
    print(f"Current status: {status}")

    if status == 'finished':
        print("Done! Result:", job.result)
        break
    elif status == 'failed':
        print("Job failed! Check worker logs.")
        # Можна вивести помилку: print(job.exc_info)
        break
    i -= 1
    time.sleep(2) # Чекаємо 2 секунди перед наступною перевіркою

Задач у черзі 'video_queue': 2
Checking status for Job ID: eb85ae4d-f3c7-4498-91ef-f72acf76dd11
Current status: JobStatus.QUEUED
Current status: JobStatus.QUEUED
Current status: JobStatus.QUEUED


## Отримати глибину черги

In [10]:
# Підключення
red = redis.StrictRedis(irds_host, irds_port, password=irds_psw)
q_msg = Queue(name=irdsq_outmsg, connection=red)

# Список усіх ID завдань у черзі
queued_job_ids = q_msg.job_ids
print(f"ID завдань у черзі: {queued_job_ids}")

# Отримання самого об'єкта завдання за ID
if queued_job_ids:
    first_job = q_msg.fetch_job(queued_job_ids[0])
    print(f"Завдання {first_job.id} викликає функцію: {first_job.func_name}")

ID завдань у черзі: ['e2d4c29f-87d2-45b8-9709-9c1b7fdfc824', 'eb85ae4d-f3c7-4498-91ef-f72acf76dd11']
Завдання e2d4c29f-87d2-45b8-9709-9c1b7fdfc824 викликає функцію: utils.tasks.crttask_sendmsg
