In [1]:
import os
import torch
from transformers import pipeline, StoppingCriteria
import random
import pandas as pd
import numpy as np
import sqlite3
import random
from copy import deepcopy

這邊使用的推薦系統是根據 <a href="https://grouplens.org/datasets/movielens/">https://grouplens.org/datasets/movielens/</a>
的 25M-MovieLens 資料所建造的 <br>
這個資料集的資料最新到 2019，所以這裡的推薦系統無法推薦更新的電影 <br>
執行這個程式所需資料包括：<br>
./hundred_likers.csv<br>
./best_300.csv<br>
./cross_scores.db<br>
這三個檔案<br>
它們主要是我使用 ./Recommandation_cross_score_calculation.ipynb 與 ./Recommandation_cross_score_calculation.ipynb 兩個筆記本整理計算而成<br>
其中 ./best_300.csv 裡面的中文片名，則是我另外手動填上的<br>

這邊我使用的 LLM 基底是 Llama-3-Taiwan 的 8B-128k 版本，這是一個專門針對繁體中文資料優化過的 Llmam-3 模型。<br>
由於這個模型在 huggingface 上需要同意一些簡單的條件才能下載，所以如果你想自己執行這個程式，<br>
你必須先註冊一個 huggingface 帳號，取得跟你帳號綁定的 token，<br>
並且到  <a href="https://huggingface.co/yentinglin/Llama-3-Taiwan-8B-Instruct-128k">https://huggingface.co/yentinglin/Llama-3-Taiwan-8B-Instruct-128k</a> 取得授權，<br>
才能下載這個模型 <br>
另外，如果要在GPU上執行這個模型，那VRAM最好需要20GB以上比較保險。

# 初始化

## 初始化 LLM

In [2]:
with open('./huggingface/read-only-token.txt', 'r') as f:
    token = f.read()

In [3]:
# Define a custom stopping criteria class
class EosListStoppingCriteria(StoppingCriteria):
    def __init__(self, eos_sequence=[128256]):
        self.eos_sequence = eos_sequence

    def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor, **kwargs) -> bool:
        last_ids = input_ids[:, -len(self.eos_sequence):].tolist()
        return self.eos_sequence in last_ids

In [4]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_name = "yentinglin/Llama-3-Taiwan-8B-Instruct-128k"

llm = pipeline("text-generation", model=model_name, device_map=device, torch_dtype=torch.bfloat16, token=token)
tokenizer = llm.tokenizer

Loading checkpoint shards:   0%|          | 0/4 [00:00<?, ?it/s]

Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


## ChatBot

In [5]:
# 對話機器人
class Chat():
    def __init__(self):
        self.role = [{"role": "system", "content": "You are an AI assistant called Twllm, created by TAME (TAiwan Mixture of Expert) project."},]
        self.chat = deepcopy(self.role)
        self.recent_movie_recommand = False
        self.robot_name = 'Twllm' #對話機器人的名字

    def text_generator(self, chat, temperature=0.7):
        flatten_chat_for_generation = tokenizer.apply_chat_template(chat, tokenize=False, add_generation_prompt=True)
        with torch.no_grad():
            output = llm(flatten_chat_for_generation, 
                         return_full_text=False, 
                         max_new_tokens=768,
                         temperature=temperature, 
                         stopping_criteria=[EosListStoppingCriteria([tokenizer.eos_token_id])])
        return output[0]['generated_text']

    def request_of_film_recommandation(self, input_text):
        for k in range(3):
            justify = self.text_generator(self.role + [{"role": "user",
                                                        "content": (f"當使用者說出「{input_text}」的時候，他是在請你推薦電影嗎？請回答「是」，「否」，或「不確定」。")}],
                                          temperature=0.1)
            if not '是' in justify[0:min(len(justify), 3)]:
                return False
        return True

    # def say_goodbye(self, input_text):
    #     justify = self.text_generator(self.role + [{"role": "user",
    #                                                 "content": (f"當有人說出「{input_text}」的時候，他是在表達要結束對話嗎？請回答「是」，「否」，或「不確定」。")}],
    #                                   temperature=0.1)
    #     return '是' in justify[0:min(len(justify), 3)]

    def run(self):
        while True:
            input_text = input('你：')
            self.chat.append({"role": "user", "content": input_text})
            if self.request_of_film_recommandation(input_text) and (not self.recent_movie_recommand):
                movie_recommander = Movie_Recommander()
                output_text = movie_recommander.run()
                self.chat += movie_recommander.movie_discuss_chat[1::]
                self.recent_movie_recommand = True
                if output_text is None:
                    output_text = self.text_generator(self.chat)
            else:
                output_text = self.text_generator(self.chat)
                self.recent_movie_recommand = False
            self.chat.append({"role": "assistant", "content": output_text})
            print('='*30)
            print(f"{self.robot_name}：{output_text}")
            if '再見' in output_text or 'bye' in output_text.lower():
                break
            print('='*30)
            

## 推薦系統

In [6]:
class Movie_Recommander(Chat):
    def __init__(self, chat=None):
        super().__init__()
        self.best_300 = pd.read_csv('./best_300.csv')
        self.sql_path = './cross_scores.db'
        self.hundred_likers = pd.read_csv('./hundred_likers.csv')
        self.ideal_movie_recommandation_number = 7
        self.minimum_movie_recommandation_base = 3
        self.maximum_movie_recommandation_base = 14
        if chat is None:
            self.movie_discuss_chat = deepcopy(self.role)
        else:
            self.movie_discuss_chat = chat

    def ask(self, n_film=25):
        self.random_selected_df =\
        self.best_300.loc[random.sample(list(self.best_300.index), k=n_film)].reset_index(drop=True).drop(['Liker_num', 'Liker_ratio'], axis=1)
        output_text = f'{self.robot_name}：我需要知道你喜歡什麼電影，請在以下電影當中，選擇 1~3 部你喜歡的電影：\n\n'
        context_text = ''
        self.ask_lines = []
        for i in self.random_selected_df.index:
            movieId = self.random_selected_df.loc[i, 'Movie_id']
            eng_title = self.random_selected_df.loc[i, 'Movie']
            tw_title = self.random_selected_df.loc[i, 'Taiwan Film Name'].strip(' ')
            year = self.random_selected_df.loc[i, 'Year']

            output_text += f'  -{tw_title} ({eng_title}) ({year})\n'
            self.ask_lines.append(f'  -{tw_title} ({eng_title}) ({year})'.replace(' ', ''))
            context_text += f'(ID: {movieId}, "{tw_title}", "{eng_title}")|||\n'
        print('='*30)
        print(output_text)
        self.movie_discuss_chat += [{'role': 'context', 'content': context_text}]
        self.movie_discuss_chat += [{'role': 'assistant', 'content': output_text[3::]}]
        print('='*30)
        answer = input('你：')
        self.movie_discuss_chat += [{'role': 'user', 'content': answer}]

    def find_IDs(self, answer):
        sub_titles = self.text_generator(self.movie_discuss_chat +\
                                     [{'role': 'directive', 'content': f'請找出這段文字當中的電影名稱，用逗號分隔回傳。:{answer}'}])
        sub_titles = [s.replace(' ', '') for s in sub_titles.split(',')]

        IDs = []
        input_titles = []
        for sub_title in sub_titles:
            for i, line in enumerate(self.ask_lines):
                if sub_title in line:
                    IDs.append(self.random_selected_df.loc[i, 'Movie_id'])
                    input_titles.append(self.random_selected_df.loc[i, 'Taiwan Film Name'])
                    break
        return IDs, input_titles

    def adjust_score(self, a_diff):
        def func(df):
            df['F_score'] += a_diff * (np.log(np.array(df['P_boost'])+1e-4) - np.log(np.array([max(1, j) for j in df['N_boost']]))) / np.log(10)
            return df
        return func

    def get_recommandation_scores_from_one_movie(self, movieId, save_unrelated_movies=True, adjust_score=None):
        conn = sqlite3.connect(self.sql_path)
        cursor = conn.cursor()
        sql_command = f'SELECT * FROM cross_score WHERE Movie_ID_A = {movieId}'
        cursor.execute(sql_command)
        result = cursor.fetchall()
    
        keys = ('ID_A', 'ID_B', 'P_boost', 'N_boost', 'P_condition', 'N_condition', 'A_condition', 'F_score')
        dict_to_df = {key:[] for key in keys}
        for row in result:
            for i in range(len(keys)):
                dict_to_df[keys[i]].append(row[i])
            if save_unrelated_movies:
                if dict_to_df['F_score'][-1] < -9000:
                    if dict_to_df['P_boost'][-1] >= dict_to_df['N_boost'][-1] and dict_to_df['P_condition'][-1] >= dict_to_df['N_condition'][-1] * 2:
                        dict_to_df['F_score'][-1] = -1
            
                #dict_to_df = adjust_score(dict_to_df)
        the_df = pd.DataFrame(dict_to_df)
        if not adjust_score is None:
            the_df = adjust_score(the_df)
        return the_df.sort_values('ID_B')

    def get_recommandation_IDs_from_movies(self, movieIds, method='A', adjust_a=0, return_scores=False):
        ideal_n_movies = self.ideal_movie_recommandation_number
        min_n_movies = self.minimum_movie_recommandation_base
        max_n_movies = self.maximum_movie_recommandation_base
        def select_and_pop():
            L = []
            for i in range(len(ds)):
                if not i in candidate_dict.keys():
                    L.append(0)
                    continue
                if len(candidate_dict[i]) == 0:
                    L.append(0)
                    continue
                L.append(max([j[1] for j in candidate_dict[i]]))
            this_c = np.argmax(L)
            this_m = candidate_dict[this_c].pop(0)
            selected_movies[this_c].append(this_m)
            for j in candidate_dict.keys():
                for k in range(len(candidate_dict[j])):
                    if candidate_dict[j][k][0] == this_m[0]:
                        _ = candidate_dict[j].pop(k)
                        break
        
        if not type(movieIds) == list:
            movieIds = [movieIds]

        eff_movieIds = []
        for i in movieIds:
            if i in list(self.best_300['Movie_id']):
                eff_movieIds.append(i)
        
        if len(eff_movieIds) == 0:
            return None
        
        if adjust_a == 0:
            ds = [self.get_recommandation_scores_from_one_movie(i) for i in eff_movieIds]
        else:
            ds = [self.get_recommandation_scores_from_one_movie(i, adjust_score=self.adjust_score(adjust_a)) for i in eff_movieIds]
        n_candidates_each_df = int(np.ceil(max_n_movies / len(eff_movieIds)))
        n_suggestion_each_df = int(min_n_movies / len(eff_movieIds))
        
        if len(ds) > 1:
            for j in range(1, len(ds)):
                if j == 1:
                    D = pd.merge(ds[j-1], ds[j], on = 'ID_B', suffixes=[f'_{j-1}', f'_{j}'])
                else:
                    D = pd.merge(D, ds[j], on = 'ID_B').rename(columns={'F_score': f'F_score_{j}'})
        
            for j in range(len(ds)):
                D = D[D[f'F_score_{j}'] >= 0]
                D = D[D['ID_B'] != eff_movieIds[j]]
    
            candidate_dict = {}
            for i in range(len(ds)):
                this_D = D.sort_values(f'F_score_{i}', ascending=False).iloc[0:n_candidates_each_df]
                candidate_dict[i] = [list(this_D.iloc[j][['ID_B', f'F_score_{i}']]) for j in range(len(this_D))]
            if method == 'A':
                result_set = set(D.sort_values('F_score_0', ascending=False).iloc[0:n_candidates_each_df]['ID_B'])
                for j in range(1, len(ds)):
                    result_set = result_set.union(set(D.sort_values(f'F_score_{j}', ascending=False).iloc[0:n_candidates_each_df]['ID_B']))
                #result_list = list(result_set)
                final_df = pd.DataFrame({'ID':[], 'Score':[]})
                for ID in result_set:
                    this_df = pd.DataFrame({'ID':[ID], 'Score':max(list(D[D['ID_B']==ID][[f'F_score_{j}' for j in range(len(ds))]].iloc[0]))})
                    final_df = pd.concat([final_df, this_df])
                final_df = final_df.sort_values('Score', ascending=False)[0:min(len(final_df), ideal_n_movies)]
            elif method == 'B':
                candidate_dict = {}
                for i in range(len(ds)):
                    this_D = D.sort_values(f'F_score_{i}', ascending=False).iloc[0:n_candidates_each_df]
                    candidate_dict[i] = [list(this_D.iloc[j][['ID_B', f'F_score_{i}']]) for j in range(len(this_D))]
                selected_movies = {i:[] for i in range(len(ds))}
                while sum([len(v) for v in selected_movies.values()]) < ideal_n_movies and sum([len(v) for v in candidate_dict.values()]) > 0:
                    select_and_pop()
                
                remaining_keys = [k for k in candidate_dict.keys()]
                for i in remaining_keys:
                    if len(selected_movies[i]) >= n_suggestion_each_df:
                        _ = candidate_dict.pop(i)
                
                while min([len(v) for v in selected_movies.values()]) < n_suggestion_each_df and sum([len(v) for v in candidate_dict.values()]) > 0:
                    select_and_pop()
                #print(selected_movies)
                final_df = {'ID':[], 'Score':[]}
                for k in selected_movies.keys():
                    for j in range(len(selected_movies[k])):
                        final_df['ID'].append(selected_movies[k][j][0])
                        final_df['Score'].append(selected_movies[k][j][1])
                final_df = pd.DataFrame(final_df)
                final_df = final_df.sort_values('Score', ascending=False)
                
        else:
            final_df = ds[0][['ID_B', 'F_score']].rename(columns={'F_score': 'Score', 'ID_B': 'ID'})
            final_df = final_df.sort_values('Score', ascending=False)[0:min(len(final_df), ideal_n_movies)]
        final_id_list = [int(s) for s in final_df['ID']]
        final_scores = list(final_df['Score'])
        if return_scores:
            return final_id_list, final_scores
        return final_id_list

    def get_titles_from_IDs(self, ids):
        movies_titles = []
        for i in ids:
            movies_titles.append(self.hundred_likers[self.hundred_likers['Movie_id'] == i]['Movie'].iloc[0])
        return movies_titles

    def recommand(self, movies_titles, input_titles):
        movies_titles = ', '.join(movies_titles)
        this_chat = self.role + [{'role': 'directive', 'content': (f'你要推薦這幾部電影給使用者:{movies_titles}，而不要推薦他已經告訴你他喜歡的電影。'
                                                                         f'你要告訴使用者，你是根據他喜歡的電影，為他量身訂製這一份片單，'
                                                                        '並為每一部你推薦的電影寫50字以內的推薦文。')}]
        output_text = self.text_generator(this_chat, temperature=0.3)
        return output_text

    def run(self):
        self.ask()
        input_IDs, input_titles= self.find_IDs(self.movie_discuss_chat[-1]['content'])
        ans_IDs = self.get_recommandation_IDs_from_movies(input_IDs, method='B', adjust_a=5)
        if ans_IDs is None:
            return None
        ans_titles = self.get_titles_from_IDs(ans_IDs)
        output_text = self.recommand(ans_titles, input_titles)
        return output_text
        

# 執行對話機器人

In [7]:
chat = Chat()

In [None]:
chat.run()

你： 你好，請問你是誰？


  attn_output = torch.nn.functional.scaled_dot_product_attention(


Twllm：我是一個開源人工智能助理，我很樂意為您提供幫助。我可以回答您的問題，幫助您完成任務，與您交談等等。您想問我什麼呢？


你： 我想請你推薦電影


Twllm：我需要知道你喜歡什麼電影，請在以下電影當中，選擇 1~3 部你喜歡的電影：

  -神鬼無間 (The Departed) (2006)
  -美女與野獸 (Beauty and the Beast) (1991)
  -聖戰奇兵 (Indiana Jones and the Last Crusade) (1989)
  -大英雄天團 (Big Hero 6) (2014)
  -空前絕後滿天飛 (Airplane!) (1980)
  -雲端情人 (Her) (2013)
  -明日邊界 (Edge of Tomorrow) (2014)
  -幸福綠皮書 (Green Book) (2018)
  -刺激驚爆點 (The Usual Suspects) (1995)
  -哈利波特：消失的密室 (Harry Potter and the Chamber of Secrets) (2002)
  -魔鬼剋星 (Ghostbusters) (1984)
  -今天暫時停止 (Groundhog Day) (1993)
  -衝出寧靜號 (Serenity) (2005)
  -哈利波特：混血王子的背叛 (Harry Potter and the Half-Blood Prince) (2009)
  -星際異攻隊2 (Guardians of the Galaxy 2) (2017)
  -醉後大丈夫 (The Hangover) (2009)
  -北非諜影 (Casablanca) (1942)
  -侏羅紀公園 (Jurassic Park) (1993)
  -哈比人：意外旅程 (The Hobbit: An Unexpected Journey) (2012)
  -超人特攻隊2 (Incredibles 2) (2018)
  -莎翁情史 (Shakespeare in Love) (1998)
  -神鬼奇航：鬼盜船魔咒 (Pirates of the Caribbean: The Curse of the Black Pearl) (2003)
  -神鬼認證：神鬼疑雲 (The Bourne Supremacy) (2004)
  -驚魂記 (Psycho) (1960)
  -2009月球漫遊 (Moon) (2009)



你： 神鬼無間  神鬼奇航：鬼盜船魔咒


Twllm：根據你喜歡的電影，我為你精心挑選了這些電影。讓我們開始吧！

1. Sherlock Holmes (2009) - 這部電影是一部充滿智慧和懸疑的驚悚片，講述了著名偵探福爾摩斯和他的夥伴華生醫生的冒險故事。由羅伯特·唐尼和裘德·洛主演，這部電影是一個很好的選擇，適合喜歡動作和智慧的電影愛好者。
2. Seven (a.k.a. Se7en) (1995) - 這部電影是一部心理驚悚片，講述了一名偵探和一名年輕警官合作，追捕一名正在進行一系列殘忍犯罪的罪犯。由布萊德·彼特和摩根·弗里曼主演，這部電影是一個很好的選擇，適合喜歡緊張和懸疑的電影愛好者。
3. Casino (1995) - 這部電影是一部犯罪劇，講述了一名在賭城工作的拉斯維加斯高利拉斯的故事。由馬丁·斯科塞斯執導，由羅伯特·迪尼羅、謝爾·加德納和凱西·貝茨主演，這部電影是一個很好的選擇，適合喜歡劇情和角色發展的電影愛好者。
4. X-Men (2000) - 這部電影是一部超級英雄動作片，講述了一群具有超能力的英雄對抗一個試圖消滅他們的壞人。由休·傑克曼、伊恩·麥克連和派翠西亞·克拉克主演，這部電影是一個很好的選擇，適合喜歡動作和冒險的電影愛好者。
5. Ocean's Eleven (2001) - 這部電影是一部犯罪喜劇，講述了一群專業小偷計劃在拉斯維加斯的豪華賭場進行大膽搶劫。由史蒂芬·索德伯格執導，由喬治·克隆尼、布魯斯·威利斯和凱特·哈德森主演，這部電影是一個很好的選擇，適合喜歡幽默和娛樂的電影愛好者。
6. Goodfellas (1990) - 這部電影是一部劇情片，講述了一名年輕人加入黑幫家族並逐漸崛起的故事。由馬丁·斯科塞斯執導，由勞勃·迪尼羅、約翰·特拉沃爾塔和凱特·卡林主演，這部電影是一個很好的選擇，適合喜歡劇情和角色發展的電影愛好者。
7. Iron Man (2008) - 這部電影是一部超級英雄動作片，講述了一名億萬富翁發明家建造了一套強大的裝甲套裝並利用它來對抗一個敵對國家的故事。由強·法夫洛執導，由羅伯特·唐尼、格溫妮絲·帕特洛和唐·錢德爾主演，這部電影是一個很好的選擇，適合喜歡動作和冒險的電影愛好者。

希望你喜歡這些電影！
