## ※店舗マスタのCSVを取込み、Silverとして扱う

In [0]:
%run /Workspace/Users/takahiro.koizumi@nsw.co.jp/EX_error_log_export

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp
from pyspark.sql.functions import col
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import lit
from datetime import datetime, timedelta

In [0]:
# Spark session作成
def spark_session(app_name):
    print("Spark session 作成中…")
    try:
        spark = SparkSession.builder.appName(app_name).getOrCreate()
        print("Spark session 作成完了")
        return spark

    # エラー処理
    except Exception as e:
        # ログを出力する
        logger = setup_logging(logtype="error")
        logger.error(f"Spark session 作成中にエラーが発生しました: {e}")
        return False

In [0]:
# csvのDataFrame化
def csv_to_dataframe(spark, csv_file, temp_view_name):
    print("CSVファイルをDataFrameに変換中…")
    try:
        # 各CSVファイルを読み込み、DataFrameを作成
        csv_df = spark.read.csv(csv_file, header=True, inferSchema=True,nullValue="NULL")
        
        # 一時ビューを作成
        temp_view = csv_df.createOrReplaceTempView(temp_view_name)
        
        # レコード数（行数）を取得
        row_count_csv = csv_df.count()
        print(f"変換完了: {row_count_csv}件")
        return True

    # エラー処理
    except Exception as e:
        logger = setup_logging(logtype="error")
        logger.error(f"Error: {e}")
        print(f"csvファイルの変換に失敗しました。エラー: {e}")
        return False

In [0]:
# Silverテーブルを作成
def create_table(Silver_table_name):
    print(f"{Silver_table_name}の空テーブルを作成中…")
    try:
        spark.sql(
            f"""
        CREATE TABLE { Silver_table_name } (
            ID varchar(8) NOT NULL,
            COUNTRY_CODE varchar(2),
            COUNTRY_NAME varchar(120),
            COMPANY_CODE varchar(2),
            COMPANY_NAME varchar(120),
            SHOP_NAME varchar(120),
            SHOP_SIMPLE_NAME varchar(120),
            SHOP_ATTRIBUTION varchar(8),
            OPENING_DATE date,
            CLOSING_DATE date,
            BLOCK_LEADER_CODE varchar(8),
            BLOCK_LEADER_NAME varchar(120),
            SHOP_MANAGER_CODE varchar(8),
            SHOP_MANAGER_NAME varchar(120),
            BUSINESS_TYPE_CODE varchar(8),
            BUSINESS_TYPE_NAME varchar(120),
            SHOP_CLASS_CODE varchar(8),
            SHOP_CLASS_NAME varchar(120),
            SHOP_TYPE_CODE varchar(8),
            SHOP_TYPE_NAME varchar(120),
            LARGE_AREA_CODE varchar(8),
            LARGE_AREA_NAME varchar(120),
            MIDDLE_AREA_CODE varchar(8),
            MIDDLE_AREA_NAME varchar(120),
            SMALL_AREA_CODE varchar(8),
            SMALL_AREA_NAME varchar(120),
            FLOOR_SPACE numeric(8, 2),
            UNIT_QUANTITY integer,
            CUSTOMER_PRICE_GROUP varchar(32),
            BUILDIHG_TYPE_CODE varchar(12),
            BUILDIHG_TYPE_NAME varchar(120),
            LARGE_DEVELOPER_CODE varchar(40),
            LARGE_DEVELOPER_NAME varchar(120),
            MIDDLE_DEVELOPER_CODE varchar(40),
            MIDDLE_DEVELOPER_NAME varchar(120),
            SMALL_DEVELOPER_CODE varchar(40),
            SMALL_DEVELOPER_NAME varchar(120),
            WAREHOUSE_CODE varchar(8),
            WAREHOUSE_NAME varchar(120),
            PRIMARY KEY(ID)
        ) USING DELTA
        """
        )
        print(f"作成完了")
        return True

    # エラー処理
    except Exception as e:
        logger = setup_logging(logtype="error")
        logger.error(f"Error: {e}")
        print(f"csvファイルの変換に失敗しました。エラー: {e}")
        return False

In [0]:
# 一時ビューから新しいテーブルにデータを挿入
def insert_data(silver_table_name):
    print(f"{silver_table_name}にデータを挿入中…")
    try:
        spark.sql(
            f"""
        INSERT INTO {silver_table_name}
            SELECT
                ID,
                COUNTRY_CODE,
                COUNTRY_NAME,
                COMPANY_CODE,
                COMPANY_NAME,
                SHOP_NAME,
                SHOP_SIMPLE_NAME,
                SHOP_ATTRIBUTION,
                OPENING_DATE,
                CLOSING_DATE,
                BLOCK_LEADER_CODE,
                BLOCK_LEADER_NAME,
                SHOP_MANAGER_CODE,
                SHOP_MANAGER_NAME,
                BUSINESS_TYPE_CODE,
                BUSINESS_TYPE_NAME,
                SHOP_CLASS_CODE,
                SHOP_CLASS_NAME,
                SHOP_TYPE_CODE,
                SHOP_TYPE_NAME,
                LARGE_AREA_CODE,
                LARGE_AREA_NAME,
                MIDDLE_AREA_CODE,
                MIDDLE_AREA_NAME,
                SMALL_AREA_CODE,
                SMALL_AREA_NAME,
                coalesce(FLOOR_SPACE,0) AS FLOOR_SPACE,
                coalesce(UNIT_QUANTITY,0) AS UNIT_QUANTITY,
                CUSTOMER_PRICE_GROUP,
                BUILDIHG_TYPE_CODE,
                BUILDIHG_TYPE_NAME,
                LARGE_DEVELOPER_CODE,
                LARGE_DEVELOPER_NAME,
                MIDDLE_DEVELOPER_CODE,
                MIDDLE_DEVELOPER_NAME,
                SMALL_DEVELOPER_CODE,
                SMALL_DEVELOPER_NAME,
                WAREHOUSE_CODE,
                WAREHOUSE_NAME
            FROM {temp_view_name}
        """
        )
        row_count_insert = spark.read.table(silver_table_name).count()
        print(f"挿入完了 　総データ件数:{row_count_insert}件")
        return True
    
    # エラー処理
    except Exception as e:
        logger = setup_logging(logtype="error")
        logger.error(f"Error: {e}")
        print(f"Silverテーブル作成に失敗しました。エラー: {e}")
        return False

In [0]:
# 作成したSilverテーブルの全件削除
def delete_all_data(silver_table_name):
    print(f"{silver_table_name}の全データを削除中…")
    try:
        spark.sql(
            f"""
        TRUNCATE TABLE {silver_table_name}
        """
        )
        print(f"削除完了")
        return True
    except Exception as e:
        logger = setup_logging(logtype="error")
        logger.error(f"Error: {e}")
        print(f"削除に失敗しました。エラー: {e}")
        return False

In [0]:
# 　実行関数
def silver_layer(csv_file, temp_view_name, silver_table_name):
    # Silverテーブルが存在しない場合
    if not spark.catalog.tableExists(silver_table_name):
        print(f"{silver_table_name}が存在しないため新規作成します。")
        # sppark session起動
        spark_session(app_name)

        # csvファイルをDataFrameに変換
        csv_to_dataframe(spark, csv_file, temp_view_name)
        
        # Silverテーブルを作成
        create_table(silver_table_name)
        
        # Silverテーブルにデータを挿入
        insert_data(silver_table_name)
        return True

    # Silverテーブルが存在する場合
    else:
        print(f"{silver_table_name}が存在するので全件入替を実施します。")
        # sppark session起動
        spark_session(app_name)
        
        # 既存レコードの全削除
        delete_all_data(silver_table_name)

        # csvファイルをDataFrameに変換
        csv_to_dataframe(spark, csv_file, temp_view_name)

        # Silverテーブルにデータを挿入
        insert_data(silver_table_name)
        return True
    

In [0]:
# 昨日日付を取得する
def generate_csv_path_yesterday(base_path: str, prefix: str) -> str:
    yesterday = (datetime.today() - timedelta(days=1)).strftime("%Y%m%d")
    return f"{base_path}/{prefix}_{yesterday}.csv"

In [0]:
# app_name を指定してください。
app_name = "Silver_layer"
# ディレクトリパス
base_path ="/FileStore/tables"
# テーブル種類（マスタ名）
prefix = "shop"
# 完成したCSVファイルのパス（昨日日付）
csv_file = [generate_csv_path_yesterday(base_path, prefix)]

# 一時ビュー名を指定してください。
temp_view_name = "temp_dim_shop"
# Silverテーブル名を入力してください。
silver_table_name = "prd_im_dlh.silver.dim_shop"

# 実行
silver_layer(csv_file, temp_view_name, silver_table_name)