# Yolo Import

In [1]:
# OpenCV 
import cv2
import numpy as np
import pandas as pd
# 파일 경로 읽어오기
from glob import glob
import time
# multi thread
import threading
# 이미지 가져오기
import os
from PIL import Image
from tensorflow.keras.preprocessing import image
# Sound 출력
from gtts import gTTS
from IPython.display import Audio
# warning 무시
import warnings
warnings.filterwarnings(action='ignore') 

# CNN & OCR Import

In [2]:
# tensorflow : CNN model
from tensorflow.keras.models import load_model
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import io
 # OCR Google Cloud api
from google.cloud import vision

In [3]:
# gTTS 음성파일 만들기
# kor_wav = gTTS('주행 불가능 지역입니다', lang = 'ko') 
# kor_wav.save('impossible.wav')

# kor_wav = gTTS('헬멧을 미착용하였습니다', lang = 'ko') 
# kor_wav.save('sound_file/no_helmet.wav')

In [4]:
#google cloud platform
# !pip install opencv-contrib-python
# !pip install --upgrade google-cloud-vision

# IP Webcam Web Crolling - 카메라 전환

In [5]:
#크롬 드라이버로 창 열기
# !pip install webdriver-manager
from selenium import webdriver
from webdriver_manager.chrome import ChromeDriverManager
from bs4 import BeautifulSoup
from selenium.webdriver.common.by import By

In [6]:
# ip 주소
url = "https://172.20.10.5:8080/video"
cap = cv2.VideoCapture(url)
fps = cap.get(cv2.CAP_PROP_FPS)  # fps = 25.0

In [7]:
driver = webdriver.Chrome(ChromeDriverManager().install())
driver.get(url.split('/video')[0])

#자세히 알아보기 클릭
btn1="#details-button"
driver.find_elements(By.CSS_SELECTOR,btn1)[0].click()

#안전하지 않음으로 이동 클릭
btn2="#proceed-link"
driver.find_elements(By.CSS_SELECTOR,btn2)[0].click()
#스피커 사용
# btn3="body > div.jumbotron > div.container > div > form > div:nth-child(3) > label.round_switch > span"
# driver.find_elements(By.CSS_SELECTOR,btn3)[0].click()
# 이부분은 양방향 음성을 On 하는 것인데, 소리가 울려서 일단 꺼둠
#허용 클릭! 해주세요

# Prepare : Yolo weight file, CNN Model load, OCR

In [8]:
# CNN 모델 로드
model = load_model('cnn_model/cnn_model_3class_final2.h5')

In [9]:
# 학습된 yolo weight파일과 cfg 파일 가져오기
net = cv2.dnn.readNet("Yolov4/yolov4-obj_1000.weights", "Yolov4/yolov4-obj.cfg") # Helmet : 0, No Helmet : 1

In [10]:
# obj.names : Helmet/No_Helmet
classes = []
with open("Yolov4/obj.names", "r") as f:
    classes = [line.strip() for line in f.readlines()]
layer_names = net.getLayerNames()
output_layers = [layer_names[i - 1] for i in net.getUnconnectedOutLayers()]
colors = np.random.uniform(0, 255, size=(len(classes), 3))
print(colors)

[[105.62274256   2.00640153 140.1098862 ]
 [195.73811954  32.26401563 247.49028541]]


In [11]:
# Use google API - json 파일 가져오기
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '개인 key'
 
client_options = {'api_endpoint': 'eu-vision.googleapis.com'}
client = vision.ImageAnnotatorClient(client_options=client_options)

In [12]:
# Empty capture folder
# CNN capture 폴더 처음에 비워줘야 한다.
for f in glob('capture/*.jpg'):
    os.remove(f)

# Yolov4 Define

In [13]:
# Yolo 헬멧인식 경고음
def sound():
    display(Audio('sound_file/no_helmet.wav', autoplay=True))

In [14]:
# CNN 도로인식 경고음
def sound2():
    display(Audio('sound_file/impossible.wav', autoplay=True))

# OCR Define

In [15]:
# OCR 어린이 보호구역 음성파일 만드는 함수
def get_narrator(text):
    tts = gTTS(text=text, lang='ko')
    filename = 'sound_file/narrator.mp3'
    tts.save(filename)
    
    display(Audio(filename, autoplay=True))
    
# get_narrator("어린이 보호구역입니다")

In [16]:
# OCR 인식중 '어린이'가 인식되는 경우 출력되는 함수
def get_text(path):
    result = []
    
    # Load image
    with io.open(path, 'rb') as f:
        content = f.read()
    
    # Get text from image
    image = vision.Image(content=content)
    response = client.text_detection(image=image)
    texts = response.text_annotations
    
    for text in texts[1:]:
        if text.description.isdigit():
            if int(text.description) in [x for x in range(20, 90, 10)]:
                result.append(text.description)
        elif text.description=='어린이':
            result.append(text.description)
            
    return result

In [17]:
def alert_speed_limit(texts):
    message = []
    digit_boolean = list(map(lambda x: x.isdigit(), texts))
    if np.all(digit_boolean):  # 모두 숫자
        for text in texts:
            message.append(f"제한속도 시속 {text}km 구간입니다")
    elif np.any(digit_boolean):  # '어린이' 하나 이상 포함
        for text in np.array(texts)[digit_boolean]:
            message.append(f"어린이 보호구역입니다. 시속 {text}km로 주행하세요")
    else:
        message.append("어린이 보호구역입니다")
        
    for m in message:
        get_narrator(m)  # tts 음성 파일 함수 호출

# CNN Define

In [18]:
# CNN test 이미지 전처리 후 predict하는 함수
def predict_road(model):
    df = pd.DataFrame({'image_path': glob('capture/*.jpg')})
    
    test_datagen = ImageDataGenerator(rescale=1/255.)
    test_generator = test_datagen.flow_from_dataframe(
        df,
        x_col='image_path',
        y_col='None',
        target_size=(250, 250),
        class_mode=None,
        shuffle=False,
    )
    
    pred = model.predict_generator(test_generator)
    
    return np.argmax(pred, axis=-1)

# image.c 파일 열어서 validation filename print 주석 처리하여 안뜨게 함
# C:\Users\tjdbs\anaconda3\Lib\site-packages\keras\preprocessing

In [19]:
# CNN 도로인식 함수
def CNN():    
    global C
#     flag = False
    passibility = "Passibility"
    color = (255, 255, 255)

    cap = cv2.VideoCapture(url)

    fps = cap.get(cv2.CAP_PROP_FPS)  # fps = 25.0

    for f in glob('capture/*.jpg'):
        os.remove(f)

    C=[]
    tmp=[]
    while(True):
        camera, frame = cap.read()
        resize_frame = cv2.flip(frame, 0)
        resize_frame = cv2.flip(frame, 1)

        
        if resize_frame is not None:

            # Capture 5 images per 1 second - 1초에 5번 도로 캡쳐
            if (int(cap.get(1)) % 5 == 0):
                # Save image as 001 ~ 005.jpg
                filename = f'{str(len(os.listdir("capture"))%5 + 1).zfill(3)}.jpg'
                cv2.imwrite(f'capture/{filename}', resize_frame)

                # Get text from image (per 5 seconds)
                if (int(cap.get(1)) % 125 == 0):  # 5 * fps
                    if get_text(f'capture/{filename}'):
                        alert_speed_limit(get_text(f'capture/{filename}'))  # OCR

            if len(os.listdir('capture')) >= 5:
                pred = predict_road(model)

                # 삼중분류 CNN 모델
                if np.all(pred == 1):   # 차도 =1
                    passibility = "Passible"    
                    color = (255, 0, 0)
                elif np.all(pred == 2): # 인도 = 2
                    passibility = "Impassible"   # 인도 주행 불가
                    color = (0, 0, 255)
                    C.append(np.all(pred == 2))  # 주행 불가능 횟수를 C에 append
                else:                   # 자전거도로 = 0
                    passibility = "Passible"
                    color = (255, 0, 0)
                    
                for f in glob('capture/*.jpg'):
                    os.remove(f)

            cv2.putText(resize_frame, passibility, (50, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2)
            cv2.imshow("Frame", resize_frame)

            # Detect impassible road more than 5 times -> sound()
            if (C.count(1) > 0) and (C.count(1) % 8 == 0):  # 변수 C에는 불법주행횟수가 카운트되어 누적된다.
                if C.count(1) not in tmp:
                    tmp.append(C.count(1))
                    # C의 값이 8의 배수이고 0보다 클 때, 경고음을 출력하도록 구현
                    print("불법 주행 인식 : " + str(C.count(1)))
                    sound2() # 불법주행도로 경고음

        # To finish program(close windows), press q                     
        q = cv2.waitKey(1)
        if q == ord("q"):
            break
    cv2.destroyAllWindows()

# main

In [21]:
url = "https://172.20.10.5:8080/video"
cap = cv2.VideoCapture(url)
fps = cap.get(cv2.CAP_PROP_FPS)  # fps = 25.0

cv2.namedWindow('camera_screenshot', cv2.WINDOW_AUTOSIZE)
color = (255, 255, 255)

while True:
    camera, frame = cap.read()
    
    if not camera:
        print("Can't read camera")
        break
    
    if int(cap.get(1) % fps == 0):
        cv2.imwrite('face.jpg', frame)
        
        img = cv2.imread('face.jpg')
        height, width, channels = img.shape
        blob = cv2.dnn.blobFromImage(img, 0.00392, (250, 250), (0, 0, 0), True, crop=False)
        net.setInput(blob)
        outs = net.forward(output_layers)
        
        class_ids = []
        for out in outs:
            for detection in out:
                scores = detection[5:]
                class_id = np.argmax(scores)
                confidence = scores[class_id]
                if confidence > 0.5:
                    class_ids.append(class_id)

        if class_ids.count(0) >= 1:
            print('Helmet detected!!')
            time.sleep(0.3)
            cv2.destroyAllWindows()
            
            #카메라 전환 버튼
            btn_sw="#img_ffc_current"
            driver.find_elements(By.CSS_SELECTOR,btn_sw)[0].click()

            # CNN 모델
            CNN()
            break
            
        else:
            cv2.putText(frame, 'Please wear a helmet..', (50, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2)
            cv2.imshow("camera_screenshot", frame)
            
            if int(cap.get(1) % 125 == 0):
                print("헬멧 미착용")
                sound()
    
    cv2.putText(frame, 'Please wear a helmet..', (50, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2)
    cv2.imshow("camera_screenshot", frame)
                                                            
    # 강제종료 q 키
    if cv2.waitKey(1) == ord('q'):
        break
        
cv2.destroyAllWindows()

Helmet detected!!
불법 주행 인식 : 8


불법 주행 인식 : 16


불법 주행 인식 : 24


In [20]:
#카메라 전환 버튼
btn_sw="#img_ffc_current"
driver.find_elements(By.CSS_SELECTOR,btn_sw)[0].click()