In [None]:
# Welcome to your new notebook
# Type here in the cell editor to add code!


StatementMeta(, 3ed54005-e5ae-4583-9176-6827b8e4ca03, 84, Finished, Available, Finished)

In [8]:
import pandas as pd
import requests
from io import StringIO

url = "https://www.mof.go.jp/english/policy/jgbs/reference/interest_rate/historical/jgbcme_all.csv"
response = requests.get(url, timeout=30)
response.encoding = 'shift_jis'

df = pd.read_csv(StringIO(response.text), header=1, na_values="-",parse_dates=["Date"])
df.head(5)
# 10年国債利回りを抽出

StatementMeta(, 3ed54005-e5ae-4583-9176-6827b8e4ca03, 88, Finished, Available, Finished)

Unnamed: 0,Date,1Y,2Y,3Y,4Y,5Y,6Y,7Y,8Y,9Y,10Y,15Y,20Y,25Y,30Y,40Y
0,1974-09-24,10.327,9.362,8.83,8.515,8.348,8.29,8.24,8.121,8.127,,,,,,
1,1974-09-25,10.333,9.364,8.831,8.516,8.348,8.29,8.24,8.121,8.127,,,,,,
2,1974-09-26,10.34,9.366,8.832,8.516,8.348,8.29,8.24,8.122,8.128,,,,,,
3,1974-09-27,10.347,9.367,8.833,8.517,8.349,8.29,8.24,8.122,8.128,,,,,,
4,1974-09-28,10.354,9.369,8.834,8.518,8.349,8.291,8.24,8.122,8.129,,,,,,


In [None]:

# ============================================================
# Cell 2: pandas DataFrame → Spark DataFrame → Delta table 書き込み
# ============================================================

from pyspark.sql.types import StructType, StructField, DateType, DoubleType, TimestampType, IntegerType

# --- 明示的スキーマ定義（設計書準拠）---
schema = StructType([
    StructField("date",        DateType(),      nullable=False),
    StructField("yield_10y",   DoubleType(),    nullable=False),
    StructField("ingested_at", TimestampType(), nullable=False),
    StructField("year",        IntegerType(),   nullable=False),
    StructField("month",       IntegerType(),   nullable=False),
])

# --- スキーマ定義と同じ順序でカラムを並べる（位置マッピングのずれを防止）---
df_ordered = df[["date", "yield_10y", "ingested_at", "year", "month"]]

# --- pandas → Spark DataFrame 変換 ---
spark_df = spark.createDataFrame(df_ordered, schema=schema)

# --- Delta table 書き込み（overwrite: 再実行時も冪等性を保証）---
(
    spark_df
    .write
    .format("delta")
    .mode("overwrite")
    .partitionBy("year", "month")
    .option("overwriteSchema", "true")
    .saveAsTable(TABLE_NAME)
)

actual_count = spark.read.table(TABLE_NAME).count()
print(f"[完了] Delta table '{TABLE_NAME}' に {actual_count:,} 行を書き込みました")
print(f"パーティション: year / month")


In [None]:


# pandas DataFrame → Spark DataFrame に変換し、Delta table として書き込み

spark_df = spark.createDataFrame(df)

table_name = "bronze_interest_rate"
spark_df.write.format("delta").mode("overwrite").saveAsTable(f"dbo.{table_name}")

print(f"Delta table '{table_name}' に {spark_df.count()} 行を書き込みました")

StatementMeta(, 3ed54005-e5ae-4583-9176-6827b8e4ca03, 6, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 4aee22ec-fa7e-4fa1-876c-9a53000b7a7d)

In [None]:
#---型変換---
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, DateType, DoubleType, IntegerType, TimestampType 
schema = StructType([
    StructField("Date", DateType(), True),
    StructField("10Y", DoubleType(), True),
    StructField("year", IntegerType(), True),
    StructField("month", IntegerType(), True),
    StructField("ingested_at", TimestampType(), True)])


StatementMeta(, 3ed54005-e5ae-4583-9176-6827b8e4ca03, 200, Finished, Available, Finished)

In [None]:
TABLE_NAME = "bronze_interest_rate"
# --- pandas → Spark DataFrame 変換 ---
spark_df = spark.createDataFrame(df, schema=schema)

# --- Delta table 書き込み（overwrite: 再実行時も冪等性を保証）---
(
    spark_df
    .write
    .format("delta")
    .mode("overwrite")
    .partitionBy("year", "month")
    .option("overwriteSchema", "true")
    .saveAsTable(TABLE_NAME)
)

actual_count = spark.read.table(TABLE_NAME).count()
print(f"[完了] Delta table '{TABLE_NAME}' に {actual_count:,} 行を書き込みました")
print(f"パーティション: year / month")


StatementMeta(, 3ed54005-e5ae-4583-9176-6827b8e4ca03, 201, Finished, Available, Finished)

  Unsupported cast from double to timestamp using function cast_timestamp
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)


PySparkTypeError: [CANNOT_ACCEPT_OBJECT_IN_TYPE] `IntegerType()` can not accept object `9.362` in type `float`.

In [None]:

# ============================================================
# Cell 4: 設定 - 不動産取引データ取得（国土交通省 不動産情報ライブラリAPI）
# データソース: https://www.reinfolib.mlit.go.jp/ex-api/external/XIT001
# 事前準備: reinfolib の API キーを取得すること
#   https://www.reinfolib.mlit.go.jp/
# ============================================================

import uuid
import time
import requests
from datetime import datetime, timezone

# --- APIキー設定 ---
# 【推奨】Fabric Key Vault から取得:
#   KEY_VAULT_URL = "https://your-keyvault.vault.azure.net/"
#   API_KEY = notebookutils.credentials.getSecret(KEY_VAULT_URL, "reinfolib-api-key")
API_KEY = "d0237ca0d02d4a6095038c68829c0ad5"  # ← Fabric 上でここに API キーを入力すること

# --- 取得設定 ---
BASE_URL   = "https://www.reinfolib.mlit.go.jp/ex-api/external/XIT001"
TABLE_NAME = "bronze_realestate_transactions"

# 都道府県コード（テスト: 東京都のみ。全国展開時は PREFECTURES_ALL に切り替え）
PREFECTURES     = ["13"]                               # 東京都のみ
PREFECTURES_ALL = [f"{i:02d}" for i in range(1, 48)]  # 全国展開時に切り替え

# 取得期間: 2020Q1〜2024Q4（分析対象全期間）
YEAR_QUARTERS = [(year, q) for year in range(2018, 2025) for q in range(1, 5)]
print(YEAR_QUARTERS)

print(f"取得設定:")
print(f"  都道府県        : {PREFECTURES} （東京都）")
print(f"  期間（年×四半期）: {len(YEAR_QUARTERS)} 件  ({YEAR_QUARTERS[0]} 〜 {YEAR_QUARTERS[-1]})")
print(f"  総 API コール数  : {len(PREFECTURES) * len(YEAR_QUARTERS):,} 回")


StatementMeta(, 781f4471-4ff3-469e-bfe5-8d26db501a9a, 5, Finished, Available, Finished)

[(2018, 1), (2018, 2), (2018, 3), (2018, 4), (2019, 1), (2019, 2), (2019, 3), (2019, 4), (2020, 1), (2020, 2), (2020, 3), (2020, 4), (2021, 1), (2021, 2), (2021, 3), (2021, 4), (2022, 1), (2022, 2), (2022, 3), (2022, 4), (2023, 1), (2023, 2), (2023, 3), (2023, 4), (2024, 1), (2024, 2), (2024, 3), (2024, 4)]
取得設定:
  都道府県        : ['13'] （東京都）
  期間（年×四半期）: 28 件  ((2018, 1) 〜 (2024, 4))
  総 API コール数  : 28 回


In [None]:

# ============================================================
# Cell 5: API取得関数定義（リトライ・エラーハンドリング付き）
# ============================================================

def fetch_transactions(year: int, quarter: int, area: str, api_key: str,
                       max_retries: int = 3) -> list:
    """
    国土交通省 不動産情報ライブラリ API から取引データを取得する。

    Parameters
    ----------
    year        : 取得年 (2020-2024)
    quarter     : 四半期 (1-4)
    area        : 都道府県コード ("01"-"47")
    api_key     : Ocp-Apim-Subscription-Key
    max_retries : エラー時の最大リトライ回数

    Returns
    -------
    list: 取引データのリスト（該当なし or エラー時は空リスト）
    """
    params = {
        "year"                : year,
        "quarter"             : quarter,
        "area"                : area,
        "priceClassification" : "01",   # 01=取引価格情報（成約価格は "02"）
    }
    headers = {"Ocp-Apim-Subscription-Key": api_key}

    for attempt in range(1, max_retries + 1):
        try:
            resp = requests.get(BASE_URL, params=params, headers=headers, timeout=30)
            resp.raise_for_status()
            body = resp.json()
            if body.get("status") == "OK":
                return body.get("data", [])
            else:
                print(f"  [WARN] status={body.get('status')} ({year}Q{quarter} area={area})")
                return []
        except Exception as e:
            if attempt < max_retries:
                wait = 2 ** attempt   # Exponential backoff: 2s → 4s → 8s
                print(f"  [RETRY {attempt}/{max_retries}] {year}Q{quarter} area={area} → {e} ({wait}s 待機)")
                time.sleep(wait)
            else:
                print(f"  [ERROR] {year}Q{quarter} area={area} → {e} (スキップ)")
                return []

print("fetch_transactions 関数 定義完了")


StatementMeta(, 781f4471-4ff3-469e-bfe5-8d26db501a9a, 6, Finished, Available, Finished)

fetch_transactions 関数 定義完了


In [23]:

# ============================================================
# Cell 6: データ取得メインループ
# ============================================================

import pandas as pd

ingested_at  = datetime.now(timezone.utc)
all_records  = []
error_count  = 0
total_calls  = len(PREFECTURES) * len(YEAR_QUARTERS)
call_count   = 0

for year, quarter in YEAR_QUARTERS:
    yq_records = []

    for area in PREFECTURES:
        call_count += 1
        records = fetch_transactions(year, quarter, area, API_KEY)

        for r in records:
            r["_year"]        = year
            r["_quarter"]     = quarter
            r["_ingested_at"] = ingested_at

        yq_records.extend(records)
        time.sleep(0.3)   # レート制限対策

    all_records.extend(yq_records)
    print(f"[{year}Q{quarter}] {len(yq_records):>6,} 件  累計: {len(all_records):>7,} 件  [{call_count}/{total_calls}]")

print(f"\n取得完了: 総レコード数 {len(all_records):,} 件")
print(yq_records[:2])  # 取得したレコードのサンプルを表示


StatementMeta(, 781f4471-4ff3-469e-bfe5-8d26db501a9a, 14, Finished, Available, Finished)

[2018Q1]  8,101 件  累計:   8,101 件  [1/28]
[2018Q2]  7,762 件  累計:  15,863 件  [2/28]
[2018Q3]  7,848 件  累計:  23,711 件  [3/28]
[2018Q4]  7,958 件  累計:  31,669 件  [4/28]
[2019Q1]  8,056 件  累計:  39,725 件  [5/28]
[2019Q2]  7,832 件  累計:  47,557 件  [6/28]
[2019Q3]  7,817 件  累計:  55,374 件  [7/28]
[2019Q4]  7,673 件  累計:  63,047 件  [8/28]
[2020Q1]  9,242 件  累計:  72,289 件  [9/28]
[2020Q2]  6,749 件  累計:  79,038 件  [10/28]
[2020Q3]  9,716 件  累計:  88,754 件  [11/28]
[2020Q4] 10,063 件  累計:  98,817 件  [12/28]
[2021Q1]  9,572 件  累計: 108,389 件  [13/28]
[2021Q2]  9,282 件  累計: 117,671 件  [14/28]
[2021Q3]  8,682 件  累計: 126,353 件  [15/28]
[2021Q4]  8,688 件  累計: 135,041 件  [16/28]
[2022Q1]  8,263 件  累計: 143,304 件  [17/28]
[2022Q2]  8,069 件  累計: 151,373 件  [18/28]
[2022Q3]  8,085 件  累計: 159,458 件  [19/28]
[2022Q4]  7,952 件  累計: 167,410 件  [20/28]
[2023Q1]  7,618 件  累計: 175,028 件  [21/28]
[2023Q2]  7,822 件  累計: 182,850 件  [22/28]
[2023Q3]  8,178 件  累計: 191,028 件  [23/28]
[2023Q4]  8,051 件  累計: 199,079 件  [24/28]
[

In [None]:

# ============================================================
# Cell 7: pandas DataFrame 整形 → Delta table 書き込み
# API レスポンスの英語キー名をスネークケースに変換し、メタデータ列を追加する
# ============================================================

# --- カラム名マッピング（API英語キー → Bronze テーブル列名）---
COLUMN_MAP = {
    "PriceCategory"  : "price_category",
    "Type"           : "property_type",       # 中古マンション等, 宅地(土地と建物) 等
    "Region"         : "region",
    "MunicipalityCode": "municipality_code",
    "Prefecture"     : "prefecture",
    "Municipality"   : "municipality",
    "DistrictName"   : "district",
    "TradePrice"     : "trade_price",
    "PricePerUnit"   : "price_per_unit",
    "FloorPlan"      : "floor_plan",
    "Area"           : "area",
    "UnitPrice"      : "unit_price",
    "LandShape"      : "land_shape",
    "Frontage"       : "frontage",
    "TotalFloorArea" : "total_floor_area",
    "BuildingYear"   : "building_year",
    "Structure"      : "structure",
    "Use"            : "use",
    "Purpose"        : "purpose",
    "Direction"      : "direction",
    "Classification" : "classification",
    "Breadth"        : "breadth",
    "CityPlanning"   : "city_planning",
    "CoverageRatio"  : "coverage_ratio",
    "FloorAreaRatio" : "floor_area_ratio",
    "Period"         : "transaction_period",  # 例: "令和２年第１四半期"
    "Renovation"     : "renovation",
    "Remarks"        : "remarks",
    "_year"          : "year",
    "_quarter"       : "quarter",
    "_ingested_at"   : "ingested_at",
}

# --- pandas DataFrame に変換・整形 ---
df_bronze = pd.DataFrame(all_records).rename(columns=COLUMN_MAP)

# transaction_id（UUID）を先頭列に追加
df_bronze.insert(0, "transaction_id", [str(uuid.uuid4()) for _ in range(len(df_bronze))])

print(f"整形後カラム数: {len(df_bronze.columns)}")
print(f"整形後レコード数: {len(df_bronze):,} 件")

# --- pandas → Spark DataFrame → Delta table 書き込み ---
spark_df = spark.createDataFrame(df_bronze)

(
    spark_df
    .write
    .format("delta")
    .mode("overwrite")
    .partitionBy("year", "quarter")
    .option("overwriteSchema", "true")
    .saveAsTable(TABLE_NAME)
)

actual_count = spark.read.table(TABLE_NAME).count()
print(f"\n[完了] Delta table '{TABLE_NAME}' に {actual_count:,} 行を書き込みました")
print(f"パーティション: year / quarter")


StatementMeta(, 781f4471-4ff3-469e-bfe5-8d26db501a9a, 15, Finished, Available, Finished)

整形後カラム数: 33
整形後レコード数: 231,810 件

[完了] Delta table 'bronze_realestate_transactions' に 231,810 行を書き込みました
パーティション: year / quarter


In [None]:

# ============================================================
# Cell 8: 動作確認
# ============================================================
from pyspark.sql.functions import col, count, min, max

df_check = spark.read.table(TABLE_NAME)

print("=== スキーマ ===")
df_check.printSchema()

print("=== 件数・期間確認（年×四半期別）===")
df_check.groupBy("year", "quarter") \
        .count() \
        .orderBy("year", "quarter") \
        .show(25)

print("=== 物件種別の内訳 ===")
df_check.groupBy("property_type").count().orderBy(col("count").desc()).show()

print("=== サンプル（先頭5行）===")
display(df_check.limit(5))


StatementMeta(, 781f4471-4ff3-469e-bfe5-8d26db501a9a, 16, Finished, Available, Finished)

=== スキーマ ===
root
 |-- transaction_id: string (nullable = true)
 |-- price_category: string (nullable = true)
 |-- property_type: string (nullable = true)
 |-- region: string (nullable = true)
 |-- municipality_code: string (nullable = true)
 |-- prefecture: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- district: string (nullable = true)
 |-- trade_price: string (nullable = true)
 |-- price_per_unit: string (nullable = true)
 |-- floor_plan: string (nullable = true)
 |-- area: string (nullable = true)
 |-- unit_price: string (nullable = true)
 |-- land_shape: string (nullable = true)
 |-- frontage: string (nullable = true)
 |-- total_floor_area: string (nullable = true)
 |-- building_year: string (nullable = true)
 |-- structure: string (nullable = true)
 |-- use: string (nullable = true)
 |-- purpose: string (nullable = true)
 |-- direction: string (nullable = true)
 |-- classification: string (nullable = true)
 |-- breadth: string (nullable = true)
 |-- ci

SynapseWidget(Synapse.DataFrame, 9f0d3285-d389-485b-bc83-d51e865b9163)

In [None]:
display(df_check.orderBy(col("year").asc(),col("quarter").asc()).limit(10))

StatementMeta(, 781f4471-4ff3-469e-bfe5-8d26db501a9a, 42, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 67f9a9d1-cf34-417a-833f-70e5b75ab8c4)