In [None]:
#pysparkに必要なライブラリを読み込む
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession

#spark sessionの作成
# spark.ui.enabled trueとするとSparkのGUI画面を確認することができます
# spark.eventLog.enabled true　とすると　GUIで実行ログを確認することができます
# GUIなどの確認は次のチャプターで説明を行います。
spark = SparkSession.builder \
    .appName("chapter2") \
    .config("hive.exec.dynamic.partition", "true") \
    .config("hive.exec.dynamic.partition.mode", "nonstrict") \
    .config("spark.sql.session.timeZone", "JST") \
    .config("spark.ui.enabled","true") \
    .config("spark.eventLog.enabled","true") \
    .enableHiveSupport() \
    .getOrCreate()

# Spark Sessionとは？

Javaで言うところのインスタンスを作る作業のことです(new Class())。  
今回の場合は、アプリケーション名が「chapter2」で作成を行っています。  

configの部分で  
非常に細かい設定ができるので、詳しくは公式のドキュメントを参考にしてください https://spark.apache.org/docs/3.1.1/  
一部メモリの設定いついては「Sparkを本番環境で動かす」チャプターにて紹介します

# Sparkを用いたバッチにおけるデータエンジニアリング一連の流れ

1. データソースの読み込み(今回は、人口統計データ(/dataset/jinkou.csv))　ETL(Extract Transform Load)で言うEの部分
2. 変換を行う(集計等を行う)　DataFrame処理 or SQL処理の２パターンで実行可能 ETL(Extract Transform Load)で言うTの部分
3. カラムナーフォーマットへ変換する ETL(Extract Transform Load)で言うTの部分
4. 出力したデータをみんなに見やすくするため(BIツールから参照できるように)テーブルを作成する ETL(Extract Transform Load)で言うLの部分

よくある、関数の羅列をするのではなく、実業務に沿った形で流れを紹介していきます。

## データソース
データの源。リレーショナルデータベースのときもあれば、今回のようにファイルの形式のときもある。  
更に進むと、PDFやEXCELなんて事もあります。  
ストリーミングだとIoTであったり、Webブラウザのアクセスログだったりとデータになりうるものは無限に存在しています。  

## 変換処理
ETL（Extract Transform Load）というと少し定義として広いのかもしれないのですが、  
データを整形してより分析向けの形(フォーマット変換や圧縮含む)にしたり、精度の高いデータを作成する行為のことです。  
そのため、ETLというとバッチ処理のイメージを持つ人も多いかもしれませんが、ストリーミングデータにも適用される言葉です。   

Lの処理はSparkにおいてDataFrameもしくはSQLで処理することができる(RDDと呼ばれるものもあるが、労力の割に実際は出番はあまりなく今回は取り扱わない)

## カラムナーフォーマットへ変換を行う
ビッグデータの世界では、Apache Parquet と呼ばれるフォーマットが広く使われています。  
CSV形式のようなローフォーマットはビックデータ処理において処理効率が悪いため、早い段階でParquetに変換を行います。  
分析用のSQLの実行であったり、複数台で処理することに向いているフォーマットです。  

Parquetの特徴としては以下になります。

- カラムナー（ストレージ）フォーマット
- カラムごとに圧縮が効くため、効率よくデータをストアできる
- 多くのプロダクトがサポートしている

多くのプロダクトはParquetを取り込んだり処理したりする機能を提供してくれており単体ではなく総合で使えるフォーマットです。

## テーブル形式での保存

多くは、実データとテーブル定義が分離された`ロケーション方式`をとっている。  
後ほど実際に作成してみますが、イメージは以下のような感じです。

```
CREATE EXTERNAL TABLE IF NOT EXISTS sample.sampletable ( id INT, date STRING)
PARTITIONED BY (dt INT)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LOCATION '/Users/yuki/pyspark_batch/dataset/parquet/';

#S3などであれば、以下のように設定を変えることも可能です。
LOCATION 's3://data.platform/sample.db/raw_zone/sampletable/';


```

テーブルとして保存することによって、非エンジニアにも扱いやすくしてデータを提供することが可能です。

# データソースの読み込み

In [None]:
#データソースの読み込み
#sep='\t'とすればtsvでも読み込みが可能です
#multiLineは、CSVやTSVの各カラムに改行が含まれていた時の対策です。
df=spark.read.option("multiLine", "true").option("encoding", "SJIS").csv("/Users/yuki/pyspark_batch/dataset/jinko.csv", header=True, sep=',', inferSchema=False)
df.count()

In [None]:
df.show(truncate=False)

# 変換を行う(集計等を行う)

In [None]:
# ここからはETLにおけるTを行っていきます

#大正や昭和、平成はもう不要かなと感じたら変換処理にて
df_after_t=df.where(df."和暦（年）"== "令和")

In [None]:
#うーん使いづらい。。(日本語))
from pyspark.sql.types import LongType, StructType, StructField, StringType
from pyspark.sql.functions import col

#スキーマ設定をしていきましょう
# カラム名、型、デフォルト値で設定していきます
struct = StructType([
    StructField("code", StringType(), False),
    StructField("kenmei", StringType(), False),
    StructField("gengo", StringType(), False),
    StructField("wareki", StringType(), False),
    StructField("seireki", StringType(), False),
    StructField("chu", StringType(), False),
    StructField("jinko_male", StringType(), False),
    StructField("jinko_female", StringType(), False)
])
df=spark.read.option("multiLine", "true").option("encoding", "SJIS").csv("/Users/yuki/pyspark_batch/dataset/jinko.csv", header=False, sep=',', inferSchema=False,schema=struct)
df.show()

In [None]:
# 大正や昭はもう不要かなと感じたら変換処理にて
# ヘッダーも同時に除外してしまう
df.where(df.gengo == "平成").count()
# where 以外にもfilterと呼ばれるものがあります。機能は同じなので好きな方を選んで大丈夫です
df.filter(df.gengo == "平成").count()

In [None]:
#集計をしてみます
#平成の県ごとの男女の数の平均
import pyspark.sql.functions as sf
df.where(df.gengo == "平成").groupBy("kenmei").agg(sf.avg("jinko_male").alias("male_avg"),sf.avg("jinko_female").alias("female_avg")).show()

#「人口集中地区以外の地区」がいらなそうですね。
# データをクレンジングして不要なデータを除きましょう

In [None]:
#良さそうです！
df.where(df.gengo == "平成").groupBy("kenmei").agg(sf.avg("jinko_male").alias("male_avg"),sf.avg("jinko_female").alias("female_avg")).filter(df.kenmei != "人口集中地区以外の地区").sort("male_avg").show()

In [None]:
#結果を一度保存しておきます
df_after_t=df.where(df.gengo == "平成").groupBy("kenmei").agg(sf.avg("jinko_male").alias("male_avg"),sf.avg("jinko_female").alias("female_avg")).filter(df.kenmei != "人口集中地区以外の地区").sort("male_avg")

# DIKWモデル
少し脇道にそれるのですが、上記の作業はDIKWモデルというものに沿った動きでです。

DIKWモデルでは、データのステージを「Data」「Infromation」「Knowledge」「Wisdom」として定義しています。

- Data(データ)
- Information（情報）
- Knowledge（知識）
- Wisdom（知恵）

これらの頭文字をとってDIKWモデルと呼ばれています。

ETLをすることはDataを情報や知識に変換することを指します。
情報や知識は、データから見つかるルールや関係性のことです。
今回の場合だと、鳥取県が人口少ないです　という事実がわかったという形になります。

知恵は、この知識から生み出すもので、例えば鳥取県の人口が少なく、それが問題なのであれば
その問題を解決するための施策が知恵になります。



# カラムナーフォーマットへ変換する
データの変換が終わったので次は、そのデータをビッグデータ向けのフォーマットで保存することを考えます。  
今回はParquet形式へデータを変換します。


In [None]:
#単純に吐き出す方法
df_after_t.write.mode("overwrite").parquet("/Users/yuki/pyspark_batch/dataset/parquet")

In [138]:
#ファイルを見てみます
!ls -al /Users/yuki/pyspark_batch/dataset/parquet

#　ファイルが多いですね。。 一つのファイルサイズが小さいのも気になります

total 24
drwxr-xr-x  6 yuki  staff   192 Sep  8 17:58 [34m.[m[m
drwxr-xr-x  4 yuki  staff   128 Sep  8 17:58 [34m..[m[m
-rw-r--r--  1 yuki  staff     8 Sep  8 17:58 ._SUCCESS.crc
-rw-r--r--  1 yuki  staff    24 Sep  8 17:58 .part-00000-3be5d55f-9d08-4db4-b259-258609a9c292-c000.snappy.parquet.crc
-rw-r--r--  1 yuki  staff     0 Sep  8 17:58 _SUCCESS
-rw-r--r--  1 yuki  staff  2004 Sep  8 17:58 part-00000-3be5d55f-9d08-4db4-b259-258609a9c292-c000.snappy.parquet


#　スモールファイル問題
オンプレでもクラウドでもそうなのですが、ビッグデータの世界では一つのファイルが小さすぎると途端に処理が遅くなります。  
この問題をスモールファイル問題と読んでいます。  
一般に、１GBくらいずつまとめることが推奨されています。

この問題を解決するためには、repartition(もしくはcolaese)を使ってファイルをマージする必要があります。

In [135]:
# 今回はファイルを一個に纏めてみようと思います。
df_after_t.repartition(1).write.mode("overwrite").parquet("/Users/yuki/pyspark_batch/dataset/parquet")

In [137]:
# もう一度ファイルを見てみます
!ls -l /Users/yuki/pyspark_batch/dataset/parquet
# 一つになりました！ スモールファイル問題も解決です。

total 8
-rw-r--r--  1 yuki  staff     0 Sep  8 17:58 _SUCCESS
-rw-r--r--  1 yuki  staff  2004 Sep  8 17:58 part-00000-3be5d55f-9d08-4db4-b259-258609a9c292-c000.snappy.parquet


In [139]:
#もう少し書き込みのオプションを見ていきます
# partitionByを使うことで、データをパーティションごと(次に説明します)に分けて配置することができます。
# 今回はkenmei(県名)ごとにデータを保存してみようと思います。
df_after_t.repartition(1).write.partitionBy("kenmei").mode("overwrite").parquet("/Users/yuki/pyspark_batch/dataset/parquet")



In [140]:
# もう一度ファイルを見てみます
!ls -l /Users/yuki/pyspark_batch/dataset/parquet

#県名が出てきました
三重県のデータはkenmei=三重県の下に格納されています。

total 0
-rw-r--r--  1 yuki  staff    0 Sep  8 20:36 _SUCCESS
drwxr-xr-x  4 yuki  staff  128 Sep  8 20:36 [34mkenmei=三重県[m[m
drwxr-xr-x  4 yuki  staff  128 Sep  8 20:36 [34mkenmei=京都府[m[m
drwxr-xr-x  4 yuki  staff  128 Sep  8 20:36 [34mkenmei=人口集中地区[m[m
drwxr-xr-x  4 yuki  staff  128 Sep  8 20:36 [34mkenmei=佐賀県[m[m
drwxr-xr-x  4 yuki  staff  128 Sep  8 20:36 [34mkenmei=全国[m[m
drwxr-xr-x  4 yuki  staff  128 Sep  8 20:36 [34mkenmei=兵庫県[m[m
drwxr-xr-x  4 yuki  staff  128 Sep  8 20:36 [34mkenmei=北海道[m[m
drwxr-xr-x  4 yuki  staff  128 Sep  8 20:36 [34mkenmei=千葉県[m[m
drwxr-xr-x  4 yuki  staff  128 Sep  8 20:36 [34mkenmei=和歌山県[m[m
drwxr-xr-x  4 yuki  staff  128 Sep  8 20:36 [34mkenmei=埼玉県[m[m
drwxr-xr-x  4 yuki  staff  128 Sep  8 20:36 [34mkenmei=大分県[m[m
drwxr-xr-x  4 yuki  staff  128 Sep  8 20:36 [34mkenmei=大阪府[m[m
drwxr-xr-x  4 yuki  staff  128 Sep  8 20:36 [34mkenmei=奈良県[m[m
drwxr-xr-x  4 yuki  staff  128 Sep  8 20:36 [34mkenmei=宮城県[m[m
drwxr-xr-x  

In [142]:
#　三重県のデータを見てみます
!ls -l /Users/yuki/pyspark_batch/dataset/parquet/kenmei=三重県

total 8
-rw-r--r--  1 yuki  staff  741 Sep  8 20:36 part-00000-edc190f6-8222-4c3a-afa5-b1f74d651935.c000.snappy.parquet


In [144]:
# ここでSparkでParquetのデータを読み込んでみます

parquet_df=spark.read.parquet("/Users/yuki/pyspark_batch/dataset/parquet/kenmei=三重県")
parquet_df.show()

#コレが三重県の男性と女性の人口の平均値です。

+------------------+----------+
|          male_avg|female_avg|
+------------------+----------+
|1838127.1666666667|  893167.5|
+------------------+----------+



# 出力したデータをみんなに見やすくするため(BIツールから参照できるように)テーブルを作成する
次はテーブルの作成を行ってみようと思います。  
今のままだとエンジニア向けでちょっと使い勝手が悪いのとBIツールといった他のツールから参照することができません

In [145]:
# Createテーブルを発行します
# 次のチャプターでも紹介しますが、spark.sqlという関数を使います

#jinko_avgテーブルを作成します。
#パーティションとはデータを分けるフォルダみたいなもの、パーティションを分けることで読み込むデータ量を少なくしたりできるので最適化できる
#先程確認したkenmei=の部分がパーティションになっているのでその出力結果に合わせてテーブルを作成してみます。

#ロケーションは、kenmeiを含まずに指定します。

spark.sql(""" 
CREATE EXTERNAL TABLE IF NOT EXISTS default.jinko_avg ( male_avg double, female_avg double)
PARTITIONED BY (kenmei String)
STORED AS PARQUET
LOCATION '/Users/yuki/pyspark_batch/dataset/parquet/';
""")


21/09/08 20:50:17 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
21/09/08 20:50:17 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
21/09/08 20:50:21 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
21/09/08 20:50:21 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore yuki@127.0.0.1
21/09/08 20:50:21 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
21/09/08 20:50:22 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
21/09/08 20:50:22 WARN HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist
21/09/08 20:50:22 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
21/09/08 20:50:22 WARN Hive

DataFrame[]

In [147]:
# テーブルを見てみます。
spark.sql("show tables").show()

# ちゃんとできているようですね。
# ちなみに実行しているSQLは実はSQLみたいなものでHiveSQLと呼ばれるものです。
# Mysqlの扱いとほとんど同じなので、Mysqlみたいに使って動かなかったところだけ検索すると効率が良いと思います。

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| default|jinko_avg|      false|
+--------+---------+-----------+



In [153]:
#　おや。データを見ることができません。。
spark.sql("select * from default.jinko_avg").show()

+--------+----------+------+
|male_avg|female_avg|kenmei|
+--------+----------+------+
+--------+----------+------+



In [154]:
# テーブルだけでなく、Partitionを認識させてあげないといけません
# msck repair table　テーブル名と実行するとパーティションが認識されます(ちなみにAdd partitionというコマンドもあります)。
spark.sql("msck repair table jinko_avg")

DataFrame[]

In [155]:
# 今一度検索をしてみます。
spark.sql("select * from default.jinko_avg").show()
#　今度は出ましたね！

+--------------------+--------------------+------------+
|            male_avg|          female_avg|      kenmei|
+--------------------+--------------------+------------+
|  1838127.1666666667|            893167.5|      三重県|
|  2628424.6666666665|  1268325.3333333333|      京都府|
|        8.32563095E7|4.0840519833333336E7|人口集中地区|
|            864635.0|            408192.5|      佐賀県|
|1.2650455783333333E8|         6.1816723E7|        全国|
|           5511837.5|           2650310.5|      兵庫県|
|   5589153.166666667|  2665781.3333333335|      北海道|
|           5962485.5|           2987847.0|      千葉県|
|  1037736.3333333334|            490624.0|    和歌山県|
|   6936328.166666667|  3492880.3333333335|      埼玉県|
|  1210304.3333333333|   571530.6666666666|      大分県|
|   8809790.833333334|   4292675.833333333|      大阪府|
|  1405915.3333333333|   671178.6666666666|      奈良県|
|           2330816.5|           1139561.5|      宮城県|
|           1151179.5|            542386.5|      宮崎県|
|  1105906.8333333333|

In [156]:
# もちろんSQLなので whereも可能です
spark.sql("select * from default.jinko_avg where kenmei='東京都'").show()

+--------------------+-----------------+------+
|            male_avg|       female_avg|kenmei|
+--------------------+-----------------+------+
|1.2490754833333334E7|6222455.666666667|東京都|
+--------------------+-----------------+------+



# 次のチャプターはSparkSQLについて紹介していこうと思います

In [None]:
# Spark利用の停止
spark.stop()
spark.sparkContext.stop()