In [0]:
%run ./_include

In [0]:
# 現在のデータベース（スキーマ）を確認
current_db = spark.catalog.currentDatabase()
print(f"Current Database: {current_db}")

# Unity Catalog環境では currentCatalog() も使用可能（Spark 3.4+）
current_catalog = spark.catalog.currentCatalog()
print(f"Current Catalog: {current_catalog}")


## 分析

## シルバー層

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col, to_date, year, month, day, quarter, dayofmonth, when, quarter, dayofweek, date_format, lit, weekofyear, monotonically_increasing_id, current_timestamp, sum
from datetime import date, timedelta


# ===============シルバー層のモック

# オーダー詳細のモックデータ
silver_order_details = [
    (10248, 11, 1, 14.00, 14.00 * 12 - 0, 1,  0.2),
    (10248, 42, 1, 9.00,  9.80 * 10 - 0, 2,   0.0),
    (10249, 14, 1, 18.60, 18.60 * 9 - 0, 12,  0.05),
    (10250, 13, 2, 18.60, 18.60 * 9 - 0, 6,   0.0),
]
silver_order_details_schema = [
    "order_id",
    "product_id",
    "employee_id",
    "unit_price",
    "detail_total_amount",
    "quantity",
    "discount",
]
df_silver_order_details = spark.createDataFrame(silver_order_details, silver_order_details_schema)
df_silver_order_details.createOrReplaceTempView("silver_order_details")

# カテゴリーのモックデータ
silver_categories = [
    (1, "cat_AAA"),
    (2, "cat_BBB"),
]
silver_categories_schema = [
    "category_id",
    "category_name",
]
df_silver_categories = spark.createDataFrame(silver_categories, silver_categories_schema)
df_silver_categories.createOrReplaceTempView("df_silver_categories")


# 商品のモックデータ
silver_products = [
    (1, "AAA", 1, 1, 21, 22, 31),
    (11, "BBB", 2, 1, 21, 22, 31),
    (13, "abc", 3, 1, 21, 22, 31),
    (14, "CCC", 3, 1, 21, 22, 31),
    (42, "DDD", 3, 1, 21, 22, 31),
]
silver_products_schema = [
    "product_id",
    "product_name",
    "supplier_id",
    "category_id",
    "units_in_stock",
    "units_on_order",
    "reorder_level",
]
df_silver_products = spark.createDataFrame(silver_products, silver_products_schema)
df_silver_products.createOrReplaceTempView("silver_products")

# オーダーのモックデータ
silver_orders = [
    (10248, "ALFKI", 5, date(1996, 7, 4), date(1996, 7, 20), date(1996, 8, 1)),
    (10249, "ANATR", 6, date(1996, 8, 19), date(1996, 8, 29), date(1996, 9, 2)),
    (10250, "ANTON", 7, date(1996, 8, 22), date(1996, 8, 30), date(1996, 9, 2)),
]
silver_orders_schema = [
    "order_id",
    "customer_id",
    "employee_id",
    "order_date",
    "required_date",
    "shipped_date"
]
df_silver_orders = spark.createDataFrame(silver_orders, silver_orders_schema)
df_silver_orders.createOrReplaceTempView("df_silver_orders")


# 顧客のモックデータ
silver_customers = [
    ("ALFKI", "Alfreds Futterkiste", "Germany"),
    ("ANATR", "Ana Trujillo Emparedados y helados", "Mexico"),
    ("ANTON", "Antonio Moreno Taquería", "UK"),
]
silver_customers_schema = [
    "customer_id",
    "company_name",
    "country_name",
]
df_silver_customers = spark.createDataFrame(silver_customers, silver_customers_schema)
df_silver_customers.createOrReplaceTempView("silver_customers")


# 従業員のモックデータ
silver_employees = [
    (1, "Davolio Nancy"),
    (2, "Fuller Andrew"),
    (3, "Leverling Janet"),
]
silver_employees_schema = [
    "employee_id",
    "employee_name",
]
df_silver_employees = spark.createDataFrame(silver_employees, silver_employees_schema)
df_silver_employees.createOrReplaceTempView("silver_employees")




## ゴールド層

### ディメンション

In [0]:

# 顧客ディメンション
df_dim_customers = df_silver_customers\
    .withColumn('customer_key', monotonically_increasing_id())\
    .withColumn("created_at", current_timestamp())
df_dim_customers.createOrReplaceTempView("dim_customers")

# 国別ディメンション
df_countries = df_silver_customers\
    .select('country_name')\
    .distinct()\
    .orderBy("country_name")

df_dim_countries = df_countries\
    .withColumn("country_key", monotonically_increasing_id())\
    .withColumn("created_date", current_timestamp()) \
    .select(
        "country_key",
        "country_name"
    )
df_dim_countries.createOrReplaceTempView("dim_countries")

# 商品ディメンション
df_dim_products = df_silver_products\
    .withColumn('product_key', monotonically_increasing_id())\
    .withColumn("created_at", current_timestamp())
df_dim_products.createOrReplaceTempView("dim_products")

# カテゴリーディメンション
df_dim_categories = df_silver_categories\
    .withColumn('category_key', monotonically_increasing_id())\
    .withColumn("created_at", current_timestamp())
df_dim_categories.createOrReplaceTempView("dim_categories")

# 従業員ディメンション
df_dim_employees = df_silver_employees\
    .withColumn('employee_key', monotonically_increasing_id())\
    .withColumn("created_at", current_timestamp())
df_dim_employees.createOrReplaceTempView("dim_employees")

# 月別ディメンション
start_date = date(1990, 1, 1)
end_date = date(2000, 12, 31)
date_range =  [(start_date + timedelta(days=x),) for x in range((end_date - start_date).days + 1)]

df_dates = spark.createDataFrame(date_range, ['date'])


df_dim_date = df_dates\
    .withColumn('date_key', monotonically_increasing_id())\
    .withColumn('year', year(col('date')))\
    .withColumn('quarter', quarter(col('date')))\
    .withColumn('month', month(col('date')))\
    .withColumn('day', dayofmonth(col('date')))\
    .withColumn('day_of_week', dayofweek(col('date')))\
    .withColumn('day_name', date_format(col('date'), 'EEEE'))\
    .withColumn('week_of_year', weekofyear(col('date')))\
    .withColumn('is_weekend', 
      when(col('day_of_week').isin([1,7]), True)\
      .otherwise(False)
    )\
    .withColumn('week_of_year', weekofyear(col('date')))\
    .withColumn("year_month", date_format(col("date"), "yyyy-MM"))
df_dim_date.createOrReplaceTempView("dim_date")



### ファクトテーブル

In [0]:
# ###### ファクトテーブル ######
df_fact_prep = df_silver_order_details\
    .join(df_silver_products, "product_id", "left") \
    .join(df_silver_orders, "order_id", "left") \
    .join(df_silver_employees, "employee_id", "left") \
    .join(df_silver_customers, "customer_id", "left")\
# 注文明細に商品情報を結合
df_fact_sales = df_fact_prep\
    .join(df_dim_customers, "customer_id", "left")\
    .join(df_dim_products, "product_id", "left")\
    .join(df_dim_date, col('order_date') == col('date'), "left")\
    .join(df_dim_categories, "category_id", "left")\
    .join(df_dim_countries, 'country_name', "left")\
    .join(df_dim_employees, 'employee_id', "left")\
    .select(
        col("order_id"),
        col("customer_key"),
        col("product_key"),
        col("date_key"),
        col("category_key"),
        col("country_key"),
        col("employee_key"),
        col("date"),
        col("quantity"),
        col("detail_total_amount"),
        col("discount"),
        col("required_date"),
        col("shipped_date"),

    )\
    .withColumn("created_at", current_timestamp())


print("ファクトテーブル")
display(df_fact_sales.limit(5))
df_fact_sales.createOrReplaceTempView("fact_sales")

## 分析

## 売り上げ分析マート

In [0]:
print("\n年月別分析")

df_analyze_sales_each_month = spark.sql(f"""
  SELECT 
    d.year,
    d.month,
    SUM(f.detail_total_amount) as total_sales_amount,
    COUNT(DISTINCT f.order_id) as order_count,
    SUM(f.quantity) as product_count,
    AVG(f.discount) as average_discount_rate,
    'aaa' as dummy_column
  FROM fact_sales f
  JOIN dim_date d ON f.date_key = d.date_key
  GROUP BY d.year, d.month
  ORDER BY d.year, d.month
""")
display(df_analyze_sales_each_month.limit(5))


print("\n顧客別分析")
df_analyze_sales_customer = spark.sql(f"""
  SELECT 
    c.customer_key,
    SUM(f.detail_total_amount) as total_sales_amount,
    COUNT(DISTINCT f.order_id) as order_count,
    SUM(f.quantity) as product_count,
    AVG(f.discount) as average_discount_rate,
    'aaa' as dummy_column
  FROM fact_sales f
  JOIN dim_customers c ON f.customer_key = c.customer_key
  GROUP BY c.customer_key
  ORDER BY c.customer_key
""")
display(df_analyze_sales_customer.limit(5))


print("\n商品別分析")
df_analyze_sales_product = spark.sql(f"""
  SELECT 
    p.product_key,
    SUM(f.detail_total_amount) as total_sales_amount,
    COUNT(DISTINCT f.order_id) as order_count,
    SUM(f.quantity) as product_count,
    AVG(f.discount) as average_discount_rate,
    'aaa' as dummy_column
  FROM fact_sales f
  JOIN dim_products p ON f.customer_key = p.product_key
  GROUP BY p.product_key
  ORDER BY p.product_key
""")
display(df_analyze_sales_product.limit(5))



print("\n国別別分析")
df_analyze_sales_country = spark.sql(f"""
  SELECT 
    c.country_key,
    c.country_name,
    SUM(f.detail_total_amount) as total_sales_amount,
    COUNT(DISTINCT f.order_id) as order_count,
    SUM(f.quantity) as product_count,
    AVG(f.discount) as average_discount_rate,
    'aaa' as dummy_column
  FROM fact_sales f
  JOIN dim_countries c ON f.country_key = c.country_key
  GROUP BY c.country_key, c.country_name
  ORDER BY c.country_key
""")
display(df_analyze_sales_country.limit(5))





### 顧客分析マート

In [0]:
df_analyze_customer = spark.sql(f"""
  SELECT 
    c.company_name,
    SUM(f.detail_total_amount) as total_sales,
    COUNT(DISTINCT f.order_id) as order_count,
    MIN(f.date) AS first_order_date,
    MAX(f.date) AS last_order_date,
    CASE
      WHEN total_sales >= 20000 THEN "aaaa"
      WHEN 20000 > total_sales AND total_sales >= 10000 THEN "bbbb"
      ELSE "ccccc"
    END as avg_order_value
  FROM fact_sales f
  JOIN dim_customers c ON f.customer_key = c.customer_key
  GROUP BY c.customer_key, c.company_name
  ORDER BY c.customer_key
""")

display(df_analyze_customer.limit(5))


### 従業員分析マート

In [0]:
df_analyze_employee = spark.sql(f"""
  SELECT 
    e.employee_name,
    SUM(f.detail_total_amount) as total_sales,
    COUNT(DISTINCT f.order_id) as order_count,
    MIN(f.date) AS first_order_date,
    MAX(f.date) AS last_order_date
  FROM fact_sales f
  JOIN dim_employees e ON f.employee_key = e.employee_key
  GROUP BY e.employee_key, e.employee_name
  ORDER BY e.employee_key
""")

display(df_analyze_employee.limit(5))
