In [None]:
!pip install --upgrade google-auth google-auth-oauthlib supabase

In [None]:
from google.colab import files
uploaded = files.upload()

In [None]:
from google_auth_oauthlib.flow import Flow
import json

# credentials.jsonの読み込み
with open("credentials.json") as f:
    secrets = json.load(f)["installed"]

flow = Flow.from_client_config(
    {
        "installed": {
            "client_id": secrets["client_id"],
            "client_secret": secrets["client_secret"],
            "auth_uri": secrets["auth_uri"],
            "token_uri": secrets["token_uri"],
        }
    },
    scopes=[
        "https://www.googleapis.com/auth/drive",
        "https://www.googleapis.com/auth/spreadsheets",
    ],
    redirect_uri="urn:ietf:wg:oauth:2.0:oob",  # ← ここがColabで重要
)

# 認証URLを生成
auth_url, _ = flow.authorization_url(prompt="consent")

print("👇以下のURLをクリックして認証し、表示されるコードをコピーしてください：\n")
print(auth_url)

In [None]:
code = input("認証コードを貼り付けてください：")
flow.fetch_token(code=code)

creds = flow.credentials

# 認証情報の保存
with open("authorized_user.json", "w") as token:
    token.write(creds.to_json())

# gspread 認証
import gspread
gc = gspread.authorize(creds)

In [None]:
from google.colab import files
files.upload()

In [None]:
!mkdir -p /root/.config/gspread
!mv credentials.json /root/.config/gspread/credentials.json
!mv authorized_user.json /root/.config/gspread/authorized_user.json

In [None]:

import pandas as pd
import requests
import base64
import time
import random
from datetime import datetime, timedelta
import gspread
from gspread_dataframe import set_with_dataframe

gc = gspread.oauth()

In [None]:

client_id = '581c1bdd9d464fef81ade0e46271e24e'
client_secret = '92dbe0318c194f6bbe863599f864d373'

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import pandas as pd
from supabase import create_client, Client
# ClientOptions は不要なので削除
# from supabase.lib.client_options import ClientOptions

# ==========================================
# 1. Supabase 接続設定
# ==========================================
url = "https://oslpycadjyhnldloxvyl.supabase.co"
key = "sb_publishable_6Mb_C0QEUGRzrlowo0Zn8w_KOG6eYmZ"

# オプションなしでクライアントを作成
supabase: Client = create_client(url, key)

# ==========================================
# 2. データ取得 (external_ids から取得し、groups を結合)
# ==========================================
print("📡 Supabase (schema: imd) からSpotify IDリストを取得中...")

try:
    # ✨ ここで .schema("imd") を指定してチェーンします
    response = supabase.schema("imd").table("external_ids").select(
        "external_id, groups(name_ja)"
    ).eq("service", "spotify").execute()

    # データがあるか確認
    if not response.data:
        raise Exception("❌ データが見つかりませんでした。条件に合うレコードがありません。")

    # 3. 整形 (ネストされたJSONをフラットにする)
    df_ids = pd.json_normalize(response.data)

    # カラム名を変更して、続きの処理が動くようにする
    # external_id -> spotify_id
    rename_map = {
        'external_id': 'spotify_id',
        'groups.name_ja': 'name'
    }
    df_ids = df_ids.rename(columns=rename_map)

    # 必要なカラムだけに絞る
    df_ids = df_ids[['spotify_id', 'name']]

    # 4. 重複チェック
    print(f"✅ {len(df_ids)} 件のデータを取得しました。")
    print("💾 重複チェックを開始します...")

    duplicate_ids = df_ids[df_ids.duplicated(subset=['spotify_id'], keep=False)]

    if not duplicate_ids.empty:
        print(f"⚠️ {len(duplicate_ids['spotify_id'].unique())} 個のspotify_idに関して、合計 {len(duplicate_ids)} 行の重複が見つかりました。")
        # 重複排除（最初の1つを残す）
        df_ids = df_ids.drop_duplicates(subset='spotify_id')
    else:
        print("✅ 重複はありません。")

    print("-" * 50)
    # 確認のため先頭5行を表示
    print(df_ids.head())

except Exception as e:
    print(f"❌ エラーが発生しました: {e}")

In [None]:

class TokenManager:
    def __init__(self, cid, secret):
        self.client_id = cid
        self.client_secret = secret
        self.token = None
        self.token_expiry_time = datetime.now()

    def _get_new_token(self):
        auth = base64.b64encode(f"{self.client_id}:{self.client_secret}".encode()).decode()
        headers = {'Authorization': f'Basic {auth}'}
        data = {'grant_type': 'client_credentials'}
        try:
            r = requests.post('https://accounts.spotify.com/api/token', headers=headers, data=data)
            r.raise_for_status()
            token_data = r.json()
            self.token = token_data['access_token']
            expires_in = token_data.get('expires_in', 3600)
            self.token_expiry_time = datetime.now() + timedelta(seconds=expires_in - 300)
            print("✅ Spotifyアクセストークンを正常に取得・更新しました。")
        except requests.exceptions.RequestException as e:
            print(f"❌ トークン取得に失敗しました: {e}")
            self.token = None

    def get_token(self):
        if self.token is None or datetime.now() >= self.token_expiry_time:
            print("トークンが期限切れ、または存在しないため、リフレッシュします。")
            self._get_new_token()
        return self.token

        print("401エラー検知。トークンを強制的にリフレッシュします。")
        self._get_new_token()

token_manager = TokenManager(client_id, client_secret)

In [None]:

def make_spotify_request(url, token_manager, max_retries=5):
    for attempt in range(max_retries):
        try:
            token = token_manager.get_token()
            if not token:
                raise Exception("トークンが取得できません。")

            headers = {"Authorization": f"Bearer {token}"}
            r = requests.get(url, headers=headers, timeout=10)

            if r.status_code == 200:
                return r.json()
            elif r.status_code == 401:
                print(f"URL: {url}")
                continue
            elif r.status_code == 429:
                wait_time = int(r.headers.get('Retry-After', 5))
                print(f"レート制限(429)のため、{wait_time}秒待機します。")
                time.sleep(wait_time)
            elif r.status_code >= 500:
                wait_time = (2 ** attempt) + random.uniform(0, 1)
                print(f"サーバーエラー({r.status_code}) リトライ {attempt + 1}。{wait_time:.2f}秒待機します。")
                time.sleep(wait_time)
            else:
                return None
        except requests.exceptions.RequestException as e:
            print(f"リクエスト例外発生: {e} リトライします...")
            wait_time = (2 ** attempt) + random.uniform(0, 1)
            time.sleep(wait_time)
    return None

def get_artist_info(artist_id, token_manager):
    url = f"https://api.spotify.com/v1/artists/{artist_id}"
    return make_spotify_request(url, token_manager)

def get_top_track_popularities(artist_id, token_manager, top_n=5):
    url = f"https://api.spotify.com/v1/artists/{artist_id}/top-tracks?market=JP"
    data = make_spotify_request(url, token_manager)
    if data and "tracks" in data:
        tracks = data["tracks"][:top_n]
        return tracks, [t['popularity'] for t in tracks]
    return [], []

def count_recent_releases(tracks, days=30):
    count = 0
    now = datetime.now().date()
    for t in tracks:
        try:
            release_date_str = t['album']['release_date']
            if len(release_date_str) == 10:
                release_date = datetime.strptime(release_date_str, "%Y-%m-%d").date()
            elif len(release_date_str) == 7:
                release_date = datetime.strptime(release_date_str, "%Y-%m").date()
            elif len(release_date_str) == 4:
                release_date = datetime.strptime(release_date_str, "%Y").date()
            else:
                continue
            if now - release_date <= timedelta(days=days):
                count += 1
        except (ValueError, KeyError):
            continue
    return count

In [None]:

def fetch_snapshot(df_ids, token_manager):
    snapshot = []
    total = len(df_ids)
    for i, row in df_ids.iterrows():
        sid = row['spotify_id']
        print(f"[{i+1}/{total}] {sid} のデータを取得中...")
        info = get_artist_info(sid, token_manager)
        tracks, pops = get_top_track_popularities(sid, token_manager)
        new_count = count_recent_releases(tracks, days=30)

        snapshot.append({
            'spotify_id': sid,
            'name': info.get('name', 'N/A') if info else 'N/A',
            'artist_popularity': info.get('popularity', 0) if info else 0,
            'followers': info.get('followers', {}).get('total', 0) if info else 0,
            'track_popularity_sum': sum(pops) if pops else 0,
            'new_release_count': new_count
        })
    return pd.DataFrame(snapshot)

In [None]:

df_snapshot = fetch_snapshot(df_ids, token_manager)

for round_num in range(3):
    na_ids = df_snapshot[
        (df_snapshot['name'] == 'N/A') |
        ((df_snapshot['followers'] == 0) & (df_snapshot['artist_popularity'] > 0))
    ]['spotify_id']

    if len(na_ids) == 0:
        print("✅ 全てのデータの取得が完了しました。")
        break

    print(f"\n🔁 補完ラウンド {round_num + 1}: {len(na_ids)} 件を再取得します。")
    time.sleep(5)
    retry_df = fetch_snapshot(df_ids[df_ids['spotify_id'].isin(na_ids)], token_manager)
    df_snapshot = df_snapshot.set_index('spotify_id')
    df_snapshot.update(retry_df.set_index('spotify_id'))
    df_snapshot = df_snapshot.reset_index()

df_snapshot = df_snapshot.drop_duplicates(subset='spotify_id', keep='last')

In [None]:

today_str = datetime.today().strftime('%Y-%m-%d')
sheet_name = "IHC_Snapshot"

try:
    sh = gc.open(sheet_name)
except gspread.exceptions.SpreadsheetNotFound:
    sh = gc.create(sheet_name)

try:
    worksheet = sh.worksheet(today_str)
    sh.del_worksheet(worksheet)
    print(f"既存のシート '{today_str}' を削除しました。")
except gspread.exceptions.WorksheetNotFound:
    pass

worksheet = sh.add_worksheet(title=today_str, rows="100", cols="20")
set_with_dataframe(worksheet, df_snapshot)
print(f"✅ スナップショット作成完了: {sh.url}")