In [1]:
%%HTML
<link rel="stylesheet" href="custom.css">

# Data ScienceのためのPython，その先へ
<hr> 

#### ゼロからのpandasの使い方とPySparkへの架け橋
株式会社ネットプロテクションズ
関西開発インターン生　上野孝斗

タイトルは以下書籍のオマージュです．ゼミで読んでました．

https://www.amazon.co.jp/統計学への確率論、その先へ―ゼロからの測度論的理解と漸近理論への架け橋-清水-泰隆/dp/4753601250

## 本日の内容
<hr>

- 自己紹介
- Pythonについて
- Pandasの実際
- Apache Sparkについて
- Pysparkについて
- Pysparkの実際
- PandasとPysparkの比較
- 参考

## 注意
<hr>

画像の引用先は全て同スライドにある参考文献の番号に対応しています

## 自己紹介
<hr>

- NPには6月からジョイン
- 担当業務はAFTEE（DS）
- 大学の専攻は強化学習[1]，ベイズモデリング
- 趣味は散歩，ゲーム，映画鑑賞

# Pythonについて

## Pythonの概要
<hr>

- Python[2]は1990年代初期オランダで生まれたプログラミング言語
- その習得のしやすさから，入門用として使われる
- 一方，企業でもよく使われている


## Pythonの主な利用用途
<hr>

- データサイエンスおよび数値計算
- Webアプリケーション
- システム管理及びグルー用語
- 教育

PythonがGlue言語と言われる理由については以下の参考文献をチェック  
https://qiita.com/kawamou/items/05028c1a871d42119473

## フリートーク
<hr>

- お題：Pythonライブラリについて
- 知っているものでも，好きなものでも
- 実はR派...(ﾎﾞｿｯ)みたいなのも

## 有名なライブラリは数あれどやはり...
<hr>

- Pythonのライブラリといえばpandas!!(多分)
- ...pandasについて少し紹介します

有名なライブラリとしてPandasを挙げたのは進行上の都合になります.

# pandasについて

## pandasの概要
<hr>

- pandas[3]はPythonのライブラリの一つ
- PythonでDataScienceするときはまずはとりあえずpandas
- `pandas.DataFrame`オブジェクトによって，データフレームを作ることができる

## 補足：データフレームについて
<hr>

- さまざまな種類のデータフレームが存在(dask,pandas,pickle,parquet...)する
- ざっくり言うと，データフレーム＝二次元の表形式のデータ
- 言い換えると，テーブルデータ，ラベル付き二次元配列

## データフレームをpandasで作成
<hr>

- pandasでデータフレームを作成する
- 10 minutes to pandas[4]より

In [2]:
import pandas as pd
import numpy as np

pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

dates = pd.date_range("20130101", periods=6)
df = pd.DataFrame(np.random.randn(6, 4), index=dates, columns=list("ABCD"))
df

Unnamed: 0,A,B,C,D
2013-01-01,-0.170443,-0.732601,-1.311803,-0.010146
2013-01-02,0.803708,-0.888342,-1.942037,-1.667417
2013-01-03,-0.514631,0.626405,0.13861,0.282927
2013-01-04,0.301704,0.424543,0.559864,-0.642745
2013-01-05,-1.906992,-1.093645,-0.401919,-1.187676
2013-01-06,-0.18387,-1.085584,-0.662191,-1.586502


# pandasでできること

## 抽出（WHERE）
<hr>

- [6]を全体的に参考にした
- まずはデータを作る

In [3]:
pdf = pd.DataFrame({
    'student': ['Larissa', 'Jaylan', 'Golda', 'Myriam', 'Fabiola', 'Abigail', 'Hardy', 'Jeromy'],
    'class': list('ABAACCBC'),
    'score': [50, 69, 68, 70, 82, 57, 88, 93],
})
pdf

Unnamed: 0,student,class,score
0,Larissa,A,50
1,Jaylan,B,69
2,Golda,A,68
3,Myriam,A,70
4,Fabiola,C,82
5,Abigail,C,57
6,Hardy,B,88
7,Jeromy,C,93


- このデータからA組のテスト結果の点数と生徒の名前を抽出する．もしSQLでやるならこのようになる．

```
SELECT id, student, class
FROM test_result
WHERE class == 'A'
```
- pandasを使った場合は以下のように書ける

In [4]:
pdf.loc[pdf['class'] == 'A', ['student', 'score']]

Unnamed: 0,student,score
0,Larissa,50
2,Golda,68
3,Myriam,70


pandasでは，`Dataframe.loc`で行について操作することができるようになる

## 複数の条件で抽出
<hr>

- 複数の条件を抽出したいときは`&`や`|`を用いる．`and`や`or`ではエラーが出るので注意
- boolean型でも抽出ができる
- 「C組のうち、80点以上の生徒」という複数の条件で抽出する

In [5]:
pdf.loc[(pdf['class'] == 'C') & (pdf['score'] >= 80)]

Unnamed: 0,student,class,score
4,Fabiola,C,82
7,Jeromy,C,93


## 結合（JOIN）
<hr>

- `student`，`attendance`と二つのテーブルを用意する

In [6]:
# [7]を参考
from IPython.display import HTML
from jinja2 import Template

student = pd.DataFrame({
    'name': ['Larissa', 'Jaylan', 'Golda', 'Myriam', 'Fabiola', 'Abigail', 'Bernice', 'Hardy', 'Jeromy'],
    'class': list('ABAACCBBC'),
}, index=['A1', 'B1', 'A2', 'A3', 'C1', 'C2', 'B2', 'B3', 'C4'])

attendance = pd.DataFrame({
    'student_id': ['C2', 'C1', 'C4', 'A1', 'C2', 'B2', 'A1', 'B1', 'B3'],
    'subject': ['History', 'Japanese', 'English', 'math', 'math', 'History', 'math', 'math', 'History'],
})

In [7]:
html_tpl = """
<table>
  <tr>
    <td>{{ student }}</td>
    <td>{{ attendance }}</td>
  </tr>
</table>
"""
tpl = Template(html_tpl)
html_text = tpl.render({"attendance": attendance.to_html(), "student": student.to_html()})

HTML(html_text)

Unnamed: 0_level_0,name,class
Unnamed: 0_level_1,student_id,subject
A1,Larissa,A
B1,Jaylan,B
A2,Golda,A
A3,Myriam,A
C1,Fabiola,C
C2,Abigail,C
B2,Bernice,B
B3,Hardy,B
C4,Jeromy,C
0,C2,History

Unnamed: 0,name,class
A1,Larissa,A
B1,Jaylan,B
A2,Golda,A
A3,Myriam,A
C1,Fabiola,C
C2,Abigail,C
B2,Bernice,B
B3,Hardy,B
C4,Jeromy,C

Unnamed: 0,student_id,subject
0,C2,History
1,C1,Japanese
2,C4,English
3,A1,math
4,C2,math
5,B2,History
6,A1,math
7,B1,math
8,B3,History


- `INNER JOIN`はSQLだと以下のようになる
```
SELECT *
FROM attendance INNER JOIN student
  ON attendance.student_id = student.id
```
- pandasではこのようになる

In [8]:
pd.merge(attendance, student, left_on='student_id', right_index=True)

Unnamed: 0,student_id,subject,name,class
0,C2,History,Abigail,C
4,C2,math,Abigail,C
1,C1,Japanese,Fabiola,C
2,C4,English,Jeromy,C
3,A1,math,Larissa,A
6,A1,math,Larissa,A
5,B2,History,Bernice,B
7,B1,math,Jaylan,B
8,B3,History,Hardy,B


- 続いて`LEFT JOIN`をやってみる
- 「B組の生徒について、歴史（History）への出席情報」を取得する
- SQLでは以下のようになる

```
SELECT *
FROM student LEFT JOIN attendance
  ON student.id = attendance.student_id
WHERE student.class = 'B' AND attendance.subject = 'History'
```

- pandasでは次のようにする

In [9]:
# 各DataFrameからB組、およびHistoryの出席情報をそれぞれ抽出
b_student = student.loc[student['class'] == 'B']
history_attendance = attendance.loc[attendance['subject'] == 'History']

# LEFT JOINを実行
pd.merge(
    b_student,
    history_attendance,
    left_index=True,
    right_on='student_id',
    how='left',
)

Unnamed: 0,name,class,student_id,subject
,Jaylan,B,B1,
5.0,Bernice,B,B2,History
8.0,Hardy,B,B3,History


- `pd.merge()`では．結合に使う列名を指定できる
- 列名の左右について注意が必要[6]
![](pic2.png)

## 集約（GROUP BY）
<hr>

以下のような`test_result`テーブルを例にする

In [10]:
test_result = pd.DataFrame({
    'name': np.sort(['Larissa', 'Jaylan', 'Golda', 'Myriam', 'Fabiola', 'Abigail', 'Bernice', 'Hardy', 'Jeromy'] * 2),
    'class': np.sort(list('AABBCC') * 3),
    'subject': ['math', 'English'] * 9,
    'score': [50, 72, 96, 74, 66, 65, 51, 65, 85, 69, 53, 76, 44, 77, 56, 56, 52, 68],
})

- 集約関数が一つのとき
- 例えば，「クラスごと，教科ごとの平均点」を求めてみる
- SQLでは以下のようになる

```
SELECT class, subject, avg(score)
FROM test_result
GROUP BY class, subject
```

- pandasでは次のようになる

In [11]:
test_result.groupby(['class', 'subject']).mean()

  test_result.groupby(['class', 'subject']).mean()


Unnamed: 0_level_0,Unnamed: 1_level_0,score
class,subject,Unnamed: 2_level_1
A,English,70.333333
A,math,70.666667
B,English,70.0
B,math,63.0
C,English,67.0
C,math,50.666667


- 集約関数が複数ある場合
- 平均だけでなく，最大点も表示したいときがある
- その場合は`DataFrameGroupBy`オブジェクトの`agg()`メソッドを使うことによって実現できる

In [12]:
grouped = test_result.groupby(['class', 'subject'])
grouped.agg(['mean', 'max'])

  grouped.agg(['mean', 'max'])


Unnamed: 0_level_0,Unnamed: 1_level_0,score,score
Unnamed: 0_level_1,Unnamed: 1_level_1,mean,max
class,subject,Unnamed: 2_level_2,Unnamed: 3_level_2
A,English,70.333333,74
A,math,70.666667,96
B,English,70.0,76
B,math,63.0,85
C,English,67.0,77
C,math,50.666667,56


## pandasの課題点
<hr>

- pandasはものすごく便利だけど
- ビッグデータではメモリエラーが出る／処理が遅い
- SQLに習熟している場合は文法がわかりにくいことも

## フリートーク（その２）

<hr>

- メモリエラーを起こさずにデータを扱うにはどんな方法があるか

## pandasからApache Sparkへ
<hr>

- Apache Sparkを使えば，場合によってはpandasの100倍早く処理を行うことができる

# Apache Sparkについて

## Apache Sparkの概要
<hr>

- 並列分散処理を行えるデータ処理ツール
- 大規模データ分析を行う際に役立つ！

## Python + Apache Spark= ?
<hr>

- PythonでApache Sparkがもし使えたら...
- 学習しやすい，使いやすい！！
- もちろんJupyter Lab（Notebook）で使える！
- それがPySpark!![5]

![](pic3.png)

# PySparkについて

## PySparkの概要
<hr>

- Sparkをpython上で実行するためのAPI
- python以外にも実はsparkにはたくさんのAPIがあり，たくさんの言語に対応している．

# PySparkでできること


## PySparkとPandasの互換性
<hr>

- pandasのDFとPySparkのDFは相互に変換することができる
- 実際のところは．pandasからPySparkの場合がほとんど

In [27]:
from pyspark.sql import SparkSession

# pandasでつくったDFをSparkのDFへ変換
spark = SparkSession.builder.getOrCreate()
sdf = spark.createDataFrame(pdf)

sdf.show()

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


+-------+-----+-----+
|student|class|score|
+-------+-----+-----+
|Larissa|    A|   50|
| Jaylan|    B|   69|
|  Golda|    A|   68|
| Myriam|    A|   70|
|Fabiola|    C|   82|
|Abigail|    C|   57|
|  Hardy|    B|   88|
| Jeromy|    C|   93|
+-------+-----+-----+



## 抽出（WHERE）
<hr>

- データからA組のテスト結果の点数と生徒の名前を抽出する
- PySparkを使って書いた場合このようになる

In [14]:
sdf.filter(sdf['class'] == 'A').select('student','score').show()

+-------+-----+
|student|score|
+-------+-----+
|Larissa|   50|
|  Golda|   68|
| Myriam|   70|
+-------+-----+



In [15]:
# dropを使って表示する列を指定するとき
sdf.filter(sdf['class'] == 'A').drop('class').show()

+-------+-----+
|student|score|
+-------+-----+
|Larissa|   50|
|  Golda|   68|
| Myriam|   70|
+-------+-----+



- SQL, pandas, PySparkとコードを比較する

SQL
```
SELECT id, student, class
FROM test_result
WHERE class == 'A'
```

pandas
```pandas
pdf.loc[pdf['class'] == 'A', ['student', 'score']]b
```
PySpark
```PySpark
sdf.filter(sdf['class'] == 'A').select('student','score').show()
```

pandasではメソッドで角かっこを使っていて紛らわしい気もする

## 複数の条件で抽出
<hr>

- 「C組のうち80点以上の生徒」で抽出する

In [16]:
sdf.filter((sdf['class'] == 'C') & (sdf['score'] >= 80)).show()

+-------+-----+-----+
|student|class|score|
+-------+-----+-----+
|Fabiola|    C|   82|
| Jeromy|    C|   93|
+-------+-----+-----+



pandas
```pandas
pdf.loc[(pdf['class'] == 'C') & (pdf['score'] >= 80)]
```
PySpark
```PySpark
sdf.filter((sdf['class'] == 'C') & (sdf['score'] >= 80)).show()
```

## 結合（JOIN）
<hr>

- PySparkのDF変換した際`index`の情報が抜け落ちるので`student_id`として再定義する
- `withColumn`メソッドをうまく使えば解決できそうだが手が及ばなかった

In [17]:
student = pd.DataFrame({
    'name': ['Larissa', 'Jaylan', 'Golda', 'Myriam', 'Fabiola', 'Abigail', 'Bernice', 'Hardy', 'Jeromy'],
    'class': list('ABAACCBBC'),
    'student_id': ['A1', 'B1', 'A2', 'A3', 'C1', 'C2', 'B2', 'B3', 'C4']
})

s_attendance = spark.createDataFrame(attendance)
s_student = spark.createDataFrame(student)

- `INNER JOIN`をPySparkにより実装すると以下のようになる

In [18]:
s_attendance.join(s_student, 'student_id', 'inner').show()



+----------+--------+-------+-----+
|student_id| subject|   name|class|
+----------+--------+-------+-----+
|        A1|    math|Larissa|    A|
|        A1|    math|Larissa|    A|
|        B1|    math| Jaylan|    B|
|        B2| History|Bernice|    B|
|        B3| History|  Hardy|    B|
|        C1|Japanese|Fabiola|    C|
|        C2| History|Abigail|    C|
|        C2|    math|Abigail|    C|
|        C4| English| Jeromy|    C|
+----------+--------+-------+-----+



                                                                                

- 続いて`LEFT JOIN`をやってみる
- `INNER JOIN`と同様に，`join`メソッドの引数で

In [19]:
# LEFTJOIN
sb_student = s_student.filter(s_student['class'] == 'B')
shistory_attendance = s_attendance.filter(s_attendance['subject'] == 'History')
# indexを使っていないため，結合の指定がしやすい（のかも）
sb_student.join(shistory_attendance, "student_id", "left").show()

+----------+-------+-----+-------+
|student_id|   name|class|subject|
+----------+-------+-----+-------+
|        B1| Jaylan|    B|   null|
|        B2|Bernice|    B|History|
|        B3|  Hardy|    B|History|
+----------+-------+-----+-------+



SQL
```
SELECT *
FROM student LEFT JOIN attendance
  ON student.id = attendance.student_id
WHERE student.class = 'B' AND attendance.subject = 'History'
```
pandas
```
pd.merge(b_student,history_attendance,left_index=True,right_on='student_id',how='left',)
```
PySpark
```
sb_student.join(shistory_attendance, "student_id", "left").show()
```

## 集約（GROUP BY）
<hr>

- まずはDFの変換をする

In [20]:
s_test_result = spark.createDataFrame(test_result)

- pandasの場合と同様に「クラスごと，教科ごとの平均点」を求めてみる

In [21]:
s_test_result.groupBy('class','subject').mean().show()

[Stage 23:>                                                         (0 + 8) / 8]

+-----+-------+------------------+
|class|subject|        avg(score)|
+-----+-------+------------------+
|    A|   math| 70.66666666666667|
|    A|English| 70.33333333333333|
|    B|   math|              63.0|
|    B|English|              70.0|
|    C|English|              67.0|
|    C|   math|50.666666666666664|
+-----+-------+------------------+



                                                                                

SQL
```
SELECT class, subject, avg(score)
FROM test_result
GROUP BY class, subject
```
pandas
```
test_result.groupby(['class', 'subject']).mean()
```
PySpark
```
s_test_result.groupBy('class','subject').mean().show()
```

- 集約関数が複数存在する場合
- `agg()`メソッドを使って表現，計算のために`pyspark.sql.functions`ライブラリをインポートしている

In [22]:
import pyspark.sql.functions as F

s_test_result.groupBy('class','subject').agg(F.avg('score'), F.max('score')).show()

[Stage 26:>                                                         (0 + 8) / 8]

+-----+-------+------------------+----------+
|class|subject|        avg(score)|max(score)|
+-----+-------+------------------+----------+
|    A|   math| 70.66666666666667|        96|
|    A|English| 70.33333333333333|        74|
|    B|   math|              63.0|        85|
|    B|English|              70.0|        76|
|    C|English|              67.0|        77|
|    C|   math|50.666666666666664|        56|
+-----+-------+------------------+----------+



                                                                                

## SparkSQLについて
<hr>

- PySparkはPandasよりもSQLに近い形で書くことができて便利
- SQL文によって操作することもできる

- 先ほど行ったPySparkでの抽出をSQLで行う[8]

In [23]:
# SparkSQLで操作するテーブルを登録
sdf.createOrReplaceTempView('test_result')

# SQLによる操作
query = '''
SELECT student, score
        FROM test_result
        WHERE class == 'A'
'''

spark.sql(query).show()

+-------+-----+
|student|score|
+-------+-----+
|Larissa|   50|
|  Golda|   68|
| Myriam|   70|
+-------+-----+



SQLに習熟している人はPySparkのメソッドを覚えなくても良い気がする

# PandasとPysparkの比較

## 乱数によるデータセット作成
<hr>

- 乱数によってデータを生成[9]する
- データサイズはおよそ80KB

In [24]:
np.random.seed(seed=42)
r_pdf = pd.DataFrame(np.random.rand(100,100))
r_sdf = spark.createDataFrame(r_pdf)
#Byte単位でデータサイズを表示
r_pdf.memory_usage().sum()

80128

## 時間計測
<hr>

- 一つのカラムに関してソートを行う時間を計測することにより比較する
- pandasの場合

In [25]:
import time

start = time.perf_counter() 

r_pdf.sort_values([0])

print(time.perf_counter() - start)

0.0003958750021411106


- PySparkの場合

In [26]:
import time

start = time.perf_counter()

r_sdf.sort("0")

print(time.perf_counter() - start)

0.04507395801192615


- PySparkとpandasそれぞれの場合について実行時間を比較すると圧倒的にpandasの方が早い
- [10]ではPySparkとpandasの性能比較をより詳しく行なっている
- [10]内でも，平均に関しては常にpandasの方が早い
- データのサイズが1,000,000×1000を越え，かつ相関を求めるといった複雑な処理を行うとPySparkの性能が上回る
- 下の図では横軸がそれぞれデータの行，列を表し，縦軸では計算時間を表している


<img src="pic4.png" width="75%">

## 終わりに
<hr>

- PySparkによるデータ前処理の例を参考文献に数例紹介した
- pandasっぽい文法でSparkを動かそうというコンセプトのKoalasというものもある
- Sparkのほうが処理が早いからと言って全ての分析ケースに対してpandasからPySpark(Spark)に置き換えるのは得策ではない
- プロジェクトとして行なっていく際には，まずは小さなデータからpandasを使ってアドホックな分析を行い，分析の方針が決まってからSparkを扱うといったパターンが想定される．

## フリートーク（その3）
<hr>

質問お待ちしてます！

## 参考
<hr>

### スライド中の参考文献
[1] https://bookclub.kodansha.co.jp/product?item=0000275420  
[2] https://docs.python.org/ja/3/  
[3] https://pandas.pydata.org/docs/  
[4] https://pandas.pydata.org/docs/user_guide/10min.html  
[5] https://sparkbyexamples.com/pyspark/pandas-vs-pyspark-dataframe-with-examples/amp/  
[6] https://www.ohitori.fun/entry/basic-data-analysis-in-pandas  
[7] https://qiita.com/driller/items/ef8a16be03e146ce2183  
[8] https://blog.serverworks.co.jp/introducing-pyspark-6  
[9] https://qiita.com/yukifddd/items/5668483705ef9d89a0c9  
[10] https://towardsdatascience.com/parallelize-pandas-dataframe-computations-w-spark-dataframe-bba4c924487c

### PySparkによるデータ前処理の例
https://techblog.nhn-techorus.com/archives/7301  
https://www.ariseanalytics.com/activities/report/20211210/