# 本セクションの目次
1. テクニカルメタデータについて
2. テクニカルメタデータの一種データプロファイリング
3. PySparkでデータプロファイリングをしてみよう1 
4. PySparkでデータプロファイリングをしてみようその２
5. PySparkでデータプロファイリングをしてみようその3
6. データプロファイリングの結果をSparkテーブルに格納してみよう
7. データ品質

# テクニカルメタデータについて
テクニカルメタデータとは、技術的な詳細を表すものです。  
例えば、テーブルのフォーマット(Parquet？CSVなのか？)は何か？やデータの形はどのような状態なのか？  

といった情報を取得することになります。  

特にデータを利用するエンジニア向けの情報になることが多いですが、次に紹介するデータプロファイリングはエンジニアだけでなくビジネスユーザにも有効な情報を提供することが可能です。

# テクニカルメタデータの一種データプロファイリング
テクニカルメタデータの一種にはデータプロファリングがあります。  
データプロファイリングは、データの特性をクエリせずともわかるようにすることが目的になります。

特にクラウド環境では使った分だけお金を取られるため、できる限り無駄なクエリはユーザに発行させないことが重要です。

ユーザ1000人がcount文を打つだけでも相当な金額になることがあるのです。

ここからは、データプロファリングをどうのように取得していくのかというイメージをSparkを使いながら実行していきましょう。

In [None]:
# コンソールで設定したSparkとNoteBookを接続します(動かす前に毎度実行する必要があります)
import findspark
findspark.init()

In [None]:
#pysparkに必要なライブラリを読み込む
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession

#spark sessionの作成
# spark.ui.enabled trueとするとSparkのGUI画面を確認することができます
# spark.eventLog.enabled true　とすると　GUIで実行ログを確認することができます
# GUIなどの確認は最後のセクションで説明を行います。
spark = SparkSession.builder \
    .appName("chapter1") \
    .config("hive.exec.dynamic.partition", "true") \
    .config("hive.exec.dynamic.partition.mode", "nonstrict") \
    .config("spark.sql.session.timeZone", "JST") \
    .config("spark.ui.enabled","true") \
    .config("spark.eventLog.enabled","true") \
    .enableHiveSupport() \
    .getOrCreate()


# spark.xxxxxと記載することで処理を分散させることが可能です。

# PySparkでデータプロファイリングをしてみようその１

データプロファイリングその１では、以下の3つを算出してみます。  
3つのうち件数をrow_num(件数)をメタデータストアに保存していきます。

- 件数
- 平均
- 合計値

## 件数/平均/合計の使い道
例えば件数はよく、異常な状態を見付けることに使えます。  
例えば、前日までに１０件だったものが急に1000件になったらアラートを鳴らすということにも使えたりします。  
データ基盤は時には何万というテーブルが存在しているため、一つ一つ目で見るのは不可能に近いのが現実です。  
そこで件数のような簡単なチェックを入れておくことでも、テーブル数が増え続けても問題ないデータ基盤を作ることが可能です。  

一方で、取得しやすいがゆえに何も考えずに取得することは無意味になりますので、しっかりと自身の環境に適用できるかどうかを判断することは大切になります。  
例えば、年齢の合計値はあまり意味を成さないかもしれませんし時と場合によっては、合計を知ることで何か得るものがあるかもしれません。  

In [None]:
# 件数の取得
spark.sql("REFRESH TABLE data_management_crush_course.jinko_table ")
record_num=spark.sql("select count(*) as row_num from data_management_crush_course.jinko_table ")
record_num.show()

# 件数情報を取得する
record_count=record_num.collect()[0].asDict()['row_num']

In [None]:
# 平均と合計値
# 今回は平均として、女性と男性の合計値の平均をとっていこうと思います。
spark.sql("REFRESH TABLE data_management_crush_course.jinko_table ")
sum=spark.sql("select cast(sum(jinko_male)+sum(jinko_femail) as decimal(19, 0)) as sum from data_management_crush_course.jinko_table ")
# decimalにcastしないと数値が指数表記になってみづらくなるのでdecimalにしています。
sum.show()

In [None]:
# 平均と合計値
# 今回は平均として、女性と男性の全体の平均をとっていこうと思います。
spark.sql("REFRESH TABLE data_management_crush_course.jinko_table ")
avg=spark.sql("""
select avg(sokei) as avg 
 from 
   data_management_crush_course.jinko_table
""")
avg.show()

# PySparkでデータプロファイリングをしてみようその２
データプロファリングその２では、以下の２つを算出していきます。

- カーディナリティ
- セレクティビティ

# カーディナリティの使い道
カーディナリティとはどれだけ値がバラけているかを示す指標です。  
ビッグデータ基盤におけるカーディナリティはスモールファイル問題やデータスキューネスを発見するための手段としてビッグデータ基盤では使われます。  
データスキューネスとは、データに偏りがある状態を指します。  

ビッグデータ基盤における偏りやスモールファイルは非常に問題になることがあります。  
例えば、クエリのエラーを多発させたり、データの転送を遅くさせたりするする原因にもなります。  

スモールファイル問題について詳しく知りたい方は是非以下のコースも受講してみてください。  
https://www.udemy.com/course/python-spark-pyspark/?referralCode=E67BF8B61F65866794EB   

# セレクティビティの使い道
セレクティビティとは、検索した時に結果が何件返却されるか？というものです  

セレクティビティは、クエリのしやすさに直結してきます。  
データを探索する際に、重複したデータが出てきたらどうでしょうか(そもそも重複に気付けない可能性がありますが)？  
重複を除く処理をSQLに記載しなければなりませんし、さらには  
最終的な分析結果を間違えることもあります。たとえば、結果が2倍になってしまうかも。  
数億というレコードの中で重複を分析の結果だけ見て判断するというのはできないものです。  
そうなれば、再度モデルの作成仕直しや開発のやり直しをしなければならなくなります。  

ビッグデータ基盤にはPK（プライマリーキー）のような仕組みを持っているものが少なく、データの重複が多々発生してしまいます。  

そこでセレクティビティのチェックを行うことによって、重複しているデータを見つけ出し対処を行うことができるのです※。  

※どのような対処を取るべきなのか？についてはデータ品質管理という方法があります。準備が出来次第別のコースで作成予定です。  

In [108]:
from pyspark.sql.functions import approx_count_distinct
from pyspark.sql.types import DecimalType

# カーディナリティを測ってみます
card=spark.sql("select * from data_management_crush_course.jinko_table")

cols_cardinality = card.select([(approx_count_distinct(c)/card.count()). \
    cast(DecimalType(38,2)).alias("cardinaryty_"+c) for c in card.columns])

+----------------+-----------------+------------------+-------------------+---------------+-----------------+----------------------+------------------------+------------------+
|cardinaryty_code|cardinaryty_gengo|cardinaryty_wareki|cardinaryty_seireki|cardinaryty_chu|cardinaryty_sokei|cardinaryty_jinko_male|cardinaryty_jinko_femail|cardinaryty_kenmei|
+----------------+-----------------+------------------+-------------------+---------------+-----------------+----------------------+------------------------+------------------+
|            0.17|             0.00|              0.02|               0.02|           0.00|             1.01|                  0.97|                    1.07|              0.16|
+----------------+-----------------+------------------+-------------------+---------------+-----------------+----------------------+------------------------+------------------+



In [None]:
cols_cardinality.show() 
# approx(正確さを捨て速度と引き換えに概算値を出してくれるもの)
# 数値が大きければ大きいほど値がバラけています(カーディナリティが高い)。
# 数値が小さければ小さいほど値が同じで値はバラけていません(カーディナリティが低い)。

In [114]:
from pyspark.sql.functions import approx_count_distinct
from pyspark.sql.types import DecimalType

# 今回はcodeを条件として指定した時に、どれだけ件数が返却される可能性があるかを検証してみます。
# jinko_tableとjinko_codeテーブル(全国のコードが削除されている＆＆神奈川のコードが変更されている)
# jinko_codeテーブルベースで検索した件数と、オリジナルの生の件数を割り算することでセレクティビティを出すことが可能です。

#オリジナルの件数を表示
spark.sql("select count(distinct jinko.code) from data_management_crush_course.jinko_table jinko").show()

#　セレクティビティはキーを指定した場合（今回の場合はcode）レコードの返却はどれくらいであるかを想定するものです
card=spark.sql(""" 
select count(distinct jinko.code) trunsaction , count( distinct num.num_code) master from
  data_management_crush_course.jinko_table jinko,
  (select code as num_code from data_management_crush_course.jinko_table) num
where jinko.code in (select code as num_code from data_management_crush_course.jinko_code)
""")

card.show()

# 件数が一致しているならば返却されるはず
# そうでなければ欠落が発生する
cols_selctivity = card.select((card.trunsaction/card.master).cast(DecimalType(38,2)).alias("selectivity"))

cols_selctivity.show()
# 表示された結果がおおよその返却結果（件数）になります。
# 今回の結果だと6件ほど返されるテーブルになりそうですね


# 今回はパーティションを指定した時を前提にどれくらいの件数が返却されるのかを
cols_selctivity.createOrReplaceTempView("tmp")
list_selectivity=spark.sql("""
select 
selectivity
 as concat from tmp
"""
).collect()[0].asDict()['concat']

#調べた結果として取得して値を格納しておきます
print(list_selectivity)

# 1の場合はレコードが必ず一件返ってくることが想定される
# 1より大きい場合はレコードが1件以上返却される場合がある
# 1より小さい場合はレコードが一件も返却されない場合がある

# 今回は0.96なので、一件も返却されない場合がある

+--------------------+
|count(DISTINCT code)|
+--------------------+
|                  50|
+--------------------+

+-----------+------+
|trunsaction|master|
+-----------+------+
|         48|    50|
+-----------+------+

+-----------+
|selectivity|
+-----------+
|       0.96|
+-----------+

0.96


# 結果から考察してみる
この結果を用いてどう考えるか？というところです。  
現在はjinko_tableは「kenmei」をパーティションにしています。

パーティションは多すぎてもいけませんし、少なすぎてもいけません。  
この場合「cardinaryty_jinko_male」などでパーティションを切ってしまうとパーティションが多くなってしまいますし、一方で「cardinaryty_gengo」でパーティションを切るとデータが一つのパーティションに入り込んでしまいます。

そのように考えると「cardinaryty_kenmei」は多くもなく少なくもないちょうどいいカーディナリティであることがわかります。

セレクティビティに関してはcodeでは唯一に絞ることはできず、しかもデータが取れない場合があることがわかりました。
特にプライマリーキー(ビッグデータ基盤ではUUIDを付与したりすることが多い)などをセレクティビティチェックした時にレコードが複数返却される(されない)場合はどこかの経路で何か間違えているということになります。

# PySparkでデータプロファイリングをしてみようその3
データプロファイリング３では、コンシステンシーについて考えてみたいと思います。
コンシステンシーとは一貫性があるかないかという指標です。

例えばjinko_code(今回はマスターテーブルと考えてください)と今回のjinko_tableを使ってみていきましょう(作成はチャプター３で実施済みです)

もっとも一般的な一貫性のチェックはマスターテーブルなどのコードとの一致率を調べることです。
数値で出すというより、Yes Noくらいの判定でもOKです。

マスターコードが更新されていない（もしくは正しく反映されていない）場合必ず結合時に紐つかないレコードが存在します。  
データ分析においてはテーブルの結合はデータの価値を高めていく行為の一つです。  
そこでデータの欠落が起きてしまうとそれだけで価値が下がってしまうことにもつながります。  

In [115]:
# 一貫しているかどうかを判定するためには集合演算を使います

#jinko_codeにおけるcodeとjinko_tableにおけるcodeを比較してみることにします。
#マスターテーブルですので、本来であれば差分は0となるはずです

jinko_code=spark.sql("select distinct(code) from data_management_crush_course.jinko_code")
jinko_table=spark.sql("select distinct(code) from data_management_crush_course.jinko_table")

print(jinko_code.count())
print(jinko_table.count())

consistency_judge_f=True

# 件数が多い方を左側にするといいです。
# Aの集合からBの集合を引き算して残った個数をカウントしています
if jinko_table.subtract(jinko_code).selectExpr('code as diff').count() !=0:
    consistency_judge_f = False

#結果
print(consistency_judge_f)

#　差分を表示してみる
jinko_table.subtract(jinko_code).selectExpr('code as diff').show()


49
50
False
+----+
|diff|
+----+
|  00|
|  14|
+----+



# 結果を格納しよう
これまでの結果を格納してみましょう

- 件数
- セレクティビティ
- コンシステンシーチェック

これら３つの結果を保存してみましょう。

それぞれ対応する変数は以下になります。

- record_count
- list_selectivity
- consistency_judge_f

In [116]:

from pyspark.sql.functions import when
metadata_df=spark.sql("select * from metadata_tmp.sample_metadata ")

# メタデータ取得対象のデータを更新する
metadata_df = metadata_df. \
    withColumn("record_num", when(metadata_df.record_num.isNull() ,record_count). \
        otherwise(metadata_df.record_num))
metadata_df = metadata_df. \
    withColumn("selectivity", when(metadata_df.selectivity.isNull() ,list_selectivity). \
        otherwise(metadata_df.selectivity))
metadata_df = metadata_df. \
    withColumn("consistency_flag", when(metadata_df.consistency_flag.isNull() ,consistency_judge_f). \
        otherwise(metadata_df.consistency_flag))

metadata_df.show()

+--------------------+-----------+--------------------+--------------------------------------+----------+-----------+----------------+----------------+
|       database_name| table_name|    table_definition|                               sammary|record_num|selectivity|consistency_flag|frequency_access|
+--------------------+-----------+--------------------+--------------------------------------+----------+-----------+----------------+----------------+
|data_management_c...|jinko_table|CREATE TABLE `dat...|一旦テーブルの説明は空にしておきます。|       300|       0.96|           false|            null|
+--------------------+-----------+--------------------+--------------------------------------+----------+-----------+----------------+----------------+



In [117]:
#取得したデータをmetadata_tmp.sample_metadataに格納していきます
#読み込んだテーブルに対して直接データを入れることができないので、一度ファイルを吐き出します

spark.sql("REFRESH TABLE metadata_tmp.sample_metadata")
metadata_df.write.mode('overwrite'). \
    parquet("/Users/saitouyuuki/Desktop/src/pyspark_datamanagement_metadata/dataset/tmp/")
insert_df=spark.read. \
    parquet("/Users/saitouyuuki/Desktop/src/pyspark_datamanagement_metadata/dataset/tmp/")

#取得したデータをmetadata_tmp.sample_metadataに格納していきます
insert_df.createOrReplaceTempView("sample")

spark.sql("""
Insert overwrite  table metadata_tmp.sample_metadata 
select  * from sample
""")



DataFrame[]

In [118]:
# 結果の確認をしてみます

spark.sql("select * from metadata_tmp.sample_metadata ").show()

+--------------------+-----------+--------------------+--------------------------------------+----------+-----------+----------------+----------------+
|       database_name| table_name|    table_definition|                               sammary|record_num|selectivity|consistency_flag|frequency_access|
+--------------------+-----------+--------------------+--------------------------------------+----------+-----------+----------------+----------------+
|data_management_c...|jinko_table|CREATE TABLE `dat...|一旦テーブルの説明は空にしておきます。|       300|       0.96|           false|            null|
+--------------------+-----------+--------------------+--------------------------------------+----------+-----------+----------------+----------------+



In [119]:
# Stopは忘れずに
# 忘れてしまうと、いつの間にか接続が溜まっていってしまいます
spark.stop()
spark.sparkContext.stop()

Thu Nov 25 16:42:40 JST 2021 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
Thu Nov 25 16:42:41 JST 2021 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for s

# データ品質管理
ここで一つ別のトピックについて言及しようと思います。
それがデータ品質管理です。

データ品質管理とは、データプロファリングの延長線にあるものですが、データがより良い状態を達成するための方法の一つです。  
データは当然ながら「正しい」状態で配置されていることが好ましいのは当たり前ですが、闇雲に施策を打ってもうまくいきません。

そこでデータプロファイリングを使いながらデータの不備を発見しその不備に対してアクションを取っていくことが必要です。

アクションとは

- 対向システム(ここでのシステムは、データソース側のシステムのこと)自体の修正
- データ自体の修正（再集計など）

が挙げられます。
入門のコースのためデータ品質管理は取り上げませんが、  
今回のデータプロファイリングの延長戦にはデータ品質管理が存在していると認識してもらえるだけでもまずは良いと思います。

# 演習答え

In [None]:
# 指定のフォーマットに沿っているもの
df=spark.sql("select * from data_management_crush_course.jinko_table ")
expr = "[0-9]{2}"
dx = df.filter(df["code"].rlike(expr))
dx.count()

In [None]:
# 指定のフォーマットに沿っていないもの
df=spark.sql("select * from data_management_crush_course.jinko_table ")
expr = "[0-9]{2}"
dx = df.filter(df["code"].rlike(expr))
# 全体から指定フォーマットに沿っているものを除いています。
df.subtract(dx).count()