In [1]:
import requests
import gradio as gr
from matplotlib import pyplot as plt
from PIL import Image, ImageDraw, ImageFont
from io import BytesIO
from loguru import logger
import cv2
import numpy as np
import io
from azure.cognitiveservices.vision.customvision.prediction import CustomVisionPredictionClient
from msrest.authentication import ApiKeyCredentials
import time

ENDPOINT = "https://team10eighticustomvision-prediction.cognitiveservices.azure.com/"
PREDICTION_KEY = "9FRZbiwBubFIcSZ1k88tCTCskOAZwMMAMvnFLuVJ26tlU0V0fsqjJQQJ99ALACYeBjFXJ3w3AAAIACOGOj1S"
PROJECT_ID = "4218ecac-688a-422b-9e14-2726b938f67c"
PUBLISHED_NAME = "Iteration8"
 
# Initialize the prediction client
credentials = ApiKeyCredentials(in_headers={"Prediction-key": PREDICTION_KEY})
predictor = CustomVisionPredictionClient(endpoint=ENDPOINT, credentials=credentials)

# 역딕셔너리 생성 (값 -> 키 변환용)
hangul_dict = {"Giyeok": "ㄱ", "Nieun": "ㄴ", "Digeut": "ㄷ", "Rieul": "ㄹ", "Mieum": "ㅁ", "Bieup": "ㅂ", "Siot": "ㅅ",
        "Ieung": "ㅇ", "Jieut": "ㅈ", "Chieut": "ㅊ", "Kieuk": "ㅋ", "Tieut": "ㅌ", "Pieup": "ㅍ", "Hieut": "ㅎ",
        "A": "ㅏ", "Ya": "ㅑ", "Eo": "ㅓ", "Yeo": "ㅕ", "O": "ㅗ", "Yo": "ㅛ", "U": "ㅜ", "Yu": "ㅠ", "Eu": "ㅡ",
        "Yi": "ㅣ", "Ae": "ㅐ", "Yae": "ㅒ", "E": "ㅔ", "Ye": "ㅖ", "Oe": "ㅚ", "Wi": "ㅟ", "Ui": "ㅢ"}

reverse_hangul_dict = {value: key for key, value in hangul_dict.items()}

def split_hangul(word):
    # 한글 초성, 중성, 종성 리스트
    initial_consonants = ['ㄱ', 'ㄲ', 'ㄴ', 'ㄷ', 'ㄸ', 'ㄹ', 'ㅁ', 'ㅂ', 'ㅃ', 'ㅅ', 'ㅆ', 'ㅇ', 'ㅈ', 'ㅉ', 'ㅊ', 'ㅋ', 'ㅌ', 'ㅍ', 'ㅎ']
    medial_vowels = ['ㅏ', 'ㅐ', 'ㅑ', 'ㅒ', 'ㅓ', 'ㅔ', 'ㅕ', 'ㅖ', 'ㅗ', 'ㅘ', 'ㅙ', 'ㅚ', 'ㅛ', 'ㅜ', 'ㅝ', 'ㅞ', 'ㅟ', 'ㅠ', 'ㅡ', 'ㅢ', 'ㅣ']
    final_consonants = ['', 'ㄱ', 'ㄲ', 'ㄳ', 'ㄴ', 'ㄵ', 'ㄶ', 'ㄷ', 'ㄹ', 'ㄺ', 'ㄻ', 'ㄼ', 'ㄽ', 'ㄾ', 'ㄿ', 'ㅀ', 'ㅁ', 'ㅂ', 'ㅄ', 'ㅅ', 'ㅆ', 'ㅇ', 'ㅈ', 'ㅊ', 'ㅋ', 'ㅌ', 'ㅍ', 'ㅎ']
 
    separated = []
    for char in word:
        if '가' <= char <= '힣':  # 한글 음절인지 확인
            code = ord(char) - ord('가')
            initial = code // (21 * 28)  # 초성
            medial = (code % (21 * 28)) // 28  # 중성
            final = code % 28  # 종성
            separated.append(initial_consonants[initial])  # 초성 추가
            separated.append(medial_vowels[medial])       # 중성 추가
            if final_consonants[final]:                   # 종성이 있다면 추가
                separated.append(final_consonants[final])
        else:
            separated.append(char)  # 한글이 아닌 문자는 그대로 추가
 
    return separated

def draw_boxes(image, predictions):
    """가장 확률이 높은 객체만 경계 상자를 그리도록 수정"""
    img = image.copy()
    
    # 예측 결과 중 확률이 가장 높은 하나를 선택
    if predictions:
        highest_prediction = max(predictions, key=lambda p: p.probability)
        
        # if first_char_is_inprogress:
        logger.info(str(highest_prediction.tag_name) + " " + str(highest_prediction.probability))
            # if highest_prediction.tag_name == first_char and highest_prediction.probability > 0.7:
                # global first_char_succeed
                # first_char_succeed = True
                # print("First character detected successfully!")
        
        # 확률이 0.5 이상인 객체만 선택
        if highest_prediction.probability > 0.5:
            color = (255, 0, 0)  # 경계 상자 색상 (빨간색)
            box = highest_prediction.bounding_box
            left = int(box.left * img.shape[1])
            top = int(box.top * img.shape[0])
            width = int(box.width * img.shape[1])
            height = int(box.height * img.shape[0])

            # 경계 상자 그리기
            cv2.rectangle(img, 
                        (left, top), 
                        (left + width, top + height), 
                        color, 
                        2)
            
            # 라벨과 확률 텍스트 추가
            label = f"{highest_prediction.tag_name}: {highest_prediction.probability:.2f}"
            cv2.putText(img, 
                        label, 
                        (left, top - 10), 
                        cv2.FONT_HERSHEY_SIMPLEX, 
                        0.5, 
                        color, 
                        2)
    
    return img, highest_prediction.tag_name

def click_sign_send(sign_word):
    split_sign = split_hangul(sign_word)
    logger.info(sign_word + " " + str(split_sign))

    gallery_images = []
    sign_confirmed = []

    for sign in split_sign:
        sign_file_name = reverse_hangul_dict[sign]
        gallery_images.append(f"./images/Basic/{sign_file_name}.jpg")
        sign_confirmed.append(False)
    
    return gr.update(visible=False), gr.update(visible=True), gallery_images, sign_confirmed

#################################
# 카메라 인식 부분
#################################
def process_frame(frame, sign_word, sign_confirmed):
    if frame is None:
        return None
    
    # Convert frame to PIL Image
    pil_image = Image.fromarray(frame)
    
    # Save to bytes for Azure API
    img_byte_arr = io.BytesIO()
    pil_image.save(img_byte_arr, format='PNG')
    img_byte_arr = img_byte_arr.getvalue()

    try:
        # Get predictions from Azure Custom Vision
        results = predictor.detect_image(PROJECT_ID, PUBLISHED_NAME, img_byte_arr)
        
        # Draw boxes on frame
        annotated_frame, tag_name = draw_boxes(frame, results.predictions)

        gallery_images = []

        split_sign = split_hangul(sign_word)
        confirmed_list = list(sign_confirmed)

        i = 0
        for sign in split_sign:
            sign_file_name = reverse_hangul_dict[sign]
            sign_confirmed = confirmed_list[i]

            if sign_confirmed:
                gallery_images.append(f"./images/Correct/{sign_file_name}.jpg") 
            else:
                if sign_file_name == tag_name:
                    gallery_images.append(f"./images/Correct/{sign_file_name}.jpg") 
                    confirmed_list[i] = True
                else :    
                    gallery_images.append(f"./images/Basic/{sign_file_name}.jpg") 
            i = i+1

        return annotated_frame, gallery_images, str(confirmed_list)
    
    except Exception as e:
        logger.error(f"Error during prediction: {e}")
        return frame, []


def request_gpt(user_text, temperature=0.7, top_p=0.95, max_tokens=800):

    # endpoint
    # method
    # header
    # payload

    api_base="https://team10-eighti.openai.azure.com" 
    deployment_id="gpt-4o"  
    endpoint = f"{api_base}/openai/deployments/{deployment_id}/chat/completions?api-version=2024-08-01-preview"
    api_key = '1BajXTI5Mp0tKrs46XFGuOWjSPXKzOZSKy8e6R3qha1SVQ4lz1PFJQQJ99ALACYeBjFXJ3w3AAABACOGgPvw'
    
    search_endpoint="https://team10-eighti-search.search.windows.net"
    search_key = 'wnalAsW6FqKRHIR6S3sUZGzNH28Lf3sBOS2ubCZsZxAzSeA205k3'
    search_index="sign-index"
    semantic_name = "sign-semantic"

    method = requests.post
    headers = {
        "Content-Type": "application.json",
        "api-key": api_key
    }
    payload = {
        "messages": [
            {
                "role": "system",
                "content": "당신은 사용자가 수화 정보를 찾는데 도움을 주는 수화 AI 도우미입니다."
            },
            {
                "role": "user",
                "content": user_text
            }
        ],
        "temperature": temperature,
        "top_p": top_p,
        "frequency_penalty": 0,
        "presence_penalty": 0,
        "max_tokens": max_tokens,
        "stop": None,
        "data_sources": [
            {
                "type": "azure_search",
                "parameters": {
                    "endpoint": search_endpoint,
                    "index_name": search_index,
                    "semantic_configuration": semantic_name,
                    "query_type": "semantic",
                    "fields_mapping": {},
                    "filter": None,
                    "top_n_documents": 5,
                    "authentication": {
                        "type": "api_key",
                        "key": search_key
                    }
                }
            }
        ]
    }

    response = requests.post(endpoint, headers=headers, json=payload)
    # print(response.status_code)
    # print(response.text)

    if response.status_code == 200:
        response_json = response.json()
        content_text = response_json['choices'][0]['message']['content']
        logger.info(f"채팅 답변 = {content_text}")

        # citations = response_json['choices'][0]['message']['context']['citations'][0]['content'].split('\n')
        citations = response_json['choices'][0]['message']['context']['citations']
        if len(citations) > 0:
            logger.info(f"citations = {citations}")
            citations_movie = get_citations(citations, "mp4")
            citations_images = get_citations(citations, "jpg").split(' ')
            logger.info(f"citations_movie = {citations_movie}")
            logger.info(f"citations_images = {citations_images}")
            citations_images_first = None
            citations_images_second = None

            if (len(citations_images) > 0 ):
                citations_images_first = get_image_url(citations_images[0])

                print(citations_images_first)
                if (len(citations_images) > 1 ):
                    citations_images_second = citations_images[1]
        else:
            logger.info(f"citations(empty) = {citations}")
            
            citations_movie = "No Video"
            citations_images_first = None
            citations_images_second = None

        return content_text, citations_movie, citations_images_first, citations_images_second
    else:
        return "", "No Video", None, None

def get_citations(citations="", extentions="mp4"):
    logger.info(f"****get_citations = {citations}")
    if len(citations) > 0 and len(citations[0]) > 0 and len(citations[0]['content']) > 0:
        contents = citations[0]['content'].split('\n')

        for row in contents:
            row_list = row.split('.')
            if row_list[-1] == extentions:
                print("extentions=", extentions, "|row=",  row)
                return row

    return ""

def get_image_url(url):

    if not url.strip():
        return None  # 빈값 처리
    try:
        response = requests.get(url)
        response.raise_for_status()  # HTTP 에러 확인
        img = Image.open(BytesIO(response.content))  # 이미지를 메모리에서 읽음
        return img
    except Exception as e:
        return f"Error fetching image: {str(e)}"

def click_send(prompt, histories):
    # history_list = get_history_messages(histories=histories)
    # response_text, citation_html = request_gpt(prompt, history_list)
    response_text, citations_movie, citations_images_first, citations_images_second = request_gpt(prompt)
    histories.append((prompt, response_text))
    return histories, "", citations_movie, citations_images_first, citations_images_second, gr.update(visible=True), gr.update(visible=False)

with gr.Blocks() as demo:
    # 두 개의 상태
    col1_state = gr.State(value=True)  # Column 1 초기 visible 상태
    col2_state = gr.State(value=False)  # Column 2 초기 visible 상태

    with gr.Row():
        with gr.Column():
            chatbot = gr.Chatbot(label='채팅 기록')
            with gr.Row():
                input_textbox = gr.Textbox(label="", scale=7)
                send_button = gr.Button("전송", scale=1)

        # citation = gr.HTML(label='참조')
        with gr.Column():
            with gr.Row():
                sign_input_textbox = gr.Textbox(label="지문자 입력 (2글자 단어)", value="소맥", scale=7)
                sign_confirmed_textbox = gr.Textbox(show_label=False, visible=True)
                toggle_button = gr.Button("지문자 확인", scale=1)

            with gr.Column(visible=True) as column1:
                videio = gr.Video(label="Video Player", autoplay=True)
                with gr.Row():
                    image_first = gr.Image(scale=0.3)
                    image_second = gr.Image(scale=0.3)

            with gr.Column(visible=False) as column2:
                with gr.Row():  # Row layout for input and output
                    gallery = gr.Gallery(columns=[6], rows=[1], show_label=False, show_share_button=False, show_download_button=False, interactive=False, show_fullscreen_button=False, height=130)
                with gr.Row():  # Row layout for input and output
                    with gr.Column():  # Input webcam column
                        webcam_input = gr.Image(sources="webcam", streaming=True, mirror_webcam=True, label="Webcam")
                    with gr.Column():  # Output display column
                        output = gr.Image(label="Detected Objects")

                # Set the process_frame function as the update function for webcam input
                webcam_input.stream(process_frame, inputs=[webcam_input, sign_input_textbox, sign_confirmed_textbox], outputs=[output, gallery, sign_confirmed_textbox])
                
                demo.title = "Azure Custom Vision Object Detection"
                demo.description = "Real-time object detection using Azure Custom Vision"

    input_textbox.submit(fn=click_send, inputs=[input_textbox, chatbot], outputs=[chatbot, input_textbox, videio, image_first, image_second, column1, column2])
    send_button.click(fn=click_send, inputs=[input_textbox, chatbot], outputs=[chatbot, input_textbox, videio, image_first, image_second, column1, column2])
    toggle_button.click(fn=click_sign_send, inputs=[sign_input_textbox], outputs=[column1, column2, gallery, sign_confirmed_textbox])

# content_text, citations_list = request_gpt("된장찌개가 수화로 어떻게 돼?", temperature=0.7, top_p=0.95, max_tokens=800)

# print(content_text)
# print(citations_list)

demo.launch(share=False)

  from .autonotebook import tqdm as notebook_tqdm


* Running on local URL:  http://127.0.0.1:7863

To create a public link, set `share=True` in `launch()`.




In [2]:
import requests
import gradio as gr
from matplotlib import pyplot as plt
from PIL import Image, ImageDraw, ImageFont
from io import BytesIO
from loguru import logger
import cv2
import numpy as np
import io
from azure.cognitiveservices.vision.customvision.prediction import CustomVisionPredictionClient
from msrest.authentication import ApiKeyCredentials
import time


ENDPOINT = "https://team10eighticustomvision-prediction.cognitiveservices.azure.com/"
PREDICTION_KEY = "9FRZbiwBubFIcSZ1k88tCTCskOAZwMMAMvnFLuVJ26tlU0V0fsqjJQQJ99ALACYeBjFXJ3w3AAAIACOGOj1S"
PROJECT_ID = "4218ecac-688a-422b-9e14-2726b938f67c"
PUBLISHED_NAME = "Iteration8"
 
# Initialize the prediction client
credentials = ApiKeyCredentials(in_headers={"Prediction-key": PREDICTION_KEY})
predictor = CustomVisionPredictionClient(endpoint=ENDPOINT, credentials=credentials)

# 역딕셔너리 생성 (값 -> 키 변환용)
hangul_dict = {"Giyeok": "ㄱ", "Nieun": "ㄴ", "Digeut": "ㄷ", "Rieul": "ㄹ", "Mieum": "ㅁ", "Bieup": "ㅂ", "Siot": "ㅅ",
        "Ieung": "ㅇ", "Jieut": "ㅈ", "Chieut": "ㅊ", "Kieuk": "ㅋ", "Tieut": "ㅌ", "Pieup": "ㅍ", "Hieut": "ㅎ",
        "A": "ㅏ", "Ya": "ㅑ", "Eo": "ㅓ", "Yeo": "ㅕ", "O": "ㅗ", "Yo": "ㅛ", "U": "ㅜ", "Yu": "ㅠ", "Eu": "ㅡ",
        "Yi": "ㅣ", "Ae": "ㅐ", "Yae": "ㅒ", "E": "ㅔ", "Ye": "ㅖ", "Oe": "ㅚ", "Wi": "ㅟ", "Ui": "ㅢ"}

reverse_hangul_dict = {value: key for key, value in hangul_dict.items()}

def split_hangul(word):
    # 한글 초성, 중성, 종성 리스트
    initial_consonants = ['ㄱ', 'ㄲ', 'ㄴ', 'ㄷ', 'ㄸ', 'ㄹ', 'ㅁ', 'ㅂ', 'ㅃ', 'ㅅ', 'ㅆ', 'ㅇ', 'ㅈ', 'ㅉ', 'ㅊ', 'ㅋ', 'ㅌ', 'ㅍ', 'ㅎ']
    medial_vowels = ['ㅏ', 'ㅐ', 'ㅑ', 'ㅒ', 'ㅓ', 'ㅔ', 'ㅕ', 'ㅖ', 'ㅗ', 'ㅘ', 'ㅙ', 'ㅚ', 'ㅛ', 'ㅜ', 'ㅝ', 'ㅞ', 'ㅟ', 'ㅠ', 'ㅡ', 'ㅢ', 'ㅣ']
    final_consonants = ['', 'ㄱ', 'ㄲ', 'ㄳ', 'ㄴ', 'ㄵ', 'ㄶ', 'ㄷ', 'ㄹ', 'ㄺ', 'ㄻ', 'ㄼ', 'ㄽ', 'ㄾ', 'ㄿ', 'ㅀ', 'ㅁ', 'ㅂ', 'ㅄ', 'ㅅ', 'ㅆ', 'ㅇ', 'ㅈ', 'ㅊ', 'ㅋ', 'ㅌ', 'ㅍ', 'ㅎ']
 
    separated = []
    for char in word:
        if '가' <= char <= '힣':  # 한글 음절인지 확인
            code = ord(char) - ord('가')
            initial = code // (21 * 28)  # 초성
            medial = (code % (21 * 28)) // 28  # 중성
            final = code % 28  # 종성
            separated.append(initial_consonants[initial])  # 초성 추가
            separated.append(medial_vowels[medial])       # 중성 추가
            if final_consonants[final]:                   # 종성이 있다면 추가
                separated.append(final_consonants[final])
        else:
            separated.append(char)  # 한글이 아닌 문자는 그대로 추가
 
    return separated

def draw_boxes(image, predictions):
    """가장 확률이 높은 객체만 경계 상자를 그리도록 수정"""
    img = image.copy()
    
    # 예측 결과 중 확률이 가장 높은 하나를 선택
    if predictions:
        highest_prediction = max(predictions, key=lambda p: p.probability)
        
        # if first_char_is_inprogress:
        # logger.info(str(highest_prediction.tag_name) + " " + str(highest_prediction.probability))
            # if highest_prediction.tag_name == first_char and highest_prediction.probability > 0.7:
                # global first_char_succeed
                # first_char_succeed = True
                # print("First character detected successfully!")
        
        # 확률이 0.5 이상인 객체만 선택
        if highest_prediction.probability > 0.5:
            color = (255, 0, 0)  # 경계 상자 색상 (빨간색)
            box = highest_prediction.bounding_box
            left = int(box.left * img.shape[1])
            top = int(box.top * img.shape[0])
            width = int(box.width * img.shape[1])
            height = int(box.height * img.shape[0])

            # 경계 상자 그리기
            cv2.rectangle(img, 
                        (left, top), 
                        (left + width, top + height), 
                        color, 
                        2)
            
            # 라벨과 확률 텍스트 추가
            label = f"{highest_prediction.tag_name}: {highest_prediction.probability:.2f}"
            cv2.putText(img, 
                        label, 
                        (left, top - 10), 
                        cv2.FONT_HERSHEY_SIMPLEX, 
                        0.5, 
                        color, 
                        2)
    
    return img, highest_prediction.tag_name

def click_sign_send(sign_word):
    split_sign = split_hangul(sign_word)
    logger.info(sign_word + " " + str(split_sign))

    gallery_images = []
    sign_confirmed = []

    for sign in split_sign:
        sign_file_name = reverse_hangul_dict[sign]
        gallery_images.append(f"./images/Basic/{sign_file_name}.jpg")
        sign_confirmed.append(False)
    
    return gr.update(visible=False), gr.update(visible=True), gallery_images, sign_confirmed

#################################
# 카메라 인식 부분
#################################
def process_frame(frame, gallery_origin_images, sign_word, sign_confirmed):
    if frame is None:
        return None
    
    # Convert frame to PIL Image
    pil_image = Image.fromarray(frame)
    
    # Save to bytes for Azure API
    img_byte_arr = io.BytesIO()
    pil_image.save(img_byte_arr, format='PNG')
    img_byte_arr = img_byte_arr.getvalue()

    try:
        # Get predictions from Azure Custom Vision
        results = predictor.detect_image(PROJECT_ID, PUBLISHED_NAME, img_byte_arr)
        
        # Draw boxes on frame
        annotated_frame, tag_name = draw_boxes(frame, results.predictions)

        gallery_images = []

        split_sign = split_hangul(sign_word)
        # logger.info(sign_confirmed)
        sign_confirmed = sign_confirmed.replace("[","").replace("]", "")
        # logger.info(sign_confirmed)

        sign_confirmed_list = sign_confirmed.split(',')
        sign_confirmed_str_list = []

        for sign_confirmed in sign_confirmed_list:
            sign_confirmed = sign_confirmed.replace("'","").replace('"', "").strip()
            sign_confirmed_str_list.append(sign_confirmed)
            # logger.info(sign_confirmed)

        logger.info(sign_confirmed_str_list)

        # for confirmed in sign_confirmed_str_list:
        #     if confirmed == "True":
        #         logger.info(str(sign_confirmed_str_list))

        confirmed_result= []
        for i in range(len(split_sign)):
            confirmed_result.append("False")


        i = 0
        for sign in split_sign:
            sign_file_name = reverse_hangul_dict[sign]
            confirmed = (sign_confirmed_str_list[i] == 'True')
            # logger.info(confirmed)            
            if confirmed:
                gallery_images.append(f"./images/Correct/{sign_file_name}.jpg") 
                confirmed_result[i] = "True"
            else:
                if sign_file_name == tag_name:
                    gallery_images.append(f"./images/Correct/{sign_file_name}.jpg") 
                    confirmed_result[i] = "True"
                else :    
                    gallery_images.append(f"./images/Basic/{sign_file_name}.jpg") 
                    confirmed_result[i] = "False"
            i = i+1

        return annotated_frame, gallery_images, str(confirmed_result)
    
    except Exception as e:
        logger.error(f"Error during prediction: {e}")
        return frame, gallery_origin_images, sign_confirmed


def request_gpt(user_text, temperature=0.7, top_p=0.95, max_tokens=800):

    # endpoint
    # method
    # header
    # payload

    api_base="https://team10-eighti.openai.azure.com" 
    deployment_id="gpt-4o"  
    endpoint = f"{api_base}/openai/deployments/{deployment_id}/chat/completions?api-version=2024-08-01-preview"
    api_key = '1BajXTI5Mp0tKrs46XFGuOWjSPXKzOZSKy8e6R3qha1SVQ4lz1PFJQQJ99ALACYeBjFXJ3w3AAABACOGgPvw'
    
    search_endpoint="https://team10-eighti-search.search.windows.net"
    search_key = 'wnalAsW6FqKRHIR6S3sUZGzNH28Lf3sBOS2ubCZsZxAzSeA205k3'
    search_index="sign-index"
    semantic_name = "sign-semantic"

    method = requests.post
    headers = {
        "Content-Type": "application.json",
        "api-key": api_key
    }
    payload = {
        "messages": [
            {
                "role": "system",
                "content": "당신은 사용자가 수화 정보를 찾는데 도움을 주는 수화 AI 도우미입니다."
            },
            {
                "role": "user",
                "content": user_text
            }
        ],
        "temperature": temperature,
        "top_p": top_p,
        "frequency_penalty": 0,
        "presence_penalty": 0,
        "max_tokens": max_tokens,
        "stop": None,
        "data_sources": [
            {
                "type": "azure_search",
                "parameters": {
                    "endpoint": search_endpoint,
                    "index_name": search_index,
                    "semantic_configuration": semantic_name,
                    "query_type": "semantic",
                    "fields_mapping": {},
                    "filter": None,
                    "top_n_documents": 5,
                    "authentication": {
                        "type": "api_key",
                        "key": search_key
                    }
                }
            }
        ]
    }

    response = requests.post(endpoint, headers=headers, json=payload)
    # print(response.status_code)
    # print(response.text)

    if response.status_code == 200:
        response_json = response.json()
        content_text = response_json['choices'][0]['message']['content']
        logger.info(f"채팅 답변 = {content_text}")

        # citations = response_json['choices'][0]['message']['context']['citations'][0]['content'].split('\n')
        citations = response_json['choices'][0]['message']['context']['citations']
        if len(citations) > 0:
            logger.info(f"citations = {citations}")
            citations_movie = get_citations(citations, "mp4")
            citations_images = get_citations(citations, "jpg").split(' ')
            logger.info(f"citations_movie = {citations_movie}")
            logger.info(f"citations_images = {citations_images}")
            citations_images_first = None
            citations_images_second = None

            if (len(citations_images) > 0 ):
                citations_images_first = get_image_url(citations_images[0])

                print(citations_images_first)
                if (len(citations_images) > 1 ):
                    citations_images_second = citations_images[1]
        else:
            logger.info(f"citations(empty) = {citations}")
            
            citations_movie = "No Video"
            citations_images_first = None
            citations_images_second = None

        return content_text, citations_movie, citations_images_first, citations_images_second
    else:
        return "", "No Video", None, None

def get_citations(citations="", extentions="mp4"):
    logger.info(f"****get_citations = {citations}")
    if len(citations) > 0 and len(citations[0]) > 0 and len(citations[0]['content']) > 0:
        contents = citations[0]['content'].split('\n')

        for row in contents:
            row_list = row.split('.')
            if row_list[-1] == extentions:
                print("extentions=", extentions, "|row=",  row)
                return row

    return ""

def get_image_url(url):

    if not url.strip():
        return None  # 빈값 처리
    try:
        response = requests.get(url)
        response.raise_for_status()  # HTTP 에러 확인
        img = Image.open(BytesIO(response.content))  # 이미지를 메모리에서 읽음
        return img
    except Exception as e:
        return f"Error fetching image: {str(e)}"

def click_send(prompt, histories):
    # history_list = get_history_messages(histories=histories)
    # response_text, citation_html = request_gpt(prompt, history_list)
    response_text, citations_movie, citations_images_first, citations_images_second = request_gpt(prompt)
    histories.append((prompt, response_text))
    return histories, "", citations_movie, citations_images_first, citations_images_second, gr.update(visible=True), gr.update(visible=False)

with gr.Blocks() as demo:
    
    with gr.Row():
        with gr.Column():
            chatbot = gr.Chatbot(label='채팅 기록')
            with gr.Row():
                input_textbox = gr.Textbox(label="", scale=7)
                send_button = gr.Button("전송", scale=1)

        # citation = gr.HTML(label='참조')
        with gr.Column():
            with gr.Row():
                sign_input_textbox = gr.Textbox(label="지문자 입력 (2글자 단어)", value="소맥", scale=7)
                sign_confirmed_textbox = gr.Textbox(show_label=False, visible=False)
                toggle_button = gr.Button("지문자 확인", scale=1)

            with gr.Column(visible=True) as column1:
                videio = gr.Video(label="Video Player", autoplay=True)
                with gr.Row():
                    image_first = gr.Image(scale=0.3)
                    image_second = gr.Image(scale=0.3)

            with gr.Column(visible=False) as column2:
                with gr.Row():  # Row layout for input and output
                    gallery = gr.Gallery(columns=[6], rows=[1], show_label=False, show_share_button=False, show_download_button=False, interactive=False, show_fullscreen_button=False, height=130)
                with gr.Row():  # Row layout for input and output
                    with gr.Column():  # Input webcam column
                        webcam_input = gr.Image(sources="webcam", streaming=True, mirror_webcam=True, label="Webcam")
                    with gr.Column():  # Output display column
                        output = gr.Image(label="Detected Objects")

                # Set the process_frame function as the update function for webcam input
                webcam_input.stream(process_frame, inputs=[webcam_input, gallery, sign_input_textbox, sign_confirmed_textbox], outputs=[output, gallery, sign_confirmed_textbox])
                
                demo.title = "Azure Custom Vision Object Detection"
                demo.description = "Real-time object detection using Azure Custom Vision"

    input_textbox.submit(fn=click_send, inputs=[input_textbox, chatbot], outputs=[chatbot, input_textbox, videio, image_first, image_second, column1, column2])
    send_button.click(fn=click_send, inputs=[input_textbox, chatbot], outputs=[chatbot, input_textbox, videio, image_first, image_second, column1, column2])
    toggle_button.click(fn=click_sign_send, inputs=[sign_input_textbox], outputs=[column1, column2, gallery, sign_confirmed_textbox])

# content_text, citations_list = request_gpt("된장찌개가 수화로 어떻게 돼?", temperature=0.7, top_p=0.95, max_tokens=800)

# print(content_text)
# print(citations_list)

demo.launch(share=False)



* Running on local URL:  http://127.0.0.1:7874

To create a public link, set `share=True` in `launch()`.




[32m2024-12-20 17:33:03.438[0m | [1mINFO    [0m | [36m__main__[0m:[36mclick_sign_send[0m:[36m99[0m - [1m소맥 ['ㅅ', 'ㅗ', 'ㅁ', 'ㅐ', 'ㄱ'][0m
[32m2024-12-20 17:33:26.061[0m | [1mINFO    [0m | [36m__main__[0m:[36mprocess_frame[0m:[36m148[0m - [1m['False', 'False', 'False', 'False', 'False'][0m
[32m2024-12-20 17:33:26.739[0m | [1mINFO    [0m | [36m__main__[0m:[36mprocess_frame[0m:[36m148[0m - [1m['False', 'False', 'False', 'False', 'False'][0m
[32m2024-12-20 17:33:27.442[0m | [1mINFO    [0m | [36m__main__[0m:[36mprocess_frame[0m:[36m148[0m - [1m['True', 'False', 'False', 'False', 'False'][0m
[32m2024-12-20 17:33:28.240[0m | [1mINFO    [0m | [36m__main__[0m:[36mprocess_frame[0m:[36m148[0m - [1m['True', 'False', 'False', 'False', 'False'][0m
[32m2024-12-20 17:33:28.957[0m | [1mINFO    [0m | [36m__main__[0m:[36mprocess_frame[0m:[36m148[0m - [1m['True', 'False', 'False', 'False', 'False'][0m
[32m2024-12-20 17:33:29.932[0m | 