In [None]:
from ultralytics import YOLO
from time import strftime
import cv2
import time


# Шлях до відео та моделі
video_path = 'input_videos/fall85.mp4' #enter video path
output_time = str(strftime("%d.%m_%H-%M-%S"))
output_path = f'output_videos/{output_time}_result.mp4'

# Завантаження моделі
model = YOLO('runs/detect/train/weights/best.pt')

# Завантаження відео
cap = cv2.VideoCapture(video_path)

# Параметри відео
width  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps_input = cap.get(cv2.CAP_PROP_FPS)

# Створення файлу для відеозапису
save_video = False 
if save_video:
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps_input, (width, height))

# Кольори під класи
colors = {
    'normal': (0, 255, 0),  # Зелений
    'fall': (0, 0, 255)     # Червоний
}

# Теймери падіння по об'єктам (track_id: start_time)
fall_timers = {}
# Максимальни0 йчас до тривоги
fall_timer_duration = 20  # секунд  
# Перевірка надсилання повідомлення про тривогу (track_id: True/False)
alert_sent = {}            


# Основний цикл обробки кадрів
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    start_time = time.time()

    # Трекінг об'єктів
    results = model.track(source=frame, persist=True, conf=0.5, verbose=False)[0]

    # Відстеження ID об'єкта, чий статус на зараз 'fall'
    current_falls = set()

    # Створення боксів
    for box in results.boxes:
        cls_id = int(box.cls[0])
        label = model.names[cls_id]
        track_id = int(box.id[0]) if box.id is not None else None

        if track_id is None:
            continue  # Пропускає об'єкт без ID

        x1, y1, x2, y2 = map(int, box.xyxy[0])
        color = colors.get(label, (255, 255, 255)) # Білий за замовчуванням


        # ====================================== Обробка падіння ======================================
        label_text = f'ID{track_id}: {label}'

        if label == 'fall':
            current_falls.add(track_id)

            # Запуск таймера, якщо ще не запущено
            if track_id not in fall_timers:
                fall_timers[track_id] = time.time()

            elapsed = time.time() - fall_timers[track_id]

            # Відобразити таймер поруч з ID
            label_text += f'({int(elapsed)}s)'
            
             # Якщо час на таймері закінчився
            if elapsed >= fall_timer_duration:
                label_text += ' - WARNING!'
                if not alert_sent.get(track_id, False):
                    print(f"❗ Об'єкт ID №{track_id} знаходиться в тривалому падінні! Можлива травма.")
                    send_telegram_alert(track_id, elapsed, frame=frame.copy())
                    log_fall_event(track_id, fall_timer_duration)
                    alert_sent[track_id] = True
         # =============================================================================================      

        # Вставити bounding box і текст
        cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
        cv2.putText(frame, label_text, (x1, y1 - 10),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)

    # Видяляє таймер тим, хто більше не в положені fall
    active_ids = set(fall_timers.keys())
    for track_id in active_ids:
        if track_id not in current_falls:
            del fall_timers[track_id]


    # FPS
    end_time = time.time()
    fps = 1 / (end_time - start_time + 1e-6)
    fps_text = f'FPS: {fps:.2f}'
    cv2.putText(frame, fps_text, (10, 30),
                cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 2)

    # Трансляція відео на екран
    show_video = True
    if show_video:
        cv2.imshow('Fall Detection', frame)

    # Збараеження відео
    if save_video:
        out.write(frame)

    # Вихід з програми натисканням Esc
    if cv2.waitKey(1) & 0xFF == 27:
        break


# Звільнення ресурсів
cap.release()
if save_video:
    out.release()

# Закрити всі вікна OpenCV
cv2.destroyAllWindows()


# print(output_time)

In [None]:
import requests
 
# Значення токену та ID
bot_token = 'YOUR_TELEGRAM_BOT_TOKEN'
chat_id = 'YOUR_CHAT_ID'

def send_telegram_alert(track_id, elapsed_time, frame=None):
    message = (
        f"🚨 Виявлено довготривале падіння!\n"
        f"🧍 ID об'єкта: {track_id}\n"
        f"⏱ Час в падінні: {int(elapsed_time)} секунд\n"
        f"⚠️ Можлива травма!"
    )
    
    url = f'https://api.telegram.org/bot{bot_token}/sendPhoto'
    data = {
        'chat_id': chat_id,
        'caption': message
    }

    if frame is not None:
        _, img_encoded = cv2.imencode('.jpg', frame)
        files = {'photo': ('fall_frame.jpg', img_encoded.tobytes())}


    try:
        response = requests.post(url, files=files, data=data)
        if response.status_code == 200:
            print(f"[Telegram] ✅ Повідомлення відправлено!")        
        else:
            print(f"[Telegram] ❌ Помилка відправлення: {response.text}")
    except Exception as e:
        print(f"[Telegram] ⚠️ Помилка при підключенні: {e}")


In [5]:
from datetime import datetime

def log_fall_event(track_id, duration_seconds, log_file='critical falls.txt'):
    now = datetime.now()
    month_str = now.strftime("%B, %Y")
    day_time_str = now.strftime("%d.%m - %H:%M:%S")
    log_entry = f"{day_time_str} - ID об'єкта: {track_id} - тривалість падіння: {duration_seconds}с\n"

    with open(log_file, 'a', encoding='utf-8') as file:
        # Додати заголовок місяця, якщо його ще нема
        if month_str not in open(log_file, encoding='utf-8').read():
            file.write(f"\n{month_str}:\n")
        file.write(log_entry)