In [1]:
import requests
from bs4 import BeautifulSoup
import time
from tqdm import tqdm
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.by import By
from selenium.webdriver.common.alert import Alert
from urllib import parse

In [2]:
# DB 접속 정보 불러오기
with open('c:/2nd_project/Service/DB_config.txt') as f:
    conf = f.readlines()
    DB_HOST = conf[0].replace('\n','')
    DB_USER = conf[1].replace('\n','')
    DB_PASSWORD = conf[2].replace('\n','')
    DB_NAME = conf[3].replace('\n','')

In [3]:
import pymysql
import pymysql.cursors
import logging

class Database:
    def __init__(self, host, user, password, db_name, charset='utf8'):
        self.host = host
        self.user = user
        self.password = password
        self.db_name = db_name
        self.charset = charset
        self.conn = None

    def connect(self):
        if self.conn != None:
            return
        self.conn = pymysql.connect(
            host=self.host,
            user=self.user,
            password=self.password,
            db=self.db_name,
            charset=self.charset
        )

    def close(self):
        if self.conn is None:
            return
        if not self.conn.open:
            self.conn = None
            return
        self.conn.close()
        self.conn = None

    def execute(self, sql):
        last_row_id = -1
        try:
            with self.conn.cursor() as cursor:
                cursor.execute(sql)
            self.conn.commit()
            last_row_id = cursor.lastrowid
        except Exception as ex:
            logging.error(ex)
        finally:
            return last_row_id
        
    def select_one(self, sql):
        result = None
        try:
            with self.conn.cursor(pymysql.cursors.DictCursor) as cursor:
                cursor.execute(sql)
                result = cursor.fetchone()
        except Exception as ex:
            logging.error(ex)
        finally:
            return result

    def select_all(self, sql):
        result = None
        try:
            with self.conn.cursor(pymysql.cursors.DictCursor) as cursor:
                cursor.execute(sql)
                result = cursor.fetchall()
        except Exception as ex:
            logging.error(ex)
        finally:
            return result

In [4]:
# import pandas as pd

# df = pd.read_excel('c:/2nd_project/Data/movie_data/[KOBIS] 박스오피스_줄거리_감정키워드(2003.01~2023.07).xlsx', engine='openpyxl')
# df.iloc[0]

In [5]:
# title, opendate, people, grade, genre, repnation, nations, Production, distributor, director, actors, story, keyword 

In [6]:
# insert 쿼리 생성
def InsertMovie(series):
    try: 
        sql = f'''insert chat_movie(title, opendate, people, grade, genre, repnation, nations, 
        Production, distributor, director, actors, story, keyword) values ("{series[0]}", "{series[1]}",
        "{series[2]}", "{series[3]}", "{series[4]}", "{series[5]}", "{series[6]}", "{series[7]}", "{series[8]}",
        "{series[9]}", "{series[10]}", "{series[11]}", "{series[12]}")'''

        # 엑셀에서 불러온 cell에 데이터가 없는 경우 null로 치환
        sql = sql.replace('nan', 'null').replace('None','null')
        
        db.execute(sql)
        
    except Exception as ex:
        logging.error(ex)

In [7]:
# db = Database(host=DB_HOST, user=DB_USER, password=DB_PASSWORD, db_name=DB_NAME)
# db.connect() 

# for i in range(len(df)):
#     InsertMovie(df.iloc[i])

In [8]:
# db = Database(host=DB_HOST, user=DB_USER, password=DB_PASSWORD, db_name=DB_NAME)
# db.connect() 

# db.select_all(sql)

# 답변 찾기

In [9]:
from datetime import datetime
from dateutil.relativedelta import *

class FindAnswer:
    def __init__(self, db, lsts):
        self.db = db
        self.lsts = lsts

    def recent_day(self):
        now = datetime.now()
        six_months_ago = now - relativedelta(months=6)
        recent = six_months_ago.strftime("%Y-%m-%d")
        return recent

    def time_sort(self,times):
        sort_time = [[],[],[],[],[]]
        return_time = []
    
        for time in times:
            if "월" in time or "화" in time or "수" in time or "목" in time or "금" in time or "토" in time or "일" in time:
                if "요일" in time:
                    sort_time[4] = time
                elif "일" in time:
                    sort_time[1] = time
                else:
                    sort_time[0] = time
            elif "시" in time:
                sort_time[2] = time
            elif "분" in time:
                sort_time[3] = time
            
        for st in sort_time:
            if len(st) > 0:
                return_time.append(st)
        return return_time

    def intent_query(self):
        if self.lsts[0] == "추천":
            sql_intent = ' order by rand() limit 1;'    # 랜덤으로 하나 추천
        else:
            sql_intent = ''
        return sql_intent

    def emotion_query(self):
        if self.lsts[1] != "없음":
            if self.lsts[2] != "부정":
                sql_keyword = f"keyword = '{self.lsts[1]}'"
            else:
                sql_keyword = f"keyword != '{self.lsts[1]}'"
        else:
            sql_keyword=''
        return sql_keyword

    def trend_query(self):
        if self.lsts[3] == '최신':
            sql_trend = f"opendate >= '{self.recent_day()}'" # 최신(6개월)
        elif self.lsts[3] == '인기':
            sql_trend = "people >= 5000000"
        else:
            sql_trend = "people >= 1000000"
        return sql_trend

    def ner_query(self):
        sql_lst = []
        lst = self.lsts[4]

        # 배우 포함
        if lst[1] != []:
            for act in lst[1]:
                sql = f"actors like '%{act}%'"
                sql_lst.append(sql)
                
        # 장르 포함
        if lst[2] != []:
            for gen in lst[2]:
                sql = f"genre like '%{gen}%'"
                sql_lst.append(sql)
        
        # 국적 - 대표국적으로 구분
        if lst[3] != []:
            한국 = ['대한민국','우리나라','국내']
            외국 = ['해외','외국']
            for nat in lst[3]:
                if nat in 한국:
                    sql = "repnations like '한국'"
                elif nat in 외국:
                    sql = "repnations not like '한국'"
                else:
                    sql = f"repnations like '{nat}'"
                sql_lst.append(sql)
        
        # 감독 포함
        if lst[4] != []:
            for direc in lst[4]:
                sql = f"director like '%{direc}%'"
                sql_lst.append(sql)
                
        if len(sql_lst)==0:
            sql =''
        else:
            sql = ' and '.join(sql_lst)
        return sql
    
    def final_query(self):
        final_lst = []
        final_lst.append(self.emotion_query())
        final_lst.append(self.trend_query())
        final_lst.append(self.ner_query())
        # try:
        #     final_lst.remove('')
        # except:
        #     pass
        final_lst = [item for item in final_lst if item]
                
        sql = ' and '.join(final_lst)
        final_sql = "select * from chat_movie where " + sql + self.intent_query()
        return final_sql
    
    def find_answer(self):
        title = self.lsts[4][0]
        time = self.time_sort(self.lsts[4][5])
        
        
        if self.lsts[0] == '추천':
            self.db.connect()
            find_dict = self.db.select_all(self.final_query())
            print(find_dict)
            self.db.close()
            
            if len(find_dict) == 0 :
                ans = "조건에 맞는 영화목록이 없습니다."
                return ans
            else:
                ans = f'''영화 {find_dict[0]['title']} 추천드립니다.\n{'='*50}\n등급 : {find_dict[0]['grade']}\n장르 : {find_dict[0]['genre']}\n감독 : {find_dict[0]['director']}\n배우 : {' / '.join(find_dict[0]['actors'].split(',')[:3])}\n개봉일 : {find_dict[0]['opendate']}'''
                return ans
            
        elif self.lsts[0] == '후기':
            ans = f'''영화 {title} 후기입니다.\n{'='*50}\n'''
            return ans
            
        elif self.lsts[0] == '예매':
            if len(self.lsts[4][0]) > 0 and len(self.lsts[4][5]) > 0:
                ans = f'''영화 {title[0]}이/가 {' '.join(time)}에 예약되었습니다.'''
            else:
                ans = f"예매하고 싶은 영화명과 예매 시간을 포함해서 다시 문의주세요."
            return ans
        
        elif self.lsts[0] == '정보':
            if title == []:
                ans = "해당 영화 정보가 없습니다."
                return ans
            
            else:
                headers = {'User-Agent':'Mozilla/5.0 (Windows NT 6.3; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36'}
                base_url = 'https://search.naver.com/search.naver?sm=tab_hty.top&where=nexearch&query='
                sup_url = parse.quote(f'영화 {self.lsts[4][0][0]} 평점')
                url = base_url + sup_url
                req = requests.get(url, headers=headers)
                soup = BeautifulSoup(req.text, 'html.parser')

                print(soup)

                # 전체 평점
                try:
                    tot_score = float(soup.select('.area_star_number')[0].get_text())
                except:
                    tot_score = 0

                # 남자 평점
                try:
                    man_score = float(soup.select('.area_star_number')[1].get_text())
                except:
                    man_score = 0

                # 여자 평점
                try:
                    girl_score = float(soup.select('.area_star_number')[2].get_text())
                except:
                    girl_score = 0

                # 10대 만족도
                try:
                    age10_score = float(soup.select('.area_graph_age > li')[0].select('.this_text_num')[0].get_text())
                except:
                    age10_score = 0

                # 20대 만족도
                try:
                    age20_score = float(soup.select('.area_graph_age > li')[1].select('.this_text_num')[0].get_text())
                except:
                    age20_score = 0

                # 30대 만족도
                try:
                    age30_score = float(soup.select('.area_graph_age > li')[2].select('.this_text_num')[0].get_text())
                except:
                    age30_score = 0

                # 40대 만족도
                try:
                    age40_score = float(soup.select('.area_graph_age > li')[3].select('.this_text_num')[0].get_text())
                except:
                    age40_score = 0

                # 50대 만족도
                try:
                    age50_score = float(soup.select('.area_graph_age > li')[4].select('.this_text_num')[0].get_text())
                except:
                    age50_score = 0

                # time.sleep(0.3)

                # # 공감순 리뷰 페이지 내리기
                # driver.find_element(By.XPATH, '//*[@id="main_pack"]/div[2]/div[2]/div[2]/div/div[2]/div[2]/div/div[1]/ul/li[2]/a').click()

                # driver.find_element(By.XPATH, '//*[@id="main_pack"]/div[2]/div[2]/div[2]/div/div[2]/div[4]/div/div/div/div/ul/li[1]/a').click()

                # driver.find_element(By.XPATH, '//*[@id="main_pack"]/div[2]/div[2]/div[2]/div/div[2]/div[6]').click()


                # html = driver.page_source
                # soup = BeautifulSoup(html, 'html.parser')
                
                # 리뷰 20개 미만 처리
                # print(f"중간확인용 : {soup.select('.area_card_outer._item_wrapper')}")
                min = len(soup.select('.area_card_outer._item_wrapper > li'))
                rev_sco = []
                rev_row = []

                # 상위 리뷰 3개 추출
                if min >= 3:
                    try:
                        for cnt in range(0,3):
                            rev = soup.select('.area_card_outer._item_wrapper > li > .area_review_content > div')[cnt].select('.desc')[0].get_text()
                            rev_row.append(rev)
                            rev_score = int(soup.select('.area_card_outer._item_wrapper > li .area_text_box')[cnt].get_text().split(')')[1])
                            rev_sco.append(rev_score)

                    except:
                        pass
                else:
                    try:
                        for cnt in range(0,min):
                            rev = soup.select('.area_card_outer._item_wrapper > li > .area_review_content > div')[cnt].select('.desc')[0].get_text().strip()
                            rev_row.append(rev)
                            rev_score = int(soup.select('.area_card_outer._item_wrapper > li .area_text_box')[cnt].get_text().split(')')[1])
                            rev_sco.append(rev_score)
                    except:
                        error_txt = '리뷰 error'
                        pass
                review_3 = '/ '.join(rev_row)
                review_3_point = str(' / '.join(map(str, rev_sco)))

                print(f'평점:{tot_score}\n리뷰 : {rev_sco} : {rev_row}')
                
                ans = soup


                # self.db.connect()
                # find_dict = self.db.select_all(f'select * from chat_movie where title = "{title[0]}"')
                # self.db.close()
 
                # ans = f'''영화 {title}의 정보입니다.\n{'='*50}\n등급 : {find_dict[0]['grade']}\n장르 : {find_dict[0]['genre']}\n제작국가 : {find_dict[0]['nations']}\n감독 : {find_dict[0]['director']}\n배우 : {' / '.join(find_dict[0]['actors'].split(',')[:3])}\n개봉일 : {find_dict[0]['opendate']}\n\n줄거리 : \n{find_dict[0]['story']}'''
                # return ans
        else:
            ans = '죄송합니다. 다시 이용해주세요.'
            return ans

# 모델 테스트

In [10]:
from konlpy.tag import Komoran
import pickle
import jpype

class Preprocess :
    def __init__(self, word2index_dic='', userdic=None):
        # 단어 인덱스 사전 불러오기
        if (word2index_dic != ''):
            f = open(word2index_dic, 'rb')
            self.word_index = pickle.load(f)
            f.close()
        else:
            self.word_index = None
            
        # 형태소 분석기 초기화
        self.komoran = Komoran(userdic=userdic)
        
        # 제외할 품사
        # 참조 : https://docs.komoran.kr/firststep/postypes.html
        # 관계언, 기호, 어미, 접미사 제거
        self.exclusion_tags = [
            'JKS', 'JKC', 'JKG', 'JKO', 'JKB', 'JKV', 'JKQ',
            'JX', 'JC',
            'SF', 'SP', 'SS', 'SE', 'SO',
            'EP', 'EF', 'EC', 'ETN', 'ETM',
            'XSN', 'XSV', 'XSA'
        ]
        
    # 형태소 분석기
    def pos(self, sentence):
        jpype.attachThreadToJVM()
        return self.komoran.pos(sentence)
        
    # 불용어 제거 후, 필요한 품사 정보만 가져오기
    # 재밌는 영화, 무서운 영화, 액션 영화 등 NNP인 경우는 쪼개서 가져오기
    def get_keywords(self, pos, without_tag=False):
        f = lambda x: x in self.exclusion_tags
        word_lst = []
        word_list = []
        for p in pos:
            if p[1] == 'NNP':
                if '영화' in p[0]:
                    for q in p[0].split():
                        word_lst.extend(self.pos(q))
                else:
                    word_lst.append(p)
            else:
                word_lst.append(p)
                    
        for word in word_lst:
            if f(word[1]) is False:
                word_list.append(word if without_tag is False else word[0])
        return word_list
    
    # 키워드를 단어 인덱스 시퀀스로 변환
    def get_wordidx_sequence(self, keywords):
        if self.word_index is None:
            return []
        
        w2i = []
        for word in keywords:
            try:
                w2i.append(self.word_index[word])
            except KeyError:
                # 해당 단어가 사전에 없는 경우, OOV 처리
                w2i.append(self.word_index['OOV'])
        return w2i

In [11]:
import tensorflow as tf
from tensorflow.keras.models import Model, load_model
from tensorflow.keras import preprocessing
import numpy as np

# 분류 모델 모듈
class PredictModel:
    def __init__(self, category, model_name, proprocess):
        
        self.MAX_SEQ_LEN = 15
        self.category = category
        
        if self.category =='intent':
            self.labels = {0: '기타', 1: '추천', 2: '후기', 3: '정보', 4: '예매', 5: '욕설'}
            
        elif self.category == 'emotion':
            self.labels = {0: '무서움', 1: '슬픔', 2: '신남', 3: '없음', 4: '웃김', 5: '재미'}
            
        elif self.category == 'binary':
            self.labels = {0: '긍정', 1: '부정', 2: '없음'}
            
        elif self.category == 'trend':
            self.labels = {0: '없음', 1: '인기', 2: '최신'}
        elif self.category == 'ner':
            self.MAX_SEQ_LEN = 40
            self.labels = {1: 'O', 2: 'B_MOVIE', 3: 'B_ACT', 4: 'B_GEN', 5: 'B_NAT', 
                           6: 'B_DIR', 7: 'B_DT', 8: 'B_RAT', 0: 'PAD'}  
        else:
            self.labels = {}

        self.labels[len(self.labels)]="-"
        # 분류 모델 불러오기
        self.model = load_model(model_name)
        # 챗봇 Preprocess 객체
        self.p = proprocess

    # 클래스 예측
    def predict_class(self, query):
        # 형태소 분석
        pos = self.p.pos(query)
        
        # 문장내 키워드 추출(불용어 제거)
        keywords = self.p.get_keywords(pos, without_tag=True)
        sequences = [self.p.get_wordidx_sequence(keywords)]

        # 패딩처리
        padded_seqs = preprocessing.sequence.pad_sequences(sequences, maxlen=self.MAX_SEQ_LEN, padding='post')
        predict = self.model.predict(padded_seqs)
        predict_class = tf.math.argmax(predict, axis=1)

        return predict_class.numpy()[0]

        # ner_전용
    def predict_ner(self, query):
        # 형태소 분석
        pos = self.p.pos(query)

        # 문장내 키워드 추출(불용어 제거)
        keywords = self.p.get_keywords(pos, without_tag=True)
        sequences = [self.p.get_wordidx_sequence(keywords)]

        # 패딩처리
        padded_seqs = preprocessing.sequence.pad_sequences(sequences, padding="post", value=0,
                                                           maxlen=self.MAX_SEQ_LEN)
        predict = self.model.predict(np.array([padded_seqs[0]]))
        predict_class = tf.math.argmax(predict, axis=-1)
        tags = [self.labels[i] for i in predict_class.numpy()[0]]
        return list(zip(keywords, tags))
    

In [12]:
# GPU에서 실행
import tensorflow as tf
with tf.device('/:GPU0'):
        db = Database(host=DB_HOST, user=DB_USER, password=DB_PASSWORD, db_name=DB_NAME)
        db.connect() 

        p = Preprocess(word2index_dic='c:/2nd_project/Data/chatbot_dict.bin',
                        userdic = 'c:/2nd_project/Data/ner_data/new_user_dic_10.txt')

        intent = PredictModel(category='intent', model_name='c:/2nd_project/Model/intent_model/intent_uso_model_0811_a_ep_best(5).h5', proprocess=p)
        emotion = PredictModel(category='emotion', model_name='c:/2nd_project/Model/emotion_model/question_emotion_model.h5', proprocess=p)
        binary = PredictModel(category='binary', model_name='c:/2nd_project/Model/binary_model/question_emotion_binary_model.h5', proprocess=p)
        trend = PredictModel(category='trend', model_name='c:/2nd_project/Model/trend_model/question_trend_model.h5', proprocess=p)
        ner = PredictModel(category='ner', model_name='c:/2nd_project/Model/ner_model/ner_model_0817_a.h5', proprocess=p)

        def ner_tag_sep(lsts):  
                ner_movie = []
                ner_act = []
                ner_gen = []
                ner_nat = []
                ner_dir = []
                ner_dt = []
                ner_rat = []

                for lst in lsts:
                        if lst[1] == 'B_MOVIE':
                                ner_movie.append(lst[0])
                        elif lst[1] == 'B_ACT':
                                ner_act.append(lst[0])
                        elif lst[1] == 'B_GEN':
                                ner_gen.append(lst[0])
                        elif lst[1] == 'B_NAT':
                                ner_nat.append(lst[0])
                        elif lst[1] == 'B_DIR':
                                ner_dir.append(lst[0])
                        elif lst[1] == 'B_DT':
                                ner_dt.append(lst[0])
                        elif lst[1] == 'B_RAT':
                                ner_rat.append(lst[0])

                        ner_movie = list(set(ner_movie))
                        ner_act = list(set(ner_act))
                        ner_gen = list(set(ner_gen))
                        ner_nat = list(set(ner_nat))
                        ner_dir = list(set(ner_dir))
                        ner_dt = list(set(ner_dt))
                        ner_rat = list(set(ner_rat))

                return ner_movie, ner_act, ner_gen, ner_nat, ner_dir, ner_dt, ner_rat


        def predict_keyword(text):
        
                intent_pred = intent.predict_class(text)
                emotion_pred = emotion.predict_class(text)
                binary_pred = binary.predict_class(text)
                trend_pred = trend.predict_class(text)
                ner_pred = ner.predict_ner(text)
                
                intent_label = intent.labels[intent_pred]
                emotion_label = emotion.labels[emotion_pred]
                binary_label = binary.labels[binary_pred]
                trend_label = trend.labels[trend_pred]
                ner_label = ner_tag_sep(ner_pred)

                return intent_label, emotion_label, binary_label, trend_label, ner_label



In [13]:
# # text = '기생충 영화 드림 기생충 감독 봉준호 내가 좋아하는 배우는 황정민 박서준 고소영 우리나라 호러 장르는 액션  미국 오늘은 8월 전체관람가인 영화 추천 12시 안해주면 화날꺼같아'
# # query = '요즘 슬프지 않은 영화 볼만한거 없나'
# db = Database(host=DB_HOST, user=DB_USER, password=DB_PASSWORD, db_name=DB_NAME)

# # query = '그냥 봉준호 감독 영화 정보좀'
# # query = '겨울왕국 정보좀'
# # query = '아무영화 추천좀'
# # query = '명량 다음주 금요일에 예약해줘'
# # query = '명량 이번주 화요일 13일에 예약해줘'
# # query = '명량 목요일 20시 30분에 예약해줘'
# # query = '명량 1월 15일 12시에 예약해줘'
# # query = '명량 예약해줘'
# query = '8시 예약해줘'
# # query = '명량 30분에 예약해줘'

# # query = '재밌는 한국 영화 추천좀'

# print(query)

# lsts = predict_keyword(query)

# # print('1. 의도 :', lsts[0])
# # print('2. 감정 :', lsts[1])
# # print('3. 긍부정 :', lsts[2])
# # print('4. 트렌드 :', lsts[3])
# # print('5. 개체명\n\t영화명 : {}\n\t배우 : {}\n\t장르 : {}\n\t국가 : {}\n\t감독 : {}\n\t시간 : {}\n\t등급 : {}'
# # .format(lsts[4][0], lsts[4][1], lsts[4][2], lsts[4][3], lsts[4][4], lsts[4][5], lsts[4][6]))
# print(lsts)

# obj = FindAnswer(db, lsts)
# sql = obj.final_query()
# # print(sql)
# ans = obj.find_answer()
# print(ans)



# # print(sql)

# # movie_data = db.select_all(sql)
# # print(movie_data)

# # recomand_movie = movie_data[0]['title']
# # print(recomand_movie)

In [14]:
db = Database(host=DB_HOST, user=DB_USER, password=DB_PASSWORD, db_name=DB_NAME)

query = '명량 정보줘'
print(query)

lsts = predict_keyword(query)
print(lsts)

obj = FindAnswer(db, lsts)

sql = obj.final_query()
print(sql)

ans = obj.find_answer()
print(ans)

명량 정보줘


  jpype.attachThreadToJVM()


('정보', '없음', '없음', '없음', (['명량'], [], [], [], [], [], []))
select * from chat_movie where people >= 1000000
<!DOCTYPE html>
 <html lang="ko"> <head> <meta charset="utf-8"/> <meta content="always" name="referrer"/> <title>네이버 검색</title> <link href="https://ssl.pstatic.net/sstatic/search/css/2011/common.css" rel="stylesheet" type="text/css"/> <link href="https://ssl.pstatic.net/sstatic/search/favicon/favicon_191118_pc.ico" rel="shortcut icon"/> <style type="text/css"> body{margin:0;padding:0;text-align:center} body,td,div{color:#404040;font-size:12px} img{border:none} label{cursor:hand;cursor:pointer} *{margin:0;padding:0} li{list-style-type:none} fieldset{border:none} legend{display:none} /* 120110 */ .blind{visibility:hidden;overflow:hidden;position:absolute;top:0;left:0;width:1px;height:1px;font-size:0;line-height:0} .logo a{display:block;width:93px;height:17px;background:url(https://ssl.pstatic.net/sstatic/search/img3/sp_block2.gif) 0 0 no-repeat}/* 120209 */ .info dt.share_pc{margin

In [15]:
ans