# PySpark によるメタデータデプロイの実践

Spark におけるメタデータを保持する主なオブジェクトとして次のものがあり、CRUD（作成、読込、更新、削除）という観点で適切なメタデータデプロイが必要。Spark （+Delta Lake）を利用するメリットの１つにデータ参照のダウンタイムがほぼないことがあり、メタデータデプロイ時にもダウンタイムを最小限とすることが望ましい。

| #    | 取得元                    | 取得元の詳細                                                           |
| ---- | ------------------------- | ------------------------------------------------------------ |
| 1    | Delta Lake ディレクトリ | - _delta_log                                                 |
| 2    | メタストア                | - Spark Schema(Database)<br/>  - Spark Table<br/>  - Spark View |
| 3    | Spark Dataframe           | - SparkSession<br/>  - Data Sources                                                             |


Spark においては、メタストアにて多くのメタデータが格納される。一般的には Hive メタストアを用いることが多いが、Spark プロバイダー固有のメタストア（Databricks における Unity Catalog）を提供されていることがある。メタストアからメタデータを取得する際には、Spark 環境から`DESCRIBE`などのコマンドにより参照する。Delta Lake を用いる場合にはメタストアに存在しないメタデータが Delta Lake ディレクトリに格納されることもある。


メタデータのデプロイ方法としては命令型と宣言型があるが、Spark にて宣言型でデプロイを行うためのツールは公開されていないため、宣言型デプロイを行うためにはスクラッチでの開発が必要。

| #    | デプロイ方法 | 実施方法                                                     | 他データストアのツール例                                                     |
| ---- | ------------ | ------------------------------------------------------------ | ------------------------------------------------------------ |
| 1    | 命令型       | デプロイ対象の DDL 文等の処理を随時実行する。                |                                                              |
| 2    | 宣言型       | デプロイ後のテーブル定義等を保持したをモデルの定義を行い、そのモデルとの差分を DDL 文等により反映する。 | - [Flyway](https://flywaydb.org/) <br />- [データ層アプリケーション (DAC) - SQL Server ](https://docs.microsoft.com/ja-jp/sql/relational-databases/data-tier-applications/data-tier-applications?view=sql-server-ver16) |


、メタデータの更新を行う際には、ALTER 文の実行のみでは完結しないものがあることに注意が必要。

- ALTER 文を実行のみで可能
    1. テーブル名を変更
    2. カラムの追加
    3. カラムのコメントを変更
    4. カラム順の変更
    5. テーブルプロパティ（TBLPROPERTIES）の変更
    6. Not NULL制約の設定と削除
    7. Check制約の設定と削除
- データの再書き込みが必要
    1. カラム名を変更
    2. カラムを削除
    3. カラムのデータ型を変更
    4. パーティションカラムを変更
    5. 生成列の追加
- 他のパスへのデータの書き込みが必要
    1. テーブルの保存場所（LOCATION）の変更
- Optimizeの実行が必要
    1. bloomfitlerの設定

## 事前準備

In [0]:
from pyspark.sql import Row
from decimal import *
import datetime
import pprint

In [0]:
import random, string
random_string = ''.join(random.choices(string.ascii_letters, k=5))

In [0]:
db_name = f'_metadata_test_database_{random_string}'
print(db_name)

In [0]:
# データベースを作成
sql = f'DROP DATABASE IF EXISTS {db_name} CASCADE'
spark.sql(sql)

sql = f'CREATE DATABASE {db_name}'
spark.sql(sql)

In [0]:
table_name      = '_metadata_test_table'
table_full_name = f'{db_name}.{table_name}'

sample_Data = [
    Row(
        string_column    = 'AAA', 
        byte_column      = 1, 
        integer_column   = 1, 
        bigint_column    = 1, 
        float_column     = 12.300000190734863, 
        double_column    = 12.3, 
        numeric_column   = Decimal('12'), 
        boolean_column   = True, 
        date_column      = datetime.date(2020, 1, 1), 
        timestamp_column = datetime.datetime(2021, 1, 1, 0, 0), 
        binary_column    = bytearray(b'A'), 
        struct_column    = Row(
            struct_string_column = 'AAA', 
            struct_int_column    = 1
        ), 
        array_column     = ['AAA', 'BBB', 'CCC'], 
        map_column       = {'AAA': 1}
    )
]


schema = '''
--文字型
string_column string,

--整数型
byte_column byte,
integer_column integer,
bigint_column bigint,

--浮動小数点型
float_column float,
double_column double,
numeric_column numeric,

--真偽型
boolean_column boolean,

--日付時刻
date_column date, 
timestamp_column timestamp,

--バイナリー型
binary_column binary,

--複合型
struct_column struct<
    struct_string_column :string,
    struct_int_column    :int
>,
array_column array<string>, 
map_column map<string, int>
'''

df = spark.createDataFrame(sample_Data, schema)
df.display()

# テーブルを作成
spark.sql(f'DROP TABLE IF EXISTS {table_full_name}')
df.write.format('delta').saveAsTable(table_full_name)

string_column,byte_column,integer_column,bigint_column,float_column,double_column,numeric_column,boolean_column,date_column,timestamp_column,binary_column,struct_column,array_column,map_column
AAA,1,1,1,12.3,12.3,12,True,2020-01-01,2021-01-01T00:00:00.000+0000,QQ==,"List(AAA, 1)","List(AAA, BBB, CCC)",Map(AAA -> 1)


In [0]:
table_name_02      = '_metadata_test_table_02'
table_full_name_02 = f'{db_name}.{table_name_02}'

sql = f'''
DROP TABLE IF EXISTS {table_full_name_02}
'''
spark.sql(sql)

sql = f'''
CREATE TABLE {table_full_name_02}
(
    id BIGINT GENERATED ALWAYS AS IDENTITY
    ,string_column string
    ,integer_column integer
    ,date_column date
    ,generated_col INT GENERATED ALWAYS AS (integer_column)
)
USING delta
TBLPROPERTIES (
    delta.autoOptimize.optimizeWrite = True, 
    delta.autoOptimize.autoCompact   = True,
    delta.dataSkippingNumIndexedCols = 1
  )
PARTITIONED BY (
    string_column
)
COMMENT "table_comment"
'''
spark.sql(sql)


In [0]:
table_name_03      = '_metadata_test_table_03'
table_full_name_03 = f'{db_name}.{table_name_03}'

sql = f'''
DROP TABLE IF EXISTS {table_full_name_03}
'''
spark.sql(sql)

sql = f'''
CREATE OR REPLACE TABLE {table_full_name_03}
(
    col_001 string
    ,col_002 string
    ,col_003 string
)
USING delta
TBLPROPERTIES (
    delta.autoOptimize.optimizeWrite = True, 
    delta.autoOptimize.autoCompact   = True,
    delta.dataSkippingNumIndexedCols = 1
  )
'''
spark.sql(sql)


In [0]:
view_name      = '_v_metadata_test_table'
view_full_name = f'{db_name}.{view_name}'

sql = f'''
CREATE OR REPLACE VIEW {view_full_name}
AS
SELECT
  *
  FROM
    {table_full_name}
'''

spark.sql(sql)

## Delta Lake ディレクトリにおけるメタデータ

Delta Lake ディレクトリでは、直下に作成される`_delta_log`に情報が格納される。保持する情報については、Delta Lake の Gtihub レポジトリにて詳細が記載されている。

- [Understanding the Delta Lake Transaction Log - Databricks Blog](https://www.databricks.com/blog/2019/08/21/diving-into-delta-lake-unpacking-the-transaction-log.html)
- [delta/PROTOCOL.md at master · delta-io/delta · GitHub](https://github.com/delta-io/delta/blob/master/PROTOCOL.md)

この章では、`_delta_log`にどのような情報が格納されているかをコード実行を通して説明する。

In [0]:
# DESCRIBE の実行結果は、メタストアから取得
sql = f'DESCRIBE {table_full_name_02}'

spark.sql(sql).display()

col_name,data_type,comment
id,bigint,
string_column,string,
integer_column,int,
date_column,date,
generated_col,int,
,,
# Partitioning,,
Part 0,string_column,


In [0]:
# DESCRIBE HISTORY の実行結果は、Delta lake テーブルのメタデータの一部を確認可能
sql = f'DESCRIBE HISTORY {table_full_name_02}'

# `userName` カラムにメールアドレスが含まれる
# spark.sql(sql).display()
spark.sql(sql).drop('userName').display()

version,timestamp,userId,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
0,2022-10-05T11:47:44.000+0000,538697268355820,CREATE TABLE,"Map(isManaged -> true, description -> table_comment, partitionBy -> [""string_column""], properties -> {""delta.autoOptimize.autoCompact"":""true"",""delta.dataSkippingNumIndexedCols"":""1"",""delta.autoOptimize.optimizeWrite"":""true""})",,List(1640450761247157),0401-001605-ka2e9r5g,,WriteSerializable,True,Map(),,Databricks-Runtime/10.4.x-scala2.12


In [0]:
file_path = spark.sql(f'DESC EXTENDED {table_full_name_02}').where("col_name = 'Location'").select('data_type').collect()[0][0]

In [0]:
display(dbutils.fs.ls(file_path))
display(dbutils.fs.ls(f'{file_path}/_delta_log'))

path,name,size,modificationTime
dbfs:/user/hive/warehouse/_metadata_test_database_sccax.db/_metadata_test_table_02/_delta_log/,_delta_log/,0,1664970465000


path,name,size,modificationTime
dbfs:/user/hive/warehouse/_metadata_test_database_sccax.db/_metadata_test_table_02/_delta_log/00000000000000000000.crc,00000000000000000000.crc,2435,1664970465000
dbfs:/user/hive/warehouse/_metadata_test_database_sccax.db/_metadata_test_table_02/_delta_log/00000000000000000000.json,00000000000000000000.json,1652,1664970464000
dbfs:/user/hive/warehouse/_metadata_test_database_sccax.db/_metadata_test_table_02/_delta_log/__tmp_path_dir/,__tmp_path_dir/,0,1664970465000


In [0]:
print(dbutils.fs.head(f'{file_path}/_delta_log/00000000000000000000.crc'))

In [0]:
# 下記については、ユーザー名（メールアドレス）が表示されるため実行結果共有時に注意が必要
# print(dbutils.fs.head(f'{file_path}/_delta_log/00000000000000000000.json'))

In [0]:
file_contents = dbutils.fs.head(f'{file_path}/_delta_log/00000000000000000000.json')
file_contents = file_contents.split('\n')

import json
print('--protocol--')
print(json.loads(file_contents[0]))
print('--metaData--')
print(json.loads(file_contents[1]))

# 下記については、ユーザー名（メールアドレス）が表示されるため実行結果共有時に注意が必要
# print('--Operation--')
# print(json.loads(file_contents[2]))

In [0]:
sql = f'''
CREATE BLOOMFILTER INDEX
ON {table_full_name_02}
FOR COLUMNS(
  integer_column
)
'''
spark.sql(sql)

In [0]:
display(dbutils.fs.ls(file_path))
display(dbutils.fs.ls(f'{file_path}/_delta_log'))

path,name,size,modificationTime
dbfs:/user/hive/warehouse/_metadata_test_database_sccax.db/_metadata_test_table_02/_delta_log/,_delta_log/,0,1664970471000


path,name,size,modificationTime
dbfs:/user/hive/warehouse/_metadata_test_database_sccax.db/_metadata_test_table_02/_delta_log/00000000000000000000.crc,00000000000000000000.crc,2435,1664970465000
dbfs:/user/hive/warehouse/_metadata_test_database_sccax.db/_metadata_test_table_02/_delta_log/00000000000000000000.json,00000000000000000000.json,1652,1664970464000
dbfs:/user/hive/warehouse/_metadata_test_database_sccax.db/_metadata_test_table_02/_delta_log/00000000000000000001.crc,00000000000000000001.crc,2579,1664970471000
dbfs:/user/hive/warehouse/_metadata_test_database_sccax.db/_metadata_test_table_02/_delta_log/00000000000000000001.json,00000000000000000001.json,1775,1664970471000
dbfs:/user/hive/warehouse/_metadata_test_database_sccax.db/_metadata_test_table_02/_delta_log/__tmp_path_dir/,__tmp_path_dir/,0,1664970471000


In [0]:
print(dbutils.fs.head(f'{file_path}/_delta_log/00000000000000000000.crc'))
print(dbutils.fs.head(f'{file_path}/_delta_log/00000000000000000001.crc'))

In [0]:
# 下記については、ユーザー名（メールアドレス）が表示されるため実行結果共有時に注意が必要
# print(dbutils.fs.head(f'{file_path}/_delta_log/00000000000000000001.json'))

In [0]:
file_contents = dbutils.fs.head(f'{file_path}/_delta_log/00000000000000000001.json')
file_contents = file_contents.split('\n')

import json
print('--metaData--')
print(json.loads(file_contents[0]))

# 下記については、ユーザー名（メールアドレス）が表示されるため実行結果共有時に注意が必要
# print('--Operation--')
# print(json.loads(file_contents[1]))

In [0]:
sql = f'''
INSERT INTO {table_full_name_02}
(
    string_column
    ,integer_column
    ,date_column
)
VALUES
('a', 1, '2020-01-01'),
('b', 2, '2020-02-01'),
('c', 3, '2020-03-01')

'''

spark.sql(sql)

In [0]:
display(dbutils.fs.ls(file_path))
display(dbutils.fs.ls(f'{file_path}/_delta_log'))

path,name,size,modificationTime
dbfs:/user/hive/warehouse/_metadata_test_database_sccax.db/_metadata_test_table_02/_delta_log/,_delta_log/,0,1664970474000
dbfs:/user/hive/warehouse/_metadata_test_database_sccax.db/_metadata_test_table_02/string_column=a/,string_column=a/,0,1664970473000
dbfs:/user/hive/warehouse/_metadata_test_database_sccax.db/_metadata_test_table_02/string_column=b/,string_column=b/,0,1664970473000
dbfs:/user/hive/warehouse/_metadata_test_database_sccax.db/_metadata_test_table_02/string_column=c/,string_column=c/,0,1664970473000


path,name,size,modificationTime
dbfs:/user/hive/warehouse/_metadata_test_database_sccax.db/_metadata_test_table_02/_delta_log/00000000000000000000.crc,00000000000000000000.crc,2435,1664970465000
dbfs:/user/hive/warehouse/_metadata_test_database_sccax.db/_metadata_test_table_02/_delta_log/00000000000000000000.json,00000000000000000000.json,1652,1664970464000
dbfs:/user/hive/warehouse/_metadata_test_database_sccax.db/_metadata_test_table_02/_delta_log/00000000000000000001.crc,00000000000000000001.crc,2579,1664970471000
dbfs:/user/hive/warehouse/_metadata_test_database_sccax.db/_metadata_test_table_02/_delta_log/00000000000000000001.json,00000000000000000001.json,1775,1664970471000
dbfs:/user/hive/warehouse/_metadata_test_database_sccax.db/_metadata_test_table_02/_delta_log/00000000000000000002.crc,00000000000000000002.crc,2620,1664970474000
dbfs:/user/hive/warehouse/_metadata_test_database_sccax.db/_metadata_test_table_02/_delta_log/00000000000000000002.json,00000000000000000002.json,2857,1664970474000
dbfs:/user/hive/warehouse/_metadata_test_database_sccax.db/_metadata_test_table_02/_delta_log/__tmp_path_dir/,__tmp_path_dir/,0,1664970474000


In [0]:
print(dbutils.fs.head(f'{file_path}/_delta_log/00000000000000000001.crc'))
print(dbutils.fs.head(f'{file_path}/_delta_log/00000000000000000002.crc'))

In [0]:
# 下記については、ユーザー名（メールアドレス）が表示されるため実行結果共有時に注意が必要
# print(dbutils.fs.head(f'{file_path}/_delta_log/00000000000000000002.json'))

In [0]:
file_contents = dbutils.fs.head(f'{file_path}/_delta_log/00000000000000000002.json')
file_contents = file_contents.split('\n')

import json
print('--Operation--')
print(json.loads(file_contents[0]))
print('--Operation--')
print(json.loads(file_contents[1]))
print('--Operation--')
print(json.loads(file_contents[2]))

# 下記については、ユーザー名（メールアドレス）が表示されるため実行結果共有時に注意が必要
print('--commitInfo--')
print(json.loads(file_contents[3]))

## メタデータのデプロイ

### メタデータの命令型デプロイ

In [0]:
spark.table(table_full_name_03).printSchema()

In [0]:
sql = f'''
ALTER TABLE {table_full_name_03}
  ADD COLUMN 
  (
    col_004 string
  )
'''

spark.sql(sql)

In [0]:
spark.table(table_full_name_03).printSchema()

### メタデータの宣言型デプロイ

In [0]:
# 現在のテーブルカラムと設定値を比較して差分がある場合にカラムを追加するメソッドを追加
import inspect

def add_cols_to_spark_tbl(
    cols_conf,
    tbl_name,
):
    """Spark テーブルに対してカラムを追加
    """
    # 期待値のカラムリストを取得
    expected_cols_list = [d["name"] for d in cols_conf]

    # 実際のカラムリストを取得
    actual_cols_list = spark.table(tbl_name).columns

    # 設定値のカラムと実際のカラムを比較して、追加対象のカラム一覧を取得
    cols_to_be_add = list(set(sorted(expected_cols_list)) - set(sorted(actual_cols_list)))

    # 現在のテーブルカラムと設定値を比較して差分がある場合にカラムを追加
    for col_to_be_add in cols_to_be_add:
        for col_info_to_be_executed in [col_info for col_info in cols_conf if col_info['name'] == col_to_be_add]:
            col_name = col_info_to_be_executed['name']
            data_type = col_info_to_be_executed['type']
            add_col_ddl = f'''
            ALTER TABLE {tbl_name}
              ADD COLUMNS {col_name} {data_type}'''
            add_col_ddl = inspect.cleandoc(add_col_ddl)
            spark.sql(add_col_ddl)

            # 実行した DDL を表示
            print(add_col_ddl)

In [0]:
# 現在のテーブルと同義の設定値を定義
tbl_conf = {
    'table_name': table_full_name_03,
    'schema': [
        {'name': 'col_001', 'type': 'STRING'},
        {'name': 'col_002', 'type': 'STRING'},
        {'name': 'col_003', 'type': 'string'},
        {'name': 'col_004', 'type': 'STRING'},
    ],
}

In [0]:
# カラム追加の処理が実行されない
add_cols_to_spark_tbl(
    cols_conf = tbl_conf['schema'],
    tbl_name = tbl_conf['table_name'],
)

In [0]:
# `col_005`を追加した設定値を定義
tbl_conf = {
    'table_name': table_full_name_03,
    'schema': [
        {'name': 'col_001', 'type': 'STRING'},
        {'name': 'col_002', 'type': 'STRING'},
        {'name': 'col_003', 'type': 'string'},
        {'name': 'col_004', 'type': 'STRING'},
        {'name': 'col_005', 'type': 'STRING'},
    ],
}

In [0]:
# カラム追加の処理が実行される想定
add_cols_to_spark_tbl(
    cols_conf = tbl_conf['schema'],
    tbl_name = tbl_conf['table_name'],
)

In [0]:
# `col_005`が追加されたことを確認
spark.table(table_full_name_03).printSchema()

## Spark のデータオブジェクト のメタデータ取得方法

### Spark Schema (Database) のメタデータ取得方法

#### DESCRIBE DATABASE

In [0]:
sql = f"DESCRIBE DATABASE {db_name}"

spark.sql(sql).display()

database_description_item,database_description_value
Namespace Name,_metadata_test_database_sCcaX
Comment,
Location,dbfs:/user/hive/warehouse/_metadata_test_database_sccax.db
Owner,root


#### SHOW SCHEMAS

In [0]:
sql = f"SHOW SCHEMAS LIKE '{db_name}'"

spark.sql(sql).display()

databaseName
_metadata_test_database_sccax


#### SHOW TABLES

In [0]:
sql = f"SHOW TABLES FROM {db_name}"

spark.sql(sql).display()

database,tableName,isTemporary
_metadata_test_database_sccax,_metadata_test_table,False
_metadata_test_database_sccax,_metadata_test_table_02,False
_metadata_test_database_sccax,_metadata_test_table_03,False
_metadata_test_database_sccax,_v_metadata_test_table,False


#### SHOW VIEWS

In [0]:
sql = f"SHOW VIEWS FROM {db_name}"

spark.sql(sql).display()

namespace,viewName,isTemporary
_metadata_test_database_sccax,_v_metadata_test_table,False


### Spark Table のメタデータ

#### SHOW CREATE TABLE

In [0]:
sql = f'SHOW CREATE TABLE {table_full_name}'

spark.sql(sql).display()
print(spark.sql(sql).collect()[0][0]) 

createtab_stmt
"CREATE TABLE spark_catalog._metadata_test_database_sccax._metadata_test_table (  string_column STRING,  byte_column TINYINT,  integer_column INT,  bigint_column BIGINT,  float_column FLOAT,  double_column DOUBLE,  numeric_column DECIMAL(10,0),  boolean_column BOOLEAN,  date_column DATE,  timestamp_column TIMESTAMP,  binary_column BINARY,  struct_column STRUCT,  array_column ARRAY,  map_column MAP) USING delta TBLPROPERTIES (  'Type' = 'MANAGED',  'delta.minReaderVersion' = '1',  'delta.minWriterVersion' = '2')"


#### DESCRIBE

In [0]:
sql = f'DESCRIBE {table_full_name}'

spark.sql(sql).display()

col_name,data_type,comment
string_column,string,
byte_column,tinyint,
integer_column,int,
bigint_column,bigint,
float_column,float,
double_column,double,
numeric_column,"decimal(10,0)",
boolean_column,boolean,
date_column,date,
timestamp_column,timestamp,


In [0]:
sql = f'DESCRIBE {table_full_name} integer_column'

spark.sql(sql).display()

info_name,info_value
col_name,integer_column
data_type,int
comment,


#### DESCRIBE EXTENDED

In [0]:
sql = f'DESCRIBE EXTENDED {table_full_name}'

spark.sql(sql).display()

col_name,data_type,comment
string_column,string,
byte_column,tinyint,
integer_column,int,
bigint_column,bigint,
float_column,float,
double_column,double,
numeric_column,"decimal(10,0)",
boolean_column,boolean,
date_column,date,
timestamp_column,timestamp,


In [0]:
sql = f'DESCRIBE EXTENDED {table_full_name} integer_column'

spark.sql(sql).display()

info_name,info_value
col_name,integer_column
data_type,int
comment,
min,
max,
num_nulls,
distinct_count,
avg_col_len,
max_col_len,
histogram,


In [0]:
# 統計情報取得後に、カラムの統計情報を取得
sql = f'ANALYZE TABLE {table_full_name} COMPUTE STATISTICS FOR COLUMNS integer_column'
spark.sql(sql).display()
sql = f'DESCRIBE EXTENDED {table_full_name} integer_column'
spark.sql(sql).display()

info_name,info_value
col_name,integer_column
data_type,int
comment,
min,1
max,1
num_nulls,0
distinct_count,1
avg_col_len,4
max_col_len,4
histogram,


#### DESCRIBE QUERY

In [0]:
sql = f'''
DESCRIBE QUERY
SELECT
  string_column
  ,integer_column
  ,COUNT(*) AS COUNT

  FROM 
    {table_full_name}
  GROUP BY
    string_column
    ,integer_column

'''

spark.sql(sql).display()

col_name,data_type,comment
string_column,string,
integer_column,int,
COUNT,bigint,


#### SHOW COLUMNS

In [0]:
sql = f'SHOW COLUMNS IN {table_full_name}'

spark.sql(sql).display()

col_name
string_column
byte_column
integer_column
bigint_column
float_column
double_column
numeric_column
boolean_column
date_column
timestamp_column


#### SHOW TBLPROPERTIES

In [0]:
sql = f'SHOW TBLPROPERTIES {table_full_name}'

spark.sql(sql).display()

key,value
Type,MANAGED
delta.minReaderVersion,1
delta.minWriterVersion,2


### Delta Lake テーブルのメタデータ取得

#### DESCRIBE DETAIL

In [0]:
sql = f'DESCRIBE DETAIL {table_full_name}'

spark.sql(sql).display()

format,id,name,description,location,createdAt,lastModified,partitionColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion
delta,80c0f691-1fd3-45d8-a87d-544b1f988d25,_metadata_test_database_sccax._metadata_test_table,,dbfs:/user/hive/warehouse/_metadata_test_database_sccax.db/_metadata_test_table,2022-10-05T11:47:41.245+0000,2022-10-05T11:47:42.000+0000,List(),1,4832,Map(),1,2


#### DESCRIBE HISTORY

In [0]:
sql = f'DESCRIBE HISTORY {table_full_name}'

# `userName` カラムにメールアドレスが含まれる
# spark.sql(sql).display()
spark.sql(sql).drop('userName').display()

version,timestamp,userId,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
0,2022-10-05T11:47:42.000+0000,538697268355820,CREATE TABLE AS SELECT,"Map(isManaged -> true, description -> null, partitionBy -> [], properties -> {})",,List(1640450761247157),0401-001605-ka2e9r5g,,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 1, numOutputBytes -> 4832)",,Databricks-Runtime/10.4.x-scala2.12


#### `Delta Transaction Log`上のメタデータ

In [0]:
import json

# 最新の Delta Transaction Log の version を取得
max_version = (
    spark
    .sql(f'DESC HISTORY {table_full_name_02}')
    .sort('version', ascending=False)
    .select('version')
    .limit(1)
    .collect()[0][0]
)

# Delta Lake テーブルのファイルパスを取得
file_path = spark.sql(f'DESC EXTENDED {table_full_name_02}').where("col_name = 'Location'").select('data_type').collect()[0][0]

file_content = dbutils.fs.head(f'{file_path}/_delta_log/{max_version:020}.json')

# １つファイルに内に格納される json 文字列群から、`metaData`の文字列を取得
delta_log_metadata = json.loads(
    [l for l in file_content.split('\n') if l != '' and json.loads(l).get('metaData') != None][0]
)

pprint.pprint(delta_log_metadata)

### Spark Dataframe のメタデータ

#### DataFrame.printSchema

In [0]:
df.printSchema()

#### DataFrame.schema

In [0]:
df.schema

#### toDDL(DDL 文字列)

In [0]:
df_schema = df.schema
pprint.pprint(spark.sparkContext._jvm.org.apache.spark.sql.types.DataType.fromJson(df_schema.json()).toDDL())

#### DataFrame.columns

In [0]:
df.columns

#### DataFrame.dtypes

In [0]:
df.dtypes

#### DataFrame.storageLevel

In [0]:
df.storageLevel

#### DataFrame.inputFiles

In [0]:
filepath = "dbfs:/databricks-datasets/tpch/data-001/region/"

schema = """
    r_regionkey date,
    r_name string,
    r_comment string
"""
   
df_2 = (
    spark
    .read
    .format("csv")
    .schema(schema)
    .option("sep", "|")
    .load(filepath)
)

df_2.inputFiles()

#### Spark Config の設定値

In [0]:
spark_confs = spark.sparkContext.getConf().getAll()

spark.createDataFrame(spark_confs).limit(5).display()

_1,_2
spark.databricks.preemption.enabled,true
spark.databricks.clusterUsageTags.clusterFirstOnDemand,1
spark.sql.hive.metastore.jars,/databricks/databricks-hive/*
spark.driver.tempDirectory,/local_disk0/tmp
spark.sql.warehouse.dir,dbfs:/user/hive/warehouse


In [0]:
sql = 'set -v'

spark.sql(sql).limit(5).display()

key,value,meaning,Since version
com.databricks.sql.io.caching.consistentFileSplitting,true,"When true, the proration will still be enabled, but affect only the partition size. The split size will be pinned to a fixed value given by spark.sql.files.maxPartitionBytes. [FilePartition]s will be created prioritizing the locality of the split files.",2.0.0
spark.databricks.adaptive.autoBroadcastJoinThreshold,31457280b,Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join when adaptive execution is enabled. By setting this value to -1 broadcasting can be disabled.,
spark.databricks.adaptive.autoOptimizeShuffle.aggregateRatio,0.1,The aggregate output/input row ratio employed to estimate aggregate output size in the calculation of the initial shuffle partition number when 'spark.sql.shuffle.partitions' is set to 'auto'.,
spark.databricks.adaptive.autoOptimizeShuffle.aggregateSpillFactor,0.1,The ratio of spilling by aggregate compared to sort of the same input size used in the calculation of the initial shuffle partition number when 'spark.sql.shuffle.partitions' is set to 'auto'.,
spark.databricks.adaptive.autoOptimizeShuffle.equalityFilterSelectivity,0.1,The filter selectivity of a single equality predicate employed to estimate filter output size in the calculation of the initial shuffle partition number when 'spark.sql.shuffle.partitions' is set to 'auto'.,


### Databricks 固有機能

#### SHOW GRANTS ON

In [0]:
# 下記については、ユーザー名（メールアドレス）が表示されるため実行結果共有時に注意が必要
# try:
#     sql = f'SHOW GRANTS ON {table_full_name}'
#     spark.sql(sql).display()
# except:
#     print('Databricks の テーブルアクセスコントロールができるクラスターで実行してください。')

## リソースのクリーンアップ

In [0]:
# データベースを作成
sql = f'DROP DATABASE IF EXISTS {db_name} CASCADE'
spark.sql(sql)