# 第 10 章: ストリーミング処理とスキーマ・パーティション進化


In [None]:
import pyspark
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
import pandas as pd
CATALOG = "my_catalog"
CATALOG_URL = "http://server:8181/"
S3_ENDPOINT = "http://minio:9000"
SPARK_VERSION = pyspark.__version__
SPARK_MINOR_VERSION = '.'.join(SPARK_VERSION.split('.')[:2])
ICEBERG_VERSION = "1.8.1"

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, current_timestamp, to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, LongType
import json
import time
from datetime import datetime

In [None]:
spark = (
    SparkSession.builder
        .config("spark.jars.packages", 
                f"org.apache.iceberg:iceberg-spark-runtime-{SPARK_MINOR_VERSION}_2.12:{ICEBERG_VERSION},org.apache.iceberg:iceberg-aws-bundle:{ICEBERG_VERSION},org.apache.spark:spark-sql-kafka-0-10_2.12:{SPARK_VERSION}")
        .config(f"spark.sql.catalog.{CATALOG}", "org.apache.iceberg.spark.SparkCatalog")
        .config(f"spark.sql.catalog.{CATALOG}.type", "rest")
        .config(f"spark.sql.catalog.{CATALOG}.uri", CATALOG_URL)
        .config(f"spark.sql.catalog.{CATALOG}.s3.endpoint", S3_ENDPOINT)
        .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
        .config("spark.sql.defaultCatalog", "my_catalog")
        .getOrCreate()
)

In [None]:
%sql spark

## 準備

* アクセスログを保存するための Iceberg テーブル `web_access_logs` を作成
* Kafka トピックの作成

### (Optional) データベースの作成
データベースを作成していない場合、以下のセルを実行してください。既にデータベースが存在する場合は、本ステップにつきましてはスキップしてください。

In [None]:
%%sql
CREATE DATABASE IF NOT EXISTS db

テーブル作成

In [None]:
%%sql
CREATE TABLE db.web_access_logs (
    timestamp TIMESTAMP,
    ip_address STRING,
    path STRING,
    status_code INT,
    user_agent STRING
) USING iceberg
PARTITIONED BY (day(timestamp))

### Kafka トピックの作成

In [None]:
from kafka.admin import KafkaAdminClient, NewTopic
from kafka.errors import TopicAlreadyExistsError
KAFKA_TOPIC = 'web-access-logs'


kafka_client = KafkaAdminClient(bootstrap_servers='kafka:29092', client_id=None)
topic = NewTopic(name=KAFKA_TOPIC, num_partitions=1, replication_factor=1)

# Create a new topic
kafka_client.create_topics(new_topics=[topic], validate_only=False)

## Web アクセスログを Spark Structured Streaming で Iceberg テーブルに書き込む
### Kafka プロデューサーからデータを送信する

In [None]:
from kafka import KafkaProducer
import random

In [None]:
# Function to produce sample web access logs to Kafka
web_log_producer = KafkaProducer(
    bootstrap_servers=['kafka:29092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

20 個のログを送信する

In [None]:
paths = ["/home", "/products", "/about", "/contact", "/api/users", "/api/products"]
ip_addresses = ["192.168.1.10", "10.0.0.5", "172.16.0.3", "192.168.1.25"]
user_agents = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
    "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36"
]

try:
    for i in range(20):
        log = {
            "timestamp": datetime.now().isoformat(),
            "ip_address": random.choice(ip_addresses),
            "path": random.choice(paths),
            "status_code": random.choice([200, 200, 200, 404, 500]),
            "user_agent": random.choice(user_agents)
        }
        web_log_producer.send(KAFKA_TOPIC, log)
        print(f"Sent: {log}")
        time.sleep(0.5)
except Exception as e:
    raise e
finally:
    web_log_producer.flush()
    web_log_producer.close()

### Structured Streaming で Kafka からデータを取得する

In [None]:
# Read from Kafka
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:29092") \
    .option("subscribe", KAFKA_TOPIC) \
    .option("startingOffsets", "earliest") \
    .option("failOnDataLoss", "false") \
    .load()

In [None]:
# Define initial schema
initial_schema = StructType([
    StructField("timestamp", StringType(), False),
    StructField("ip_address", StringType(), False),
    StructField("path", StringType(), False),
    StructField("status_code", IntegerType(), False),
    StructField("user_agent", StringType(), True)
])

In [None]:
# Parse JSON and apply schema
df_processed = df.select(
    from_json(col("value").cast("string"), initial_schema).alias("data")
).select(
    to_timestamp(col("data.timestamp")).alias("timestamp"),
    col("data.ip_address"),
    col("data.path"),
    col("data.status_code"),
    col("data.user_agent")
)

### Iceberg テーブルにデータを書き込む

In [None]:
# Write to Iceberg
sq = df_processed.writeStream \
    .format("iceberg") \
    .outputMode("append") \
    .trigger(processingTime="10 seconds") \
    .option("checkpointLocation", "/tmp/iceberg-checkpoint/web-logs-v1") \
    .option("fanout-enabled", "true") \
    .toTable("db.web_access_logs")

### 現在のアクセスログを集計する
Kafka から読み取り、Iceberg テーブルに書き込んだデータを試しに読んでみましょう。

In [None]:
%%sql
SELECT count(*) FROM db.web_access_logs

In [None]:
%%sql
SELECT ip_address, status_code, path, count(*) as cnt FROM db.web_access_logs
GROUP BY ip_address, status_code, path
ORDER BY ip_address, status_code, path

In [None]:
# Check the data
spark.sql("SELECT * FROM db.web_access_logs ORDER BY timestamp DESC LIMIT 5").show(truncate=False)

次のセクションでスキーマ変更が発生する前提での書き込み方法に切り替えるため、一度ストリーミングアプリケーションを停止します。

In [None]:
sq.stop()

## スキーマ進化したウェブアクセスログを書き込む

In [None]:
web_log_producer = KafkaProducer(
    bootstrap_servers=['kafka:29092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

ウェブアプリケーション側の仕様変更によりスキーマが変更されたログが 30 メッセージ送信される

In [None]:
paths = ["/home", "/products", "/about", "/contact", "/api/users", "/api/products"]
ip_addresses = ["192.168.1.10", "10.0.0.5", "172.16.0.3", "192.168.1.25"]
user_agents = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
    "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36"
]
user_ids = ["user123", "user456", "user789", None]

try:
    for i in range(30):
        log = {
            "timestamp": datetime.now().isoformat(),
            "ip_address": random.choice(ip_addresses),
            "path": random.choice(paths),
            "status_code": random.choice([200, 200, 200, 404, 500]),
            "user_agent": random.choice(user_agents),
            "response_time_ms": random.randint(50, 2000),  # 追加されたカラム
            "user_id": random.choice(user_ids)  # 追加されたカラム
        }
        producer.send(KAFKA_TOPIC, log)
        print(f"Sent log with new schema: {log}")
        time.sleep(0.5)
except Exception as e:
    raise e
finally:
    web_log_producer.flush()
    web_log_producer.close()

### スキーマ進化したデータを読み込み Iceberg テーブルのスキーマを動的に変更しながら書き込む

In [None]:
df_new_schema = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:29092") \
    .option("subscribe", KAFKA_TOPIC) \
    .option("startingOffsets", "latest") \
    .option("failOnDataLoss", "false") \
    .load()

In [None]:
# Schema inference をしながら、新たにデータを書き込む
def process_evolving_schema_logs(batch_df, batch_id):
    def get_inferred_schema(df_str):
        """Infer schema from JSON records"""
        json_records = df_str.collect()     
        parsed_records = []
        for row in json_records:
            try:
                parsed_records.append(json.loads(row.json_string))
            except Exception as e:
                raise e
        df_schema = spark.read.option('inferSchema', True).json(spark.sparkContext.parallelize([json.dumps(r) for r in parsed_records]))
        return df_schema.schema
                
   # Extract the JSON string values from Kafka messages
   df_str = batch_df.select(col("value").cast("string").alias("json_str"))

   # Parse all records with the inferred schema
   df_new_schema = batch_df.select(
       from_json(col("value").cast("string"), get_inferred_schema(df_str)).alias("data")
   ).select("data.*")

   # Add timestamp conversion if timestamp field exists
   if "timestamp" in df_new_schema.columns:
       df_new_schema = df_new_schema.withColumn("timestamp", to_timestamp(col("timestamp")))


   # Write with schema merge enabled
   write_options = {
       "merge-schema": "true",  # スキーママージを有効にする
       "check-ordering": "false"  # Allow column reordering
   }
    df_new_schema.printSchema()
    df.show()
       
   # Write to Iceberg table
   # df_new_schema.writeTo("db.web_access_logs").options(**write_options).append()

In [None]:
# Start streaming with foreachBatch
sq_new_schema = df_new_schema.writeStream \
    .foreachBatch(process_evolving_schema_logs) \
    .trigger(processingTime="10 seconds") \
    .option("checkpointLocation", "/tmp/iceberg-checkpoint/web-logs-v2") \
    .start()

### テーブルスキーマを確認する

In [None]:
%%sql
DESCRIBE db.web_access_logs

### 集計クエリを実行する

In [None]:
%%sql
SELECT count(*) as total FROM db.web_access_logs

各ユーザーからのアクセス状況。これを基に、各ユーザーにレコメンデーションができる

In [None]:
%%sql
SELECT ip_address, status_code, path, count(*) as cnt FROM db.web_access_logs
GROUP BY user_id, status_code, path
ORDER BY user_id, status_code, path
WHERE user_id IS NOT NULL

平均レスポンス時間

In [None]:
%%sql
SELECT path, AVG(response_time_ms) as avg_response_time_ms
FROM db.web_access_logs
GROUP BY path ORDER BY avg_response_time_ms