# Snowflake Schema Column Detector (サンプル値マッチング版)

このノートブックは、あらかじめ与えられた複数のサンプル値から、それらの値を含むテーブル・カラムを特定します。

## 機能
- 指定したSchemaの全テーブルを自動取得
- あらかじめ与えられたサンプル値のリストから該当するカラムを検索
- 各カラムの情報(カラム名、データ型、NULL許可など)を取得
- マッチした値とマッチ率を表示
- 結果をDataFrameで表示・CSV/Excel出力

## 1. 必要なライブラリのインポート

In [None]:
import snowflake.connector
import pandas as pd
from typing import List, Dict
import os
from IPython.display import display

print("✓ ライブラリのインポート完了")

## 2. Snowflake接続設定

接続情報を設定します。セキュリティのため、環境変数からの読み込みを推奨します。

In [None]:
# Snowflake接続情報の設定
config = {
    'account': os.getenv('SNOWFLAKE_ACCOUNT', 'your_account'),
    'user': os.getenv('SNOWFLAKE_USER', 'your_username'),
    'password': os.getenv('SNOWFLAKE_PASSWORD', 'your_password'),
    'warehouse': os.getenv('SNOWFLAKE_WAREHOUSE', 'your_warehouse'),
    'database': os.getenv('SNOWFLAKE_DATABASE', 'your_database'),
    'schema': os.getenv('SNOWFLAKE_SCHEMA', 'your_schema')
}

print(f"データベース: {config['database']}")
print(f"スキーマ: {config['schema']}")

## 3. Snowflakeへの接続

In [None]:
# Snowflakeに接続
try:
    conn = snowflake.connector.connect(
        account=config['account'],
        user=config['user'],
        password=config['password'],
        warehouse=config['warehouse'],
        database=config['database'],
        schema=config['schema']
    )
    print(f"✓ Snowflakeに接続しました: {config['database']}.{config['schema']}")
except Exception as e:
    print(f"✗ 接続エラー: {e}")

## 4. テーブル一覧の取得

指定されたスキーマ内の全テーブルを取得します。

In [None]:
def get_tables(connection, database: str, schema: str) -> List[str]:
    """
    指定されたスキーマ内の全テーブルを取得
    
    Parameters:
    -----------
    connection : snowflake.connector.connection
        Snowflake接続オブジェクト
    database : str
        データベース名
    schema : str
        スキーマ名
        
    Returns:
    --------
    List[str]
        テーブル名のリスト
    """
    query = f"""
    SELECT TABLE_NAME
    FROM {database}.INFORMATION_SCHEMA.TABLES
    WHERE TABLE_SCHEMA = '{schema}'
    AND TABLE_TYPE = 'BASE TABLE'
    ORDER BY TABLE_NAME
    """
    
    cursor = connection.cursor()
    cursor.execute(query)
    tables = [row[0] for row in cursor.fetchall()]
    cursor.close()
    
    return tables

# テーブル一覧の取得
tables = get_tables(conn, config['database'], config['schema'])
print(f"テーブル数: {len(tables)}")
print("\nテーブル一覧:")
for i, table in enumerate(tables, 1):
    print(f"  {i}. {table}")

## 5. カラム情報の取得

指定されたテーブルのカラム情報を取得します。

In [None]:
def get_columns_info(connection, database: str, schema: str, table_name: str) -> pd.DataFrame:
    """
    指定されたテーブルのカラム情報を取得
    
    Parameters:
    -----------
    connection : snowflake.connector.connection
        Snowflake接続オブジェクト
    database : str
        データベース名
    schema : str
        スキーマ名
    table_name : str
        テーブル名
        
    Returns:
    --------
    pd.DataFrame
        カラム情報のDataFrame
    """
    query = f"""
    SELECT 
        COLUMN_NAME,
        DATA_TYPE,
        IS_NULLABLE,
        COLUMN_DEFAULT,
        CHARACTER_MAXIMUM_LENGTH,
        NUMERIC_PRECISION,
        NUMERIC_SCALE
    FROM {database}.INFORMATION_SCHEMA.COLUMNS
    WHERE TABLE_SCHEMA = '{schema}'
    AND TABLE_NAME = '{table_name}'
    ORDER BY ORDINAL_POSITION
    """
    
    cursor = connection.cursor()
    cursor.execute(query)
    columns = cursor.fetchall()
    cursor.close()
    
    df = pd.DataFrame(columns, columns=[
        'COLUMN_NAME', 'DATA_TYPE', 'IS_NULLABLE', 
        'COLUMN_DEFAULT', 'CHAR_MAX_LENGTH', 
        'NUMERIC_PRECISION', 'NUMERIC_SCALE'
    ])
    
    return df

# 例: 最初のテーブルのカラム情報を取得(テーブルが存在する場合)
if tables:
    example_table = tables[0]
    columns_info = get_columns_info(conn, config['database'], config['schema'], example_table)
    print(f"\nテーブル '{example_table}' のカラム情報:")
    display(columns_info)

## 6. サンプル値によるカラム検索

あらかじめ与えられたサンプル値のリストから、そのサンプル値を含むカラムを検索します。

In [None]:
def search_table_all_columns(connection, database: str, schema: str,
                            table_name: str, sample_values: List,
                            columns_info: pd.DataFrame) -> List[Dict]:
    """
    テーブルの全カラムを一度のクエリで検索（UNION ALL方式）
    
    Parameters:
    -----------
    connection : snowflake.connector.connection
        Snowflake接続オブジェクト
    database : str
        データベース名
    schema : str
        スキーマ名
    table_name : str
        テーブル名
    sample_values : List
        検索するサンプル値のリスト
    columns_info : pd.DataFrame
        カラム情報のDataFrame
        
    Returns:
    --------
    List[Dict]
        各カラムのマッチ情報を含む辞書のリスト
    """
    if not sample_values or columns_info.empty:
        return []
    
    # サンプル値をクエリ用に変換
    formatted_values = []
    for val in sample_values:
        if isinstance(val, str):
            escaped_val = val.replace("'", "''")
            formatted_values.append(f"'{escaped_val}'")
        else:
            formatted_values.append(str(val))
    
    values_str = ', '.join(formatted_values)
    
    # 各カラムに対するSELECT文を生成
    union_parts = []
    for _, col_info in columns_info.iterrows():
        column_name = col_info['COLUMN_NAME']
        union_parts.append(f"""
        SELECT 
            '{column_name}' as COLUMN_NAME,
            "{column_name}" as MATCHED_VALUE
        FROM {database}.{schema}."{table_name}"
        WHERE "{column_name}" IN ({values_str})
        """)
    
    # UNION ALLで結合
    query = " UNION ALL ".join(union_parts)
    
    try:
        cursor = connection.cursor()
        cursor.execute(query)
        results = cursor.fetchall()
        cursor.close()
        
        # カラムごとにマッチした値を集計
        column_matches = {}
        for row in results:
            col_name = row[0]
            matched_val = row[1]
            if col_name not in column_matches:
                column_matches[col_name] = []
            column_matches[col_name].append(matched_val)
        
        # 各カラムの結果を作成
        column_results = []
        for _, col_info in columns_info.iterrows():
            column_name = col_info['COLUMN_NAME']
            data_type = col_info['DATA_TYPE']
            is_nullable = col_info['IS_NULLABLE']
            
            # マッチした値を取得（重複を除去）
            matched_values = list(set(column_matches.get(column_name, [])))
            matched_count = len(matched_values)
            match_rate = (matched_count / len(sample_values)) * 100
            
            column_results.append({
                'column_name': column_name,
                'data_type': data_type,
                'is_nullable': is_nullable,
                'matched_count': matched_count,
                'matched_values': matched_values,
                'match_rate': match_rate
            })
        
        return column_results
        
    except Exception as e:
        print(f"    エラー: {str(e)}")
        return []


# サンプル値の定義（ここに検索したい値を設定）
SAMPLE_VALUES = [
    'sample_value_1',
    'sample_value_2',
    'sample_value_3',
    # 必要に応じて値を追加
]

print(f"検索するサンプル値: {SAMPLE_VALUES}")
print(f"サンプル値の数: {len(SAMPLE_VALUES)}")


## 7. スキーマ全体からサンプル値に一致するカラムを検索

全テーブル・カラムを走査し、与えられたサンプル値に一致するカラムを見つけます。

In [None]:
def find_columns_by_sample_values(connection, database: str, schema: str,
                                 sample_values: List, 
                                 min_match_rate: float = 50.0,
                                 specific_tables: List[str] = None) -> pd.DataFrame:
    """
    サンプル値に一致するカラムをスキーマ全体から検索（最適化版）
    テーブルごとに全カラムを一度のクエリで検索
    
    Parameters:
    -----------
    connection : snowflake.connector.connection
        Snowflake接続オブジェクト
    database : str
        データベース名
    schema : str
        スキーマ名
    sample_values : List
        検索するサンプル値のリスト
    min_match_rate : float
        最小マッチ率（％）。この値以上のマッチ率のカラムのみを結果に含める
    specific_tables : List[str], optional
        特定のテーブルのみを検索する場合に指定
        
    Returns:
    --------
    pd.DataFrame
        マッチしたテーブル、カラム、データ型、マッチ情報を含むDataFrame
    """
    # テーブルリストの取得
    if specific_tables:
        table_list = specific_tables
    else:
        table_list = get_tables(connection, database, schema)
    
    print(f"検索対象テーブル数: {len(table_list)}")
    print(f"検索するサンプル値の数: {len(sample_values)}")
    print(f"最小マッチ率: {min_match_rate}%")
    print(f"最適化: テーブルごとに全カラムを一度のクエリで検索\n")
    
    results = []
    
    for idx, table in enumerate(table_list, 1):
        print(f"[{idx}/{len(table_list)}] 検索中: {table}", end=" ... ")
        
        # カラム情報の取得
        columns_info = get_columns_info(connection, database, schema, table)
        
        if columns_info.empty:
            print("カラムなし")
            continue
        
        print(f"{len(columns_info)}カラム", end=" ... ")
        
        # テーブルの全カラムを一度に検索
        column_results = search_table_all_columns(
            connection, database, schema, table, sample_values, columns_info
        )
        
        if not column_results:
            print("マッチなし")
            continue
        
        # 最小マッチ率以上の結果のみを追加
        match_count = 0
        for col_result in column_results:
            match_rate = col_result['match_rate']
            
            if match_rate >= min_match_rate:
                matched_values_str = ', '.join([str(v) for v in col_result['matched_values']])
                
                results.append({
                    'テーブル名': table,
                    'カラム名': col_result['column_name'],
                    'データ型': col_result['data_type'],
                    'NULL許可': col_result['is_nullable'],
                    'マッチ数': col_result['matched_count'],
                    'サンプル値総数': len(sample_values),
                    'マッチ率(%)': round(match_rate, 2),
                    'マッチした値': matched_values_str
                })
                match_count += 1
        
        if match_count > 0:
            print(f"✓ {match_count}カラムがマッチ")
        else:
            print("マッチ率低い")
    
    df_results = pd.DataFrame(results)
    
    # マッチ率で降順ソート
    if not df_results.empty:
        df_results = df_results.sort_values('マッチ率(%)', ascending=False).reset_index(drop=True)
        print(f"\n✓ 検索完了: {len(df_results)} カラムが条件に一致しました")
    else:
        print(f"\n✗ 条件に一致するカラムが見つかりませんでした")
    
    return df_results


## 8. サンプル値の設定と検索の実行

検索したいサンプル値を設定し、カラム検索を実行します。

**パラメータ:**
- `sample_values`: 検索するサンプル値のリスト
- `min_match_rate`: 最小マッチ率（％）デフォルト50%
- `specific_tables`: 特定のテーブルのみ検索する場合に指定

In [None]:
# ========================================
# ここに検索したいサンプル値を設定
# ========================================
SEARCH_SAMPLE_VALUES = [
    'Tokyo',
    'Osaka',
    'Kyoto',
    'Nagoya',
    'Sapporo'
    # 必要に応じて値を追加
]

# 検索の実行
# 全テーブルを検索する場合:
results_df = find_columns_by_sample_values(
    conn, 
    config['database'], 
    config['schema'], 
    SEARCH_SAMPLE_VALUES,
    min_match_rate=30.0  # 30%以上マッチするカラムを表示
)

# 特定のテーブルのみを検索する場合:
# results_df = find_columns_by_sample_values(
#     conn, 
#     config['database'], 
#     config['schema'], 
#     SEARCH_SAMPLE_VALUES,
#     min_match_rate=50.0,
#     specific_tables=['TABLE1', 'TABLE2']
# )


## 9. 検索結果の表示

サンプル値にマッチしたカラムの一覧を表示します。マッチ率が高い順に表示されます。

In [None]:
# 結果の表示
print("="*80)
print("検索結果（マッチ率が高い順）")
print("="*80)
display(results_df)

if not results_df.empty:
    print(f"\n統計情報:")
    print(f"マッチしたカラム数: {len(results_df)}")
    print(f"マッチしたテーブル数: {results_df['テーブル名'].nunique()}")
    print(f"\n最高マッチ率: {results_df['マッチ率(%)'].max()}%")
    print(f"平均マッチ率: {results_df['マッチ率(%)'].mean():.2f}%")
    print(f"\nマッチ率が100%のカラム数: {len(results_df[results_df['マッチ率(%)'] == 100])}")
else:
    print("\n条件に一致するカラムが見つかりませんでした。")
    print("・サンプル値が正しいか確認してください")
    print("・最小マッチ率を下げてみてください")

## 10. 結果のフィルタリングと分析

マッチ率やテーブル名でさらに絞り込んで分析できます。

In [None]:
# マッチ率が100%のカラムのみ表示
if not results_df.empty:
    perfect_matches = results_df[results_df['マッチ率(%)'] == 100]
    if not perfect_matches.empty:
        print("完全一致（マッチ率100%）のカラム:")
        display(perfect_matches)
    else:
        print("完全一致のカラムはありません")
    
    # マッチ率でフィルタリング（例: 80%以上）
    high_match = results_df[results_df['マッチ率(%)'] >= 80]
    print(f"\nマッチ率80%以上のカラム数: {len(high_match)}")
    
    # 特定のテーブルでフィルタリング（例）
    # specific_table_results = results_df[results_df['テーブル名'] == 'YOUR_TABLE_NAME']
    # display(specific_table_results)
    
    # データ型別の集計
    print("\nデータ型別のマッチ数:")
    print(results_df['データ型'].value_counts())


## 11. 結果のエクスポート

分析結果をCSVまたはExcelファイルに出力します。

In [None]:
# CSVファイルに出力
if not results_df.empty:
    csv_filename = 'snowflake_column_match_results.csv'
    results_df.to_csv(csv_filename, index=False, encoding='utf-8-sig')
    print(f"✓ CSVファイルに出力しました: {csv_filename}")
    
    # Excelファイルに出力(オプション)
    try:
        excel_filename = 'snowflake_column_match_results.xlsx'
        results_df.to_excel(excel_filename, index=False, engine='openpyxl')
        print(f"✓ Excelファイルに出力しました: {excel_filename}")
    except Exception as e:
        print(f"Excel出力エラー: {e}")
        print("(openpyxlがインストールされていない可能性があります)")
else:
    print("結果がないため、ファイル出力をスキップします")


## 12. Snowflake接続のクローズ

処理が完了したら、Snowflake接続をクローズします。

In [None]:
# Snowflake接続をクローズ
conn.close()
print("✓ Snowflakeから切断しました")