# Data Ingestion form Qiita API

In [0]:
dbutils.widgets.text("catalog", "workspace")
catalog = dbutils.widgets.get("catalog")

dbutils.widgets.text("schema", "qiita")
schema = dbutils.widgets.get("schema")

dbutils.widgets.text("table", "items")
table = dbutils.widgets.get("table")

dbutils.widgets.text("page_from", "1")
page_from = dbutils.widgets.get("page_from")

dbutils.widgets.text("page_to", "1")
page_to = dbutils.widgets.get("page_to")

dbutils.widgets.text("per_page", "20")
per_page = dbutils.widgets.get("per_page")

dbutils.widgets.text("query", "tag:databricks")
query = dbutils.widgets.get("query")
query = query if query else None

print(f"catalog: {catalog}")
print(f"schema: {schema}")
print(f"table: {table}")
print("-----")
print(f"page: {page_from} to {page_to}")
print(f"per_page: {per_page}")
print(f"query: {query}")

## パラメータのチェック
try:
    page_from_int = int(page_from)
    page_to_int = int(page_to)
    per_page_int = int(per_page)

    if int(page_from) > int(page_to):
        raise Exception(f"page_from must be less than or equal to page_to: page_from={page_from}, page_to={page_to}")

except (TypeError, ValueError):
    raise Exception(
        f"page_from, page_to, per_page must be integers: page_from={page_from}, page_to={page_to}, per_page={per_page}"
    )


## コネクションの作成

In [0]:
%sql

-- Qiita API用外部接続の作成
CREATE CONNECTION IF NOT EXISTS qiita_api
TYPE HTTP
OPTIONS (
  host 'https://qiita.com',
  port '443',
  base_path '/api/v2/',
  bearer_token secret('qiita', 'api_key')
)
COMMENT 'QiitaのAPIにアクセスするための接続です'
;

## Custom Datasourceの作成

In [0]:
import json
from urllib.parse import quote
from datetime import datetime

from pyspark.sql import Row
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    BooleanType,
    IntegerType,
    TimestampType,
    ArrayType,
)

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.serving import ExternalFunctionRequestHttpMethod

_items_schema = StructType(
    [
        StructField("rendered_body", StringType(), True),  # HTML 形式の本文
        StructField("body", StringType(), True),  # Markdown 形式の本文
        StructField("coediting", BooleanType(), True),  # 共同更新か否か (Qiita Team)
        StructField("comments_count", IntegerType(), True),  # コメント数
        StructField("created_at", StringType(), True),  # 作成日時 (ISO8601)
        StructField("group", StringType(), True),  # Qiita Team のグループ情報
        StructField("id", StringType(), True),  # 記事の一意な ID
        StructField("likes_count", IntegerType(), True),  # いいね数 (Qiita)
        StructField("private", BooleanType(), True),  # 限定共有か否か (Qiita Team 無効)
        StructField(
            "reactions_count", IntegerType(), True
        ),  # 絵文字リアクション数 (Qiita Team)
        StructField("stocks_count", IntegerType(), True),  # ストック数
        # StructField("tags", ArrayType(StringType()), True),  # タグ一覧
        StructField("tags", StringType(), True),  # タグ一覧
        StructField("title", StringType(), True),  # 記事タイトル
        StructField("updated_at", StringType(), True),  # 最終更新日時
        StructField("url", StringType(), True),  # 記事 URL
        StructField("user", StringType(), True),  # 記事作者情報
        StructField("page_views_count", IntegerType(), True),  # 閲覧数 (null 許容)
        StructField(
            "team_membership", StringType(), True
        ),  # Team メンバー情報（構造が不明なため文字列で仮置き）
        StructField(
            "organization_url_name", StringType(), True
        ),  # Organization の url_name (null 許容)
        StructField("slide", BooleanType(), True),  # スライドモードか否か
    ]
)


class QiitaItemsDataSource(DataSource):
    """
    QiitaItemsDataSource

    Qiita APIから記事データを取得し、Spark DataFrameとして読み込むためのDataSourceクラス。

    Attributes:
        None

    Methods:
        name():
            データソース名 "qiita_items" を返却します。
        schema():
            記事データのスキーマを返却します。
        reader(schema):
            DataSourceReaderを返却します。

    Example:
        spark.dataSource.register(QiitaItemsDataSource)

        spark.read.format("qiita_items").option(...).load()
    """

    @classmethod
    def name(self):
        return "qiita_items"

    def schema(self):
        return _items_schema

    def reader(self, schema):
        return QiitaItemsDataSourceReader(schema, self.options)


class QiitaItemsDataSourceReader(DataSourceReader):
    def __init__(self, schema, options):
        self.schema = schema
        self.options = options

    def read(self, partition):

        # 各種オプションの取得
        conn_name = self.options.get("conn_name", "qiita_api")
        host = self.options.get("host")
        token = self.options.get("token")
        client_id = self.options.get("client_id")
        client_secret = self.options.get("client_secret")
        query = self.options.get("query")

        page = self.options.get("page", "1")
        per_page = self.options.get("per_page", "20")

        try:
            page = int(page)
            per_page = int(per_page)
        except (TypeError, ValueError):
            raise Exception(
                f"Page and per_page options must be convertible to integer: page={page}, per_page={per_page}"
            )

        if (page < 1 and 100 < page) or (per_page < 1 and 100 < per_page):
            raise Exception(
                f"Page and per_page options must be between 1 and 100: page={page}, per_page={per_page}"
            )

        # queryが指定されていない場合、authenticated_userの記事一覧を取得する
        if query:
            raw_path = f"items?page={page}&per_page={per_page}&query={query}"
        else:
            raw_path = f"authenticated_user/items?page={page}&per_page={per_page}"

        path = quote(raw_path, safe="/?=&")

        ws = WorkspaceClient(
            host=host, token=token, client_id=client_id, client_secret=client_secret
        )
        response = ws.serving_endpoints.http_request(
            conn=conn_name,
            method=ExternalFunctionRequestHttpMethod.GET,
            path=path,
        )

        response.raise_for_status()
        items = response.json()

        for item in items:
            schema_fields = [field.name for field in self.schema.fields]
            print(schema_fields)
            filtered_item = {key: item.get(key, None) for key in schema_fields}

            # 辞書型はJSON形式に変換する
            for key, value in filtered_item.items():
                if isinstance(value, (dict, list)):
                    filtered_item[key] = json.dumps(value, ensure_ascii=False)
            yield Row(**filtered_item)

spark.dataSource.register(QiitaItemsDataSource)

## データの取得・保管

In [0]:
import pyspark.sql.functions as F
from functools import reduce

token = (
    dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()
)
host = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get()

# 指定範囲の記事を取得する
all_df = []
for page in range(int(page_from), int(page_to)+1):
    options = {
        "host": host,
        "token": token,
        "page": page,
        "per_page": per_page,
        "query": query,
    }

    df = spark.read.format("qiita_items").options(**options).load()

    # 結果が得られない場合はそこで終了
    if df.count() == 0:
        break

    all_df.append(df)

df = reduce(lambda a, b: a.unionByName(b), all_df)
df = df.withColumn("imported_at", F.current_timestamp())

# display(df)


In [0]:

# Appendモードでデータを書き込む

spark.sql(f"CREATE CATALOG IF NOT EXISTS `{catalog}`")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS `{schema}`")

df.write.mode("append").option("mergeSchema", "true").saveAsTable(
    f"`{catalog}`.`{schema}`.`{table}`"
)


In [0]:
# # テスト用のテーブル削除処理

# sql_drop = f"""
# DROP TABLE IF EXISTS `{catalog}`.`{schema}`.`{table}`
# """

# spark.sql(sql_drop)