国会図書館データを取得するプログラムです。  
https://dl.ndl.go.jp

このプログラムでは以下のURLよりダウンロードしたエクセルデータを利用します。  
https://www.ndl.go.jp/jp/dlib/standards/opendataset/index.html

今回利用したエクセルファイルは全部で4つです
- 図書 2ファイル
- 古典籍 1ファイル
- 雑誌 1ファイル

動作環境
- Google Colab Pro
- ハイメモリ
- GPU 不要

In [None]:
gpu_info = !nvidia-smi
gpu_info = '\n'.join(gpu_info)
if gpu_info.find('failed') >= 0:
  print('Not connected to a GPU')
else:
  print(gpu_info)

In [None]:
from psutil import virtual_memory
ram_gb = virtual_memory().total / 1e9
print('Your runtime has {:.1f} gigabytes of available RAM\n'.format(ram_gb))

if ram_gb < 20:
  print('Not using a high-RAM runtime')
else:
  print('You are using a high-RAM runtime!')

In [None]:
!pip install datasets huggingface_hub

In [None]:
import os
import gc
import requests
import zipfile
import pandas as pd
import shutil
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
from datasets import Dataset
from huggingface_hub import HfApi, HfFolder

In [None]:
# Hugging Faceのトークンを環境変数に設定
os.environ['HF_TOKEN'] = 'YOUR API KEY'

# Hugging Faceのトークンを保存
HfFolder.save_token(os.getenv('HF_TOKEN'))

In [None]:
class ExcelDataProcessor:
    def __init__(self, file_path):
        self.file_path = file_path
        self.data = None

    def load_data(self):
        if self.data is None:
            print("Loading Excel file...")
            self.data = pd.read_excel(self.file_path)
            self.data['PID'] = self.data['永続的識別子'].str.extract(r'/(\d+)$')
            self.data = self.data[self.data['権利区分'] == '保護期間満了'].reset_index(drop=True)
            print("Excel file loaded and filtered.")
        return self.data

    def get_data_length(self):
        if self.data is None:
            self.load_data()
        return len(self.data)

    def process_data(self, _destination_folder, _download_folder, start_index=0, end_index=None):
        filtered_data = self.load_data()

        if end_index is None or end_index > len(filtered_data):
            end_index = len(filtered_data)
        filtered_data = filtered_data.iloc[start_index:end_index].copy()

        text_contents = []

        print("Starting download and reading text files...")
        with ThreadPoolExecutor(max_workers=10) as executor:
            futures = []
            for pid in filtered_data['PID']:
                future = executor.submit(self.download_and_extract, pid, _destination_folder, _download_folder)
                futures.append(future)

            for future in tqdm(as_completed(futures), total=len(futures), desc="Processing PIDs"):
                try:
                    content = future.result()
                    text_contents.append(content)
                except Exception as e:
                    print(f"Error occurred: {e}")
                    text_contents.append(None)

        print("Inserting contents...")

        filtered_data.insert(2, "本文", text_contents)
        print(filtered_data.head())
        return filtered_data

    @staticmethod
    def download_and_extract(pid, destination_folder, _download_folder):
      try:
        # 対象フォルダ内にすでに{pid}.txtがある場合はテキストを読み込む
        if os.path.exists(os.path.join(destination_folder, f"{pid}.txt")):
          with open(os.path.join(destination_folder, f"{pid}.txt"), 'r') as file:
              content = file.read()
          return content

        # ZIPファイルのダウンロード
        url = f"https://lab.ndl.go.jp/dl/api/book/fulltext/{pid}"
        zip_file_path = os.path.join(_download_folder, f"{pid}.zip")
        response = requests.get(url)
        response.raise_for_status()  # HTTPエラーが発生した場合の処理

        with open(zip_file_path, "wb") as file:
            file.write(response.content)

        # ZIPファイルの解凍
        extract_folder = os.path.join(_download_folder, pid)
        os.makedirs(extract_folder, exist_ok=True)
        with zipfile.ZipFile(zip_file_path, "r") as zip_ref:
            zip_ref.extractall(extract_folder)

        # テキストファイルの移動および結合
        source_file = os.path.join(extract_folder, f"{pid}.txt")
        if os.path.exists(extract_folder) and not os.path.exists(source_file):
            text_files = sorted([fname for fname in os.listdir(extract_folder) if fname.endswith('.txt')])
            combined_content = ""
            for text_file in text_files:
                with open(os.path.join(extract_folder, text_file), 'r') as file:
                    combined_content += file.read() + "\n"

            destination_file = os.path.join(destination_folder, f"{pid}.txt")
            with open(destination_file, 'w') as file:
                file.write(combined_content)
        else:
            shutil.move(source_file, os.path.join(destination_folder, f"{pid}.txt"))

        # 作業用のZIPファイルと解凍されたフォルダを削除
        os.remove(zip_file_path)
        shutil.rmtree(extract_folder)

        with open(os.path.join(destination_folder, f"{pid}.txt"), 'r') as file:
            content = file.read()
        return content

      except zipfile.BadZipFile:
          print(f"Failed to process PID {pid}: Bad ZIP file")
          return None
      except OSError as e:
          print(f"OS error for PID {pid}: {e}")
          return None
      except Exception as e:
          print(f"Failed to process PID {pid}: {e}")
          return None

In [None]:
# エクセルファイルのパス
working_folder = "/content/drive/MyDrive/Colab Notebooks/GENIAC/ndlj"
download_folder = "/content"

# リスト形式でエクセルファイルを渡せるようにしたが、処理時間が長くなるので1ファイルずつ実行した方が良い
# tosho_2 の batch_size = 8000

file_names = ["tosho_1"]
batch_size = 10000

for file_name in file_names:
  excel_file_path = working_folder + "/" + file_name + ".xlsx"
  hugging_face_repogitory_name = "ndlj_" + file_name

  # ダウンロードとファイル操作を行うフォルダ
  destination_folder = working_folder+ "/text_files/" + file_name

  processor = ExcelDataProcessor(excel_file_path)

  # データの長さを取得
  data_length = processor.get_data_length()
  print(f"Total number of rows in the processed data: {data_length}")

  # 全バッチ数の計算
  total_batches = (data_length + batch_size - 1) // batch_size

  for batch_no in range(1, total_batches + 1):
      start_number = batch_size * (batch_no - 1)
      end_number = min(batch_size * batch_no, data_length)  # 最後のバッチがデータ長を超えないようにする

      padded_batch_no = str(batch_no).zfill(2)
      padded_start = str(start_number).zfill(6)
      padded_end = str(end_number - 1).zfill(6)
      hf_directory_name = f"batch_no{padded_batch_no}_{padded_start}_to_{padded_end}"

      # データの処理
      df = processor.process_data(destination_folder, download_folder, start_index=start_number, end_index=end_number)

      # CSVファイルとして保存
      csv_file_path = f"{working_folder}/csv_files/{file_name}_{hf_directory_name}.csv"
      df.to_csv(csv_file_path, index=False)

      # pandasデータフレームをHugging Faceデータセットに変換
      dataset = Dataset.from_pandas(df)

      # データフレームのメモリを解放
      del df
      gc.collect()

      # データセットのアップロード
      dataset.push_to_hub(
            hugging_face_repogitory_name,
            private=True,
            data_dir=hf_directory_name,
            # create_pr=True,
            # commit_description=f"upload data, {hf_directory_name}",
            )
      print(f"Uploaded batch {batch_no} for file {file_name}")

      # データセットのメモリを解放
      del dataset
      gc.collect()
