In [None]:
import os
import json
import time
import urllib
from pathlib import Path
from typing import Tuple, List, Dict, Any, Optional, Union

import numpy as np
import pandas as pd
import requests
from dotenv import load_dotenv

In [None]:
class RemoteDataError(IOError):
    pass

def _init_session(session):
    if session is None:
        session = requests.Session()
    else:
        if not isinstance(session, requests.Session):
            raise TypeError("session must be a request.Session")
    return session

In [None]:
class _BaseReader:
    """
    Base class for data readers with retry and session management.

    Parameters
    ----------
    retry_count : int, default 3
        Number of times to retry query request.
    pause : float, default 0.1
        Time, in seconds, of the pause between retries.
    timeout : int, default 30
        Request timeout in seconds.
    session : Optional[requests.Session], default None
        requests.sessions.Session instance to be used.
    """

    def __init__(
        self,
        retry_count: int = 3,
        pause: float = 0.1,
        timeout: int = 30,
        session: Optional[requests.Session] = None,
    ) -> None:
        if not isinstance(retry_count, int) or retry_count < 0:
            raise ValueError("'retry_count' must be integer larger than 0")
        if not isinstance(pause, (int, float)) or pause < 0:
            raise ValueError("'pause' must be a positive number")
        if not isinstance(timeout, int) or timeout <= 0:
            raise ValueError("'timeout' must be a positive integer")
        
        self.retry_count = retry_count
        self.pause = pause
        self.timeout = timeout
        self.pause_multiplier = 1
        self.session = _init_session(session)
        self.headers: Optional[Dict[str, str]] = None

    def close(self) -> None:
        """Close network session."""
        self.session.close()

    @property
    def url(self) -> str:
        """API URL - must be overridden in subclass."""
        raise NotImplementedError

    @property
    def params(self) -> Optional[Dict[str, Any]]:
        """Parameters to use in API calls."""
        return None

    def read(self) -> pd.DataFrame:
        """Read data from connector."""
        try:
            return self._read_one_data(self.url, self.params)
        finally:
            self.close()

    def read_json(self) -> Dict[str, Any]:
        """Read data from connector and return as raw JSON."""
        try:
            response = self._get_response(self.url, params=self.params)
            return response.json()
        finally:
            self.close()
    
    def _read_one_data(self, url: str, params: Optional[Dict[str, Any]]) -> pd.DataFrame:
        """Read one data from specified URL."""
        out = self._get_response(url, params=params).json()
        return self._read_lines(out)

    def _get_response(
        self, 
        url: str, 
        params: Optional[Dict[str, Any]] = None, 
        headers: Optional[Dict[str, str]] = None
    ) -> requests.Response:
        """
        Send raw HTTP request to get requests.Response from the specified url.
        
        Parameters
        ----------
        url : str
            Target URL
        params : Optional[Dict[str, Any]]
            Parameters passed to the URL
        headers : Optional[Dict[str, str]]
            Headers for the request
            
        Returns
        -------
        requests.Response
            Response object from the HTTP request
            
        Raises
        ------
        RemoteDataError
            If unable to retrieve data after all retry attempts
        """
        headers = headers or self.headers
        pause = self.pause
        last_response_text = ""
        
        for _ in range(self.retry_count + 1):
            response = self.session.get(
                url, params=params, headers=headers, timeout=self.timeout
            )
            if response.status_code == requests.codes.ok:
                return response

            if response.encoding:
                last_response_text = response.text.encode(response.encoding)
            time.sleep(pause)

            # Increase time between subsequent requests, per subclass.
            pause *= self.pause_multiplier

            # If our output error function returns True, exit the loop.
            if self._output_error(response):
                break

        # If we reach here, we have exhausted all retries.
        if params is not None and len(params) > 0:
            url = url + "?" + urllib.parse.urlencode(query=params)
        msg = f"Unable to read URL: {url}"
        if last_response_text:
            msg += f"\nResponse Text:\n{last_response_text}"

        raise RemoteDataError(msg)

    def _read_lines(self, out: Dict[str, Any]) -> pd.DataFrame:
        """
        Process JSON response into DataFrame.
        
        Parameters
        ----------
        out : Dict[str, Any]
            JSON response data
            
        Returns
        -------
        pd.DataFrame
            Processed DataFrame
        """
        rs = pd.json_normalize(out, sep="_")
        # Remove blank space character in header names
        rs = rs.assign(**{
            col.strip(): rs[col] for col in rs.columns
        }).drop(columns=rs.columns.tolist())

        # Get rid of unicode characters in index name.
        try:
            rs.index.name = rs.index.name.decode("unicode_escape").encode(
                "ascii", "ignore"
            )
        except AttributeError:
            # Python 3 string has no decode method.
            rs.index.name = rs.index.name.encode("ascii", "ignore").decode()

        return rs

    def _output_error(self, response: requests.Response) -> bool:
        """
        Handle HTTP error responses.
        
        Parameters
        ----------
        response : requests.Response
            Response object to check for errors
            
        Returns
        -------
        bool
            True if error should stop retry loop, False otherwise
        """
        # Override in subclasses for specific error handling
        return False

In [None]:
_version = "3.0"
_BASE_URL = f"https://api.e-stat.go.jp/rest/{_version}/app/json"
ATTR_DICT = {
    "value": "値", 
    "code": "コード", 
    "name": "", 
    "level": "階層レベル", 
    "tab": "表章項目", 
    "cat": "分類", 
    "area": "地域", 
    "time": "時間軸", 
    "unit": "単位", 
    "parentCode": "親コード", 
    "addInf": "追加情報", 
    "annotation": "注釈記号",
}

In [None]:
class _eStatReader(_BaseReader):
    """
    Base class for eStat API readers.

    Parameters
    ----------
    api_key : Optional[str], default None
        取得したアプリケーションIDを指定して下さい。
        eStat API key. If None, will try to get from environment variables
        in the following order:
        E_STAT_APPLICATION_ID, ESTAT_APPLICATION_ID,
        E_STAT_APP_ID, ESTAT_APP_ID,
        E_STAT_APPID, ESTAT_APPID,
        E_STAT_API_KEY, ESTAT_API_KEY
    lang : str, default "J"
        取得するデータの言語を 以下のいずれかを指定して下さい。
        ・J：日本語 (省略値)
        ・E：英語
        Language for retrieved data. Either "J" (Japanese) or "E" (English).
    explanationGetFlg : Optional[str], default None
        統計表及び、提供統計、提供分類、各事項の解説を取得するか否かを以下のいずれかから指定して下さい。
        ・Y：取得する (省略値)
        ・N：取得しない
        Flag for getting explanation data ("Y" or "N").
    retry_count : int, default 3
        Number of times to retry query request.
    pause : float, default 0.1
        Time, in seconds, of the pause between retries.
    timeout : int, default 30
        Request timeout in seconds.
    session : Optional[requests.Session], default None
        requests.sessions.Session instance to be used.
    dotenv_path : Optional[str], default None
        Path to .env file for loading environment variables.
        If None, will look for .estat_env, .env_estat, or .env in the current directory.
    """

    def __init__(
        self,
        api_key: Optional[str] = None,
        lang: Optional[str] = None,
        explanationGetFlg: Optional[str] = None,
        retry_count: int = 3,
        pause: float = 0.1,
        timeout: int = 30,
        session: Optional[requests.Session] = None,
        dotenv_path: Optional[str] = None,
    ) -> None:
        super().__init__(
            retry_count=retry_count,
            pause=pause,
            timeout=timeout,
            session=session,
        )

        # Try to get API key from various sources
        if api_key is None:
            api_key = self._get_api_key_from_env(dotenv_path)
                
        if not api_key or not isinstance(api_key, str):
            raise ValueError(
                "The e-Stat Application ID must be provided either "
                "through the api_key variable or through one of the "
                "following environment variables: "
                "E_STAT_APPLICATION_ID, ESTAT_APPLICATION_ID, "
                "E_STAT_APP_ID, ESTAT_APP_ID, "
                "E_STAT_APPID, ESTAT_APPID, "
                "E_STAT_API_KEY, ESTAT_API_KEY"
            )

        self.api_key = api_key
        self.explanationGetFlg = explanationGetFlg
        self.lang = lang

    def _get_api_key_from_env(self, dotenv_path: Optional[str] = None) -> Optional[str]:
        """
        Get API key from environment variables or .env files.
        First tries dotenv files, then falls back to environment variables.
        
        Parameters
        ----------
        dotenv_path : Optional[str]
            Path to specific .env file to load. If None, tries default files.
            
        Returns
        -------
        Optional[str]
            API key if found, None otherwise
        """
        # Environment variable names to try in order
        env_vars = [
            "E_STAT_APPLICATION_ID",
            "ESTAT_APPLICATION_ID", 
            "E_STAT_APP_ID",
            "ESTAT_APP_ID",
            "E_STAT_APPID",
            "ESTAT_APPID",
            "E_STAT_API_KEY",
            "ESTAT_API_KEY"
        ]
        
        # First try dotenv files if available
        if DOTENV_AVAILABLE:
            if dotenv_path:
                # If specific dotenv path is provided
                if Path(dotenv_path).exists():
                    load_dotenv(dotenv_path)
                    # Try all environment variables after loading the specified file
                    for var_name in env_vars:
                        api_key = os.getenv(var_name)
                        if api_key:
                            return api_key
            else:
                # Try default .env files
                env_files = [".estat_env", ".env_estat", ".env"]
                
                for env_file in env_files:
                    if Path(env_file).exists():
                        load_dotenv(env_file)
                        # Try all environment variables after loading each file
                        for var_name in env_vars:
                            api_key = os.getenv(var_name)
                            if api_key:
                                return api_key
        
        # Fallback to regular environment variables
        for var_name in env_vars:
            api_key = os.getenv(var_name)
            if api_key:
                return api_key
                
        return None

    def get_url(self, path: str = "getStatsData") -> str:
        """
        Get API URL for specified path.
        
        Parameters
        ----------
        path : str, default "getStatsList"
            API endpoint path
            
        Returns
        -------
        str
            Complete API URL
        """
        valid_paths = ["getStatsList", "getDataCatalog", "getMetaInfo", "getStatsData"]
        if path not in valid_paths:
            path = "getStatsData"
            print(
                f"pathは{', '.join(valid_paths)}で指定します。pathをgetStatsDataに置換しました。"
            )
        return f"{_BASE_URL}/{path}?"


In [None]:

# 列名を日本語に変換
def colname_to_japanese(value: pd.DataFrame) -> pd.DataFrame:
    # 英語と日本語の対応
    attrdict = {"value": "値", "code": "コード", "name": "", "level": "階層レベル", 
        "unit": "単位", "parentCode": "親コード", "addInf": "追加情報", "tab": "表章項目", 
        "cat": "分類", "area": "地域", "time": "時間軸", "annotation": "注釈記号"  
    }
    def _convert(c):
        for k, v in attrdict.items():
            if k in c:
                return c.replace(k, v)
        return c
    return value.rename(columns=_convert)


In [None]:
class MetaInfoReader(_eStatReader):
    """
    Reader for e-Stat meta infomation API.
    メタ情報取得 API
    URL: https://www.e-stat.go.jp/api/api-info/e-stat-manual3-0#api_3_3
    
    Parameters
    ----------
    api_key : str
        e-Stat application ID (appId)
    statsDataId : Union[str, int]
        Statistics data ID
        「統計表情報取得」で得られる統計表IDです。
    prefix_colname_with_classname: bool, default True
        Whether to prefix column names with class names
    has_lv_hierarchy : bool, default False
        Whether to create hierarchy levels
    use_fillna_lv_hierarchy : bool, default False
        Whether to fill NA values in hierarchy levels
    lang : Optional[str], default None
        Language for retrieved data. Either "J" (Japanese) or "E" (English).
        取得するデータの言語を 以下のいずれかを指定して下さい。
        ・J：日本語 (省略値)
        ・E：英語
    explanationGetFlg : Optional[str], default None
        Flag for getting explanation data ("Y" or "N")
        統計表及び、提供統計、提供分類、各事項の解説を取得するか否かを以下のいずれかから指定して下さい。
        ・Y：取得する (省略値)
        ・N：取得しない
    retry_count : int, default 3
        Number of times to retry query request
    pause : float, default 0.1
        Time, in seconds, of the pause between retries
    timeout : int, default 30
        Request timeout in seconds
    session : Optional[requests.Session], default None
        requests.sessions.Session instance to be used
    dotenv_path : Optional[str], default None
        Path to .env file for loading environment variables.
        If None, will look for .estat_env, .env_estat, or .env in the current directory.
    """

    def __init__(
        self,
        api_key: str,
        statsDataId: Union[str, int],
        prefix_colname_with_classname: bool = True,
        has_lv_hierarchy: bool = False,
        use_fillna_lv_hierarchy: bool = True,
        lang: Optional[str] = None,
        explanationGetFlg: Optional[str] = None,
        retry_count: int = 3,
        pause: float = 0.1,
        timeout: int = 30,
        session: Optional[requests.Session] = None,
        dotenv_path: Optional[str] = None,
    ) -> None:
        super().__init__(
            api_key=api_key,
            lang=lang,
            explanationGetFlg=explanationGetFlg,
            retry_count=retry_count,
            pause=pause,
            timeout=timeout,
            session=session,
            dotenv_path=dotenv_path,
        )

        self.statsDataId = statsDataId
        self.prefix_colname_with_classname = prefix_colname_with_classname
        self.has_lv_hierarchy = has_lv_hierarchy
        self.use_fillna_lv_hierarchy = use_fillna_lv_hierarchy

    @property
    def url(self) -> str:
        """API URL for getMetaInfo."""
        return self.get_url("getMetaInfo")

    @property
    def params(self) -> Dict[str, Any]:
        """Parameters to use in API calls."""
        pdict = {"appId": self.api_key}

        if isinstance(self.statsDataId, (str, int)):
            pdict["statsDataId"] = self.statsDataId
        if self.explanationGetFlg in ["Y", "N"]:
            pdict["explanationGetFlg"] = self.explanationGetFlg

        return pdict
    
    def read(self) -> pd.DataFrame:
        """
        Read data from connector and return the DataFrame with the most rows.
        Excludes DataFrames with 'id': 'time'.
        
        Returns
        -------
        pd.DataFrame
            DataFrame with the most rows from all CLASS_OBJ DataFrames (excluding 'time')
        """
        try:
            result_dfs = self.read_class_obj_dfs()
            
            if not result_dfs:
                return pd.DataFrame()
            
            # Find the DataFrame with the most rows (excluding 'time')
            max_rows = 0
            largest_df = pd.DataFrame()
            
            for class_data in result_dfs:
                # Skip if id is 'time'
                if class_data["id"] == 'time':
                    continue
                
                df = class_data["meta_dataframe"]
                
                if len(df) > max_rows:
                    max_rows = len(df)
                    largest_df = df
            
            return largest_df
            
        finally:
            self.close()

    def read_json(self) -> Dict[str, Any]:
        """Read data from connector and return as raw JSON."""
        try:
            response = self._get_response(self.url, params=self.params)
            json_data = response.json()
            
            # Store response metadata as instance attributes
            self._store_params_in_attrs(json_data)
            
            return json_data 
        finally:
            self.close()

    def read_class_obj_dfs(self) -> List[Dict[str, Any]]:
        """
        Read and process CLASS_OBJ data into DataFrames.
        CLASS_OBJ dictionary's keys: ['@id', '@name', 'CLASS']
        CLASS dictionary's keys: ['@code', '@name', '@level', '@unit']
        
        Returns
        -------
        List[Dict[str, Any]]
            List of dictionaries with keys: "id", "name", "meta_dataframe", "hierarchy"
        """
        response = self._get_response(self.url, params=self.params)
        json_data = response.json()
        
        # Store response metadata as instance attributes
        self._store_params_in_attrs(json_data)
        
        # Get class objects
        meta_info = json_data.get("GET_META_INFO", {})
        class_obj = meta_info.get("METADATA_INF", {}).get("CLASS_INF", {}).get("CLASS_OBJ", [])
        
        if not isinstance(class_obj, list):
            print("CLASS_OBJはlist型ではありません。")
            return []
        
        result_dfs = []
        
        for i, co in enumerate(class_obj):
            # クラスIDを取得
            class_id = co.get("@id")
            if not class_id:
                print(f"警告: クラスID（@id）が見つかりません。処理をスキップします。")
                continue
            
            # クラス名を取得（複数の方法で試行）
            class_name = co.get("@name")
            if not class_name:
                print(f"警告: クラス名（@name）が見つかりません。処理をスキップします。クラスID: {class_id}")
                continue  # このクラスをスキップして次のクラスに進む
            
            class_data = co.get("CLASS")
            
            # 列名変換前の生データフレームを作成
            class_df_raw = self._create_class_dataframe_raw(class_data, co)
            
            if class_df_raw is None:
                continue

            # Check if hierarchy processing is needed (生データで判定)
            hierarchy_df = None
            if (self.has_lv_hierarchy and 
                "@level" in class_df_raw.columns and 
                len(class_df_raw["@level"].unique()) > 1):
                # Use the method to create hierarchy
                hierarchy_df = self._create_hierarchy_dataframe(json_data, i)

            # 列名変換を実行
            class_df = self._apply_column_transformations(class_df_raw, class_name)

            # Create result dictionary
            result_dict = {
                "id": class_id,
                "name": class_name,
                "meta_dataframe": class_df,
            }

            if hierarchy_df is not None:
                result_dict["hierarchy"] = hierarchy_df
            
            result_dfs.append(result_dict)
        
        return result_dfs

    def _store_params_in_attrs(self, json_data: Dict[str, Any]) -> None:
        """Store params in attributes as instance variables."""
        # GET_META_INFOセクションを取得
        meta_info = json_data.get("GET_META_INFO", {})
        
        # RESULTセクションの処理
        result = meta_info.get("RESULT", {})
        self.STATUS = result.get("STATUS")
        self.ERROR_MSG = result.get("ERROR_MSG")
        self.DATE = result.get("DATE")

        # PARAMETERセクションの処理
        parameter = meta_info.get("PARAMETER", {})
        self.LANG = parameter.get("LANG")
        self.DATA_FORMAT = parameter.get("DATA_FORMAT")

        # METADATA_INFセクションの取得
        metadata_inf = meta_info.get("METADATA_INF", {})
        
        # TABLE_INFセクションの処理
        table_inf = metadata_inf.get("TABLE_INF", {})
        self.TABLE_INF = table_inf
        
        # Store individual table attributes
        table_attributes = [
            "STAT_NAME", "GOV_ORG", "STATISTICS_NAME", "TITLE", "CYCLE",
            "SURVEY_DATE", "OPEN_DATE", "SMALL_AREA", "COLLECT_AREA",
            "MAIN_CATEGORY", "SUB_CATEGORY", "OVERALL_TOTAL_NUMBER",
            "UPDATED_DATE", "STATISTICS_NAME_SPEC", "TABULATION_SUB_CATEGORY1",
            "DESCRIPTION", "TITLE_SPEC"
        ]
        
        for attr in table_attributes:
            setattr(self, attr, table_inf.get(attr))

    def _create_class_dataframe_raw(self, class_data: Union[List[Dict[str, Any]], Dict[str, Any]], class_obj: Dict[str, Any]) -> Optional[pd.DataFrame]:
        """
        Create raw DataFrame from class data without column transformations.
        
        Parameters
        ----------
        class_data : Union[List[Dict[str, Any]], Dict[str, Any]]
            Class data from API response (can be list or dict)
        class_obj : Dict[str, Any]
            Class object metadata
            
        Returns
        -------
        Optional[pd.DataFrame]
            Raw DataFrame created from class data, or None if failed
        """
        if not class_data:
            return None
            
        try:
            # Handle different types of class_data
            if isinstance(class_data, list):
                df = pd.DataFrame(class_data)
            elif isinstance(class_data, dict):
                df = pd.DataFrame(pd.Series(class_data)).T
            else:
                print(f"CLASS_INF>CLASS_OBJ>CLASSの型: {type(class_data)}")
                return None
            
            # Convert level to int if exists, handle empty strings
            if "@level" in df.columns:
                # Replace empty strings with NaN, then convert to nullable int
                df = df.assign(**{
                    "@level": lambda d: pd.to_numeric(d["@level"].replace("", pd.NA), errors="coerce").astype("Int64")
                })

            return df
            
        except Exception as e:
            print(f"Error creating raw DataFrame for class {class_obj.get('@id', 'unknown')}: {e}")
            return None

    def _apply_column_transformations(self, df: pd.DataFrame, class_name: str) -> pd.DataFrame:
        """
        Apply column name transformations to DataFrame.
        
        Parameters
        ----------
        df : pd.DataFrame
            Raw DataFrame
        class_name : str
            Class name for prefixing
            
        Returns
        -------
        pd.DataFrame
            DataFrame with transformed column names
        """
        # Create a copy to avoid modifying the original
        transformed_df = df.copy()
        
        # Rename columns with class name prefix
        if self.prefix_colname_with_classname:
            # クラス名をプレフィックスとして付加
            transformed_df = transformed_df.rename(columns=lambda col: f"{class_name}{col.lstrip('@')}")
        else:
            # プレフィックスなし、@記号のみ除去
            transformed_df = transformed_df.rename(columns=lambda col: f"{col.lstrip('@')}")

        if self.lang is None or self.lang != "E":
            # Convert column names to Japanese
            transformed_df = colname_to_japanese(transformed_df).rename(columns={"": class_name})

        return transformed_df

    def _create_class_dataframe(self, class_data: Union[List[Dict[str, Any]], Dict[str, Any]], class_obj: Dict[str, Any]) -> Optional[pd.DataFrame]:
        """
        Create DataFrame from class data with full transformations.
        
        Parameters
        ----------
        class_data : Union[List[Dict[str, Any]], Dict[str, Any]]
            Class data from API response (can be list or dict)
        class_obj : Dict[str, Any]
            Class object metadata
            
        Returns
        -------
        Optional[pd.DataFrame]
            DataFrame created from class data, or None if failed
        """
        # クラス名の取得（@name → @code の順で試行）
        class_name = class_obj.get("@name")
        if not class_name:
            print(f"警告: クラス名（@name）が見つかりません。処理をスキップします。クラスID: {class_obj.get('@id', 'unknown')}")
            return None
        
        # Get raw dataframe
        raw_df = self._create_class_dataframe_raw(class_data, class_obj)
        if raw_df is None:
            return None
        
        # Apply transformations
        return self._apply_column_transformations(raw_df, class_name)

    def _create_hierarchy_dataframe(self, metainfo: Dict[str, Any], cat_key: int) -> Optional[pd.DataFrame]:
        """
        Create a hierarchical DataFrame based on metadata information.
        
        This method creates a DataFrame where each row represents a bottom-level node
        in the hierarchy, with columns for each hierarchical level containing 
        "code_name" format values. Missing intermediate levels are forward-filled.

        Parameters
        ----------
        metainfo : Dict[str, Any]
            Metadata information containing hierarchical data with @code, @name, 
            @level, and @parentCode fields
        cat_key : int
            Target category key index

        Returns
        -------
        Optional[pd.DataFrame]
            Hierarchical DataFrame with bottom-level nodes as rows and 
            hierarchical levels as columns, or None if failed
        """
        try:
            # Extract target category metadata
            class_obj_list = metainfo["GET_META_INFO"]["METADATA_INF"]["CLASS_INF"]["CLASS_OBJ"]
            
            if cat_key >= len(class_obj_list):
                print(f"警告: cat_key {cat_key} が範囲外です。")
                return None
                
            cat_meta = class_obj_list[cat_key]
            meta_name = cat_meta["@name"]
            
            class_data = cat_meta.get("CLASS")
            if not class_data:
                print(f"警告: CLASS データが見つかりません。")
                return None
            
            # Handle different types of class_data
            if isinstance(class_data, list):
                meta_cls_df = pd.DataFrame(class_data)
            elif isinstance(class_data, dict):
                meta_cls_df = pd.DataFrame(pd.Series(class_data)).T
            else:
                print(f"CLASS データの型が不正です: {type(class_data)}")
                return None
            
            # Convert level to int
            if "@level" not in meta_cls_df.columns:
                print(f"警告: @level 列が見つかりません。")
                return None
                
            meta_cls_df = meta_cls_df.assign(
                **{"@level": lambda df: pd.to_numeric(df["@level"], errors="coerce").astype("Int64")}
            )
            
            # Create set of parent codes for identifying leaf nodes
            parent_codes = {
                row.get("@parentCode") 
                for _, row in meta_cls_df.iterrows() 
                if row.get("@parentCode") and str(row.get("@parentCode")).strip()
            }
            
            # Create code-to-record mapping
            code_to_record = {row["@code"]: row for _, row in meta_cls_df.iterrows()}

            def _get_ancestry_chain(meta_record: Dict[str, Any]) -> Dict[int, str]:
                """
                Get ancestry chain for a metadata record.
                
                Parameters
                ----------
                meta_record : Dict[str, Any]
                    Metadata record with @code, @name, @level, @parentCode fields

                Returns
                -------
                Dict[int, str]
                    Dictionary mapping level to code for the ancestry chain
                """
                chain = {}
                current_record = meta_record
                
                while current_record is not None:
                    level = current_record["@level"]
                    chain[level] = current_record["@code"]
                    parent_code = current_record.get("@parentCode")
                    
                    if not parent_code or parent_code not in code_to_record:
                        break
                        
                    current_record = code_to_record[parent_code]
                
                return chain

            # Process leaf nodes only
            max_level = meta_cls_df["@level"].max()
            chain_rows = []
            
            for _, row in meta_cls_df.iterrows():
                # Skip parent nodes
                if row["@code"] in parent_codes:
                    continue

                node_level = row["@level"]
                ancestry = _get_ancestry_chain(row)
                row_chain = {}
                last_code = None
                
                # Build hierarchy with forward fill
                for level in range(1, max_level + 1):
                    col = f"level{level}"
                    if level <= node_level:
                        if level in ancestry:
                            last_code = ancestry[level]
                            row_chain[col] = ancestry[level]
                        else:
                            row_chain[col] = last_code if self.use_fillna_lv_hierarchy else None
                    else:
                        row_chain[col] = None
                        
                chain_rows.append(row_chain)

            if not chain_rows:
                print(f"警告: 階層データが生成されませんでした。")
                return None

            hierarchy_df = pd.DataFrame(chain_rows)

            # Merge with names to create "code_name" format
            for level in range(1, max_level + 1):
                level_col = f"level{level}"
                name_col = f"{meta_name}階層{level}"
                
                name_df = meta_cls_df[["@code", "@name"]].assign(
                    **{
                        level_col: meta_cls_df["@code"],
                        name_col: meta_cls_df["@code"] + "_" + meta_cls_df["@name"]
                    }
                )[["@code", name_col]].rename(columns={"@code": level_col})
                
                hierarchy_df = hierarchy_df.merge(name_df, on=level_col, how="left")

            # Apply forward fill to both code and name columns if enabled
            if self.use_fillna_lv_hierarchy:
                level_cols = [f"level{level}" for level in range(1, max_level + 1)]
                hierarchy_cols = [f"{meta_name}階層{level}" for level in range(1, max_level + 1)]
                
                hierarchy_df[level_cols] = hierarchy_df[level_cols].fillna(method="ffill", axis=1)
                hierarchy_df[hierarchy_cols] = hierarchy_df[hierarchy_cols].fillna(method="ffill", axis=1)

            return hierarchy_df
            
        except Exception as e:
            print(f"Error creating hierarchy DataFrame: {e}")
            return None

In [None]:
def colname_to_japanese(df: pd.DataFrame) -> pd.DataFrame:
    """
    Convert column names to Japanese using non-destructive assign pattern.
    
    Parameters
    ----------
    df : pd.DataFrame
        Input DataFrame
        
    Returns
    -------
    pd.DataFrame
        DataFrame with Japanese column names
    """
    def convert_column_name(col: str) -> str:
        for k, v in ATTR_DICT.items():
            col = col.replace(k, v)
        return col
    
    new_columns = {col: convert_column_name(col) for col in df.columns}
    return df.rename(columns=new_columns)

In [None]:
load_dotenv()
appId = os.getenv("ESTAT_APP_ID")

In [None]:
statsDataId = "0002070010"

In [None]:
metainfo = MetaInfoReader(api_key=appId, statsDataId=statsDataId, has_lv_hierarchy=True)

In [None]:
metainfo.url, metainfo.params

In [None]:
meta_json = metainfo._get_response(metainfo.url, params=metainfo.params).json()

In [None]:
meta_json

In [None]:
metainfo._store_params_in_attrs(meta_json)

In [None]:
metainfo.TITLE

In [None]:
class_obj = meta_json["GET_META_INFO"].get("METADATA_INF", {}).get("CLASS_INF", {}).get("CLASS_OBJ", [])
class_obj

In [None]:
class_obj[1]["CLASS"][0].keys()

In [None]:
meta_dfs = metainfo.read_class_obj_dfs()

In [None]:
meta_dfs[0]

In [None]:
meta_dfs[1]['meta_dataframe']

In [None]:
meta_dfs[1]['hierarchy']

In [None]:
metainfo.read()