# Pandas基礎② - データクリーニング（通常版）

Day 4 - Pandas Data Cleaning Basic

実務で必要なデータクリーニング技術を体系的に学習します。
より高度な処理や、大規模データへの対応方法も含みます。

In [None]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta

print("=== Pandas データクリーニング基礎（通常版） ===")

## 1. 複雑なサンプルデータの作成

実務でよく遭遇する様々な問題を含んだ販売トランザクションデータを作成します。

In [None]:
# 販売トランザクションデータ（実務でよくある問題を含む）
np.random.seed(42)
n_records = 100

data = {
    'transaction_id': [f'T{str(i).zfill(4)}' for i in range(1, n_records + 1)],
    'date': pd.date_range('2024-01-01', periods=n_records, freq='h').astype(str),
    'customer_id': np.random.choice(['C001', 'C002', 'C003', None, 'C004', 'C005'], n_records),
    'product_name': np.random.choice(['Product A', 'Product B', ' Product C ', 'product a', None], n_records),
    'quantity': np.random.choice([1, 2, 3, '4', '5個', None, -1], n_records),
    'unit_price': np.random.choice([1000, 1500, '2,000', '¥3000', None], n_records),
    'category': np.random.choice(['Electronics', 'electronics', 'ELECTRONICS', 'Clothing', None], n_records),
    'status': np.random.choice(['completed', 'Completed', 'COMPLETED', 'cancelled', 'pending'], n_records)
}

df = pd.DataFrame(data)

# 意図的にいくつかの問題を追加
df.loc[10:15, 'date'] = 'invalid_date'
df.loc[20:25, 'quantity'] = '在庫切れ'
df.loc[30:32, 'transaction_id'] = 'T0001'  # 重複ID

print("データの先頭10行:")
print(df.head(10))
print(f"\nデータの形状: {df.shape}")

## 2. データ品質の総合チェック

データの品質を包括的にチェックする関数を作成します。

In [None]:
def data_quality_report(df):
    """データ品質レポートを生成する関数"""
    report = []
    
    for col in df.columns:
        col_info = {
            'column': col,
            'dtype': str(df[col].dtype),
            'nulls': df[col].isnull().sum(),
            'null_pct': f"{df[col].isnull().sum() / len(df) * 100:.1f}%",
            'unique': df[col].nunique(),
            'duplicates': len(df) - df[col].nunique() if df[col].dtype == 'object' else '-'
        }
        report.append(col_info)
    
    return pd.DataFrame(report)

quality_report = data_quality_report(df)
print("データ品質レポート:")
quality_report

## 3. 高度な欠損値処理

欠損値のパターンを分析し、適切な処理方法を選択します。

In [None]:
# 欠損値のパターン分析
print("欠損値のパターン分析:")
missing_pattern = df.isnull().value_counts()
print(missing_pattern.head())
print()

# 条件付き欠損値処理
df_cleaned = df.copy()

# カテゴリ変数の欠損値処理
df_cleaned['category'] = df_cleaned['category'].fillna('Unknown')
df_cleaned['product_name'] = df_cleaned['product_name'].fillna('Unknown Product')

In [None]:
# 数値変数の欠損値処理（グループ別平均値で補完）
# まず数値に変換可能なものを変換
df_cleaned['quantity'] = pd.to_numeric(df_cleaned['quantity'], errors='coerce')
df_cleaned['unit_price'] = df_cleaned['unit_price'].astype(str).str.replace('[¥,円]', '', regex=True)
df_cleaned['unit_price'] = pd.to_numeric(df_cleaned['unit_price'], errors='coerce')

# カテゴリ別の平均値で補完
for category in df_cleaned['category'].unique():
    mask = df_cleaned['category'] == category
    df_cleaned.loc[mask, 'unit_price'] = df_cleaned.loc[mask, 'unit_price'].fillna(
        df_cleaned.loc[mask, 'unit_price'].mean()
    )

print("欠損値処理後の状況:")
print(df_cleaned.isnull().sum())

## 4. データ型の変換と検証

各列を適切なデータ型に変換し、無効なデータを処理します。

In [None]:
# 日付型への変換（エラー処理付き）
df_cleaned['date'] = pd.to_datetime(df_cleaned['date'], errors='coerce')

# 無効な日付の確認
invalid_dates = df_cleaned[df_cleaned['date'].isnull()]['transaction_id']
if len(invalid_dates) > 0:
    print(f"無効な日付を持つトランザクション: {list(invalid_dates.head())}")
    # 無効な日付を現在時刻で置換
    df_cleaned['date'] = df_cleaned['date'].fillna(pd.Timestamp.now())

# データ型の確認
print("\n変換後のデータ型:")
print(df_cleaned.dtypes)

## 5. 文字列の正規化と標準化

文字列データの表記揺れを統一します。

In [None]:
# 前後の空白削除
df_cleaned['product_name'] = df_cleaned['product_name'].str.strip()

# 大文字小文字の統一
df_cleaned['category'] = df_cleaned['category'].str.capitalize()
df_cleaned['status'] = df_cleaned['status'].str.lower()

# 製品名の統一（類似名称の統合）
product_mapping = {
    'product a': 'Product A',
    'Product a': 'Product A',
    'PRODUCT A': 'Product A'
}
df_cleaned['product_name'] = df_cleaned['product_name'].str.capitalize().replace(product_mapping)

print("正規化後の値の分布:")
print("カテゴリ:", df_cleaned['category'].value_counts().to_dict())
print("ステータス:", df_cleaned['status'].value_counts().to_dict())

## 6. 外れ値の検出と処理

IQR（四分位範囲）を使って外れ値を検出します。

In [None]:
def detect_outliers_iqr(df, column):
    """IQRを使った外れ値検出"""
    Q1 = df[column].quantile(0.25)
    Q3 = df[column].quantile(0.75)
    IQR = Q3 - Q1
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR
    
    outliers = df[(df[column] < lower_bound) | (df[column] > upper_bound)]
    return outliers, lower_bound, upper_bound

# 数量の外れ値検出
quantity_outliers, q_lower, q_upper = detect_outliers_iqr(df_cleaned[df_cleaned['quantity'].notna()], 'quantity')
print(f"数量の外れ値: {len(quantity_outliers)}件")
print(f"正常範囲: {q_lower:.1f} ～ {q_upper:.1f}")

# 負の数量を0に修正
df_cleaned.loc[df_cleaned['quantity'] < 0, 'quantity'] = 0

## 7. 重複データの高度な処理

完全重複と部分重複を区別して処理します。

In [None]:
# 完全重複の確認
full_duplicates = df_cleaned.duplicated().sum()
print(f"完全重複行数: {full_duplicates}")

# 部分重複の確認（transaction_idの重複）
id_duplicates = df_cleaned[df_cleaned.duplicated(subset=['transaction_id'], keep=False)]
print(f"\ntransaction_idの重複: {len(id_duplicates)}件")

if len(id_duplicates) > 0:
    print("\n重複したtransaction_id:")
    print(id_duplicates[['transaction_id', 'date', 'customer_id']].head())
    
    # 最新の日付のレコードを残す
    df_cleaned = df_cleaned.sort_values('date').drop_duplicates(subset=['transaction_id'], keep='last')
    print(f"\n重複削除後のレコード数: {len(df_cleaned)}")

## 8. データ品質の最終検証

クリーニング後のデータ品質を評価します。

In [None]:
# クリーニング前後の比較
print("クリーニング前後の比較:")
print(f"元のレコード数: {len(df)}")
print(f"クリーニング後のレコード数: {len(df_cleaned)}")
print(f"削除されたレコード数: {len(df) - len(df_cleaned)}")
print(f"欠損値の総数: {df_cleaned.isnull().sum().sum()}")

In [None]:
# データ品質スコアの計算
def calculate_quality_score(df):
    """データ品質スコアを計算（0-100）"""
    total_cells = df.size
    null_cells = df.isnull().sum().sum()
    
    # 数値列の異常値チェック
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    outlier_count = 0
    for col in numeric_cols:
        if len(df[col].dropna()) > 0:
            outliers, _, _ = detect_outliers_iqr(df[df[col].notna()], col)
            outlier_count += len(outliers)
    
    # スコア計算
    null_score = (1 - null_cells / total_cells) * 50
    outlier_score = (1 - outlier_count / len(df)) * 30
    duplicate_score = (1 - df.duplicated().sum() / len(df)) * 20
    
    total_score = null_score + outlier_score + duplicate_score
    return total_score

quality_score = calculate_quality_score(df_cleaned)
print(f"\nデータ品質スコア: {quality_score:.1f}/100")

## 9. クリーニングされたデータの保存

複数の形式でデータを保存します。

In [None]:
# CSVファイルとして保存
output_file = 'cleaned_transaction_data.csv'
df_cleaned.to_csv(output_file, index=False, encoding='utf-8-sig')
print(f"クリーニング済みデータを {output_file} として保存しました。")

# パーケット形式でも保存（大規模データに適している）
df_cleaned.to_parquet('cleaned_transaction_data.parquet', index=False)
print("パーケット形式でも保存しました（大規模データ用）。")

## 10. 実践的なクリーニング関数

再利用可能な包括的クリーニング関数を作成します。

In [None]:
def comprehensive_data_cleaning(df, config=None):
    """
    包括的なデータクリーニング関数
    
    Parameters:
    -----------
    df : pandas.DataFrame
        クリーニング対象のデータフレーム
    config : dict
        クリーニング設定（オプション）
    
    Returns:
    --------
    cleaned_df : pandas.DataFrame
        クリーニング済みデータフレーム
    report : dict
        クリーニングレポート
    """
    if config is None:
        config = {
            'remove_duplicates': True,
            'handle_missing': True,
            'standardize_text': True,
            'remove_outliers': False,
            'date_columns': [],
            'numeric_columns': [],
            'categorical_columns': []
        }
    
    cleaned_df = df.copy()
    report = {
        'original_rows': len(df),
        'original_columns': len(df.columns),
        'steps_performed': []
    }
    
    # 1. 重複削除
    if config['remove_duplicates']:
        before_rows = len(cleaned_df)
        cleaned_df = cleaned_df.drop_duplicates()
        removed = before_rows - len(cleaned_df)
        report['steps_performed'].append(f'重複削除: {removed}行削除')
    
    # 2. 欠損値処理
    if config['handle_missing']:
        missing_before = cleaned_df.isnull().sum().sum()
        
        # カテゴリ変数
        for col in config.get('categorical_columns', []):
            if col in cleaned_df.columns:
                cleaned_df[col] = cleaned_df[col].fillna('Unknown')
        
        # 数値変数
        for col in config.get('numeric_columns', []):
            if col in cleaned_df.columns:
                cleaned_df[col] = cleaned_df[col].fillna(cleaned_df[col].median())
        
        missing_after = cleaned_df.isnull().sum().sum()
        report['steps_performed'].append(f'欠損値処理: {missing_before - missing_after}個処理')
    
    # 3. テキスト標準化
    if config['standardize_text']:
        text_cols = cleaned_df.select_dtypes(include=['object']).columns
        for col in text_cols:
            cleaned_df[col] = cleaned_df[col].astype(str).str.strip()
        report['steps_performed'].append(f'テキスト標準化: {len(text_cols)}列処理')
    
    # 4. 日付変換
    for col in config.get('date_columns', []):
        if col in cleaned_df.columns:
            cleaned_df[col] = pd.to_datetime(cleaned_df[col], errors='coerce')
    
    report['final_rows'] = len(cleaned_df)
    report['rows_removed'] = report['original_rows'] - report['final_rows']
    
    return cleaned_df, report

In [None]:
# 使用例
print("包括的クリーニング関数の使用例:")
config = {
    'remove_duplicates': True,
    'handle_missing': True,
    'standardize_text': True,
    'categorical_columns': ['category', 'status'],
    'numeric_columns': ['quantity', 'unit_price'],
    'date_columns': ['date']
}

# 新しいサンプルデータでテスト
test_df = df.head(20).copy()
cleaned_test, cleaning_report = comprehensive_data_cleaning(test_df, config)

print("\nクリーニングレポート:")
for key, value in cleaning_report.items():
    print(f"  {key}: {value}")

## まとめ

データクリーニングの重要ポイントを整理します。

In [None]:
print("=== まとめ ===")
print("データクリーニングの重要ポイント:")
print("1. 体系的なアプローチ: 品質チェック → 問題特定 → 処理 → 検証")
print("2. データの特性理解: 各列の型、分布、ビジネス意味を理解")
print("3. 処理の記録: 何を、なぜ、どのように処理したかを記録")
print("4. 自動化: 繰り返し使える関数やパイプラインの構築")
print("5. 検証: クリーニング後のデータ品質を必ず確認")

## 実践課題

自分のデータでクリーニングを実践してみましょう。

In [None]:
# 実践課題用のスペース
# 自分のCSVファイルを読み込んで、学んだ技術を適用してみてください

# 例:
# my_data = pd.read_csv('my_data.csv')
# quality_report = data_quality_report(my_data)
# print(quality_report)