<a href="https://colab.research.google.com/github/kanade2001/KokomeloTalk/blob/back%2Fdevelop/music_search.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
import sqlite3
import math
import csv
import os

WORKSPACE = '/content/drive/MyDrive/Colab Notebooks/KokomeloTalk'

# データベースファイル名
DATABASE_NAME = f'{WORKSPACE}/tracks.db'

# 感情と対応する音響特徴量のマッピング (目標値、重み)
EMOTION_FEATURES = {
    "喜び": {'genre':['k-pop'], 'features':{'danceability': (0.8,1), 'energy': (0.8,1), 'valence': (0.9,1)}},
    "悲しみ": {'genre':['classic'], 'features':{'danceability': (0.3,1), 'energy': (0.3,1), 'valence': (0.2,1)}},
    "怒り": {'genre':['classic'], 'features':{'danceability': (0.7,1), 'energy': (0.9,1), 'valence': (0.1,1)}},
    "幸せ": {'genre':['j-pop','k-pop'], 'features':{'danceability': (0.7,1), 'energy': (0.7,1), 'valence': (0.8,1)}},
}

def initialize_and_save_database(csv_paths):
    """
    データベースを初期化し、指定されたCSVファイルからトラック情報を読み込んで保存します。

    Args:
        csv_path (str): CSVファイルのパス。
    """
    if os.path.exists(DATABASE_NAME):
        print(f"データベース '{DATABASE_NAME}' は既に存在します。初期化と保存をスキップします。")
        return

    conn = sqlite3.connect(DATABASE_NAME)
    cursor = conn.cursor()
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS tracks (
            id TEXT PRIMARY KEY,
            name TEXT,
            artists TEXT,
            album TEXT,
            duration_ms INTEGER,
            popularity INTEGER,
            acousticness REAL,
            danceability REAL,
            energy REAL,
            instrumentalness REAL,
            key INTEGER,
            liveness REAL,
            loudness REAL,
            mode INTEGER,
            speechiness REAL,
            tempo REAL,
            time_signature INTEGER,
            valence REAL,
            genre TEXT
        )
    ''')

    # CSVからトラック情報を読み込む
    tracks = read_tracks_from_csv(csv_paths)
    if not tracks:
        print("CSVからトラック情報を取得できませんでした。")
        conn.close()
        return

    # トラック情報をデータベースに保存
    for track in tracks:
        cursor.execute('''
            INSERT OR REPLACE INTO tracks (
                id, name, artists, album, duration_ms,
                popularity, acousticness, danceability, energy, instrumentalness, key,
                liveness, loudness, mode, speechiness, tempo, time_signature, valence, genre
            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        ''', (
            track['id'],
            track['name'],
            track['artists'],
            track['album'],
            track['duration_ms'],
            track['popularity'],
            track['acousticness'],
            track['danceability'],
            track['energy'],
            track['instrumentalness'],
            track['key'],
            track['liveness'],
            track['loudness'],
            track['mode'],
            track['speechiness'],
            track['tempo'],
            track['time_signature'],
            track['valence'],
            track['track_genre']
        ))
    conn.commit()
    conn.close()
    print(f"データベースに {len(tracks)} 曲を保存しました。")

def read_tracks_from_csv(csv_paths):
    """
    指定されたCSVファイルからトラック情報を読み込み、辞書のリストとして返します。

    CSVファイルには以下の列が含まれている必要があります:
    index, track_id, artists, album_name, track_name, popularity, duration_ms, explicit,
    danceability, energy, key, loudness, mode, speechiness, acousticness, instrumentalness,
    liveness, valence, tempo, time_signature, track_genre

    Args:
        csv_paths (str): CSVファイルのパス。

    Returns:
        list of dict: トラック情報の辞書リスト。
    """
    tracks = []
    for csv_path in csv_paths:
        try:
            with open(csv_path, 'r', encoding='utf-8') as file:
                reader = csv.DictReader(file)
                for row in reader:
                    track = {
                        'id': row['track_id'],
                        'name': row['track_name'],
                        'artists': row['artists'],
                        'album': row['album_name'],
                        'duration_ms': int(row['duration_ms']) if row['duration_ms'] else None,
                        'popularity': int(row['popularity']) if row['popularity'] else None,
                        'acousticness': float(row['acousticness']) if row['acousticness'] else None,
                        'danceability': float(row['danceability']) if row['danceability'] else None,
                        'energy': float(row['energy']) if row['energy'] else None,
                        'instrumentalness': float(row['instrumentalness']) if row['instrumentalness'] else None,
                        'key': int(row['key']) if row['key'] else None,
                        'liveness': float(row['liveness']) if row['liveness'] else None,
                        'loudness': float(row['loudness']) if row['loudness'] else None,
                        'mode': int(row['mode']) if row['mode'] else None,
                        'speechiness': float(row['speechiness']) if row['speechiness'] else None,
                        'tempo': float(row['tempo']) if row['tempo'] else None,
                        'time_signature': int(row['time_signature']) if row['time_signature'] else None,
                        'valence': float(row['valence']) if row['valence'] else None,
                        'track_genre': row['track_genre'] if 'track_genre' in row else None
                    }
                    tracks.append(track)
        except FileNotFoundError:
            print(f"CSVファイル '{csv_path}' が見つかりません。")
        except Exception as e:
            print(f"CSVの読み込み中にエラーが発生しました: {e}")
    return tracks

def load_tracks_from_database(limit=100000,genres=None):
    """
    データベースからトラック情報を読み込みます。

    Args:
        limit (int): 読み込むトラックの最大数。

    Returns:
        list of dict: 読み込んだトラック情報のリスト。
    """
    if not os.path.exists(DATABASE_NAME):
        print(f"データベース '{DATABASE_NAME}' が存在しません。")
        return []

    conn = sqlite3.connect(DATABASE_NAME)
    cursor = conn.cursor()
    if genres:
        placeholders = ' OR '.join(['genre LIKE ?' for _ in genres])
        query = f'SELECT * FROM tracks WHERE {placeholders}'
        params = [f'%{genre}%' for genre in genres]
        cursor.execute(query, params)
    else:
        cursor.execute('SELECT * FROM tracks')
    rows = cursor.fetchall()
    columns = [description[0] for description in cursor.description] if rows else [
        'id', 'name', 'artists', 'album', 'duration_ms',
        'popularity', 'acousticness', 'danceability', 'energy',
        'instrumentalness', 'key', 'liveness', 'loudness', 'mode',
        'speechiness', 'tempo', 'time_signature', 'valence', 'genre'
    ]

    tracks = []
    for row in rows[:limit]:
        track = dict(zip(columns, row))
        tracks.append(track)

    conn.close()
    print(f"データベースから {len(tracks)} 曲を読み込みました。")
    return tracks

def find_matching_tracks(tracks, target_features, top_n=3):
    """
    トラックの詳細情報を表示し、目標音響特徴量に最も近い順に上位N曲を表示します。

    Args:
        tracks (list of dict): トラック情報のリスト。
        target_features (dict): 目標とする音響特徴量。
        top_n (int): 表示するトラックの数。

    Returns:
        list of dict: 上位N曲のリスト。
    """
    if not tracks:
        print("曲が見つかりませんでした。")
        return

    # 目標音響特徴量との距離を計算
    for track in tracks:
        distance = 0
        feature_count = 0
        for feature, target in target_features.items():
            track_value = track.get(feature)
            target_value, target_weight = target
            if track_value is not None:
                distance += target_weight * ((track_value - target_value) ** 2)
                feature_count += 1
        if feature_count > 0:
            track['distance'] = math.sqrt(distance)
        else:
            track['distance'] = float('inf')  # 音響特徴量がない場合

    # 距離でソート
    sorted_tracks = sorted(tracks, key=lambda x: x['distance'])

    # 上位N曲を取得
    top_tracks = sorted_tracks[:top_n]

    # 表示する属性
    display_attributes = ["id", "name", "artists", "genre", "danceability", "valence", "energy", "distance"]
    print(f"\nTop {top_n} 曲 (距離が近い順):\n")
    for idx, track in enumerate(top_tracks, start=1):
        print(f"曲 {idx}:")
        for attr in display_attributes:
            print(f"  {attr}: {track.get(attr, 'N/A')}")
        print()

    return top_tracks

def main(csv_paths=None, desired_features=None, limit=3):
    """
    メイン関数。データベースへの保存とデータベースからの読み込みを行います。

    Args:
        csv_path (str, optional): CSVファイルのパス。指定された場合、データベースに保存します。
        desired_features (dict, optional): 音響特徴量に基づいてトラックを検索します。
        limit (int, optional): 表示するトラックの数。
    """
    if csv_paths:
        print(f"CSVファイルからトラック情報を読み込んでいます...")
        initialize_and_save_database(csv_paths)

    if desired_features:
        print("データベースからトラックを読み込んでいます...")
        tracks = load_tracks_from_database(genres=desired_features.get('genre',None))
        if tracks:
            print(f"\n目標とする音響特徴量: {desired_features}")
            find_matching_tracks(tracks, desired_features['features'], top_n=limit)

def calculate_combined_features(emotion_scores, limit=3, csv_paths=None):
    """
    感情スコアに基づいて音響特徴量を加重平均し、混合した特徴量を計算します。

    Args:
        emotion_scores (dict): {感情: スコア} の辞書。
        limit (int): 表示するトラックの数。

    Returns:
        任意: main関数の戻り値。
    """
    determine_mode = "max"
    if not emotion_scores:
        # 感情スコアが空の場合、デフォルトの音響特徴量を使用
        combined_features = {'danceability': 0.5, 'energy': 0.5, 'valence': 0.5}
    else:
        total_score = sum(emotion_scores.values())
        if determine_mode == "max":
            emotion = max(emotion_scores, key=emotion_scores.get)
            desired_features = EMOTION_FEATURES.get(emotion, {'danceability': 0.5, 'energy': 0.5, 'valence': 0.5})
        elif determine_mode == "weighted_ave":
            # 各感情のスコア割合に応じて加重平均
            combined_features = {'danceability': 0.0, 'energy': 0.0, 'valence': 0.0}
            for emotion, score in emotion_scores.items():
                weight = score / total_score
                base = EMOTION_FEATURES.get(emotion, {'danceability': 0.5, 'energy': 0.5, 'valence': 0.5})
                combined_features['danceability'] += base['danceability'] * weight
                combined_features['energy'] += base['energy'] * weight
                combined_features['valence'] += base['valence'] * weight
            desired_features = combined_features

    # 音響特徴量を使用してmain関数を実行
    return main(desired_features=desired_features, limit=limit, csv_paths=csv_paths)

def local_test(csv_paths=None):
    """
    ローカルテスト用関数。感情スコアを指定して機能をテストします。
    """
    emotion_scores = {"喜び": 0.1, "怒り": 0.2, "悲しみ": 0.3, "幸せ": 0.4}
    return calculate_combined_features(emotion_scores, limit=3, csv_paths=csv_paths)

if __name__ == "__main__":
    csv_paths = [f'{WORKSPACE}/dataset.csv']
    local_test(csv_paths)

CSVファイルからトラック情報を読み込んでいます...
データベース '/content/drive/MyDrive/Colab Notebooks/KokomeloTalk/tracks.db' は既に存在します。初期化と保存をスキップします。
データベースからトラックを読み込んでいます...
データベースから 1297 曲を読み込みました。

目標とする音響特徴量: {'genre': ['j-pop', 'k-pop'], 'features': {'danceability': (0.7, 1), 'energy': (0.7, 1), 'valence': (0.8, 1)}}

Top 3 曲 (距離が近い順):

曲 1:
  id: 1CWC107N5c6KkDRXvY2VtB
  name: Udja Kale Kawan - Marriage
  artists: Alka Yagnik;Udit Narayan
  genre: k-pop
  danceability: 0.718
  valence: 0.793
  energy: 0.699
  distance: 0.019339079605813735

曲 2:
  id: 0JnOpiUieuk9SdRv7Fkw2P
  name: 恋泥棒。
  artists: 『ユイカ』
  genre: j-pop
  danceability: 0.71
  valence: 0.813
  energy: 0.718
  distance: 0.024351591323771803

曲 3:
  id: 7mQjVfocpzkKd3PN1TtwKR
  name: Nenjangootil Neeye
  artists: Vijay Antony;Jayadev;Rajalakshmi
  genre: k-pop
  danceability: 0.672
  valence: 0.791
  energy: 0.711
  distance: 0.031400636936215094

