# 本セクションの目次
1. ビッグデータ世界のDDL
2. テンポラリテーブル
3. ビッグデータ世界のSQLとは？
4. 単純SQLを振り返ってみよう
5. 分析関数を練習してみよう
6. LAG/Lead関数
7. ピボットテーブル
8. SparkのRDDを使って1レコードつづ処理してみよう

In [None]:
# コンソールで設定したSparkとNoteBookを接続します(動かす前に毎度実行する必要があります)
import findspark
findspark.init("/home/pyspark/spark")

In [None]:
#pysparkに必要なライブラリを読み込む
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession

#spark sessionの作成
# spark.ui.enabled trueとするとSparkのGUI画面を確認することができます
# spark.eventLog.enabled true　とすると　GUIで実行ログを確認することができます
# GUIなどの確認は最後のセクションで説明を行います。
spark = SparkSession.builder \
    .appName("chapter1") \
    .config("hive.exec.dynamic.partition", "true") \
    .config("hive.exec.dynamic.partition.mode", "nonstrict") \
    .config("spark.sql.session.timeZone", "JST") \
    .config("spark.ui.enabled","true") \
    .config("spark.eventLog.enabled","true") \
    .config("spark.jars.packages", "org.apache.spark:spark-streaming_2.13:3.2.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1,org.apache.spark:spark-avro_2.12:3.2.1") \
    .enableHiveSupport() \
    .getOrCreate()

# パッケージを複数渡したい時は「,」で繋いで渡します。
# Sparkのバージョンにしっかりと合わせます(今回はSparkのバージョンが3.2を使っています。)。

# ビッグデータの世界のDDL

ビッグデータの世界でのDDLはRDSと同じ様にDDL文を実行することが可能です。  
今回は以下のDDLについてみていきましょう  

- Create Database 文
- CREATE EXTERNAL TABLE文
- CREATE VIEW
- ADD PARTITION(MSCK REPAIR)

## CREATE DATABASE文
データベースの作成を行います。
こちらはRDSなどのCreate Database　と同じ方法で作成が可能です。

In [None]:
spark.sql("create database if not exists super_crush_course")

In [None]:
# データベースの一覧を見てみましょう
spark.sql("show databases").show(truncate=False)

## CREATE EXTERNAL TABLE文
テーブル定義の構成要素をみていきましょう

```
CREATE EXTERNAL TABLE IF NOT EXISTS super_crush_course.csv_table ( id INT, date STRING)
PARTITIONED BY (dt INT)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LOCATION '/home/pyspark/super_crush_course.db/csv_table/dataset/parquet/';

#S3などであれば、以下のように設定を変えることも可能です。
LOCATION 's3://data.platform/super_crush_course.db/raw_zone/sampletable/';

```

ビッグデータの世界では、実データとデータベース定義/テーブル定義(メタデータ)は明確に分離されています。  
今回のコースだと実データはローカル端末のSSD(HDD)でテーブル定義はMysqlに登録されています。  
明確に分離されているからこそ、場所を指し示す宣言であるLocationが必要になってきます  

ロケーションは「super_crush_course.db」とDB名.db/TABLE名とすることが通例です。  
Externalは外部のという意味で、オンプレ環境の場合はつけないことが多かったが、クラウド環境ではつけるのが必須となっている設定。

メタデータについてさらに詳しく知りたい方は、以下の記事を参照してみてください  
「【PythonとSparkで始めるデータマネジメント入門】 ビッグデータレイクのための統合メタデータ管理入門」

上記の例は、CSV形式のテーブルです。  
それ以外にもParquet形式、Avro形式でテーブルを作成することができます。

```
# パーティションつきのテーブル
CREATE TABLE IF NOT EXISTS super_crush_course.parquet_table 
(code String, gengo String,wareki String,seireki String,chu String,sokei String,jinko_male String,jinko_femail String)
PARTITIONED BY (kenmei String)
STORED AS PARQUET
TBLPROPERTIES ("parquet.compression"="SNAPPY");
LOCATION '/home/pyspark/super_crush_course.db/parquet_table';

```

圧縮形式と保存するファイルフォーマットを指定してテーブルの作成を行なっていきます。  
ちなみにテーブル定義のカラムは、今回読み込んだCSV/JSONのスキーマになります。  
Partitionとはデータの区切りのことで、kenmeiを区切りとしてデータを保存しています(次のレクチャーにて)。　 

```
# パーティションなしのテーブルの場合
CREATE TABLE IF NOT EXISTS super_crush_course.parquet_table_with_no_partition
(code String, gengo String,kenmei, Stringwareki String,seireki String,chu String,sokei String,jinko_male String,jinko_femail String)
STORED AS PARQUET
TBLPROPERTIES ('parquet.compression'='SNAPPY')
LOCATION '/home/pyspark/super_crush_course.db/parquet_table_with_no_partition'

```

In [None]:
## テーブル作成
# テーブル作成も同様に、spark.sqlを使ってテーブルを作成していきます。
# ロケーションはセクション2で出力したディレクトリになります。

# Parquetテーブルの作成(パーティションあり)
spark.sql("""

CREATE TABLE IF NOT EXISTS super_crush_course.parquet_table_with_partition 
(code String, gengo String,wareki String,seireki String,chu String,sokei String,jinko_male String,jinko_femail String)
PARTITIONED BY (kenmei String)
STORED AS PARQUET
TBLPROPERTIES ('parquet.compression'='SNAPPY')
LOCATION '/home/pyspark/super_crush_course.db/parquet_table_with_partition'

""")

# Paruqetテーブルの作成(パーティションなし)
spark.sql("""

CREATE TABLE IF NOT EXISTS super_crush_course.parquet_table_with_no_partition 
(code String, gengo String,wareki String,kenmei String,seireki String,chu String,sokei String,jinko_male String,jinko_femail String)
STORED AS PARQUET
TBLPROPERTIES ('parquet.compression'='SNAPPY')
LOCATION '/home/pyspark/super_crush_course.db/parquet_table_with_no_partition'

""")

In [None]:
# AVROテーブルの作成(パーティションあり)
spark.sql("""

CREATE TABLE IF NOT EXISTS super_crush_course.avro_table_with_partition 
(code String, gengo String,wareki String,seireki String,chu String,sokei String,jinko_male String,jinko_femail String)
PARTITIONED BY (kenmei String)
STORED AS AVRO
TBLPROPERTIES ('parquet.compression'='SNAPPY')
LOCATION '/home/pyspark/super_crush_course.db/parquet_table_with_partition'

""")

# AVROテーブルの作成(パーティションなし)
spark.sql("""

CREATE TABLE IF NOT EXISTS super_crush_course.avro_table_with_no_partition 
(code String, gengo String,wareki String,kenmei String,seireki String,chu String,sokei String,jinko_male String,jinko_femail String)
STORED AS AVRO
TBLPROPERTIES ('parquet.compression'='SNAPPY')
LOCATION '/home/pyspark/super_crush_course.db/avro_table_with_no_partition'

""")

In [None]:
# 作成したテーブルを見てみましょう
spark.sql("show tables in super_crush_course").show(truncate=False)

# ADD PRTITION
パーティションを認識するためにコマンドを発行する必要があります。

- Add Partiton
- MSCK repair Table

の2つをみていきましょう

In [None]:
# パーティションも管理されている
spark.sql("show partitions super_crush_course.parquet_table_with_partition").show(n=2)
# 本来件名があってほしいが。。。

In [None]:
# パーティションがテーブルは。。？クエリしてみる
spark.sql("select * from super_crush_course.parquet_table_with_no_partition").show(n=2)

In [None]:
# パーティションがあってadd paritionをしていないテーブルは。。？クエリしてみる
spark.sql("select * from super_crush_course.parquet_table_with_partition").show(n=2)
# データを見ることができない

In [None]:
# パーティションを追加してみる
# パーティションの追加には２種類存在しています
# add partition
# msck repair table名
spark.sql("alter table super_crush_course.parquet_table_with_partition add if not exists  partition (kenmei='東京都')")

In [None]:
# パーティションも管理されている
spark.sql("show partitions super_crush_course.parquet_table_with_partition").show(n=20)

In [None]:
# 再度検索を行ってみる
# パーティションがあってadd paritionをしていないテーブルは。。？クエリしてみる
spark.sql("select * from super_crush_course.parquet_table_with_partition").show()
# 追加した東京都のデータだけ見える

In [None]:
# 一個づつAdd partitionするのは面倒なのでmsckを使う(ただし時間がかかる場合が多いので、日々の処理であればadd partitionを選択する方がいい)
spark.sql("msck repair table super_crush_course.parquet_table_with_partition")
spark.sql("msck repair table super_crush_course.avro_table_with_partition")

# Create View
ビューを作成します。
ビューとは、仮想的なテーブルのことでデータを生成しなくてもテーブルを生成することが可能です。

言葉で伝えるより実際に見た方がいいと思うので、早速作ってみましょう。　　

手っ取り早くクエリを簡単にしたい場合に有効です。

In [None]:
# create view

spark.sql("""

create view parquet_view (gengo)
as 
select gengo from 
super_crush_course.parquet_table_with_partition

""")

In [None]:
spark.sql("select * from parquet_view").show()

# テンポラリテーブル
テンポラリーテーブルとはデータフレームから一時的にテーブルを作成することで、SparkSessionごとに生成が可能です。

特にスキーマオンリードで読み込んだdataframeを一時的にテーブルにすることで、SQLでの操作を可能にすることができます。

一連の流れを見てみましょう

In [None]:
# テンポラリーテーブルの作成
json_df=spark.read.json("./dataset/jinko.json")
json_df.createOrReplaceTempView("json_tmp")

In [None]:
#　json_tempの検索
spark.sql("select * from json_tmp where kenmei='東京都'").show()

# ビッグデータ世界のDMLとは

ビッグデータの世界のSQLは基本的にSQL’ライク’です。  
というのもRDSのSQLを前提にしてライクと言っているだけなので、ビッグデータ世界を中心としたらSQLそのものです。  
RDSでのSQLに慣れている人は、ビッグデータの世界のSQLは難なくこなすことができると思います。  

- SELECT
- CTAS
- SELECT INSERT
- INSERT?
- UPDATE?
- DELETE?

今回は4つのDMLを確認してみます。

In [None]:
# SELECT
spark.sql("select * from super_crush_course.parquet_table_with_no_partition").show()

## パーティションつきのテーブルを検索してみる
spark.sql("select * from super_crush_course.parquet_table_with_partition where kenmei ='東京都'").show()

# パーティションなしの場合は、すべてのデータを走査してから絞り込みます
# パーティションありの場合は、特定のパーティション配下のディレクトリのみをチェックします

# 大体のRDSのSQLでできることは実行可能です。

In [None]:
# Joinも可能
# SQLでJoinするというのはRDBを使っている方ならイメージが湧くと思いますので、今回はデータフレームでジョインをしてみたいと思います
json_df.join(json_df,on=[json_df.code == json_df.code,json_df.wareki == json_df.wareki],how="inner").show()

# CTAS
Create Table As Selectの略です。

簡単にいうと、Selectの返却結果からテーブルを作成することが可能です。

In [None]:
# CTASを動かしてみます

# SQLでやる方法
spark.sql(""" 
CREATE EXTERNAL TABLE if not exists super_crush_course.ctas_sql 
    STORED AS PARQUET LOCATION '/home/pyspark/super_crush_course.db/ctas_sql' 
AS
SELECT *
    FROM super_crush_course.parquet_table_with_no_partition
WHERE 1=1

""")

In [None]:
spark.sql("show tables in super_crush_course").show(truncate=False)

In [None]:
json_df.write.format("parquet").mode("overwrite").saveAsTable("super_crush_course.ctas_dataframe",path='/home/pyspark/super_crush_course.db/ctas_dataframe')

In [None]:
spark.sql("select * from super_crush_course.ctas_dataframe").printSchema()
spark.sql("show create table super_crush_course.ctas_dataframe").show(truncate=False)

In [None]:
spark.sql("select * from super_crush_course.ctas_dataframe").show()

# SELECT INSERT
既に存在するテーブルに対して、検索結果をもとにデータを登録していくことができます。　　
SELECT INSERTの場合は、ADD PARTITIONは不要です

CTASと違うのはこちらはテーブルを作る操作ではなくて既にあるテーブルに対してデータを登録することが目的です。

In [None]:
# 今回は各地域のデータの履歴をまとめて一つのパーティションに入れるSelect Insertを記載してましょう。
spark.sql(""" 
Insert overwrite table super_crush_course.parquet_table_with_partition PARTITION(kenmei='all')

select code,gengo,wareki,seireki,chu,sokei,jinko_male,jinko_femail
 from 
super_crush_course.parquet_table_with_no_partition

""")

In [None]:
spark.sql("show partitions super_crush_course.parquet_table_with_partition").show()

# INSERT/UPDATE/DELETE?
ビッグデータの世界では原則としてACIDをサポートしていません。  
そのため、UPDATEやDELETがサポートされていないことが多いです。

INSERTは単体で利用することはできますが、あまり出番がなく前述で紹介したSELECT INSERTでの出番が大半です。

# 分析関数の練習をしよう
ここからは、分析関数の練習をしてみましょう。  

- agg(groupby,count,sum)
- window(over)
- ピボットテーブル
- lag関数

データフレームでの操作とSQLでの操作を対比させながら実行していきます。

# 利用するテーブル(とDataFrame)
spark.sql("select * from super_crush_course.parquet_table_with_no_partition")

In [None]:
analysis_df=spark.sql("select * from super_crush_course.parquet_table_with_no_partition")

In [None]:
from pyspark.sql import functions as F 
# groupby->agg
# groupbyしたそれぞれのグループの中でデータを溜め込んでいく(count,sum,avg)こと

analysis_df.groupBy(analysis_df.code).count().show(n=2)

analysis_df.groupBy(analysis_df.code).agg(F.sum(analysis_df.jinko_male).alias("sum_male"),F.avg(analysis_df.jinko_male).alias("avg_me")).show(n=2) 

In [None]:
# sqlで表現してみましょう
spark.sql("""
select code,sum(jinko_male) as sum_male,avg(jinko_male) as avg_me from super_crush_course.parquet_table_with_no_partition
group by code
""").show(n=2)

In [None]:
from pyspark.sql.window import Window

# window 関数
# window->{paritionBy(=GroupBy)->(order by) -> rank(順番付け)}
# 最後にwindowをoverにいれると結果が出ます

window_schema = Window.partitionBy(analysis_df.code).orderBy(analysis_df.sokei.cast("Long").asc())
analysis_df.withColumn("sokei_rank",F.rank().over(window_schema)).show(n=50)

In [None]:
# SQLで表現してみましょう

spark.sql("""
select 
*,
RANK() OVER (PARTITION BY code ORDER BY cast(sokei as long) asc) as sokei_rank

from super_crush_course.parquet_table_with_no_partition 
""").show(n=50)

# LAG/LEAD関数
Lagというのはひとつ前のレコード
LEADというのは一つ先のレコード

前日比とか前年比といった様に前後のレコードを比較することによって意味をなす値を出したいときに使います。

今回は、先ほど並び替えたデータを使って、どれくらい人口が増えていったのか確認してみましょう。

In [None]:
from pyspark.sql.window import Window

# window 関数
# window->{paritionBy(=GroupBy)->(order by) -> rank(順番付け)}
# 最後にwindowをoverにいれると結果が出ます

window_schema = Window.partitionBy(analysis_df.code).orderBy(analysis_df.sokei.cast("Long").asc())

analysis_df.withColumn("sokei_per",F.lag("sokei",1).over(window_schema)).show(n=50)
percent=analysis_df.withColumn("sokei_per",F.lag("sokei",1).over(window_schema))

In [None]:
# 順番が上がるごとにどれだけ増えていったかみていきましょう

percent.withColumn("percent",percent.sokei/percent.sokei_per).show()

In [None]:
# SQLで表現してみましょう

spark.sql("""

select 
*,
sokei / LAG(sokei,1) OVER (PARTITION BY code ORDER BY cast(sokei as long) asc)  as sokei_rank
from super_crush_course.parquet_table_with_no_partition

""").show(n=50)

# ピボットテーブル(と転置(transpose))
ピボットテーブルは簡単にいうとテーブル内の項目をまとめて処理(合計、平均)することです。

一般にデータベースに格納されたテーブルは

- コーヒ　1 100 2022/11/11
- コーヒ　1 100 2022/11/12

といった様にレコード単位で格納されています。
それらのデータを集計して表現してくれるものがピボットテーブル

GroupByしたりすればできるのでは？
と思われるかもしれませんが、pivotテーブルは集計のグループをヘッダーとして持つことができます。
※うまくいえなかったので実際に見てみましょう。

transposeについては次のチャプターにて紹介を行います。

In [None]:
# 各元号の中で一番sokeiが多かったものを取り出している
analysis_df.withColumn("sokei",analysis_df.sokei.cast("long")).groupBy(analysis_df.kenmei).pivot("gengo").max("sokei").show()

# データフレーム限定 RDDによる一行づつの操作

出番がある様な、無い様な操作ですが一つSparkの特徴であるRDD(低レベル操作)についてみていきましょう。  
Sparkは全てのDataFrameは実行されるときにRDDに変換されて実行されます(そのときに最適なRDD操作に変換してくれる)。

あまり普段RDDを意識することなく操作を可能です。

一応RDDでの操作も見ておきましょう。

RDDに変換すると、mapとかlambdaなどPythonの関数が適用になりますが、最適化をやってくれなくなるのであまりおすすめはできないです。

In [None]:
rdds=json_df.rdd.map(lambda x: len(x.code))
rdds.take(10)

rdds.reduce(lambda a,b: a+b)

In [None]:
spark.stop()