In [None]:
import os
import json
import uuid  # 用來生成唯一的相似房子編號
import torch
import torch.nn.functional as F
from ckiptagger import WS
from transformers import BertTokenizer, BertModel, CLIPProcessor, CLIPModel
from ultralytics import YOLO
from PIL import Image
import numpy as np
import mysql.connector  # 資料庫連接
from collections import Counter

# 初始化模型
ws = WS("C:\\Users\\user\\OneDrive\\桌面\\data")
tokenizer = BertTokenizer.from_pretrained('bert-base-chinese')
bert_model = BertModel.from_pretrained('bert-base-chinese')
clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
yolo_model = YOLO("yolov8n.pt")

# 設置 MySQL 資料庫連接
db_conn = mysql.connector.connect(
    host="localhost",    
    user="your_username", 
    password="your_password", 
    database="your_database"
)
cursor = db_conn.cursor()

# 同時設置另一個資料庫連接，來存儲相似房子的 same 欄位
same_db_conn = mysql.connector.connect(
    host="localhost",    
    user="your_username", 
    password="your_password", 
    database="your_same_database"  # 用來存儲 same 欄位的資料庫
)
same_cursor = same_db_conn.cursor()

def get_bert_embedding(text):
    inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512)
    with torch.no_grad():
        outputs = bert_model(**inputs)
    return outputs.last_hidden_state[:, 0, :].squeeze()

def detect_objects(image_path):
    results = yolo_model(image_path)
    image = Image.open(image_path)
    objects = results[0].boxes.xyxy.cpu().numpy()
    return objects, image

def get_dominant_color(image):
    image = image.resize((50, 50))  
    pixels = np.array(image).reshape(-1, 3)
    counter = Counter(map(tuple, pixels))
    dominant_color = counter.most_common(1)[0][0]
    return dominant_color

def generate_clip_description(image, objects):
    descriptions = []
    for obj in objects:
        x1, y1, x2, y2 = map(int, obj[:4])
        cropped_image = image.crop((x1, y1, x2, y2))
        dominant_color = get_dominant_color(cropped_image)
        color_name = f"{dominant_color}"
        inputs = clip_processor(images=cropped_image, return_tensors="pt")
        with torch.no_grad():
            image_features = clip_model.get_image_features(**inputs)
        texts = [f"a {color_name} object"] * 20
        text_inputs = clip_processor(text=texts, return_tensors="pt", padding=True)
        text_features = clip_model.get_text_features(**text_inputs)
        similarities = F.cosine_similarity(image_features, text_features)
        best_match = similarities.argmax().item()
        descriptions.append((texts[best_match], dominant_color))
    return descriptions

def calculate_image_similarity(desc1, desc2):
    similarity_scores = []
    for d1, d2 in zip(desc1, desc2):
        text1, _ = d1
        text2, _ = d2
        text_emb1 = clip_processor(text=[text1], return_tensors="pt", padding=True)
        text_emb2 = clip_processor(text=[text2], return_tensors="pt", padding=True)
        text_features1 = clip_model.get_text_features(**text_emb1)
        text_features2 = clip_model.get_text_features(**text_emb2)
        cosine_sim = F.cosine_similarity(text_features1, text_features2).item()
        similarity_scores.append(cosine_sim)
    
    return sum(similarity_scores) / len(similarity_scores) if similarity_scores else 0

# 儲存文字和圖片特徵到主資料庫
def store_features_in_db(json_data, image_folder):
    for item in json_data:
        hid = item['hid']
        # 文字特徵處理
        address_text = ' '.join([str(a).strip() for a in item['positionround'].get('address', [])])
        address_tokens = ws([address_text])
        VW_address = get_bert_embedding(' '.join(address_tokens[0]))

        VW_pattern = get_bert_embedding(item['houseinfo']['pattern'])
        VW_size = get_bert_embedding(item['houseinfo']['size'])
        VW_layer = get_bert_embedding(item['houseinfo']['layer'])  # 樓層資訊
        VW_servicelist = get_bert_embedding(' '.join(item['servicelist']))  # 服務清單

        # 先處理第一張圖片
        images = os.listdir(os.path.join(image_folder, str(hid)))
        if images:
            first_image_path = os.path.join(image_folder, str(hid), images[0])
            objects, image = detect_objects(first_image_path)
            VP_image = generate_clip_description(image, objects)

        # 將處理結果儲存至資料庫
        query = """
            INSERT INTO houses (hid, VW_address, VW_pattern, VW_size, VW_layer, VW_servicelist, VP_image)
            VALUES (%s, %s, %s, %s, %s, %s, %s)
            ON DUPLICATE KEY UPDATE 
            VW_address = VALUES(VW_address), 
            VW_pattern = VALUES(VW_pattern),
            VW_size = VALUES(VW_size),
            VW_layer = VALUES(VW_layer),
            VW_servicelist = VALUES(VW_servicelist),
            VP_image = VALUES(VP_image)
        """
        cursor.execute(query, (
            hid, 
            VW_address.tolist(), 
            VW_pattern.tolist(), 
            VW_size.tolist(), 
            VW_layer.tolist(),
            VW_servicelist.tolist(), 
            str(VP_image)
        ))
    
    db_conn.commit()

# 更新相似房子的 same_id 到相似的資料庫中
def update_same_id(similar_items):
    for group in similar_items:
        # 先查看這組房子是否已有 same_id
        same_ids = set()
        for hid in group:
            same_cursor.execute("SELECT same FROM houses WHERE hid = %s", (hid,))
            result = same_cursor.fetchone()
            if result and result[0]:
                same_ids.add(result[0])
        
        if same_ids:
            # 如果已有 same_id，使用這個 same_id
            same_id = same_ids.pop()
        else:
            # 否則為這組房子生成一個新的 same_id
            same_id = str(uuid.uuid4())

        # 將相同的 same_id 更新到每個相似的房子中
        for hid in group:
            same_cursor.execute("UPDATE houses SET same = %s WHERE hid = %s", (same_id, hid))
    
    same_db_conn.commit()

# 從資料庫讀取特徵並進行相似度判斷
def find_similar_items_from_db(image_threshold=0.8, text_threshold=0.9):
    query = "SELECT hid, VW_address, VW_pattern, VW_size, VW_layer, VW_servicelist, VP_image FROM houses"
    cursor.execute(query)
    rows = cursor.fetchall()

    similar_items = []

    for i in range(len(rows)):
        for j in range(i + 1, len(rows)):
            hid1, VW_address1, VW_pattern1, VW_size1, VW_layer1, VW_servicelist1, VP_image1 = rows[i]
            hid2, VW_address2, VW_pattern2, VW_size2, VW_layer2, VW_servicelist2, VP_image2 = rows[j]

            # 文字相似度比對
            address_similarity = cosine_similarity(torch.tensor(VW_address1), torch.tensor(VW_address2)) > text_threshold
            pattern_similarity = cosine_similarity(torch.tensor(VW_pattern1), torch.tensor(VW_pattern2)) > text_threshold
            size_similarity = cosine_similarity(torch.tensor(VW_size1), torch.tensor(VW_size2)) > text_threshold
            layer_similarity = cosine_similarity(torch.tensor(VW_layer1), torch.tensor(VW_layer2)) > text_threshold
            servicelist_similarity = cosine_similarity(torch.tensor(VW_servicelist1), torch.tensor(VW_servicelist2)) > text_threshold

            if address_similarity and pattern_similarity and size_similarity and layer_similarity and servicelist_similarity:
                # 圖片相似度比對
                image_similarity = calculate_image_similarity(eval(VP_image1), eval(VP_image2)) > image_threshold
                if image_similarity:
                    similar_items.append((hid1, hid2))

    return group_similar_items(similar_items)

# 將相似房子分組處理，例如，如果三個房子都相似，分為一組
def group_similar_items(similar_items):
    groups = []
    for hid1, hid2 in similar_items:
        added = False
        # 檢查這對房子是否已經在某個組裡
        for group in groups:
            if hid1 in group or hid2 in group:
                group.add(hid1)
                group.add(hid2)
                added = True
                break
        if not added:
            groups.append({hid1, hid2})
    return [list(group) for group in groups]

def main():
    # 讀取資料並預處理
    json_data = load_json("C:\\Users\\user\\OneDrive\\桌面\\detail-複製.json")
    image_folder = "C:\\Users\\user\\OneDrive\\桌面\\gold_house-複製"
    
    # 儲存特徵到資料庫
    store_features_in_db(json_data, image_folder)
    
    # 從資料庫中讀取特徵並進行相似度判斷
    print("\n開始進行相似度比對...")
    similar_items = find_similar_items_from_db()

    # 將相似的房子分組並更新相同的 same_id
    print("\n更新相同的 same_id...")
    update_same_id(similar_items)

    # 輸出相似度比對結果
    print("相似房屋:")
    for group in similar_items:
        print(f"相似房屋組: {group}")

if __name__ == "__main__":
    main()


In [None]:
CREATE TABLE houses (
    hid INT PRIMARY KEY,                     -- 房屋 ID
    VW_address JSON,                         -- 地址的文字向量特徵
    VW_pattern JSON,                         -- 房型的文字向量特徵
    VW_size JSON,                            -- 面積的文字向量特徵
    VW_layer JSON,                           -- 樓層的文字向量特徵
    VW_servicelist JSON,                     -- 服務清單的文字向量特徵
    VP_image JSON,                           -- 圖片的特徵向量
    same VARCHAR(255)                        -- 相似房屋的唯一編號
);

In [None]:
CREATE TABLE same_houses (
    hid INT PRIMARY KEY,        -- 房屋 ID
    same VARCHAR(255)           -- 相似房屋的唯一編號
);