# ETLの処理例
object storage のCSVファイルをSparkでデータ変換し、変換済のデータをADWにロードする

In [1]:
# オブジェクトストレージのparquetを読み込み
#wine_df = spark.read.option("header", True).format("parquet").load(f'/Volumes/object_storage/default/output/predictions/')
#wine_df.show()

wine_df = spark.read.format("delta").load(f'/Volumes/object_storage/default/output/predictions')
wine_df.show(5, False)

+-----------+-------+----------+----+-----------------+---------+-------------+----------+--------------------+---------------+---------------+----+----------------------------+-------+----------------+-------------------+------------------------------------------------+----------+
|class_label|alcohol|malic_acid|ash |alcalinity_of_ash|magnesium|total_phenols|flavanoids|nonflavanoid_phenols|proanthocyanins|color_intensity|hue |od280_od315_of_diluted_wines|proline|features        |rawPrediction      |probability                                     |prediction|
+-----------+-------+----------+----+-----------------+---------+-------------+----------+--------------------+---------------+---------------+----+----------------------------+-------+----------------+-------------------+------------------------------------------------+----------+
|1          |13.05  |1.65      |2.55|18.0             |98       |2.45         |2.43      |0.29                |1.44           |4.25           |1.12|2.5

In [1]:
# 簡単な集計処理(class_label ごとの件数、平均アルコール度数、平均色の濃さを計算)
from pyspark.sql.functions import count, avg

wine_average_df = wine_df.groupBy("class_label") \
    .agg(
        count("*").alias("count"),
        avg("alcohol").alias("avg_alcohol"),
        avg("color_intensity").alias("avg_color_intensity")
    ) \
    .orderBy("class_label")

wine_average_df.show()

+-----------+-----+------------------+-------------------+
|class_label|count|       avg_alcohol|avg_color_intensity|
+-----------+-----+------------------+-------------------+
|          1|   14|13.769285714285713|               5.55|
|          2|    9|12.448888888888888| 2.9066666666666667|
|          3|    6|13.266666666666667|              6.555|
+-----------+-----+------------------+-------------------+



In [1]:
# 事前にADWのカタログを作成しておく

# 集計結果をADWにロード
wine_average_df.write.mode("overwrite").saveAsTable("adw.ksonoda.wine_average")

In [1]:
%sql
select * from adw.ksonoda.wine_average

# コードで ADWのexternal catalogを設定するケース

In [1]:
user=oidlUtils.parameters.getParameter("USER", "KSONODA")
passwd=oidlUtils.parameters.getParameter("PASSWD", "Welcome1#Welcome1#")
tns=oidlUtils.parameters.getParameter("TNS", "adw01_high")
wallet_passwd=oidlUtils.parameters.getParameter("WALLET_PASSWD", "Welcome1#Welcome1#")
wallet_path=oidlUtils.parameters.getParameter("WALLET_PATH", "/Workspace/demo/Wallet_adw01.zip")

In [1]:
import base64

# bytearray で初期化
byte_array = bytearray()
try:
    with open(wallet_path, 'rb') as file:
        byte_array = file.read()   # bytearray にする必要はなく bytes のままでOK
except FileNotFoundError:
    print(f"Error: File not found: {wallet_path}")
except Exception as e:
    print(f"An error occurred: {e}")

# Base64 エンコード
wt = base64.b64encode(byte_array).decode("utf-8")

In [1]:
# SQL 文字列生成
create_sql = f"""
create external catalog if not exists adw options (
  'wallet.content' = '{wt}',
  'type' = 'ORACLE_ADW',
  'user.name' = '{user}',
  'tns' = '{tns}',
  'password' = '{passwd}',
  'wallet.password' = '{wallet_passwd}'
)
"""

print(create_sql)


create external catalog if not exists adw options (
  'wallet.content' = 'UEsDBBQACAgIAPdtQlsAAAAAAAAAAAAAAAALAAAAZXdhbGxldC5wZW2ll0cPq0q2tuf8ip6jFtnA4A7IOWdmJphgMNGkX3/Ze599Otyj/qT+kJDMWssuvKrqed/6+9/vixUkxfybYHJuYvsC/zfbVULGF/6mCcmP9N8BQ1FEg2efbPWe6ncj0TvMsg68+z6z/Gts3+XdUZh/XBzT5A7AVEJlMLvEcZPkKRnGO4rKOswuO4maKKnCRAHPMsLEssxfXgCrKAIs0dJ+mqorTb5l8gRpGoouWovh+Hz8GmNR0GlWO2GDt0R4M89NPnbfOL/YaT4Ai5v1D4/7g7CLr4sL4o7X3EX1j90QR34YZCSNqbO5XMghtBwej2JIrV3csYtBjWdVAMEKapTU1aORn+6wCNpWxJp8Pu1Ya4uQ6dNhIs5k36XC3sHojb/BvRBs78HrR1ZmVQkg++TvC4sdTa7WOhmCazom6PLZfKlci1kgfWuHQDq13PmZBKyc443TXtdRhlWACq8YIOsXF20K5VUPNDEFdaKfCbuUZBMNyPdJf16iJfjIANn4o/zAQTcGUhhWUcXHZ9gmuwA40YVIqNq5B+nViW6LaO7wyZ2lV35CRBqEsfL96YlPezIF/hXhPcHEpuoDRThT9LoAYR88hH0Z2SzaML9Cx54ifSiFZH3wtUSJOPUKLjCDbf9TCzNu1Fk12V5y1ow5pu+yBgj4bQbE2JVQqhHEIxvQ17anDk8jzhzn1kqSzTInSU5m+6Tp7lDah/nlfJY8eLn6ek9A0tmiWos1tF62qiCpd268UdP5cyCTcCwxf6+O/WrDvOG3VSesbYQJ11hW1PKJMCIYQHherkjOB6zKsObo7WRmG6Ma7LLhXz/Bmtg9CniR+8KifG2K74EhDTpsNfDrhq4cGEi/HgXFprRfm

In [1]:
spark.sql(create_sql).show(1000,False)

+-------+
|status |
+-------+
|CREATED|
+-------+

