# 事前準備

- カタログ、スキーマの作成
  - 編集 > 置き換え > [ytsuchiya -> {ご自身の名前}]
- 以下設定の汎用クラスターを作成
  - コンピュート名：{ご自身の名前}_cluster
  - Photon：有効のまま
  - オートスケールを有効化：無効に変更
  - ワーカータイプ：i3.2xlarge
  - ワーカー数：8
  - 停止時間：80分後
  ![](./image/cluster_setting.png)

In [0]:
%pip install tqdm

In [0]:
%restart_python

In [0]:
%sql
CREATE CATALOG IF NOT EXISTS ytsuchiya;
use catalog ytsuchiya;

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS perf_tuning;
use schema perf_tuning;

In [0]:
%sql
-- ノートブックを再度実行しても、リキッドクラスタリング適用前の状態でテーブルを作成できるように、すでにテーブルが存在する場合は削除
DROP TABLE IF EXISTS ytsuchiya.perf_tuning.trn_pos;

In [0]:
spark.sql("""
CREATE TABLE ytsuchiya.perf_tuning.trn_pos
(
  pos_storecd STRING NOT NULL,
  pos_storename STRING,
  pos_itemcd STRING NOT NULL,
  pos_itemname STRING,
  pos_salesunitprice DOUBLE,
  pos_salesquantity INTEGER,
  pos_salesamount DOUBLE,
  pos_kigyocd STRING NOT NULL,
  pos_salesdate DATE NOT NULL
)
USING DELTA
""")

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType, DoubleType, DateType
import random
from tqdm import tqdm

# UDFs for each column with explicit return types
fake_storecd = F.udf(lambda: str(random.randint(1, 101)), StringType())
fake_storename = F.udf(lambda: ''.join(random.choices('ABCDEFGHIJKLMNOPQRSTUVWXYZ', k=3)), StringType())
fake_itemcd = F.udf(lambda: str(random.randint(1, 101)), StringType())
fake_itemname = F.udf(lambda: ''.join(random.choices('ABCDEFGHIJKLMNOPQRSTUVWXYZ', k=3)), StringType())
fake_salesquantity = F.udf(lambda: random.randint(1, 101), IntegerType())
fake_salesunitprice = F.udf(lambda: float(random.choice(range(1000, 100001, 500))), DoubleType())
fake_salesamount = F.udf(lambda qty, price: float(qty) * float(price), DoubleType())
fake_kigyocd = F.udf(lambda: str(random.randint(1, 101)), StringType())
fake_salesdate = F.udf(lambda: random.choice([f"2025-{str(m).zfill(2)}-{str(d).zfill(2)}" for m in range(1, 13) for d in range(1, 32) if (m != 2 or d <= 28) and (m not in [4, 6, 9, 11] or d <= 30)]), StringType())

# 以下のクエリで1万行となるようにデータ生成量を調整
'''
"pos_salesdate" >= "20250501" and "pos_salesdate" <= "20250531" # 1/12
and "pos_kigyocd" = "19" # 1/100
and "pos_itemcd" in (" "," ",・・・) # 4/100 = 1/25
'''

# 3億行 (300,000,000) を分割して生成・挿入
total_rows = 300_000_000
batch_size = 10_000_000

# テスト用サイズ
# total_rows = 60_000
# batch_size = 60_000

num_batches = int(total_rows // batch_size) + 1

for i in tqdm(range(num_batches)):
    print(f"Generating batch {i+1}/{num_batches}")
    df = spark.range(0, int(batch_size))
    df = df.withColumn("pos_storecd", fake_storecd())
    df = df.withColumn("pos_storename", fake_storename())
    df = df.withColumn("pos_itemcd", fake_itemcd())
    df = df.withColumn("pos_itemname", fake_itemname())
    df = df.withColumn("pos_salesquantity", fake_salesquantity())
    df = df.withColumn("pos_salesunitprice", fake_salesunitprice())
    df = df.withColumn("pos_salesamount", fake_salesamount(F.col("pos_salesquantity"), F.col("pos_salesunitprice")))
    df = df.withColumn("pos_kigyocd", fake_kigyocd())
    df = df.withColumn("pos_salesdate", F.to_date(fake_salesdate(), "yyyy-MM-dd"))
    df = df.select(
        "pos_storecd", "pos_storename", "pos_itemcd", "pos_itemname",
        "pos_salesquantity", "pos_salesamount", "pos_salesunitprice",
        "pos_kigyocd", "pos_salesdate"
    )
    df.write.format("delta").mode("append").saveAsTable("ytsuchiya.perf_tuning.trn_pos")

In [0]:
query = f"""
SELECT
  pos_salesdate,
  pos_storecd,
  pos_storename,
  pos_itemcd,
  pos_itemname,
  SUM(pos_salesamount) AS total_salesamount,
  SUM(pos_salesquantity) AS total_salesquantity
FROM ytsuchiya.perf_tuning.trn_pos
WHERE pos_salesdate >= DATE('2025-05-01')
  AND pos_salesdate <= DATE('2025-05-31')
  AND pos_kigyocd = '19'
GROUP BY
  pos_salesdate,
  pos_storecd,
  pos_storename,
  pos_itemcd,
  pos_itemname
"""

df = spark.sql(query)
print(df.count())
display(df)

In [0]:
itemcd_list = ["1", "2", "3", "4"]  # 例: 取得したいitemcdを指定

query = f"""
SELECT
  pos_salesdate,
  pos_storecd,
  pos_storename,
  pos_itemcd,
  pos_itemname,
  SUM(pos_salesamount) AS total_salesamount,
  SUM(pos_salesquantity) AS total_salesquantity
FROM ytsuchiya.perf_tuning.trn_pos
WHERE pos_salesdate >= DATE('2025-05-01')
  AND pos_salesdate <= DATE('2025-05-31')
  AND pos_kigyocd = '19'
  AND pos_itemcd IN ({','.join([f"'{x}'" for x in itemcd_list])})
GROUP BY
  pos_salesdate,
  pos_storecd,
  pos_storename,
  pos_itemcd,
  pos_itemname
"""

df = spark.sql(query)
print(df.count())
display(df)