<a href="https://colab.research.google.com/github/karasu1982/e-Stat_Download/blob/main/e_Stat%E3%83%80%E3%82%A6%E3%83%B3%E3%83%AD%E3%83%BC%E3%83%89.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 環境設定

In [None]:
!pip install pykakasi

In [2]:
# 標準ライブラリ
import pandas as pd
import numpy as np
import csv
import json
import xlrd
import zipfile
import urllib
import requests
import functools
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm
import os

# 関数定義

In [3]:
class EstatRestAPI_URLParser:
    """
    This is a simple python module class for e-Stat API (ver.3.0).
    See more details at https://www.e-stat.go.jp/api/api-info/e-stat-manual3-0
    """

    def __init__(self, api_version=None, app_id=None):
        # base url
        self.base_url = "https://api.e-stat.go.jp/rest"

        # e-Stat REST API Version
        if api_version is None:
            self.api_version = "3.0"
        else:
            self.api_version = api_version

        # Application ID
        if app_id is None:
            self.app_id = "<アプリケーションID>"
        else:
            self.app_id = app_id

    def getStatsListURL(self, params_dict, format="csv"):
        """
        2.1 統計表情報取得 (HTTP GET)
        """
        params_str = urllib.parse.urlencode(params_dict)
        if format == "xml":
            url = (
                f"{self.base_url}/{self.api_version}"
                f"/app/getStatsList?{params_str}"
            )
        elif format == "json":
            url = (
                f"{self.base_url}/{self.api_version}"
                f"/app/json/getStatsList?{params_str}"
            )
        elif format == "jsonp":
            url = (
                f"{self.base_url}/{self.api_version}"
                f"/app/jsonp/getStatsList?{params_str}"
            )
        elif format == "csv":
            url = (
                f"{self.base_url}/{self.api_version}"
                f"/app/getSimpleStatsList?{params_str}"
            )
        return url

    def getMetaInfoURL(self, params_dict, format="csv"):
        """
        2.2 メタ情報取得 (HTTP GET)
        """
        params_str = urllib.parse.urlencode(params_dict)
        if format == "xml":
            url = (
                f"{self.base_url}/{self.api_version}"
                f"/app/getMetaInfo?{params_str}"
            )
        elif format == "json":
            url = (
                f"{self.base_url}/{self.api_version}"
                f"/app/json/getMetaInfo?{params_str}"
            )
        elif format == "jsonp":
            url = (
                f"{self.base_url}/{self.api_version}"
                f"/app/jsonp/getMetaInfo?{params_str}"
            )
        elif format == "csv":
            url = (
                f"{self.base_url}/{self.api_version}"
                f"/app/getSimpleMetaInfo?{params_str}"
            )
        return url

    def getStatsDataURL(self, params_dict, format="csv"):
        """
        2.3 統計データ取得 (HTTP GET)
        """
        params_str = urllib.parse.urlencode(params_dict)
        if format == "xml":
            url = (
                f"{self.base_url}/{self.api_version}"
                f"/app/getStatsData?{params_str}"
            )
        elif format == "json":
            url = (
                f"{self.base_url}/{self.api_version}"
                f"/app/json/getStatsData?{params_str}"
            )
        elif format == "jsonp":
            url = (
                f"{self.base_url}/{self.api_version}"
                f"/app/jsonp/getStatsData?{params_str}"
            )
        elif format == "csv":
            url = (
                f"{self.base_url}/{self.api_version}"
                f"/app/getSimpleStatsData?{params_str}"
            )
        return url

    def postDatasetURL(self):
        """
        2.4 データセット登録 (HTTP POST)
        """
        url = (
            f"{self.base_url}/{self.api_version}"
            "/app/postDataset"
        )
        return url

    def refDataset(self, params_dict, format="xml"):
        """
        2.5 データセット参照 (HTTP GET)
        """
        params_str = urllib.parse.urlencode(params_dict)
        if format == "xml":
            url = (
                f"{self.base_url}/{self.api_version}"
                + f"/app/refDataset?{params_str}"
            )
        elif format == "json":
            url = (
                f"{self.base_url}/{self.api_version}"
                f"/app/json/refDataset?{params_str}"
            )
        elif format == "jsonp":
            url = (
                f"{self.base_url}/{self.api_version}"
                f"/app/jsonp/refDataset?{params_str}"
            )
        return url

    def getDataCatalogURL(self, params_dict, format="xml"):
        """
        2.6 データカタログ情報取得 (HTTP GET)
        """
        params_str = urllib.parse.urlencode(params_dict)
        if format == "xml":
            url = (
                f"{self.base_url}/{self.api_version}"
                f"/app/getDataCatalog?{params_str}"
            )
        elif format == "json":
            url = (
                f"{self.base_url}/{self.api_version}"
                f"/app/json/getDataCatalog?{params_str}"
            )
        elif format == "jsonp":
            url = (
                f"{self.base_url}/{self.api_version}"
                f"/app/jsonp/getDataCatalog?{params_str}"
            )
        return url

    def getStatsDatasURL(self, params_dict, format="xml"):
        """
        2.7 統計データ一括取得 (HTTP GET)
        """
        params_str = urllib.parse.urlencode(params_dict)
        if format == "xml":
            url = (
                f"{self.base_url}/{self.api_version}"
                f"/app/getStatsDatas?{params_str}"
            )
        elif format == "json":
            url = (
                f"{self.base_url}/{self.api_version}"
                f"/app/json/getStatsDatas?{params_str}"
            )
        elif format == "csv":
            url = (
                f"{self.base_url}/{self.api_version}"
                f"/app/getSimpleStatsDatas?{params_str}"
            )
        return url

In [4]:
def get_json(url):
    """
    Request a HTTP GET method to the given url (for REST API)
    and return its response as the dict object.

    Args:
    ====
    url: string
        valid url for REST API
    """
    try:
        print("HTTP GET", url)
        r = requests.get(url)
        json_dict = r.json()
        return json_dict
    except requests.exceptions.RequestException as error:    
        print(error)

In [5]:
def download_csv(url, filepath, enc="utf-8", dec="utf-8", logging=False):
    """
    Request a HTTP GET method to the given url (for REST API)
    and save its response as the csv file.

    url: string
        valid url for REST API
    filepathe: string
        valid path to the destination file
    enc: string
        encoding type for a content in a given url
    dec: string
        decoding type for a content in a downloaded file
            dec = 'utf-8' for general env
            dec = 'sjis'  for Excel on Win
            dec = 'cp932' for Excel with extended JP str on Win
    logging: True/False
        flag whether putting process log
    """
    try:
        if logging:
            print("HTTP GET", url)
        r = requests.get(url, stream=True)
        with open(filepath, 'w', encoding=enc) as f:
            f.write(r.content.decode(dec))
    except requests.exceptions.RequestException as error:
        print(error)


def download_all_csv(
        urls,
        filepathes,
        max_workers=10,
        enc="utf-8",
        dec="utf-8"):
    """
    Request some HTTP GET methods to the given urls (for REST API)
    and save each response as the csv file.
    (!! This method uses multi threading when calling HTTP GET requests
    and downloading files in order to improve the processing speed.)

    urls: list of strings
        valid urls for REST API
    filepathes: list of strings
        valid pathes to the destination file
    max_workers: int
        max number of working threads of CPUs within executing this method.
    enc: string
        encoding type for a content in a given url
    dec: string
        decoding type for a content in a downloaded file
            dec = 'utf-8' for general env
            dec = 'sjis'  for Excel on Win
            dec = 'cp932' for Excel with extended JP str on Win
    logging: True/False
    """
    func = functools.partial(download_csv, enc=enc, dec=dec)
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        results = list(
            tqdm(executor.map(func, urls, filepathes), total=len(urls))
        )
        del results

In [6]:
estatapi_url_parser = EstatRestAPI_URLParser()  # URL Parser

def search_tables(appId, statsCode):
    """
    Prams (dictionary) to search eStat tables.
    For more details, see also
    https://www.e-stat.go.jp/api/api-info/e-stat-manual3-0#api_3_2

        - appId: Application ID (*required)
        - lang: 言語(J:日本語, E:英語)
        - surveyYears: 調査年月 (YYYYY or YYYYMM or YYYYMM-YYYYMM)
        - openYears: 調査年月と同様
        - statsField: 統計分野 (2桁:統計大分類, 4桁:統計小分類)
        - statsCode: 政府統計コード (8桁)
        - searchWord: 検索キーワード
        - searchKind: データの種別 (1:統計情報, 2:小地域・地域メッシュ)     
        - collectArea: 集計地域区分 (1:全国, 2:都道府県, 3:市区町村)        
        - explanationGetFlg: 解説情報有無(Y or N)
        - ...
    """
    params_dict = {
        "appId": appId,
        "lang": "J",
        "statsCode": statsCode,
        #"searchWord": "社会・人口統計体系",  # "統計でみる市区町村のすがた",
        "searchKind": 1,
        "collectArea": 3,
        "explanationGetFlg": "N"
    }

    url = estatapi_url_parser.getStatsListURL(params_dict, format="json")   
    json_dict = get_json(url)
    # pprint(json_dict)

    if json_dict['GET_STATS_LIST']['DATALIST_INF']['NUMBER'] != 0:
        tables = json_dict["GET_STATS_LIST"]["DATALIST_INF"]["TABLE_INF"]
    else:
        tables = []
    return tables


def parse_table_id(table):
    return table["@id"]


def parse_table_raw_size(table):
    return table["OVERALL_TOTAL_NUMBER"]


def parse_table_urls(table_id, table_raw_size, csv_raw_size=100000):
    urls = []
    for j in range(0, int(table_raw_size / csv_raw_size) + 1):
        start_pos = j * csv_raw_size + 1
        params_dict = {
            "appId": appId,  # Application ID
            "lang": "J",  # 言語 (J: 日本語, E: 英語)
            "statsDataId": str(table_id),  # 統計表ID
            "startPosition": start_pos,  # 開始行
            "limit": csv_raw_size,  # データ取得件数
            "explanationGetFlg": "N",  # 解説情報有無(Y or N)
            "annotationGetFlg": "N",  # 注釈情報有無(Y or N)
            "metaGetFlg": "N",  # メタ情報有無(Y or N)
            "sectionHeaderFlg": "2",  # CSVのヘッダフラグ(1:取得, 2:取得無)
        }
        url = estatapi_url_parser.getStatsDataURL(params_dict, format="csv")
        urls.append(url)
    return urls

# 処理実行

In [7]:
appId = "＜アプリケーションID＞"
statsCode = "00200521" # 統計番号

In [None]:
CSV_RAW_SIZE = 100000

# list of tables
tables = search_tables(appId, statsCode)

# extract all table ids
if len(tables) == 0:
    print("No tables were found.")
elif len(tables) == 1:
    table_ids = [parse_table_id(tables[0])]
else:
    table_ids = list(map(parse_table_id, tables))

# list of urls
table_urls = []
table_raw_size = list(map(parse_table_raw_size, tables))
for i, table_id in enumerate(table_ids):
    table_urls = table_urls + parse_table_urls(table_id, table_raw_size[i])

# list of filepathes
filepathes = []
table_names = []
table_numbers = []

for i, table_id in enumerate(table_ids):
    tb = tables[i]["TITLE_SPEC"]["TABLE_NAME"]
    table_name = tables[i]["@id"]
    table_dir = f"./downloads/tmp/{table_name}_{table_id}"
    os.makedirs(table_dir, exist_ok=True)

    fps=[]
    for j in range(0, int(table_raw_size[i] / CSV_RAW_SIZE) + 1):
        filepath = f"{table_dir}/{table_name}_{table_id}_{j}.csv"
        fps.append(filepath)
        #filepathes.append(filepath)

    table_numbers.append(table_name)
    table_names.append(tb)
    filepathes.append(fps)

download_all_csv(table_urls, [item for l in filepathes for item in l], max_workers=30)

In [None]:
import pykakasi
kakasi = pykakasi.kakasi() # インスタンスの作成
kakasi.setMode('H', 'a') # ひらがなをローマ字に変換するように設定
kakasi.setMode('K', 'a') # カタカナをローマ字に変換するように設定
kakasi.setMode('J', 'a') # 漢字をローマ字に変換するように設定
conversion = kakasi.getConverter() # 上記モード設定の適用

import re
regex = re.compile("\d", flags=0)


df_names = pd.DataFrame()

for table_name, fps, tb_num in zip(table_names[78:], filepathes[78:], table_numbers[78:]):

  df_merged = pd.DataFrame()

  # 分割してダウンロードしたファイルを、pandas化後に結合
  for filepath in fps:
    df = pd.read_csv(filepath)

    # 日本語の変数名はBQに書き込めないため、ざっくり英語化
    col_en = []
    col_ja = list(df.columns)

    for col in col_ja:

      trans_en = conversion.do(col)
      trans_en = trans_en.replace("(","").replace(")","").replace(",","").replace(" ","_").replace("-","").replace("'","").replace(".","")
      trans_en = trans_en.replace("（","").replace("）","").replace("，","").replace("　","_").replace("－","").replace("～","").replace("・","")

      # 数値始まりの変数名はNGになるため、N_を付与
      if regex.search(trans_en[1]):
        trans_en = "N_"+trans_en

      col_en.append(trans_en)

    # 同じ列名になった時に、_2をつける
    col_en = [x + ['', '_2'][x in col_en[0:i]] for i, x in enumerate(col_en)]

    df.columns = col_en  
    df["table_name"] = table_name

    df_merged = df_merged.append(df)

  df_merged.to_gbq(f"{DATASET}.table_{tb_num}",f"{PROJECT}", if_exists="replace")

  df_name = pd.DataFrame(zip(col_ja, col_en), columns=["col_ja", "col_en"])
  df_name["table_name"] = table_name
  df_name["table_id"] = tb_num
  df_names = df_names.append(df_name)