# PySpark入門

## Sparkの概要

- Sparkはカリフォルニア大学バークレー校AMPLabで開発された、オープンソースの分散処理フレームワークである。  
- Amazon, Microsoft, Alibaba, Baidu, NTT, Uberなど、大データを処理する必要のある企業で広く利用されている。
- 現在はDatabricksという企業が開発の中心となっている。オープンソースなので、他にも世界中の多くの人々が開発に携わっており、現在の日本人の中心的な開発者としては、NTTの山室氏や猿田氏がいる。

- SparkはScala, Java, Python, Rで操作することができる。  
    - 今回はPythonで操作する方法をお伝えする。
    - Spark version2.0以前は、Pythonで操作するとScala/Javaで操作した場合に比べてかなり実行速度が遅かったが、Spark version2.0以降は、ユーザー定義関数を使わずにDataFrame形式(後述)でデータを扱うのであれば、PythonとScala/Javaとで殆ど実行速度は変わらない。
    - PythonでSparkを操作するためのラッパーライブラリは<font color=red>**PySpark**</font>と呼ばれている。
- Sparkの利用には、クラスタを管理するクラスタマネージャと、分散ファイルシステムが必要である/font。
    - クラスタマネージャとしては、デフォルトのSparkクラスタの他、Hadoop YARNクラスタを利用することも可能である。  
    - 分散ファイルシステムとしては、Hadoop分散ファイルシステム、MapRファイルシステム、Cassandra, Amazon S3などに対応している。
    - 今回は手軽にSparkを試すため、疑似分散ローカルモードを使用する。擬似的な分散ファイルシステムでローカルPCのファイルを扱い、擬似的なSparkクラスタをローカルPCで走らせるモードである。
- AWSでは、AWS EMRでSparkクラスタを立ち上げて、S3にファイルを置くことで、容易にSparkを利用できる。Azureでも同様のサービスがある。
    - AWS EMR & S3という組み合わせのほか、Databricks & S3という組み合わせもある。こちらの方がより容易/手軽であるが、その分料金は高い。細かい設定はAWS EMRの方がやりやすい印象。

- 大規模データを素早く処理したい場合は、クラスタ数を増やすことで対応する。例えばUberは10ペタバイト弱のデータがストレージにあった時点で、日々のデータ処理を10000仮想コアのHadoop Yarnクラスタで行っていたようである。(現在はもっと増えている)[参考記事](https://eng.uber.com/uber-big-data-platform/)

- このnotebookでは、Sparkの主要機能のうち、Spark SQL(データ操作)とSpark ML(機械学習)を利用する。
    - 他、Sparkの主要機能としては、Spark Streaming(ストリーミング分析/処理)、GraphX(グラフ理論分析)などがある。

PySparkで大体のことができてしまうので、Python(と場合によってはSQL)の知識が一定程度あれば、学習コストはかなり低いと言える。  
Hadoop YARNやKafkaなど、周辺のライブラリもあわせて学習する場合は、JavaやScalaの知識が必要となってくる。  
(ちなみにScalaも関数型言語とオブジェクト指向言語のハイブリッド言語で、楽しい言語である。)

## セットアップ

ここではMacBook上でSpark疑似分散ローカルモード & Pyspark & Jupyter notebook環境を構築する方法を紹介する。  
(作成にあたっては[こちら](https://qiita.com/neppysan/items/0fe706f04b001c082d38)を参考にした。)

0. システム設定 > JavaでJavaコントロールパネルを開き、一般タブの"Javaについて"ボタンを押して、java version8以降がインストールされていることを確認。
    - まだjavaが入っていない場合や、Java version7以前がインストールされている場合は、[こちら](https://www.java.com/ja/download/)からインストール。
    - 再度システム設定から、Javaのversionを確認。
    - 将来的にJavaでコーディングもするかも、という場合は、[こちら](https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html)からJDKもインストールしておくと良い。Java SE Development KitのMacOSXのものをダウンロードして実行。
        - インストール後、~/.bash_profileに以下を追加。
```sh
export JAVA_HOME=$(/usr/libexec/java_home -v 1.8)
export PATH="$JAVA_HOME/bin:$PATH"
```
        - `java -version`とターミナルで打って、バージョン情報が表示されたら成功。

In [4]:
!java -version

java version "1.8.0_181"
Java(TM) SE Runtime Environment (build 1.8.0_181-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode)


1. [こちら](https://spark.apache.org/downloads.html)のページから、Sparkをダウンロード&解凍。
    - 最新のversion2.4.3, Pre-build for Apache Hadoop 2.7 or laterをダウンロードして、適当な場所に解凍。眞田はHomeディレクトリに解凍した。

In [5]:
!ls ~/spark-2.4.3-bin-hadoop2.7/

LICENSE    README.md  [34mconf[m[m       [34mjars[m[m       [34mlogs[m[m       [34mwork[m[m
NOTICE     RELEASE    [34mdata[m[m       [34mkubernetes[m[m [34mpython[m[m     [34myarn[m[m
[34mR[m[m          [34mbin[m[m        [34mexamples[m[m   [34mlicenses[m[m   [34msbin[m[m


3. ~/.bash_profileを編集。
    - ~/.bash_profileに以下を追記。
```bash
export PATH=$PATH:$HOME/spark-2.4.3-bin-hadoop2.7/bin
export PYSPARK_PYTHON=$HOME/anaconda3/envs/{condaの仮想環境の名前}/bin/python
export PYSPARK_DRIVER_PYTHON=$HOME/anaconda3/envs/{condaの仮想環境の名前}/bin/jupyter
export PYSPARK_DRIVER_PYTHON_OPTS='notebook' pyspark
```

1行目で、先程ダウンロード&解凍したsparkのソフトウェアにパスを通している  
2行目で、PySparkで使用するPythonを指定している。  
3行目では、PySparkで使用するクライアントを指定している。今回はJupyterを使用する  
4行目で、pysparkとコマンドを打った場合に自動でnotebookが起動するように設定している。

3. ターミナルから`pyspark`と打って、jupyter notebookが起動したら成功!

4. notebookのホーム画面から、本notebookを開き、以下が動作するか確認。

In [1]:
# notebook上でも動作をチェック
spark

SparkのWeb UI画面にもアクセス可能かどうか確認。[http://localhost:4040](http://localhost:4040)にアクセス。

UIの見方は、例えば[こちら](https://linux.wwing.net/WordPress/2017/01/05/sparkのuiを調べてみた/)を参照。

## データロード

DataFrameとしてデータを読み込むことができる。これはSpark SQLの機能の一つである。  
名前が同じであるが、pandasのDataFrameとは全く別物なので注意。

### csvの読み込み

In [4]:
# ~~~/Job1/内の.csvを全てロード
df = spark.read.csv("../data/raw/CAN/20190605_CAN/103.486_CAR1V21_HDD2/Job1/*.csv")

NameError: name 'spark' is not defined

In [2]:
# 複数のフォルダの.csvを全てロードする場合は、リストで渡せばOK
df_2 = spark.read.csv(["../data/raw/CAN/20190605_CAN/103.486_CAR1V21_HDD2/Job1/*.csv",
                     "../data/raw/CAN/20190605_CAN/103.486_CAR1V21_HDD2/Job1 copy/*.csv"])

NameError: name 'spark' is not defined

In [3]:
# この書き方もできる
df_3 = spark.read.csv("../data/raw/CAN/20190605_CAN/103.486_CAR1V21_HDD2/*/*.csv")

NameError: name 'spark' is not defined

In [4]:
df.show(5) # show(行数)でいい感じに整形して表示してくれる。

+---+------------+----------+-----+--------+
|_c0|         _c1|       _c2|  _c3|     _c4|
+---+------------+----------+-----+--------+
|021|201604200750|  0.186631|12461|     1.0|
|021|201604200750|  0.186631|12463|     0.0|
|021|201604200750|  0.186631|12464|     0.0|
|021|201604200750|  0.186631|12467|     1.0|
|021|201604200750|  0.186631|12457|5.490196|
+---+------------+----------+-----+--------+
only showing top 5 rows



ちなみに、データ型はsparkが裏側で勝手に推定してくれる。

In [5]:
df.dtypes # 全て文字列型で格納されている

[('_c0', 'string'),
 ('_c1', 'string'),
 ('_c2', 'string'),
 ('_c3', 'string'),
 ('_c4', 'string')]

csvの他、jsonファイル、parquetファイル、textファイル、hiveテーブル、データベーステーブルからデータを読み込むことができる。  
詳細は[公式ドキュメント](https://spark.apache.org/docs/latest/sql-data-sources.html)を参照。

また、アクセス先のs3のアクセスキーと秘密キーの設定をしてあげれば、以下のような形で簡単にs3上のファイルもロードできる。
```python
df = spark.read.csv(["s3a://example/HDD1/*.csv", "s3a://example/HDD2/*.csv"])
```

#### レコード数

In [6]:
# countで計算できる
df.count()

89666303

少し時間がかかる。先程の[UIサーバー](http://localhost:4040)で進捗を見ることができる。

### 列名をつけて読み込み

In [7]:
# toDFメソッドを利用するのがお手軽な方法。複数の引数(カンマ区切りの引数)として入力する必要があるので、
# '*'をリストの前につけて、引数展開して入力している
df_named = df.toDF(*["carid", "starttime", "times", "feature", "val"])

In [8]:
df_named.show(5)

+-----+------------+----------+-------+--------+
|carid|   starttime|     times|feature|     val|
+-----+------------+----------+-------+--------+
|  021|201604200750|  0.186631|  12461|     1.0|
|  021|201604200750|  0.186631|  12463|     0.0|
|  021|201604200750|  0.186631|  12464|     0.0|
|  021|201604200750|  0.186631|  12467|     1.0|
|  021|201604200750|  0.186631|  12457|5.490196|
+-----+------------+----------+-------+--------+
only showing top 5 rows



In [9]:
df_named.dtypes

[('carid', 'string'),
 ('starttime', 'string'),
 ('times', 'string'),
 ('feature', 'string'),
 ('val', 'string')]

全て文字列で入ってしまっているので、スキーマを定義して再度読み込むことにする。スキーマとは、データ名とデータ型がセットになったものである。

In [10]:
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import FloatType, IntegerType, StringType, LongType

In [11]:
# StructFieldインスタンスのリストを作成
fields = [StructField("carid", StringType()),
          StructField("starttime", StringType()),
          StructField("times", FloatType()),
          StructField("feature", StringType()),
          StructField("val", FloatType())]

# 作ったリストをStructTypeコンストラクタに渡すと、スキーマが完成。
schema = StructType(fields)

StringTypeが文字列型、LongTypeがLong整数型、FloatTypeが浮動小数点型である。  
データ型については
- 整数系: LongType, IntegerType, ShortType, ByteType, 
- 実数系: FloatType, DoubleType, DecimalType
- 日付系: TimestampType, DateType
- 文字列系: StringType
- リスト系: ArrayType
- バイナリデータ系: BinaryType
- 真偽値: BooleanType  

などがある。  
ArrayTypeは、中身についても再度データ型を指定することになる。結果、入れ子構造にできる。  
詳細は[公式ドキュメント](https://spark.apache.org/docs/latest/sql-reference.html#data-types)を参照。

In [12]:
# schemaへのデータ型へのアクセスは以下のようにできる
schema["times"].dataType

FloatType

In [13]:
# 以下のようにしてschemaに従って型を変更(キャスト)できる
from pyspark.sql.functions import col

# select(後述)の中に、キャストされた各列のリストを与えている。
# castするデータ型は、schemaからアクセスしている。
df_named_withSchema = df_named.select([col(c).cast(schema[c].dataType) for c in df_named.columns])

In [14]:
df_named_withSchema.show(5)

+-----+------------+--------+-------+--------+
|carid|   starttime|   times|feature|     val|
+-----+------------+--------+-------+--------+
|  021|201604200750|0.186631|  12461|     1.0|
|  021|201604200750|0.186631|  12463|     0.0|
|  021|201604200750|0.186631|  12464|     0.0|
|  021|201604200750|0.186631|  12467|     1.0|
|  021|201604200750|0.186631|  12457|5.490196|
+-----+------------+--------+-------+--------+
only showing top 5 rows



In [15]:
# しかし、ロード時に、schemaも一緒に入力するほうが楽である
df_named_withSchema = spark.read.csv("../data/raw/CAN/20190605_CAN/103.486_CAR1V21_HDD2/Job1/*.csv", 
                   schema=schema)

In [16]:
df_named_withSchema.show(5)

+-----+------------+--------+-------+--------+
|carid|   starttime|   times|feature|     val|
+-----+------------+--------+-------+--------+
|  021|201604200750|0.186631|  12461|     1.0|
|  021|201604200750|0.186631|  12463|     0.0|
|  021|201604200750|0.186631|  12464|     0.0|
|  021|201604200750|0.186631|  12467|     1.0|
|  021|201604200750|0.186631|  12457|5.490196|
+-----+------------+--------+-------+--------+
only showing top 5 rows



In [17]:
df_named_withSchema.dtypes

[('carid', 'string'),
 ('starttime', 'string'),
 ('times', 'float'),
 ('feature', 'string'),
 ('val', 'float')]

In [18]:
# このような表示もできる。nullableが見える分、こちらの方が良いかもしれない
df_named.printSchema()

root
 |-- carid: string (nullable = true)
 |-- starttime: string (nullable = true)
 |-- times: string (nullable = true)
 |-- feature: string (nullable = true)
 |-- val: string (nullable = true)



### pandasのDataFrameとのやり取り

In [19]:
# Spark DataFrame -> pandas DataFrameは toPandas()でできる。
# 大容量のデータを無邪気にそのままtoPandasに入れるとローカルPCのメモリが死亡するので注意。
df_pandas = df_named_withSchema.limit(5).toPandas()
df_pandas

Unnamed: 0,carid,starttime,times,feature,val
0,21,201604200750,0.186631,12461,1.0
1,21,201604200750,0.186631,12463,0.0
2,21,201604200750,0.186631,12464,0.0
3,21,201604200750,0.186631,12467,1.0
4,21,201604200750,0.186631,12457,5.490196


pandas DataFrame -> Spark DataFrameは toPandas()でできるが、schemaを適切に設定してあげる必要がある場合もあるので注意。

In [20]:
# そのまま読み込むと、sparkがよしなにデータ型を指定してくれるが、小数は全てDoubleType型になってしまう
# 結果、Float型を指定していたtimesとvalは、元の有効桁数以下に、変な数字がついてくる。
df_spark = spark.createDataFrame(df_pandas)
df_spark.show()

+-----+------------+------------------+-------+-----------------+
|carid|   starttime|             times|feature|              val|
+-----+------------+------------------+-------+-----------------+
|  021|201604200750|0.1866309940814972|  12461|              1.0|
|  021|201604200750|0.1866309940814972|  12463|              0.0|
|  021|201604200750|0.1866309940814972|  12464|              0.0|
|  021|201604200750|0.1866309940814972|  12467|              1.0|
|  021|201604200750|0.1866309940814972|  12457|5.490196228027344|
+-----+------------+------------------+-------+-----------------+



In [21]:
df_spark.printSchema()

root
 |-- carid: string (nullable = true)
 |-- starttime: string (nullable = true)
 |-- times: double (nullable = true)
 |-- feature: string (nullable = true)
 |-- val: double (nullable = true)



In [22]:
# このようにきちんとスキーマを指定してあげて読み込めばOK
df_spark = spark.createDataFrame(df_pandas, schema=schema)
df_spark.show()

+-----+------------+--------+-------+--------+
|carid|   starttime|   times|feature|     val|
+-----+------------+--------+-------+--------+
|  021|201604200750|0.186631|  12461|     1.0|
|  021|201604200750|0.186631|  12463|     0.0|
|  021|201604200750|0.186631|  12464|     0.0|
|  021|201604200750|0.186631|  12467|     1.0|
|  021|201604200750|0.186631|  12457|5.490196|
+-----+------------+--------+-------+--------+



In [23]:
df_spark.printSchema()

root
 |-- carid: string (nullable = true)
 |-- starttime: string (nullable = true)
 |-- times: float (nullable = true)
 |-- feature: string (nullable = true)
 |-- val: float (nullable = true)



## DataFrameの基本的な操作

### 表示: show/take

In [24]:
# .show(x): 上位x件を表示。pandasでいうhead
df_named_withSchema.show(5)

+-----+------------+--------+-------+--------+
|carid|   starttime|   times|feature|     val|
+-----+------------+--------+-------+--------+
|  021|201604200750|0.186631|  12461|     1.0|
|  021|201604200750|0.186631|  12463|     0.0|
|  021|201604200750|0.186631|  12464|     0.0|
|  021|201604200750|0.186631|  12467|     1.0|
|  021|201604200750|0.186631|  12457|5.490196|
+-----+------------+--------+-------+--------+
only showing top 5 rows



In [25]:
# .take(x): 同じく上位x件を表示
df_named_withSchema.take(5)

[Row(carid='021', starttime='201604200750', times=0.1866309940814972, feature='12461', val=1.0),
 Row(carid='021', starttime='201604200750', times=0.1866309940814972, feature='12463', val=0.0),
 Row(carid='021', starttime='201604200750', times=0.1866309940814972, feature='12464', val=0.0),
 Row(carid='021', starttime='201604200750', times=0.1866309940814972, feature='12467', val=1.0),
 Row(carid='021', starttime='201604200750', times=0.1866309940814972, feature='12457', val=5.490196228027344)]

**takeでdouble?**

残念ながらpandasのtailに相当するコマンドはない。

### 行の選択: filter/where/limit

In [26]:
# limit: 上位x件のみにデータを絞る
# ...のだが、最後をshowで表示すると何故か全体を走査してしまい、かなり時間がかかる。
df_named_withSchema.limit(5).show(5)

+-----+------------+--------+-------+--------+
|carid|   starttime|   times|feature|     val|
+-----+------------+--------+-------+--------+
|  021|201604200750|0.186631|  12461|     1.0|
|  021|201604200750|0.186631|  12463|     0.0|
|  021|201604200750|0.186631|  12464|     0.0|
|  021|201604200750|0.186631|  12467|     1.0|
|  021|201604200750|0.186631|  12457|5.490196|
+-----+------------+--------+-------+--------+



In [27]:
# takeで取ればsortは行われない。
df_named_withSchema.limit(5).take(5)

[Row(carid='021', starttime='201604200750', times=0.1866309940814972, feature='12461', val=1.0),
 Row(carid='021', starttime='201604200750', times=0.1866309940814972, feature='12463', val=0.0),
 Row(carid='021', starttime='201604200750', times=0.1866309940814972, feature='12464', val=0.0),
 Row(carid='021', starttime='201604200750', times=0.1866309940814972, feature='12467', val=1.0),
 Row(carid='021', starttime='201604200750', times=0.1866309940814972, feature='12457', val=5.490196228027344)]

In [28]:
# filter: 条件に合うレコードのみ取り出す。whereでも同じ。
df_named_withSchema.filter(df_named_withSchema.feature == 12461).show(5)

+-----+------------+--------+-------+---+
|carid|   starttime|   times|feature|val|
+-----+------------+--------+-------+---+
|  021|201604200750|0.186631|  12461|1.0|
|  021|201604200750|0.196709|  12461|2.0|
|  021|201604200750|0.206723|  12461|3.0|
|  021|201604200750|0.216867|  12461|0.0|
|  021|201604200750|0.226624|  12461|1.0|
+-----+------------+--------+-------+---+
only showing top 5 rows



In [29]:
# 複数条件: and
df_named_withSchema.filter((df_named_withSchema.feature == 12461) &
                          (df_named_withSchema.val <= 2)).show(5)

+-----+------------+--------+-------+---+
|carid|   starttime|   times|feature|val|
+-----+------------+--------+-------+---+
|  021|201604200750|0.186631|  12461|1.0|
|  021|201604200750|0.196709|  12461|2.0|
|  021|201604200750|0.216867|  12461|0.0|
|  021|201604200750|0.226624|  12461|1.0|
|  021|201604200750|0.236824|  12461|2.0|
+-----+------------+--------+-------+---+
only showing top 5 rows



In [30]:
# 複数条件: or
df_named_withSchema.filter((df_named_withSchema.feature == 12461) |
                          (df_named_withSchema.val <= 2)).show(5)

+-----+------------+--------+-------+---+
|carid|   starttime|   times|feature|val|
+-----+------------+--------+-------+---+
|  021|201604200750|0.186631|  12461|1.0|
|  021|201604200750|0.186631|  12463|0.0|
|  021|201604200750|0.186631|  12464|0.0|
|  021|201604200750|0.186631|  12467|1.0|
|  021|201604200750|0.186631|  12403|1.0|
+-----+------------+--------+-------+---+
only showing top 5 rows



In [31]:
# not
df_named_withSchema.filter(~(df_named_withSchema.feature == 12461)).show(5)

+-----+------------+--------+-------+--------+
|carid|   starttime|   times|feature|     val|
+-----+------------+--------+-------+--------+
|  021|201604200750|0.186631|  12463|     0.0|
|  021|201604200750|0.186631|  12464|     0.0|
|  021|201604200750|0.186631|  12467|     1.0|
|  021|201604200750|0.186631|  12457|5.490196|
|  021|201604200750|0.186631|  12437|   850.0|
+-----+------------+--------+-------+--------+
only showing top 5 rows



In [32]:
# whereでも同じ
df_named_withSchema.where((df_named_withSchema.feature == 12461) &
                          (df_named_withSchema.val <= 2)).show(5)

+-----+------------+--------+-------+---+
|carid|   starttime|   times|feature|val|
+-----+------------+--------+-------+---+
|  021|201604200750|0.186631|  12461|1.0|
|  021|201604200750|0.196709|  12461|2.0|
|  021|201604200750|0.216867|  12461|0.0|
|  021|201604200750|0.226624|  12461|1.0|
|  021|201604200750|0.236824|  12461|2.0|
+-----+------------+--------+-------+---+
only showing top 5 rows



### 列の選択: select

In [33]:
# 単体の列
df_named_withSchema.select("times").show(3)

+--------+
|   times|
+--------+
|0.186631|
|0.186631|
|0.186631|
+--------+
only showing top 3 rows



In [34]:
# 複数取る場合はリスト
df_named_withSchema.select(["times", "feature", "val"]).show(3)

+--------+-------+---+
|   times|feature|val|
+--------+-------+---+
|0.186631|  12461|1.0|
|0.186631|  12463|0.0|
|0.186631|  12464|0.0|
+--------+-------+---+
only showing top 3 rows



In [35]:
# または複数の引数で渡す
df_named_withSchema.select("times", "feature").show(3)

+--------+-------+
|   times|feature|
+--------+-------+
|0.186631|  12461|
|0.186631|  12463|
|0.186631|  12464|
+--------+-------+
only showing top 3 rows



In [36]:
# リストのアクセスとのあわせ技。tで始まるものだけ取り出す。startswithはpythonの文字列のメソッド
(df_named_withSchema
 .select([c for c in df_named_withSchema.columns if c.startswith("t")])
 .show(3)

+--------+
|   times|
+--------+
|0.186631|
|0.186631|
|0.186631|
+--------+
only showing top 3 rows



Pythonｍの文字列のメソッドを忘れてしまった場合は、[標準ドキュメント](https://docs.python.org/ja/3/library/stdtypes.html#str)や[こちらのブログ](http://motw.mods.jp/Python/str_methods.html)などを参照。

In [37]:
# ilocっぽいことを列でやりたい場合
df_named_withSchema.select(df_named_withSchema.columns[1:3]).show(3)

+------------+--------+
|   starttime|   times|
+------------+--------+
|201604200750|0.186631|
|201604200750|0.186631|
|201604200750|0.186631|
+------------+--------+
only showing top 3 rows



SQLのselectのようにも使える。

In [38]:
df_named_withSchema.select(df_named_withSchema.times * 10
                           , df_named_withSchema.val + 1).show(3)

+------------+---------+
|(times * 10)|(val + 1)|
+------------+---------+
|   1.8663099|      2.0|
|   1.8663099|      1.0|
|   1.8663099|      1.0|
+------------+---------+
only showing top 3 rows



In [39]:
# 列名の変更、SQLでいうところのas ~~は、aliasを使う
df_named_withSchema.select((df_named_withSchema.times * 10).alias("sanada")
                           , (df_named_withSchema.val + 1).alias("TD")).show(3)

+---------+---+
|   sanada| TD|
+---------+---+
|1.8663099|2.0|
|1.8663099|1.0|
|1.8663099|1.0|
+---------+---+
only showing top 3 rows



### 集約関数

In [40]:
from pyspark.sql import functions as func

agg内でfuncを使用することで、集約できる。

In [41]:
# さっと計算できるように小さめのデータに
df_mini = spark.read.csv("../data/raw/CAN/20190605_CAN/" + 
                         "103.486_CAR1V21_HDD2/Job1/" + 
                         "SUZUKI_YSB-021_MRR_2016-04-20_06-27_0001.BLF.csv",
                        schema=schema)

In [42]:
df_mini.count()

9835622

In [43]:
# 平均: mean
df_mini.agg(func.mean("times")).show()

+-----------------+
|       avg(times)|
+-----------------+
|299.7804090060245|
+-----------------+



In [44]:
# agg内で、aliasで名前を変更できる。
df_mini.agg(func.mean("times").alias("mean")).show()

+-----------------+
|             mean|
+-----------------+
|299.7804090060245|
+-----------------+



In [45]:
# 複数の関数で集約する場合
df_mini.agg(func.stddev("val").alias("sd"), func.max("val"), func.min("val"),
           func.sum("val")).show()

+--------------------+------------+--------+--------------------+
|                  sd|    max(val)|min(val)|            sum(val)|
+--------------------+------------+--------+--------------------+
|1.471077937646767...|8.4318644E18|-40960.0|2.525231049090134E22|
+--------------------+------------+--------+--------------------+



In [46]:
# groupByと組み合わせることができる
(df_mini.groupBy("feature").agg(
    func.stddev("val").alias("sd")
    ,func.max("val").alias("max")
    ,func.min("val").alias("min")).show(5))

+-------+---+---+---+
|feature| sd|max|min|
+-------+---+---+---+
|  3C307|0.0|0.0|0.0|
|  3C134|0.0|0.0|0.0|
|  31067|0.0|0.0|0.0|
|  12204|0.0|0.0|0.0|
|  31477|0.0|0.0|0.0|
+-------+---+---+---+
only showing top 5 rows



In [47]:
# 複数のgroupby
(df_mini.groupBy("starttime", "feature")
 .agg(
    func.stddev("val").alias("sd")
    ,func.max("val").alias("max")
    ,func.min("val").alias("min")
    ,func.mean("val").alias("avg")
    ,func.count("val")
    ,func.countDistinct("val").alias("nunique")
 ).show(5))
# countDistinctは一つの関数なので注意。

+------------+-------+---------------+-----+-----+------------------+----------+-------+
|   starttime|feature|             sd|  max|  min|               avg|count(val)|nunique|
+------------+-------+---------------+-----+-----+------------------+----------+-------+
|201604200627|  3C116|            0.0|  0.0|  0.0|               0.0|      5993|      1|
|201604200627|  3D505|            0.0|  0.0|  0.0|               0.0|      5988|      1|
|201604200627|  2A205|            0.0|  0.0|  0.0|               0.0|     11979|      1|
|201604200627|  3A766|            0.0|  1.0|  1.0|               1.0|      5991|      1|
|201604200627|  3BA43|58.544279282335|409.5|205.1|223.52715079127648|      5990|      3|
+------------+-------+---------------+-----+-----+------------------+----------+-------+
only showing top 5 rows



sum, mean, countなど、いくつかの関数はメソッドとしても実装されている。

In [48]:
df_mini.groupBy("starttime","feature").count().show(5)

+------------+-------+-----+
|   starttime|feature|count|
+------------+-------+-----+
|201604200627|  3C116| 5993|
|201604200627|  3A766| 5991|
|201604200627|  3D505| 5988|
|201604200627|  2A205|11979|
|201604200627|  3BA43| 5990|
+------------+-------+-----+
only showing top 5 rows



In [49]:
df_mini.groupBy("starttime","feature").mean("val").show(5)

+------------+-------+------------------+
|   starttime|feature|          avg(val)|
+------------+-------+------------------+
|201604200627|  3C116|               0.0|
|201604200627|  3A766|               1.0|
|201604200627|  3D505|               0.0|
|201604200627|  2A205|               0.0|
|201604200627|  3BA43|223.52715079127648|
+------------+-------+------------------+
only showing top 5 rows



### ウィンドウ関数

In [89]:
from pyspark.sql.window import Window

In [99]:
df_test = spark.createDataFrame([("apple", 1), ("apple", 2), ("apple", 2), ("apple", 3),
                                 ("banana", 4), ("banana", 4), ("banana", 6), ("banana", 7)],
                                ["feature", "value"])

In [101]:
df_test.select(
    "feature", "value"
    ,func.row_number().over(Window.partitionBy("feature").orderBy("value")).alias("row_number")
    ,func.rank().over(Window.partitionBy("feature").orderBy("value")).alias("rank")
    ,func.dense_rank().over(Window.partitionBy("feature").orderBy("value")).alias("dense_rank")
    ,func.percent_rank().over(Window.partitionBy("feature").orderBy("value")).alias("percent_rank")
    ,func.ntile(4).over(Window.partitionBy("feature").orderBy("value")).alias("quantile")
  ).show()

+-------+-----+----------+----+----------+------------------+--------+
|feature|value|row_number|rank|dense_rank|      percent_rank|quantile|
+-------+-----+----------+----+----------+------------------+--------+
|  apple|    1|         1|   1|         1|               0.0|       1|
|  apple|    2|         2|   2|         2|0.3333333333333333|       2|
|  apple|    2|         3|   2|         2|0.3333333333333333|       3|
|  apple|    3|         4|   4|         3|               1.0|       4|
| banana|    4|         1|   1|         1|               0.0|       1|
| banana|    4|         2|   1|         1|               0.0|       2|
| banana|    6|         3|   3|         2|0.6666666666666666|       3|
| banana|    7|         4|   4|         3|               1.0|       4|
+-------+-----+----------+----+----------+------------------+--------+



他、lag/lead系とsum(累積和)がある

### ソート

In [52]:
df_mini.sort("times").show(5)

+-----+------------+--------+-------+---+
|carid|   starttime|   times|feature|val|
+-----+------------+--------+-------+---+
|  021|201604200627|0.295909|  0FA45|0.0|
|  021|201604200627|0.295909|  0FA43|0.0|
|  021|201604200627|0.295909|  0FA44|0.0|
|  021|201604200627|0.295909|  0FA53|0.0|
|  021|201604200627|0.295909|  0FA57|4.0|
+-----+------------+--------+-------+---+
only showing top 5 rows



In [56]:
# 逆順にsortするにはfunc.descを使用
df_mini.sort(func.desc("times")).show(5)

+-----+------------+---------+-------+---+
|carid|   starttime|    times|feature|val|
+-----+------------+---------+-------+---+
|  021|201604200627|599.23865|  3C100|1.0|
|  021|201604200627|599.23865|  3C105|0.0|
|  021|201604200627|599.23865|  3C101|0.0|
|  021|201604200627|599.23865|  3C102|0.0|
|  021|201604200627|599.23865|  3C103|0.0|
+-----+------------+---------+-------+---+
only showing top 5 rows



In [61]:
# 複数キーでsortするにはリストを使って引数に渡すか、複数の引数で渡す。ソート順序はSQLと同様。
df_mini.sort("feature", func.desc("times")).show(5)
# df_mini.sort(["feature", func.desc("times")]).show(5)でもOK

+-----+------------+---------+-------+---+
|carid|   starttime|    times|feature|val|
+-----+------------+---------+-------+---+
|  021|201604200627| 599.2347|  0FA21|2.0|
|  021|201604200627| 599.2247|  0FA21|2.0|
|  021|201604200627| 599.2148|  0FA21|2.0|
|  021|201604200627|599.20483|  0FA21|2.0|
|  021|201604200627| 599.1948|  0FA21|2.0|
+-----+------------+---------+-------+---+
only showing top 5 rows



### 列の追加

In [70]:
# selectで追加する方法
df_mini.select("carid", "satrtime",
               func.monotonically_increasing_id().alias("newcol")).show(5)
# 0始まりの昇順IDを付与

+-----+------------+--------+-------+---+------+
|carid|   starttime|   times|feature|val|newcol|
+-----+------------+--------+-------+---+------+
|  021|201604200627|0.295909|  0FA53|0.0|     0|
|  021|201604200627|0.295909|  0FA57|4.0|     1|
|  021|201604200627|0.295909|  0FA43|0.0|     2|
|  021|201604200627|0.295909|  0FA44|0.0|     3|
|  021|201604200627|0.295909|  0FA45|0.0|     4|
+-----+------------+--------+-------+---+------+
only showing top 5 rows



In [26]:
# withColumnで追加する方法
(df_mini.withColumn("newID", func.monotonically_increasing_id()).show(5) # 0始まりの昇順IDを付与

+-----+------------+--------+-------+---+-----+
|carid|   starttime|   times|feature|val|newID|
+-----+------------+--------+-------+---+-----+
|  021|201604200627|0.295909|  0FA53|0.0|    0|
|  021|201604200627|0.295909|  0FA57|4.0|    1|
|  021|201604200627|0.295909|  0FA43|0.0|    2|
|  021|201604200627|0.295909|  0FA44|0.0|    3|
|  021|201604200627|0.295909|  0FA45|0.0|    4|
+-----+------------+--------+-------+---+-----+
only showing top 5 rows



In [72]:
# withColumnで既にある列目を指定すると、上書きできる。
df_mini.withColumn("val", df_mini.val * 2).show(5)

+-----+------------+--------+-------+---+
|carid|   starttime|   times|feature|val|
+-----+------------+--------+-------+---+
|  021|201604200627|0.295909|  0FA53|0.0|
|  021|201604200627|0.295909|  0FA57|8.0|
|  021|201604200627|0.295909|  0FA43|0.0|
|  021|201604200627|0.295909|  0FA44|0.0|
|  021|201604200627|0.295909|  0FA45|0.0|
+-----+------------+--------+-------+---+
only showing top 5 rows



### SQLクエリ

あたかもSQLのように、データを処理することも可能。

In [15]:
# まず、DataFrameオブジェクトをTemporaryViewに登録する。
# これは今起動しているPySparkがシャットダウンされる際に、一緒に削除される
df_mini.createOrReplaceTempView("df_mini")

In [17]:
# このようにしてクエリで呼び出せる。終わりにセミコロン;をつけるとエラーになるので注意。
spark.sql("select * from df_mini").show(5)

+-----+------------+--------+-------+---+
|carid|   starttime|   times|feature|val|
+-----+------------+--------+-------+---+
|  021|201604200627|0.295909|  0FA53|0.0|
|  021|201604200627|0.295909|  0FA57|4.0|
|  021|201604200627|0.295909|  0FA43|0.0|
|  021|201604200627|0.295909|  0FA44|0.0|
|  021|201604200627|0.295909|  0FA45|0.0|
+-----+------------+--------+-------+---+
only showing top 5 rows



In [182]:
# window関数にも対応。
spark.sql("""select *, 
    lag(val, 1) over(partition by carid, starttime, feature order by times) as lag_val
          from df_mini """).show(5)

+-----+------------+--------+-------+---+-------+
|carid|   starttime|   times|feature|val|lag_val|
+-----+------------+--------+-------+---+-------+
|  021|201604200627|0.304361|  1EF61|0.0|   null|
|  021|201604200627|0.324348|  1EF61|0.0|    0.0|
|  021|201604200627|0.344353|  1EF61|0.0|    0.0|
|  021|201604200627|0.364349|  1EF61|0.0|    0.0|
|  021|201604200627|0.384346|  1EF61|0.0|    0.0|
+-----+------------+--------+-------+---+-------+
only showing top 5 rows



In [185]:
# window関数にも対応。
spark.sql("""select *, sum(val) over(partition by carid, starttime, feature order by times) as lag_val
          from df_mini """).show(5)

+-----+------------+--------+-------+---+-------+
|carid|   starttime|   times|feature|val|lag_val|
+-----+------------+--------+-------+---+-------+
|  021|201604200627|0.304361|  1EF61|0.0|    0.0|
|  021|201604200627|0.324348|  1EF61|0.0|    0.0|
|  021|201604200627|0.344353|  1EF61|0.0|    0.0|
|  021|201604200627|0.364349|  1EF61|0.0|    0.0|
|  021|201604200627|0.384346|  1EF61|0.0|    0.0|
+-----+------------+--------+-------+---+-------+
only showing top 5 rows



他、以下のようなwindow関数に対応
- avg, 
- ランキング系
    - rank
    - parcent_rank
    - row_number
    - cume_dist
    - ntile
- lag/lead