-sandbox
<h1>Delta Lake機能プライマー</h1>
<table>
  <tr><th>作者(Mod)</th><th>Masahiko Kitamura</th></tr>
  <tr><th>作者(Original)</th><th>Jixin Jia (Gin)</th></tr>
  <tr><td>期日</td><td>2021/10/15</td></tr>
  <tr><td>バージョン</td><td>2.0</td></tr>
</table>
<img style="margin-top:25px;" src="https://jixjiadatabricks.blob.core.windows.net/images/databricks-logo-small-new.png" width="140">
<hr>
<h3>データレイクに<span style="color='#38a'">信頼性</span>と<span style="color='#38a'">パフォーマンス</span>をもたらす</h3>
<p>本編はローン審査データを使用してDelta LakeでETLを行いながら、その主要機能に関して説明していきます。</p>
<div style="float:left; padding-right:60px; margin-top:20px; margin-bottom:200px;">
  <img src="https://jixjiadatabricks.blob.core.windows.net/images/delta-lake-square-black.jpg" width="220">
</div>

<div style="float:left; margin-top:0px; padding:0;">
  <h3>信頼性</h3>
  <br>
  <ul style="padding-left: 30px;">
    <li>次世代データフォーマット技術</li>
    <li>トランザクションログによるACIDコンプライアンス</li>
    <li>DMLサポート（更新、削除、マージ）</li>
    <li>データ品質管理　(スキーマージ・エンフォース)</li>
    <li>バッチ処理とストリーム処理の統合</li>
    <li>タイムトラベル (データのバージョン管理)</li>
   </ul>

  <h3>パフォーマンス</h3>
  <br>
  <ul style="padding-left: 30px;">
     <li>スケーラブルなメタデータ</li>
    <li>コンパクション (Bin-Packing)</li>
    <li>データ・インデックシング</li>
    <li>データ・スキッピング</li>
    <li>ZOrderクラスタリング</li>
    <li>ストリーム処理による低いレイテンシー</li>
  </ul>
</div>

<div style="display:block; clear:both; padding-top:20px;">
  <div style="background: #ff9; margin-top:10px;">
  <img src="https://jixjiadatabricks.blob.core.windows.net/images/exclamation-yellow.png" width="25px"><span style="padding-left:10px; padding-right:15px;">注) セル12、14と42は意図的にエラーを起こすよう作成しています</span>
</div>
</div>

In [0]:
import re

# Username を取得。
username_raw = dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().apply('user')
# Username の英数字以外を除去し、全て小文字化。Usernameをファイルパスやデータベース名の一部で使用可能にするため。
username = re.sub('[^A-Za-z0-9]+', '_', username_raw).lower()
dbname = f'delta_db_{username}'

print(f'>>> username => {username}')
print(f'>>> dbname => {dbname}')

##0. (参考)これまでのデータレイク

In [0]:
%fs head /databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv

In [0]:
df = (
  spark.read.format('csv')
  .option('Header', True)
  .option('inferSchema', True)
  .load('/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv') 
)

display(df)

_c0,carat,cut,color,clarity,depth,table,price,x,y,z
1,0.23,Ideal,E,SI2,61.5,55.0,326,3.95,3.98,2.43
2,0.21,Premium,E,SI1,59.8,61.0,326,3.89,3.84,2.31
3,0.23,Good,E,VS1,56.9,65.0,327,4.05,4.07,2.31
4,0.29,Premium,I,VS2,62.4,58.0,334,4.2,4.23,2.63
5,0.31,Good,J,SI2,63.3,58.0,335,4.34,4.35,2.75
6,0.24,Very Good,J,VVS2,62.8,57.0,336,3.94,3.96,2.48
7,0.24,Very Good,I,VVS1,62.3,57.0,336,3.95,3.98,2.47
8,0.26,Very Good,H,SI1,61.9,55.0,337,4.07,4.11,2.53
9,0.22,Fair,E,VS2,65.1,61.0,337,3.87,3.78,2.49
10,0.23,Very Good,H,VS1,59.4,61.0,338,4.0,4.05,2.39


In [0]:
df_cleaned = (
  df
  .where('price between 2000 and 2500') # カラムの条件
  .withColumn('magic_number', df.x * df.y + df.z ) # カラムの追加
  .select('carat', 'cut', 'color', 'clarity', 'price', 'magic_number') # カラムの抽出
)

display( df_cleaned )

carat,cut,color,clarity,price,magic_number
0.59,Very Good,F,SI1,2000,32.3
0.7,Ideal,J,VS1,2000,35.4724
0.7,Good,E,SI1,2000,33.7896
0.7,Fair,E,SI1,2000,36.3524
0.58,Very Good,F,VS1,2001,31.9116
0.77,Very Good,J,SI2,2001,38.8536
0.77,Very Good,J,SI2,2001,38.1144
0.72,Very Good,G,SI2,2001,35.9857
0.73,Very Good,H,SI1,2001,37.066
0.55,Ideal,E,VS1,2001,31.224000000000004


In [0]:
( 
  df_cleaned.write
  .format('json')
  .mode('overwrite')
  .save(f'/tmp/{username}/diamonds_json')
)

In [0]:
display( dbutils.fs.ls(f'/tmp/{username}/diamonds_json') )

path,name,size
dbfs:/tmp/masahiko_kitamura_databricks_com/diamonds_json/_committed_7033217148156085684,_committed_7033217148156085684,202
dbfs:/tmp/masahiko_kitamura_databricks_com/diamonds_json/_committed_8257997043505265100,_committed_8257997043505265100,202
dbfs:/tmp/masahiko_kitamura_databricks_com/diamonds_json/_committed_vacuum4291654736168110024,_committed_vacuum4291654736168110024,96
dbfs:/tmp/masahiko_kitamura_databricks_com/diamonds_json/_started_8257997043505265100,_started_8257997043505265100,0
dbfs:/tmp/masahiko_kitamura_databricks_com/diamonds_json/part-00000-tid-8257997043505265100-c0ac53bf-7584-42ad-8753-836c9374b41f-2851-1-c000.json,part-00000-tid-8257997043505265100-c0ac53bf-7584-42ad-8753-836c9374b41f-2851-1-c000.json,326089


In [0]:
%fs head dbfs:/tmp/masahiko_kitamura_databricks_com/diamonds_json/part-00000-tid-8257997043505265100-c0ac53bf-7584-42ad-8753-836c9374b41f-2851-1-c000.json

In [0]:
df.createOrReplaceTempView('diamonds')

df_sql = sql('''
  SELECT 
    carat, cut, color, clarity, price, x * y + z as magic_number
  FROM diamonds
''')

display( df_sql )

carat,cut,color,clarity,price,magic_number
0.23,Ideal,E,SI2,326,18.151
0.21,Premium,E,SI1,326,17.2476
0.23,Good,E,VS1,327,18.7935
0.29,Premium,I,VS2,334,20.396
0.31,Good,J,SI2,335,21.629
0.24,Very Good,J,VVS2,336,18.0824
0.24,Very Good,I,VVS1,336,18.191
0.26,Very Good,H,SI1,337,19.257700000000003
0.22,Fair,E,VS2,337,17.1186
0.23,Very Good,H,VS1,338,18.59


### 従来のデータレイクの制限と限界

-----
* ファイルのパーティションをユーザーが管理しないといけない
* 細かいファイルがどんどん増えていく
* ファイル数が増えるにつれて読み込みに時間がかかる
* レコードは追記のみ(UPDATE, DELETE, MERGEができない)
* スキーマの整合性はユーザー側でチェックしないといけない
* 検索条件がパーティションキーでない場合、全てのファイルを開く必要がある
* Indexingなどの最適化機能がない


など。

# 1. Delta Lakeの世界

### サンプルデータ

今回使用するデータはLending Clubが公開している2012年から2017年までの融資審査データです。
ローン申請者情報(匿名)、現在のローンステータス(延滞、全額支払い済みなど)などが含まれます。

**データ辞書** https://www.kaggle.com/wendykan/lending-club-loan-data <br>

<!--<img src="https://jixjiastorage.blob.core.windows.net/public/datasets/lending-club-loan/data_sample.png" width="95%">-->

In [0]:
# パス指定
source_path = 'dbfs:/databricks-datasets/samples/lending_club/parquet/'
delta_path = f'dbfs:/home/{username}/delta/lending-club-loan/'

# 既存のデータを削除
dbutils.fs.rm(delta_path, recurse=True)

# データを読み込む
df = spark.read.parquet(source_path)

# レコード数
print(df.count())

# randomSplit()を使って、5%のサンプルを読み取る
(data, data_rest) = df.randomSplit([0.05, 0.95], seed=123)

# 読み込まれたデータを参照
display( data )

id,member_id,loan_amnt,funded_amnt,funded_amnt_inv,term,int_rate,installment,grade,sub_grade,emp_title,emp_length,home_ownership,annual_inc,verification_status,loan_status,pymnt_plan,url,desc,purpose,title,zip_code,addr_state,dti,delinq_2yrs,earliest_cr_line,inq_last_6mths,mths_since_last_delinq,mths_since_last_record,open_acc,pub_rec,revol_bal,revol_util,total_acc,initial_list_status,out_prncp,out_prncp_inv,total_pymnt,total_pymnt_inv,total_rec_prncp,total_rec_int,total_rec_late_fee,recoveries,collection_recovery_fee,last_pymnt_d,last_pymnt_amnt,next_pymnt_d,last_credit_pull_d,collections_12_mths_ex_med,mths_since_last_major_derog,policy_code,application_type,annual_inc_joint,dti_joint,verification_status_joint,acc_now_delinq,tot_coll_amt,tot_cur_bal,open_acc_6m,open_il_6m,open_il_12m,open_il_24m,mths_since_rcnt_il,total_bal_il,il_util,open_rv_12m,open_rv_24m,max_bal_bc,all_util,total_rev_hi_lim,inq_fi,total_cu_tl,inq_last_12m,acc_open_past_24mths,avg_cur_bal,bc_open_to_buy,bc_util,chargeoff_within_12_mths,delinq_amnt,mo_sin_old_il_acct,mo_sin_old_rev_tl_op,mo_sin_rcnt_rev_tl_op,mo_sin_rcnt_tl,mort_acc,mths_since_recent_bc,mths_since_recent_bc_dlq,mths_since_recent_inq,mths_since_recent_revol_delinq,num_accts_ever_120_pd,num_actv_bc_tl,num_actv_rev_tl,num_bc_sats,num_bc_tl,num_il_tl,num_op_rev_tl,num_rev_accts,num_rev_tl_bal_gt_0,num_sats,num_tl_120dpd_2m,num_tl_30dpd,num_tl_90g_dpd_24m,num_tl_op_past_12m,pct_tl_nvr_dlq,percent_bc_gt_75,pub_rec_bankruptcies,tax_liens,tot_hi_cred_lim,total_bal_ex_mort,total_bc_limit,total_il_high_credit_limit,revol_bal_joint,sec_app_earliest_cr_line,sec_app_inq_last_6mths,sec_app_mort_acc,sec_app_open_acc,sec_app_revol_util,sec_app_open_il_6m,sec_app_num_rev_accts,sec_app_chargeoff_within_12_mths,sec_app_collections_12_mths_ex_med,sec_app_mths_since_last_major_derog,hardship_flag,hardship_type,hardship_reason,hardship_status,deferral_term,hardship_amount,hardship_start_date,hardship_end_date,payment_plan_start_date,hardship_length,hardship_dpd,hardship_loan_status,orig_projected_additional_accrued_interest,hardship_payoff_balance_amount,hardship_last_payment_amount,issue_d
,,1000.0,1000,1000.0,36 months,5.32%,30.12,A,A1,Transit Property Protection Supervisor,6 years,RENT,62500.0,Source Verified,Current,n,,,moving,Moving and relocation,114xx,NY,25.13,0.0,Apr-2001,0,,,14,0,6594,22.6%,19.0,w,576.27,576.27,455.49,455.49,423.73,31.76,0.0,0.0,0.0,Aug-2017,30.12,Sep-2017,Aug-2017,0,,1,INDIVIDUAL,,,,0,0,38765,0.0,2.0,0.0,1.0,18.0,32171.0,63.0,0.0,0.0,3866.0,48.0,29200,2.0,0.0,0.0,1,2982,14134.0,24.8,0.0,0,84.0,187,35,18,0,35.0,,18.0,,0,2,5,6,6,4,12,15,5,14,0.0,0,0,0,100.0,0.0,0,0,80103,38765,18800,50903,,,,,,,,,,,,N,,,,,,,,,,,,,,,Nov-2016
,,1000.0,1000,1000.0,36 months,6.49%,30.65,A,A2,Customers Service Consultant,< 1 year,RENT,40000.0,Source Verified,Current,n,,,debt_consolidation,Debt consolidation,441xx,OH,24.09,0.0,Oct-2003,0,29.0,,15,0,2184,19.5%,24.0,w,551.81,551.81,521.19,521.19,448.19,73.0,0.0,0.0,0.0,Aug-2017,30.65,Sep-2017,Aug-2017,0,57.0,1,INDIVIDUAL,,,,0,0,37821,0.0,11.0,0.0,1.0,14.0,35637.0,76.0,0.0,1.0,761.0,65.0,11200,0.0,0.0,0.0,2,2521,39.0,95.1,0.0,0,149.0,105,16,14,0,54.0,76.0,,29.0,2,1,4,1,2,19,4,5,4,15,0.0,0,0,0,87.5,100.0,0,0,57980,37821,800,46780,,,,,,,,,,,,N,,,,,,,,,,,,,,,Mar-2016
,,1000.0,1000,1000.0,36 months,6.49%,30.65,A,A2,dental assistant,8 years,RENT,28000.0,Not Verified,Current,n,,,credit_card,Credit card refinancing,142xx,NY,26.49,0.0,Sep-2004,0,,,6,0,3651,40.1%,19.0,w,579.33,579.33,490.04,490.04,420.67,69.37,0.0,0.0,0.0,Aug-2017,30.65,Sep-2017,Aug-2017,0,,1,INDIVIDUAL,,,,0,0,11743,0.0,3.0,0.0,1.0,13.0,8092.0,46.0,1.0,1.0,2697.0,44.0,9100,0.0,0.0,0.0,2,1957,4825.0,40.4,0.0,0,124.0,139,9,9,0,25.0,,,,0,2,3,2,12,4,3,15,3,6,0.0,0,0,1,100.0,0.0,0,0,26793,11743,8100,17693,,,,,,,,,,,,N,,,,,,,,,,,,,,,Apr-2016
,,1000.0,1000,1000.0,36 months,6.99%,30.88,A,A2,Manager,9 years,OWN,100000.0,Not Verified,Fully Paid,n,,,vacation,Vacation,376xx,TN,14.69,0.0,Dec-1997,1,,,20,0,10215,8%,48.0,w,0.0,0.0,1000.0,1000.0,1000.0,0.0,0.0,0.0,0.0,May-2017,1000.39,,Jun-2017,0,,1,INDIVIDUAL,,,,0,0,572605,0.0,3.0,1.0,2.0,11.0,49528.0,69.0,3.0,6.0,9750.0,30.0,128300,0.0,19.0,1.0,9,28630,75750.0,11.4,0.0,0,170.0,233,10,10,6,11.0,,1.0,,0,1,3,8,10,19,16,23,3,20,0.0,0,0,5,100.0,0.0,0,0,720292,59743,85500,71992,,,,,,,,,,,,N,,,,,,,,,,,,,,,May-2017
,,1000.0,1000,1000.0,36 months,7.24%,30.99,A,A3,Administrative assistant,8 years,MORTGAGE,48000.0,Source Verified,Current,n,,,other,Other,923xx,CA,29.93,0.0,Apr-2005,0,57.0,,10,0,3240,26.3%,19.0,w,769.89,769.89,278.11,278.11,230.11,48.0,0.0,0.0,0.0,Aug-2017,30.99,Sep-2017,Aug-2017,0,80.0,1,INDIVIDUAL,,,,0,0,144209,0.0,4.0,0.0,3.0,15.0,31882.0,66.0,0.0,1.0,2317.0,58.0,12300,0.0,1.0,0.0,4,16023,3311.0,49.1,0.0,0,139.0,119,24,15,1,44.0,80.0,16.0,57.0,1,2,3,2,4,6,5,12,3,10,0.0,0,0,0,84.2,0.0,0,0,178415,35122,6500,48289,,,,,,,,,,,,N,,,,,,,,,,,,,,,Nov-2016
,,1000.0,1000,1000.0,36 months,7.24%,30.99,A,A3,Busser/Expo,5 years,OWN,12000.0,Source Verified,Current,n,,,credit_card,Credit card refinancing,613xx,IL,28.62,0.0,Jul-2012,0,,,8,0,1132,8.3%,9.0,f,796.08,796.08,247.52,247.52,203.92,43.6,0.0,0.0,0.0,Aug-2017,30.99,Sep-2017,Aug-2017,0,,1,INDIVIDUAL,,,,0,0,7093,0.0,1.0,0.0,0.0,26.0,5961.0,61.0,1.0,3.0,735.0,30.0,13600,0.0,2.0,2.0,3,1013,9919.0,7.3,0.0,0,26.0,52,7,7,0,7.0,,7.0,,0,4,5,4,4,1,7,8,5,8,0.0,0,0,1,100.0,0.0,0,0,23336,7093,10700,9736,,,,,,,,,,,,N,,,,,,,,,,,,,,,Dec-2016
,,1000.0,1000,1000.0,36 months,7.49%,31.11,A,A4,Teacher,8 years,MORTGAGE,90000.0,Not Verified,Current,n,,,medical,Medical expenses,802xx,CO,22.76,0.0,Aug-2000,0,,,9,0,20908,78.9%,13.0,f,509.79,509.79,517.35,517.35,490.21,27.14,0.0,0.0,0.0,Aug-2017,31.11,Sep-2017,Aug-2017,0,,1,INDIVIDUAL,,,,0,0,214959,0.0,4.0,0.0,1.0,16.0,39146.0,49.0,1.0,1.0,12093.0,57.0,26500,0.0,2.0,2.0,2,23884,3196.0,83.4,0.0,0,138.0,197,8,8,1,42.0,,8.0,,0,3,4,3,4,6,4,6,4,9,0.0,0,0,1,100.0,66.7,0,0,275911,60054,19300,79545,,,,,,,,,,,,N,,,,,,,,,,,,,,,Jan-2017
,,1000.0,1000,1000.0,36 months,7.97%,31.33,A,A5,Medical billet,7 years,RENT,48000.0,Not Verified,Current,n,,,debt_consolidation,Debt consolidation,990xx,WA,28.83,0.0,Jul-2001,3,64.0,45.0,25,1,14969,47.2%,64.0,w,925.44,925.44,93.55,93.55,74.56,18.99,0.0,0.0,0.0,Aug-2017,31.33,Sep-2017,Aug-2017,0,,1,INDIVIDUAL,,,,0,0,37761,2.0,9.0,0.0,1.0,19.0,22792.0,60.0,7.0,12.0,3129.0,54.0,31700,1.0,2.0,4.0,13,1510,5268.0,63.9,0.0,0,146.0,190,1,1,0,1.0,,1.0,64.0,0,4,13,5,18,20,16,44,13,25,0.0,0,0,7,97.9,40.0,1,0,69648,37761,14600,37948,,,,,,,,,,,,N,,,,,,,,,,,,,,,May-2017
,,1000.0,1000,1000.0,36 months,8.18%,31.42,B,B1,Admin Officer,10+ years,MORTGAGE,87000.0,Source Verified,Current,n,,,major_purchase,Major purchase,800xx,CO,17.27,0.0,Sep-1998,0,,,13,0,22898,76.8%,30.0,w,446.54,446.54,659.14,659.14,553.46,105.68,0.0,0.0,0.0,Aug-2017,31.42,Sep-2017,Aug-2017,0,,1,INDIVIDUAL,,,,0,0,157848,,,,,,,,,,,,29800,,,,3,15785,419.0,98.1,0.0,0,169.0,206,38,10,4,92.0,,10.0,,0,4,6,4,7,13,9,13,6,13,0.0,0,0,1,100.0,100.0,0,0,180012,47724,22200,36584,,,,,,,,,,,,N,,,,,,,,,,,,,,,Nov-2015
,,1000.0,1000,1000.0,36 months,8.24%,31.45,B,B1,Regional Account Manager,10+ years,OWN,72000.0,Source Verified,Current,n,,,other,Other,129xx,NY,27.68,0.0,Mar-1993,1,42.0,,10,0,8389,74.2%,28.0,w,772.57,772.57,282.13,282.13,227.43,54.7,0.0,0.0,0.0,Aug-2017,31.45,Sep-2017,Aug-2017,0,,1,INDIVIDUAL,,,,0,0,76282,1.0,5.0,2.0,4.0,2.0,67893.0,74.0,0.0,1.0,4879.0,74.0,11300,3.0,1.0,5.0,5,8476,0.0,101.6,0.0,0,130.0,284,16,2,0,170.0,,2.0,42.0,0,2,4,2,6,14,5,14,4,10,0.0,0,0,2,85.7,100.0,0,0,103312,76282,7000,92012,,,,,,,,,,,,N,,,,,,,,,,,,,,,Nov-2016


In [0]:
from pandas_profiling import ProfileReport
df_profile = ProfileReport(df.toPandas(), minimal=True, title="Profiling Report", progress_bar=False, infer_dtypes=False)
profile_html = df_profile.to_html()

displayHTML(profile_html)

In [0]:
# 簡単な処理を行い、結果をParquetとして保存
from pyspark.sql.functions import col, expr

data.select('loan_amnt', 
            'term',
            'int_rate',
            'grade',
            'addr_state',
            'emp_title',
            'home_ownership',
            'annual_inc',
            'loan_status')\
  .withColumn('int_rate', expr('cast(replace(int_rate,"%","") as float)'))\
  .withColumnRenamed('addr_state', 'state')\
  .write\
  .format('delta')\
  .mode('overwrite')\
  .save(delta_path)

In [0]:
# Databaseを作成
sql(f'CREATE DATABASE IF NOT EXISTS {dbname}')
sql(f'USE {dbname}')

# テーブルとして登録
sql(f'DROP TABLE IF EXISTS LBS')
sql(f'CREATE TABLE LBS USING delta LOCATION "{delta_path}"')

-sandbox
<div style="float:left; padding-right:10px; margin-top:10px;">
  <img src='https://jixjiadatabricks.blob.core.windows.net/images/data-lake-traditional.png' width='70px'>
</div>
<div style="float:left;">
  <h3>データの読み込み(クエリ)</h3>
  Parquetによる分散ファイルは<span style="color:green"><strong>効率よく読む</strong></span>事ができます</span>.
</div>

In [0]:
%sql

-- データをSELECT文でクエリしてみよう
Select state, loan_status, count(*) as counts 
From LBS  
Group by state, loan_status
Order by counts desc

state,loan_status,counts
CA,Current,4996
CA,Fully Paid,4008
TX,Current,3274
NY,Current,3179
FL,Current,2805
TX,Fully Paid,2179
NY,Fully Paid,2006
FL,Fully Paid,1782
IL,Current,1666
NJ,Current,1520


-sandbox
<div style="float:left; padding-right:10px; margin-top:10px;">
  <img src='https://jixjiadatabricks.blob.core.windows.net/images/data-lake-no-label.png' width='70px'>
</div>
<div style="float:left;">
  <h3>Deltaの物理的な構造</h3>
  Delta LakeはParquet技術の上に成り立っています
</div>

In [0]:
# %fs ls /home/{username}/delta/lending-club-loan/

display( 
  dbutils.fs.ls(f'{delta_path}') # <= Deltaを書き込んだディレクトリをリストしてみる
)

path,name,size
dbfs:/home/masahiko_kitamura_databricks_com/delta/lending-club-loan/_delta_log/,_delta_log/,0
dbfs:/home/masahiko_kitamura_databricks_com/delta/lending-club-loan/part-00000-f949f076-1271-425e-ba22-70eaad7662cf-c000.snappy.parquet,part-00000-f949f076-1271-425e-ba22-70eaad7662cf-c000.snappy.parquet,339369
dbfs:/home/masahiko_kitamura_databricks_com/delta/lending-club-loan/part-00001-0b1ad511-7319-46b4-9be7-b124e8be813f-c000.snappy.parquet,part-00001-0b1ad511-7319-46b4-9be7-b124e8be813f-c000.snappy.parquet,285179
dbfs:/home/masahiko_kitamura_databricks_com/delta/lending-club-loan/part-00002-964c046b-2377-4fbd-9c0d-cec0c915eda7-c000.snappy.parquet,part-00002-964c046b-2377-4fbd-9c0d-cec0c915eda7-c000.snappy.parquet,247883
dbfs:/home/masahiko_kitamura_databricks_com/delta/lending-club-loan/part-00003-a03e2bbe-f9cf-48d2-831b-bbdb254f9276-c000.snappy.parquet,part-00003-a03e2bbe-f9cf-48d2-831b-bbdb254f9276-c000.snappy.parquet,135496


In [0]:
# %fs head /home/parquet/lending-club-loan/_delta_log/00000000000000000000.json

dbutils.fs.head(f'{delta_path}/_delta_log/00000000000000000000.json')

In [0]:
%sql
-- Describe History機能でデータの変更履歴を監査
Describe History LBS

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata
0,2021-12-08T13:04:31.000+0000,102428,masahiko.kitamura@databricks.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(10034168),0804-214524-boded127,,WriteSerializable,False,"Map(numFiles -> 4, numOutputBytes -> 1007927, numOutputRows -> 74268)",


-sandbox
<div style="float:left; padding-right:10px; margin-top:20px;">
  <img src='https://jixjiadatabricks.blob.core.windows.net/images/data-lake-no-label.png' width='70px'>
</div>
<div style="float:left;">
  <h3>DMLサポート</h3>
  ここで従来のデータレイクでは出来なかった<br>データの<span style="color:green"><strong>更新</strong></span>、<span style="color:green"><strong>マージ</strong></span>や<span style="color:green"><strong>削除</strong></span>などのETL操作を行ってみたいと思います
</div>

In [0]:
%sql
-- マップで州ごとの融資総額を見てみよう
Select *
From LBS

loan_amnt,term,int_rate,grade,state,emp_title,home_ownership,annual_inc,loan_status
1000.0,36 months,5.32,A,NY,Transit Property Protection Supervisor,RENT,62500.0,Current
1000.0,36 months,6.49,A,OH,Customers Service Consultant,RENT,40000.0,Current
1000.0,36 months,6.49,A,NY,dental assistant,RENT,28000.0,Current
1000.0,36 months,6.99,A,TN,Manager,OWN,100000.0,Fully Paid
1000.0,36 months,7.24,A,CA,Administrative assistant,MORTGAGE,48000.0,Current
1000.0,36 months,7.24,A,IL,Busser/Expo,OWN,12000.0,Current
1000.0,36 months,7.49,A,CO,Teacher,MORTGAGE,90000.0,Current
1000.0,36 months,8.18,B,CO,Admin Officer,MORTGAGE,87000.0,Current
1000.0,36 months,8.24,B,NY,Regional Account Manager,OWN,72000.0,Current
1000.0,36 months,8.24,B,VA,"Sales,delivery",RENT,44000.0,Current


In [0]:
%sql
-- 'WA'州を削除する
Delete From LBS Where State = 'WA';

-- データを見る
Select * 
From LBS

loan_amnt,term,int_rate,grade,state,emp_title,home_ownership,annual_inc,loan_status
1000.0,36 months,5.32,A,NY,Transit Property Protection Supervisor,RENT,62500.0,Current
1000.0,36 months,6.49,A,OH,Customers Service Consultant,RENT,40000.0,Current
1000.0,36 months,6.49,A,NY,dental assistant,RENT,28000.0,Current
1000.0,36 months,6.99,A,TN,Manager,OWN,100000.0,Fully Paid
1000.0,36 months,7.24,A,CA,Administrative assistant,MORTGAGE,48000.0,Current
1000.0,36 months,7.24,A,IL,Busser/Expo,OWN,12000.0,Current
1000.0,36 months,7.49,A,CO,Teacher,MORTGAGE,90000.0,Current
1000.0,36 months,8.18,B,CO,Admin Officer,MORTGAGE,87000.0,Current
1000.0,36 months,8.24,B,NY,Regional Account Manager,OWN,72000.0,Current
1000.0,36 months,8.24,B,VA,"Sales,delivery",RENT,44000.0,Current


In [0]:
%sql
-- 'NY'州のローン申込額を20倍にする!
Update LBS Set loan_amnt = loan_amnt * 20 Where State = 'NY';

-- データを見る
Select *
From LBS 

-sandbox 
<div style="float:left; padding-right:40px;">
  <h4>従来のデータレイク (7 ステップ)</h4>
  <ol style="padding-left: 20px;">
    <li>更新するデータを取得 (新規データ)</li>
    <li>更新されるデータを取得 (既存データ)</li>
    <li>更新されないデータを取得 (既存データ)</li>
    <li>それぞれのTempテーブルを用意</li>
    <li>既存データを全て削除(分散物理ファイルも含めて)</li>
    <li>一つのテーブルへ合併し、元のテーブル名へ再命名</li>
    <li>Tempテーブルをドロップ</li>
   </ol>
</div>
<div style="float:left;">
  <img src="https://pages.databricks.com/rs/094-YMS-629/images/merge-into-legacy.gif" width="800px">
</div>
<div style="clear:both; display:block;">
  <h4>Delta Lake (2 ステップ)</h4>
  <ol style="padding-left: 20px;">
    <li>更新するデータを取得 (新規データ)</li>
    <li>`MERGE INTO`実行</li>
   </ol>
</div>

In [0]:
# Demo用のマージするデータを別途作成 (本番環境では新規データがこれに値する)
columns = sql('Describe Table LBS').filter('data_type != ""').select('col_name').rdd.flatMap(lambda x: x).collect()

merge_df = sc.parallelize([
   [999999, '36 months', 5.00, 'A', 'IA', 'Demo Data', 'RENT', 1000000, 'Current']
]).toDF(columns)

merge_df.createOrReplaceTempView("merge_table")
display(merge_df)

loan_amnt,term,int_rate,grade,state,emp_title,home_ownership,annual_inc,loan_status
999999,36 months,5.0,A,IA,Demo Data,RENT,1000000,Current


In [0]:
%sql
-- マージオペレーションを行う
Merge Into LBS as target
Using merge_table as source
on target.State = source.State
when MATCHED Then Update Set *
When Not MATCHED Then Insert *
;

-- データを見る
Select *
From LBS 

loan_amnt,term,int_rate,grade,state,emp_title,home_ownership,annual_inc,loan_status
1000.0,36 months,5.32,A,NY,Transit Property Protection Supervisor,RENT,62500.0,Current
1000.0,36 months,6.49,A,OH,Customers Service Consultant,RENT,40000.0,Current
1000.0,36 months,6.49,A,NY,dental assistant,RENT,28000.0,Current
1000.0,36 months,6.99,A,TN,Manager,OWN,100000.0,Fully Paid
1000.0,36 months,7.24,A,CA,Administrative assistant,MORTGAGE,48000.0,Current
1000.0,36 months,7.24,A,IL,Busser/Expo,OWN,12000.0,Current
1000.0,36 months,7.49,A,CO,Teacher,MORTGAGE,90000.0,Current
1000.0,36 months,8.18,B,CO,Admin Officer,MORTGAGE,87000.0,Current
1000.0,36 months,8.24,B,NY,Regional Account Manager,OWN,72000.0,Current
1000.0,36 months,8.24,B,VA,"Sales,delivery",RENT,44000.0,Current


-sandbox
<div style="float:left; padding-right:10px; margin-top:20px;">
  <img src='https://jixjiadatabricks.blob.core.windows.net/images/data-lake-no-label.png' width='70px'>
</div>
<div style="float:left;">
  <h3>データ品質管理がらくらく</h3>
  スキーマエンフォースで既存のスキーマと一致しないDML操作を<span style="color:green"><strong>拒否</strong></span><br>もしくはスキーマエボリューションでスキーマの差分を<span style="color:green"><strong>完璧にマージ</strong></span>する事ができる
</div>

In [0]:
# 既存のデータの10行を読み込んで、新たなカラムを追加
new_df = spark.read\
              .format('delta')\
              .load(delta_path)\
              .limit(10)\
              .withColumn('interest_flag', expr('case when int_rate >=6 then "high" else "low" end'))

display(new_df)

loan_amnt,term,int_rate,grade,state,emp_title,home_ownership,annual_inc,loan_status,interest_flag
1000.0,36 months,5.32,A,NY,Transit Property Protection Supervisor,RENT,62500.0,Current,low
1000.0,36 months,6.49,A,OH,Customers Service Consultant,RENT,40000.0,Current,high
1000.0,36 months,6.49,A,NY,dental assistant,RENT,28000.0,Current,high
1000.0,36 months,6.99,A,TN,Manager,OWN,100000.0,Fully Paid,high
1000.0,36 months,7.24,A,CA,Administrative assistant,MORTGAGE,48000.0,Current,high
1000.0,36 months,7.24,A,IL,Busser/Expo,OWN,12000.0,Current,high
1000.0,36 months,7.49,A,CO,Teacher,MORTGAGE,90000.0,Current,high
1000.0,36 months,8.18,B,CO,Admin Officer,MORTGAGE,87000.0,Current,high
1000.0,36 months,8.24,B,NY,Regional Account Manager,OWN,72000.0,Current,high
1000.0,36 months,8.24,B,VA,"Sales,delivery",RENT,44000.0,Current,high


In [0]:
# スキーマが違うデータセットへ書き込む (Append)
new_df.write.format('delta').mode('append').save(delta_path)

今度は`mergeSchema`オプションを使って異なるスキーマ同士のデータを融合させましょう（スキーマエボリューション)

In [0]:
# スキーマが違うデータセットへ書き込む (with スキーマエボリューション)
new_df.write.format('delta').option('mergeSchema','true').mode('append').save(delta_path)

In [0]:
%sql
describe LBS

col_name,data_type,comment
loan_amnt,float,
term,string,
int_rate,float,
grade,string,
state,string,
emp_title,string,
home_ownership,string,
annual_inc,float,
loan_status,string,
interest_flag,string,


In [0]:
%sql
-- データを見る
Select *
From LBS 

loan_amnt,term,int_rate,grade,state,emp_title,home_ownership,annual_inc,loan_status,interest_flag
1000.0,36 months,5.32,A,NY,Transit Property Protection Supervisor,RENT,62500.0,Current,
1000.0,36 months,6.49,A,OH,Customers Service Consultant,RENT,40000.0,Current,
1000.0,36 months,6.49,A,NY,dental assistant,RENT,28000.0,Current,
1000.0,36 months,6.99,A,TN,Manager,OWN,100000.0,Fully Paid,
1000.0,36 months,7.24,A,CA,Administrative assistant,MORTGAGE,48000.0,Current,
1000.0,36 months,7.24,A,IL,Busser/Expo,OWN,12000.0,Current,
1000.0,36 months,7.49,A,CO,Teacher,MORTGAGE,90000.0,Current,
1000.0,36 months,8.18,B,CO,Admin Officer,MORTGAGE,87000.0,Current,
1000.0,36 months,8.24,B,NY,Regional Account Manager,OWN,72000.0,Current,
1000.0,36 months,8.24,B,VA,"Sales,delivery",RENT,44000.0,Current,


-sandbox
<div style="float:left; padding-right:10px; margin-top:20px;">
  <img src='https://jixjiadatabricks.blob.core.windows.net/images/data-lake-no-label.png' width='70px'>
</div>
<div style="float:left;">
  <h3>タイムトラベルで過去へ戻ろうく</h3>
  Delta Lakeのログを利用して<span style="color:green"><strong>以前のデータスナップショット</strong></span>を取得したり、<br>データ<span style="color:green"><strong>変更履歴の監査</strong></span>をしたり、<span style="color:green"><strong>ロールバック</strong></span>をすることが容易に出来ます
</div>

In [0]:
%sql
-- Describe History機能でデータの変更履歴を監査
Describe History LBS

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata
3,2021-12-08T13:05:16.000+0000,102428,masahiko.kitamura@databricks.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(10034168),0804-214524-boded127,2.0,WriteSerializable,False,"Map(numFiles -> 1, numOutputBytes -> 3146, numOutputRows -> 10)",
2,2021-12-08T13:05:03.000+0000,102428,masahiko.kitamura@databricks.com,MERGE,"Map(predicate -> (target.`State` = source.`State`), matchedPredicates -> [{""actionType"":""update""}], notMatchedPredicates -> [{""actionType"":""insert""}])",,List(10034168),0804-214524-boded127,1.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 1, executionTimeMs -> 1488, numTargetRowsInserted -> 1, scanTimeMs -> 914, numTargetRowsUpdated -> 0, numOutputRows -> 1, numTargetChangeFilesAdded -> 0, numSourceRows -> 1, numTargetFilesRemoved -> 0, rewriteTimeMs -> 514)",
1,2021-12-08T13:04:40.000+0000,102428,masahiko.kitamura@databricks.com,DELETE,"Map(predicate -> [""(spark_catalog.delta_db_masahiko_kitamura_databricks_com.LBS.`State` = 'WA')""])",,List(10034168),0804-214524-boded127,0.0,WriteSerializable,False,"Map(numRemovedFiles -> 4, numCopiedRows -> 72652, numAddedChangeFiles -> 0, executionTimeMs -> 1111, numDeletedRows -> 1616, scanTimeMs -> 397, numAddedFiles -> 4, rewriteTimeMs -> 713)",
0,2021-12-08T13:04:31.000+0000,102428,masahiko.kitamura@databricks.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(10034168),0804-214524-boded127,,WriteSerializable,False,"Map(numFiles -> 4, numOutputBytes -> 1007927, numOutputRows -> 74268)",


In [0]:
%sql
-- バージョンを指定してスナップショットを取得
Select * 
From LBS Version AS OF 2

loan_amnt,term,int_rate,grade,state,emp_title,home_ownership,annual_inc,loan_status
1000.0,36 months,5.32,A,NY,Transit Property Protection Supervisor,RENT,62500.0,Current
1000.0,36 months,6.49,A,OH,Customers Service Consultant,RENT,40000.0,Current
1000.0,36 months,6.49,A,NY,dental assistant,RENT,28000.0,Current
1000.0,36 months,6.99,A,TN,Manager,OWN,100000.0,Fully Paid
1000.0,36 months,7.24,A,CA,Administrative assistant,MORTGAGE,48000.0,Current
1000.0,36 months,7.24,A,IL,Busser/Expo,OWN,12000.0,Current
1000.0,36 months,7.49,A,CO,Teacher,MORTGAGE,90000.0,Current
1000.0,36 months,8.18,B,CO,Admin Officer,MORTGAGE,87000.0,Current
1000.0,36 months,8.24,B,NY,Regional Account Manager,OWN,72000.0,Current
1000.0,36 months,8.24,B,VA,"Sales,delivery",RENT,44000.0,Current


In [0]:
# 時間をしてしてスナップショットを取得
desiredTimestamp = spark.sql("Select timestamp From (Describe History LBS) Order By timestamp Desc").take(10)[-1].timestamp

print(f'desiredTimestamp => {desiredTimestamp}')

display(
  sql(f"Select * From LBS TIMESTAMP AS OF '{desiredTimestamp}'")
)

loan_amnt,term,int_rate,grade,state,emp_title,home_ownership,annual_inc,loan_status
1000.0,36 months,5.32,A,NY,Transit Property Protection Supervisor,RENT,62500.0,Current
1000.0,36 months,6.49,A,OH,Customers Service Consultant,RENT,40000.0,Current
1000.0,36 months,6.49,A,NY,dental assistant,RENT,28000.0,Current
1000.0,36 months,6.99,A,TN,Manager,OWN,100000.0,Fully Paid
1000.0,36 months,7.24,A,CA,Administrative assistant,MORTGAGE,48000.0,Current
1000.0,36 months,7.24,A,IL,Busser/Expo,OWN,12000.0,Current
1000.0,36 months,7.49,A,CO,Teacher,MORTGAGE,90000.0,Current
1000.0,36 months,7.97,A,WA,Medical billet,RENT,48000.0,Current
1000.0,36 months,8.18,B,CO,Admin Officer,MORTGAGE,87000.0,Current
1000.0,36 months,8.24,B,NY,Regional Account Manager,OWN,72000.0,Current


-sandbox
<div style="float:left; padding-right:10px; margin-top:20px;">
  <img src='https://jixjiadatabricks.blob.core.windows.net/images/data-lake-no-label.png' width='70px'>
</div>
<div style="float:left;">
  <h3>OPTIMIZEで性能を向上させるく</h3>
  小さなファイルを合併してくれる<span style="color:green"><strong>Optimize</strong></span>コマンドをはじめ<br>、<span style="color:green"><strong>Data Skpping</strong></span>,<span style="color:green"><strong>ZOrder Clustering</strong></span>などの機能をを利用して、<br>クエリ検索性能の向上を容易に実現します。
</div>

In [0]:
%sql
-- OPTIMIZEでデータをコンパクト化、インデックス作成。更にZORDERで良く使われるカラムのデータを物理的に一カ所へまとめる
OPTIMIZE LBS ZORDER By (state)

In [0]:
%sql
-- セル10と全く同じクエリを実行
Select state, loan_status, count(*) as counts 
From LBS
Group by state, loan_status
Order by counts desc

-sandbox 
<div style="float:left; padding-right:10px; margin-top:20px;">
  <img src='https://jixjiadatabricks.blob.core.windows.net/images/data-lake-no-label.png' width='70px'>
</div>
<div style="float:left;">
  <h3>BatchとStreamingの統合</h3>
  Delta Lake/SparkはStraeming処理をバッチ処理と同等に扱えます。ここでは、WikipediaのIRCチャットの内容を受信しているKafkaサーバからリアルタイムでデータを読み込むサンプルを見ていきましょう。
  
  <img src="https://jixjiadatabricks.blob.core.windows.net/images/delta_architecture_demo.gif" width="1200px">

   <h4>Kafkaサーバー情報</h4>
  <table>
    <tr><td>オプション名</td><td>kafka.bootstrap.servers</td></tr>
    <tr><td>サーバー名</td><td>server2.databricks.training:9092</td></tr>
    <tr><td>トピック</td><td>en</td></tr>
  </table>
  <br>
  <h4>JSONストリーム</h4>
  <p>DatabricksではWikipediaの公式IRCチャネルから編集データを受信し、JSONに変換して自社Kafkaサーバーへ送信しています。
    <br>今回はこのKafkaサーバーに流れ込んだ英語の記事(トピック)にサブスクライブし、リアルタイムで処理していきます。

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import from_json, col
import re

# ファイルパスの設定(書き込み先など)
## Username を取得。
username_raw = dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().apply('user')
## Username の英数字以外を除去し、全て小文字化。Usernameをファイルパスやデータベース名の一部で使用可能にするため。
username = re.sub('[^A-Za-z0-9]+', '_', username_raw).lower()

homeDir = f'/home/{username}/streaming/wikipedia/'
bronzePath = homeDir + "bronze.delta"
bronzeCkpt = homeDir + "bronze.checkpoint"

# 保存先をリセット
dbutils.fs.rm(homeDir, True)

#　JSONデータのスキーマ定義
schema = StructType([
  StructField("channel", StringType(), True),
  StructField("comment", StringType(), True),
  StructField("delta", IntegerType(), True),
  StructField("flag", StringType(), True),
  StructField("geocoding", StructType([
    StructField("city", StringType(), True),
    StructField("country", StringType(), True),
    StructField("countryCode2", StringType(), True),
    StructField("countryCode3", StringType(), True),
    StructField("stateProvince", StringType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
  ]), True),
  StructField("isAnonymous", BooleanType(), True),
  StructField("isNewPage", BooleanType(), True),
  StructField("isRobot", BooleanType(), True),
  StructField("isUnpatrolled", BooleanType(), True),
  StructField("namespace", StringType(), True),         
  StructField("page", StringType(), True),              
  StructField("pageURL", StringType(), True),           
  StructField("timestamp", StringType(), True),        
  StructField("url", StringType(), True),
  StructField("user", StringType(), True),              
  StructField("userURL", StringType(), True),
  StructField("wikipediaURL", StringType(), True),
  StructField("wikipedia", StringType(), True),
])

#　JSONストリームを解析し、Deltaに保存
input_DF = (
  spark
  .readStream
  .format('kafka')                          # Kafkaをソースと指定
  .option('kafka.bootstrap.servers', 
          'server2.databricks.training:9092')
  .option('subscribe', 'en')
  .load()
)

# ELTをして、Deltaに書き込む
(
  input_DF
  .withColumn('json', from_json(col('value').cast('string'), schema))   # Kafkaのバイナリデータを文字列に変換し、from_json()でJSONをパース
  .select(col("json.*"))                    # JSONの子要素だけを取り出す
  .writeStream                              # writeStream()でストリームを書き出す
  .format('delta')                          # Deltaとして保存
  .option('checkpointLocation', bronzeCkpt) # チェックポイント保存先を指定
  .outputMode('append')                     # マイクロバッチの結果をAppendで追加
  .queryName('Bronze Stream')               # ストリームに名前を付ける（推奨）
  .start(bronzePath)                        # start()でストリーム処理を開始 (アクション)
)

### 以下の2つのコードの違いはどこにありますか?

In [0]:
# データフレームの確認
df = spark.read.format('delta').load(bronzePath)

display( df )

In [0]:
# データフレームの確認2
df = spark.readStream.format('delta').load(bronzePath)

display( df )

### StreamingはSQLからも参照可能です

In [0]:
spark.readStream.format('delta').load(bronzePath).createOrReplaceTempView('tmp_wikipedia_msg')

In [0]:
%sql
SELECT geocoding.countryCode3, count(*) FROM tmp_wikipedia_msg
WHERE geocoding.countryCode3 is not null
GROUP BY geocoding.countryCode3
ORDER BY geocoding.countryCode3

## END
&copy; 2021 Copyright by Jixin Jia (Gin), Masahiko Kitamura
<hr>

In [0]:
sql(f'DROP DATABASE {dbname} CASCADE')
dbutils.fs.rm(delta_path, True)
dbutils.fs.rm(homeDir, True)
dbutils.fs.rm(f'/tmp/{username}', True)


