# Speaker

|  |  |
| :-- | :-- |
| <img src="https://avatars1.githubusercontent.com/u/46888598?s=460&u=1832ff2628bf03f8a7e2b1d672e6af340fd05eff&v=4" width="100px"> | **Yuji Masaoka**<br><br>青い R の中の人。<br><br>Twitter: [@mappie_kochi](https://twitter.com/mappie_kochi) <br>GitHub: [ymasaoka](https://github.com/ymasaoka)

# PySpark Intro

October 24th, 2020  
第 35 回 SQL Server 2019 勉強会

Japan SQL Server User Group - PASS Local Group

# PySpark とは

<h2>Spark を実行するための Python API</h2>  
Python プログラミングで Apache Spark を利用できる。<br>
PyPI など Python ライブラリを組み込むことも可能。


In [1]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('data_processing').getOrCreate()

import pyspark.sql.functions as F
from pyspark.sql.types import *

schema = StructType().add("user_id","string").add("country","string").add("browser", "string").add("OS",'string').add("age", "integer")
df = spark.createDataFrame([("A203",'India',"Chrome","WIN",33),("A201",'China',"Safari","MacOS",35),("A205",'UK',"Mozilla","Linux",25)], schema = schema)

df.printSchema()
df.show()

root
 |-- user_id: string (nullable = true)
 |-- country: string (nullable = true)
 |-- browser: string (nullable = true)
 |-- OS: string (nullable = true)
 |-- age: integer (nullable = true)

+-------+-------+-------+-----+---+
|user_id|country|browser|   OS|age|
+-------+-------+-------+-----+---+
|   A203|  India| Chrome|  WIN| 33|
|   A201|  China| Safari|MacOS| 35|
|   A205|     UK|Mozilla|Linux| 25|
+-------+-------+-------+-----+---+



# Apache Spark とは

さすが Microsoft さん。

- [Azure Synapse Analytics での Apache Spark](https://docs.microsoft.com/ja-jp/azure/synapse-analytics/spark/apache-spark-overview)

> Apache Spark は、ビッグデータ分析アプリケーションのパフォーマンスを向上させるメモリ内処理をサポートする並列処理フレームワークです。

つまり、

ビックデータなど、大きいデータに対して、高速に、インメモリで、分散処理を行うことができるオープンソースのフレームワーク = Apache Spark。  
Java や Scala、Python などいろいろな API が用意されている。  
分散処理のややこしい部分をうまく抽象化してくれているため、ユーザーは簡潔なコードを実行するだけで OK。  
また、以下のような便利なコンポーネントが付属している。  

- **Spark SQL**: クラスタ上のデータを SQL で処理
- **Spark R、MLlib**: 機械学習
- **Spark GraphX**: グラフ処理
- **Spark Streaming**: ストリーミング処理

詳しくは MS 畠山さんの [Azure Synapse Analytics Spark Pool](https://sqlserver.connpass.com/event/186147/) セッションで解説があった(はず)なので、ここでは割愛します。


# PySpark 環境構築

簡単に PySpark をサクっと始めたい場合は、以下を行うと良いと思います。(個人的意見)

- **Azure Synapse Analytics で行う場合**: Spark プールを作りましょう
- **ローカル環境で行う場合**: Spark インストールしましょう (Docker で用意するのが楽)

```yaml:docker-compose.yaml
version: '3'

services:
  notebook:
    image: jupyter/pyspark-notebook
    container_name: pyspark-notebook
    ports:
      - 8888:8888
    volumes:
      - ./src/pyspark-notebook:/home/jovyan/work
    environment:
      - NB_USER=jovyan
      - GRANT_SUDO=yes
      - user=root
    command: start-notebook.sh
```


# PySpark を使用したデータ処理 (入門バージョン)

セッションの内容は、**「(SQL でやっている XXX って) PySpark ってこうやってデータの処理やるんだな」**といった感じで、`皆さんの頭の中でいろいろ考えを巡らせながら`聞いていただけると嬉しいです。

今日ご紹介する内容は、大きく分けてこんな感じです。

- Dataframes (データフレーム)
- Select (選択)
- Filter (フィルタリング)
- Where (条件指定)
- Aggregations (集合)
- Collect (配列)
- User-Defined Functions (ユーザー定義関数)
- Joins (結合)
- Pivoting (ピボット)
- Window Functions or Window Aggregates (ウインドウ関数またはウインドウ集計)

## Dataframes (データフレーム)

いわゆる、リレーショナルデータベース (RDB) でいうところの`テーブル`のようなものです。(ざっくり)  
Python の世界だと、pandas.DataFrame とかもありますが、Spark の世界では v1.3 より `Spark DataFrame` という機能が追加されており、今回はこちらを使用します。

Dataframe のいいところは、以下の通りです。

- SQL に近い文法で、Where や Join などができる
- select や filter を使って、条件に該当する行や列を抜き出せる
- 独自のユーザー定義関数 (User-Defined Functions: UDF) が利用できる
- 集計も可能

### SparkSession オブジェクトの作成

Spark Dataframe の使用を始めるためには、まず `SparkSession オブジェクト` を作成する必要があります。  
また、データフレームで列に設定するデータ型、PySpark 実行に必要な関数をインポートしておきます。

In [2]:
# SparkSession オブジェクトを作成する
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('data_processing').getOrCreate()

In [3]:
# データ型や実行に必要な関数をインポート
import pyspark.sql.functions as F
from pyspark.sql.types import *

`StructType().add()` で二次元表のスキーマ(列)を指定していきます。  
.add() をつなげて宣言することで、列を追加できます。

In [4]:
# スキーマを宣言して、スキーマオブジェクトを作成
schema = StructType().add("user_id","string").add("country","string").add("browser", "string").add("OS",'string').add("age", "integer")

上で作成したスキーマオブジェクトを `spark.createDataFrame()` に渡してあげる形で、データフレームを作成していきます。  
この時に、初期データを一緒に設定することも可能です。

In [5]:
# データフレームを作成
df = spark.createDataFrame([("A203",'India',"Chrome","WIN",33),("A201",'China',"Safari","MacOS",35),("A205",'UK',"Mozilla","Linux",25)], schema = schema)

In [6]:
# データフレームのスキーマ情報を確認
df.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- country: string (nullable = true)
 |-- browser: string (nullable = true)
 |-- OS: string (nullable = true)
 |-- age: integer (nullable = true)



データフレームの中身を確認する (RDB でいうところの SELECT) には、`show()` を使用します。

In [7]:
# データフレームの中身を確認
df.show()

+-------+-------+-------+-----+---+
|user_id|country|browser|   OS|age|
+-------+-------+-------+-----+---+
|   A203|  India| Chrome|  WIN| 33|
|   A201|  China| Safari|MacOS| 35|
|   A205|     UK|Mozilla|Linux| 25|
+-------+-------+-------+-----+---+



また、読み込んだデータの値を一部、置き換えることも可能です。  
値を置き換えるには、`replace()` を使用します。

In [8]:
# Chrome を Google Chrome に置き換え
df.replace("Chrome", "Google Chrome").show()

+-------+-------+-------------+-----+---+
|user_id|country|      browser|   OS|age|
+-------+-------+-------------+-----+---+
|   A203|  India|Google Chrome|  WIN| 33|
|   A201|  China|       Safari|MacOS| 35|
|   A205|     UK|      Mozilla|Linux| 25|
+-------+-------+-------------+-----+---+



なお、このデータフレームは CSV ファイルなどを直接読み込んで作成することも可能です。  
CSV からデータフレームを作成する方法はいくつか存在しますが (ex. Spark RDD 経由でインポート)、spark-csv を使用してみます。
セッション時間の都合上、今回は割愛しますが、同様に JSON データを Spark で読み込むことも可能です。

In [9]:
# CSV ファイルをデータフレームとして読み込む
df = spark.read.csv("customer_data.csv", header=True, inferSchema=True)
df.show(10)

AnalysisException: Path does not exist: file:/home/jovyan/customer_data.csv;

In [53]:
# Docker コンテナで動かす場合は、volumes マウント側に CSV を用意して、コンテナ内のパスを指定して実行
df = spark.read.csv("./work/Chapter 2/customer_data.csv", header=True, inferSchema=True)
df.show(10)

+--------------------+----------------+------------------+-----------+--------------------+----------+-----+
|    Customer_subtype|Number_of_houses|Avg_size_household|    Avg_age|  Customer_main_type|Avg_Salary|label|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+
|Lower class large...|               1|                 3|30-40 years|Family with grown...|     44905|    0|
|Mixed small town ...|               1|                 2|30-40 years|Family with grown...|     37575|    0|
|Mixed small town ...|               1|                 2|30-40 years|Family with grown...|     27915|    0|
|Modern, complete ...|               1|                 3|40-50 years|      Average Family|     19504|    0|
|  Large family farms|               1|                 4|30-40 years|             Farmers|     34943|    0|
|    Young and rising|               1|                 2|20-30 years|         Living well|     13064|    0|
|Large religious f.

In [None]:
# CSV ファイルをデータフレームとして読み込む (Synapse Analytics バージョン)
df = spark.read.load('abfss://<your ABFSS Path>', format='csv', header=True, inferSchema=True)
df.printSchema()
df.show(10)

### Null 値への対応

データには、Null 値が存在している場合もあると思います。  
PySpark では、`fillna()` を使って Null 値を書き換えたり、`drop()` を使ってデータを省くことが可能です。

In [None]:
# null 値でを含んだデータフレームを作成 
df_na = spark.createDataFrame([("A203",None,"Chrome","WIN",33),("A201",'China',None,"MacOS",35),("A205",'UK',"Mozilla","Linux",25)], schema = schema)
df_na.show()

In [None]:
# すべての Null 値を 0 (文字列)で埋める
df_na.fillna('0').show()

In [None]:
# 列ごとに指定の値で埋める
df_na.fillna({ 'country':'USA', 'browser':'Safari' }).show()

In [None]:
# Null を含む行を省いてみる - その 1 
df_na.na.drop().show()

In [None]:
# Null を含む行を省いてみる - その 2
df_na.na.drop(subset = 'country').show()

`drop()` では、列を削除することもできます。

In [16]:
# user_id 列を削除
df_na.drop('user_id').show()

+-------+-------+-----+---+
|country|browser|   OS|age|
+-------+-------+-----+---+
|   null| Chrome|  WIN| 33|
|  China|   null|MacOS| 35|
|     UK|Mozilla|Linux| 25|
+-------+-------+-----+---+



## Select (選択)

RDB で言う `SELECT 句` みたいなものです。  
注意するとすれば、**PySpark では show() を打たないとデータは確認できない** という点ぐらいです。

In [17]:
# 列を選択しないでデータ取得
df.show(10)

+--------------------+----------------+------------------+-----------+--------------------+----------+-----+
|    Customer_subtype|Number_of_houses|Avg_size_household|    Avg_age|  Customer_main_type|Avg_Salary|label|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+
|Lower class large...|               1|                 3|30-40 years|Family with grown...|     44905|    0|
|Mixed small town ...|               1|                 2|30-40 years|Family with grown...|     37575|    0|
|Mixed small town ...|               1|                 2|30-40 years|Family with grown...|     27915|    0|
|Modern, complete ...|               1|                 3|40-50 years|      Average Family|     19504|    0|
|  Large family farms|               1|                 4|30-40 years|             Farmers|     34943|    0|
|    Young and rising|               1|                 2|20-30 years|         Living well|     13064|    0|
|Large religious f.

In [18]:
# 列を選択してデータ取得
df.select(['Customer_subtype','Avg_Salary']).show(10)

+--------------------+----------+
|    Customer_subtype|Avg_Salary|
+--------------------+----------+
|Lower class large...|     44905|
|Mixed small town ...|     37575|
|Mixed small town ...|     27915|
|Modern, complete ...|     19504|
|  Large family farms|     34943|
|    Young and rising|     13064|
|Large religious f...|     29090|
|Lower class large...|      6895|
|Lower class large...|     35497|
|     Family starters|     30800|
+--------------------+----------+
only showing top 10 rows



## Filter (フィルタリング)

RDB でいう `WHERE` 句の **(ただし AND のみ)** に該当するようなものです。  
`filter()` を使ってデータフレームを AND 条件でフィルタリングすることができます。

In [19]:
# filter() でデータを指定 - その 1
df.filter(df['Avg_Salary'] > 1000000).count()

128

In [20]:
# filter() でデータを指定 - その 2
df.select(['Customer_subtype','Number_of_houses','Avg_age','Avg_Salary']).filter(df['Avg_Salary'] > 500000).filter(df['Number_of_houses'] > 2).show()

+--------------------+----------------+-----------+----------+
|    Customer_subtype|Number_of_houses|    Avg_age|Avg_Salary|
+--------------------+----------------+-----------+----------+
|Affluent senior a...|               3|50-60 years|    596723|
|Affluent senior a...|               3|50-60 years|    944444|
|Affluent senior a...|               3|50-60 years|    788477|
|Affluent senior a...|               3|50-60 years|    994077|
+--------------------+----------------+-----------+----------+



## Where (条件指定)

先ほどの **Filter** と同じく、RDB でいう `WHERE` 句に該当するようなものです。  
Filter と違う点は `オペランドが利用可能` という点です。  
`where()` の中で、条件を指定していきます。

In [21]:
# オペランドを利用 - その 1
df.where((df['Avg_Salary'] > 500000) & (df['Number_of_houses'] > 2)).show()

+--------------------+----------------+------------------+-----------+--------------------+----------+-----+
|    Customer_subtype|Number_of_houses|Avg_size_household|    Avg_age|  Customer_main_type|Avg_Salary|label|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+
|Affluent senior a...|               3|                 2|50-60 years|Successful hedonists|    596723|    0|
|Affluent senior a...|               3|                 2|50-60 years|Successful hedonists|    944444|    0|
|Affluent senior a...|               3|                 2|50-60 years|Successful hedonists|    788477|    0|
|Affluent senior a...|               3|                 2|50-60 years|Successful hedonists|    994077|    0|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+



In [22]:
# オペランドを利用 - その 2
df.where((df['Avg_Salary'] > 500000) | (df['Number_of_houses'] > 2)).show()

+--------------------+----------------+------------------+-----------+--------------------+----------+-----+
|    Customer_subtype|Number_of_houses|Avg_size_household|    Avg_age|  Customer_main_type|Avg_Salary|label|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+
| High status seniors|               1|                 3|40-50 years|Successful hedonists|   4670288|    0|
|Affluent young fa...|               1|                 3|30-40 years|      Average Family|    762769|    1|
| High status seniors|               1|                 3|50-60 years|Successful hedonists|   9561873|    0|
| High status seniors|               1|                 2|40-50 years|Successful hedonists|  18687005|    0|
| High status seniors|               1|                 2|40-50 years|Successful hedonists|  24139960|    0|
| High status seniors|               1|                 2|50-60 years|Successful hedonists|   6718606|    0|
|High Income, expe.

## Aggregations (集合)

RDB でいう `GROUP BY` 句まわりに該当する内容です。  
`groupBy()` を使用します。

In [23]:
# Customer_subtype で GROUP BY して COUNT(*)
df.groupBy('Customer_subtype').count().show()

+--------------------+-----+
|    Customer_subtype|count|
+--------------------+-----+
|Large family, emp...|   56|
|Religious elderly...|   47|
|Large religious f...|  107|
|Modern, complete ...|   93|
|    Village families|   68|
|Young all america...|   62|
|Young urban have-...|    4|
|Young seniors in ...|   22|
|Fresh masters in ...|    2|
|High Income, expe...|   52|
|Lower class large...|  288|
| Residential elderly|    6|
|Senior cosmopolitans|    1|
|        Mixed rurals|   67|
|Career and childcare|   33|
|Low income catholics|   72|
|Mixed apartment d...|   34|
|Seniors in apartm...|   17|
|Middle class fami...|  122|
|Traditional families|  129|
+--------------------+-----+
only showing top 20 rows



In [24]:
# 覚えていますか？ PySpark は Python プログラミングが使えるんです。
# つまり、こういうことも Spark の中で出来ます。ただし、 Python はインデントが重要なので注意
for col in df.columns:
    if col !='Avg_Salary':
        print(f" *** {col} の集計結果 ***")
        df.groupBy(col).count().orderBy('count',ascending=False).show(truncate=False)

 *** Customer_subtype の集計結果 ***
+------------------------------------------+-----+
|Customer_subtype                          |count|
+------------------------------------------+-----+
|Lower class large families                |288  |
|Traditional families                      |129  |
|Middle class families                     |122  |
|Large religious families                  |107  |
|Modern, complete families                 |93   |
|Couples with teens 'Married with children'|83   |
|Young and rising                          |78   |
|High status seniors                       |76   |
|Low income catholics                      |72   |
|Mixed seniors                             |71   |
|Village families                          |68   |
|Mixed rurals                              |67   |
|Stable family                             |62   |
|Young all american family                 |62   |
|Young, low educated                       |56   |
|Large family, employed child              |56   |

GROUP BY したら、**最大値**とか**最小値**、**平均**などを取りたくなると思います。  
これまでの例に出てきたように、COUNT は `count()` を使います。  
そのほか、最大値や最小値の取得には、`agg()` を使用していきます。  

- **最大値**: `F.max()`
- **最小値**: `F.min()`
- **合計**: `F.sum()`
- **平均**: `F.mean()`

F が何か覚えていますか？最初にインポートした関数のやつです。

```python
import pyspark.sql.functions as F
```

In [25]:
# Customer_main_type 毎の Avg_Salary の最大値を取得
df.groupBy('Customer_main_type').agg(F.max('Avg_Salary')).show()

+--------------------+---------------+
|  Customer_main_type|max(Avg_Salary)|
+--------------------+---------------+
|             Farmers|          49965|
|       Career Loners|          49903|
|Retired and Relig...|          49564|
|Successful hedonists|       48919896|
|         Living well|          49816|
|      Average Family|         991838|
|    Cruising Seniors|          49526|
|Conservative fami...|          49965|
|      Driven Growers|          49932|
|Family with grown...|          49901|
+--------------------+---------------+



In [26]:
# Customer_main_type 毎の Avg_Salary の最小値を取得
df.groupBy('Customer_main_type').agg(F.min('Avg_Salary')).show()

+--------------------+---------------+
|  Customer_main_type|min(Avg_Salary)|
+--------------------+---------------+
|             Farmers|          10469|
|       Career Loners|          13246|
|Retired and Relig...|           1361|
|Successful hedonists|          12705|
|         Living well|          10418|
|      Average Family|          10506|
|    Cruising Seniors|          10100|
|Conservative fami...|          10179|
|      Driven Growers|          10257|
|Family with grown...|           1502|
+--------------------+---------------+



In [27]:
# Customer_main_type 毎の Avg_Salary の合計値を取得
df.groupBy('Customer_main_type').agg(F.sum('Avg_Salary')).show()

+--------------------+---------------+
|  Customer_main_type|sum(Avg_Salary)|
+--------------------+---------------+
|             Farmers|        2809468|
|       Career Loners|         484089|
|Retired and Relig...|        5522439|
|Successful hedonists|     3158111161|
|         Living well|        5552540|
|      Average Family|       32111040|
|    Cruising Seniors|        1732220|
|Conservative fami...|        6963043|
|      Driven Growers|        5292275|
|Family with grown...|       15237892|
+--------------------+---------------+



In [28]:
# Customer_main_type 毎の Avg_Salary の平均値を取得
df.groupBy('Customer_main_type').agg(F.mean('Avg_Salary')).show()

+--------------------+--------------------+
|  Customer_main_type|     avg(Avg_Salary)|
+--------------------+--------------------+
|             Farmers|  30209.333333333332|
|       Career Loners|             32272.6|
|Retired and Relig...|   27338.80693069307|
|Successful hedonists|1.6278923510309279E7|
|         Living well|  31194.044943820223|
|      Average Family|  104256.62337662338|
|    Cruising Seniors|  28870.333333333332|
|Conservative fami...|  29504.419491525423|
|      Driven Growers|   30769.04069767442|
|Family with grown...|  28114.191881918818|
+--------------------+--------------------+



集計関数を使った場合の列名は、`alias()` で変えられます。  
RDB で言うところの `AS` XXX ですね。 

In [29]:
# Customer_main_type 毎の Avg_Salary の平均値を取得
df.groupBy('Customer_main_type').agg(F.mean('Avg_Salary').alias('平均給与額')).show()

+--------------------+--------------------+
|  Customer_main_type|          平均給与額|
+--------------------+--------------------+
|             Farmers|  30209.333333333332|
|       Career Loners|             32272.6|
|Retired and Relig...|   27338.80693069307|
|Successful hedonists|1.6278923510309279E7|
|         Living well|  31194.044943820223|
|      Average Family|  104256.62337662338|
|    Cruising Seniors|  28870.333333333332|
|Conservative fami...|  29504.419491525423|
|      Driven Growers|   30769.04069767442|
|Family with grown...|  28114.191881918818|
+--------------------+--------------------+



集計したら、**並び替え** もしたくなりますよね。**無論、可能です！**  
RDB でいう `ORDER BY` 句に紐づくものを使用するには、`sort()` や `orderBy()` を使用します。  
ここは、好きなものを使ってください。両方、単一列/複数列で昇順/降順のソートができます。

In [30]:
# sort() での並び替えの例 - その 1
df.groupBy('Customer_subtype').agg(F.avg('Avg_Salary').alias('mean_salary')).sort('mean_salary',ascending=True).show(50,False)

+------------------------------------------+--------------------+
|Customer_subtype                          |mean_salary         |
+------------------------------------------+--------------------+
|Low income catholics                      |21713.777777777777  |
|Single youth                              |24403.25            |
|Own home elderly                          |25677.666666666668  |
|Young urban have-nots                     |25751.0             |
|Lower class large families                |26012.628472222223  |
|Dinki's (double income no kids)           |26231.117647058825  |
|Fresh masters in the city                 |27645.0             |
|Couples with teens 'Married with children'|28155.807228915663  |
|Residential elderly                       |28866.166666666668  |
|Mixed small town dwellers                 |28982.106382978724  |
|Mixed rurals                              |29073.761194029852  |
|Traditional families                      |29381.84496124031   |
|Porchless

In [31]:
# sort() での並び替えの例 - その 2
df.select(['Customer_subtype','Number_of_houses','Avg_age','Avg_Salary']).sort('Avg_age','Number_of_houses',ascending=False).show()

+--------------------+----------------+-----------+----------+
|    Customer_subtype|Number_of_houses|    Avg_age|Avg_Salary|
+--------------------+----------------+-----------+----------+
|       Mixed seniors|               1|70-80 years|     12065|
|    Own home elderly|               1|70-80 years|     23340|
|    Own home elderly|               1|70-80 years|     47001|
|Seniors in apartm...|               1|70-80 years|     27724|
|       Mixed seniors|               1|70-80 years|     15518|
|    Own home elderly|               1|70-80 years|     41473|
|Lower class large...|               1|70-80 years|     10496|
|Religious elderly...|               1|70-80 years|     49059|
|     Family starters|               2|60-70 years|     25224|
|     Family starters|               2|60-70 years|     40452|
|Seniors in apartm...|               2|60-70 years|     35243|
|Dinki's (double i...|               2|60-70 years|     48812|
|Seniors in apartm...|               2|60-70 years|    

In [32]:
# orderBy() での並び替えの例 - その 1
df.groupBy('Customer_subtype').agg(F.avg('Avg_Salary').alias('mean_salary')).orderBy('mean_salary',ascending=True).show(50,False)

+------------------------------------------+--------------------+
|Customer_subtype                          |mean_salary         |
+------------------------------------------+--------------------+
|Low income catholics                      |21713.777777777777  |
|Single youth                              |24403.25            |
|Own home elderly                          |25677.666666666668  |
|Young urban have-nots                     |25751.0             |
|Lower class large families                |26012.628472222223  |
|Dinki's (double income no kids)           |26231.117647058825  |
|Fresh masters in the city                 |27645.0             |
|Couples with teens 'Married with children'|28155.807228915663  |
|Residential elderly                       |28866.166666666668  |
|Mixed small town dwellers                 |28982.106382978724  |
|Mixed rurals                              |29073.761194029852  |
|Traditional families                      |29381.84496124031   |
|Porchless

In [33]:
# orderBy() での並び替えの例 - その 2
df.select(['Customer_subtype','Number_of_houses','Avg_age','Avg_Salary']).orderBy('Avg_age','Number_of_houses',ascending=False).show()

+--------------------+----------------+-----------+----------+
|    Customer_subtype|Number_of_houses|    Avg_age|Avg_Salary|
+--------------------+----------------+-----------+----------+
|       Mixed seniors|               1|70-80 years|     12065|
|    Own home elderly|               1|70-80 years|     23340|
|    Own home elderly|               1|70-80 years|     47001|
|Seniors in apartm...|               1|70-80 years|     27724|
|       Mixed seniors|               1|70-80 years|     15518|
|    Own home elderly|               1|70-80 years|     41473|
|Lower class large...|               1|70-80 years|     10496|
|Religious elderly...|               1|70-80 years|     49059|
|     Family starters|               2|60-70 years|     25224|
|     Family starters|               2|60-70 years|     40452|
|Seniors in apartm...|               2|60-70 years|     35243|
|Dinki's (double i...|               2|60-70 years|     48812|
|Seniors in apartm...|               2|60-70 years|    

## Collect (配列)

集計するだけではなく、GROUP BY でグループ分けをしたら、**グループ毎にどんなデータ(要素)が含まれているのか**について確認したくなる時があります。  
その時に使用するのが、`Collect` です。  
  
Collect では、2 種類の配列を返すことができます。  

- `F.collect_list()`: すべての要素を元のデータセットのデータ順で表示
- `F.collect_set()`: 一意の値のみを表示 (いわゆる RDB でいう **DISTINCT**)

In [34]:
# Customer_subtype でグループ化し、Number_of_houses の collect_list を取得
df.groupby("Customer_subtype").agg(F.collect_list("Number_of_houses")).show()

+--------------------+------------------------------+
|    Customer_subtype|collect_list(Number_of_houses)|
+--------------------+------------------------------+
|Large family, emp...|          [2, 1, 2, 1, 2, 1...|
|Religious elderly...|          [1, 1, 1, 1, 1, 1...|
|Large religious f...|          [2, 1, 1, 2, 1, 1...|
|Modern, complete ...|          [1, 1, 2, 1, 1, 1...|
|    Village families|          [1, 1, 1, 1, 1, 1...|
|Young all america...|          [1, 1, 2, 2, 1, 1...|
|Young urban have-...|                  [1, 2, 1, 1]|
|Young seniors in ...|          [1, 1, 1, 1, 1, 2...|
|Fresh masters in ...|                        [1, 1]|
|High Income, expe...|          [1, 1, 1, 1, 1, 1...|
|Lower class large...|          [1, 1, 1, 1, 1, 1...|
| Residential elderly|            [3, 1, 1, 3, 2, 1]|
|Senior cosmopolitans|                           [3]|
|        Mixed rurals|          [1, 1, 1, 1, 1, 1...|
|Career and childcare|          [2, 1, 1, 1, 1, 1...|
|Low income catholics|      

In [35]:
# Customer_subtype でグループ化し、Number_of_houses の collect_set を取得
df.groupby("Customer_subtype").agg(F.collect_set("Number_of_houses")).show()

+--------------------+-----------------------------+
|    Customer_subtype|collect_set(Number_of_houses)|
+--------------------+-----------------------------+
|Large family, emp...|                       [1, 2]|
|Religious elderly...|                       [1, 2]|
|Large religious f...|                       [1, 2]|
|Modern, complete ...|                       [1, 2]|
|    Village families|                       [1, 2]|
|Young all america...|                       [1, 2]|
|Young urban have-...|                       [1, 2]|
|Young seniors in ...|                    [1, 2, 3]|
|Fresh masters in ...|                          [1]|
|High Income, expe...|                          [1]|
|Lower class large...|                       [1, 2]|
| Residential elderly|                    [1, 2, 3]|
|Senior cosmopolitans|                          [3]|
|        Mixed rurals|                          [1]|
|Career and childcare|                       [1, 2]|
|Low income catholics|                        

## User-Defined Functions (ユーザー定義関数)

RDB でいう`ユーザー定義関数`と同じことは PySpark でも実現可能です。  
特定のデータ処理をユーザー定義関数として用意しておくと、定義するだけでいろんな場所で流用できるため、非常に便利です。  
やり方は、`Python で関数を定義する`方法と同じです。(だって、ユーザー定義「関数」だもの)  
ただし、気をつける点として、**使用する際は関数の戻り値のデータ型について定義** してあげる必要があります。

In [36]:
# ユーザー定義関数 (UDF) を使用するためのライブラリをインポート
from pyspark.sql.functions import udf

In [37]:
# 年齢毎にでカテゴリを割り当てる UDF を作成
def age_category(age):
    if age  == '20-30 years':
        return 'Young'
    elif age== '30-40 years':
        return 'Mid Aged' 
    elif ((age== '40-50 years') or (age== '50-60 years')) :
        return 'Old'
    else:
        return 'Very Old'

In [38]:
# UDF の戻り値のデータ型を設定 (今回は string: 文字列)
age_udf = udf(age_category,StringType())

PySpark では、`バケット列 (bucket column)` を作成することができます。  
これは、主にグループ分けをする際に用いられる列で、新たに数式やカスタム項目を追加する必要がなくなるため、便利です。  
`withColumn()` で指定します。(まあ、いわゆる Excel 関数使用のための列追加みたいなものです)

In [39]:
# バケット列として age_category を用意し、値に UDF を実行結果を入れる
df = df.withColumn('age_category',age_udf(df['Avg_age']))
df.select('Customer_subtype','Avg_age','age_category').show()

+--------------------+-----------+------------+
|    Customer_subtype|    Avg_age|age_category|
+--------------------+-----------+------------+
|Lower class large...|30-40 years|    Mid Aged|
|Mixed small town ...|30-40 years|    Mid Aged|
|Mixed small town ...|30-40 years|    Mid Aged|
|Modern, complete ...|40-50 years|         Old|
|  Large family farms|30-40 years|    Mid Aged|
|    Young and rising|20-30 years|       Young|
|Large religious f...|30-40 years|    Mid Aged|
|Lower class large...|40-50 years|         Old|
|Lower class large...|50-60 years|         Old|
|     Family starters|40-50 years|         Old|
|       Stable family|40-50 years|         Old|
|Modern, complete ...|40-50 years|         Old|
|Lower class large...|40-50 years|         Old|
|        Mixed rurals|40-50 years|         Old|
|    Young and rising|30-40 years|    Mid Aged|
|Lower class large...|40-50 years|         Old|
|Traditional families|40-50 years|         Old|
|Mixed apartment d...|40-50 years|      

もちろん、バケット列も GROUP BY したりすることが可能です。

In [40]:
# バケット列を groupBy() して count() を実行
df.groupBy("age_category").count().show()

+------------+-----+
|age_category|count|
+------------+-----+
|    Mid Aged|  496|
|    Very Old|   72|
|         Old| 1401|
|       Young|   31|
+------------+-----+



### Pandas UDF (pandas を使ったユーザー定義関数)

Python には、**データ分析用のライブラリ: Pandas** というとても有名なものがあります。  
Pandas は、ビックデータなどの大きな表形式データを扱うことができる、とても便利なライブラリです。  

PySpark を使ってデータ分析を行う際の大きなメリットと言っても過言ではないと思います。

#### 標準の Python UDF と Pandas UDF の違い

大きな違いは、標準の Python UDF と比べて、`Pandas UDF のほうが処理時間と実行時間の点で遥かに高速で効率的`に動作するという点です。  
標準の Python UDF は、_行ごと_に UDF を実行します。そのため、分散プレームワークの利点を実際には活用できていません。  
Pandas UDF は、`ブロックごと`で処理を実行するため、分散フレームワークの利点を活かし、高速処理を実現しています。  
`ただし、データフレームが大きいと、かなりメモリを喰います！メモリ節約のコツ的なものを一緒に覚えておくと良いです。`(今回は時間の関係上、割愛します)

In [41]:
# Pandas UDF を使用するためのライブラリをインポート
import pandas as pd
#from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.functions import pandas_udf

In [42]:
df.printSchema()

root
 |-- Customer_subtype: string (nullable = true)
 |-- Number_of_houses: integer (nullable = true)
 |-- Avg_size_household: integer (nullable = true)
 |-- Avg_age: string (nullable = true)
 |-- Customer_main_type: string (nullable = true)
 |-- Avg_Salary: integer (nullable = true)
 |-- label: integer (nullable = true)
 |-- age_category: string (nullable = true)



In [42]:
min_sal=1361
max_sal=48919896

In [55]:
# Pandas UDF を作成
def scaled_salary(salary):
    scaled_sal=(salary-min_sal)/(max_sal-min_sal)
    return scaled_sal

In [56]:
# Pandas UDF の戻り値のデータ型を設定 (今回は double: 浮動小数点)
scaling_udf = pandas_udf(scaled_salary, DoubleType())

In [57]:
df.withColumn("scaled_salary", scaling_udf(df['Avg_Salary'])).show(10,False)

Py4JJavaError: An error occurred while calling o473.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 143.0 failed 1 times, most recent failure: Lost task 0.0 in stage 143.0 (TID 3289, 9f7e57e3df08, executor driver): java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
	at io.netty.util.internal.PlatformDependent.directBuffer(PlatformDependent.java:490)
	at io.netty.buffer.NettyArrowBuf.getDirectBuffer(NettyArrowBuf.java:243)
	at io.netty.buffer.NettyArrowBuf.nioBuffer(NettyArrowBuf.java:233)
	at io.netty.buffer.ArrowBuf.nioBuffer(ArrowBuf.java:245)
	at org.apache.arrow.vector.ipc.message.ArrowRecordBatch.computeBodyLength(ArrowRecordBatch.java:222)
	at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:240)
	at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:132)
	at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:120)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.$anonfun$writeIteratorToStream$1(ArrowPythonRunner.scala:94)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.writeIteratorToStream(ArrowPythonRunner.scala:101)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:383)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1932)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:218)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2023)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1972)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1971)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1971)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:950)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:950)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:950)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2203)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2152)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2141)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:752)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2133)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:467)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:420)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3625)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2695)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2695)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2902)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
	at jdk.internal.reflect.GeneratedMethodAccessor48.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
	at io.netty.util.internal.PlatformDependent.directBuffer(PlatformDependent.java:490)
	at io.netty.buffer.NettyArrowBuf.getDirectBuffer(NettyArrowBuf.java:243)
	at io.netty.buffer.NettyArrowBuf.nioBuffer(NettyArrowBuf.java:233)
	at io.netty.buffer.ArrowBuf.nioBuffer(ArrowBuf.java:245)
	at org.apache.arrow.vector.ipc.message.ArrowRecordBatch.computeBodyLength(ArrowRecordBatch.java:222)
	at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:240)
	at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:132)
	at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:120)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.$anonfun$writeIteratorToStream$1(ArrowPythonRunner.scala:94)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.writeIteratorToStream(ArrowPythonRunner.scala:101)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:383)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1932)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:218)


## Joins (結合)

データセット同士は、マージすることができます。  
`join()`を使用します。

In [58]:
# マージ用のデータフレームを作成
region_data = spark.createDataFrame([('Family with grown ups','PN'),
                                    ('Driven Growers','GJ'),
                                    ('Conservative families','DD'),
                                    ('Cruising Seniors','DL'),
                                    ('Average Family ','MN'),
                                    ('Living well','KA'),
                                    ('Successful hedonists','JH'),
                                    ('Retired and Religious','AX'),
                                   ('Career Loners','HY'),('Farmers','JH')],schema=StructType().add("Customer_main_type","string").add("Region Code","string"))
region_data.show()

+--------------------+-----------+
|  Customer_main_type|Region Code|
+--------------------+-----------+
|Family with grown...|         PN|
|      Driven Growers|         GJ|
|Conservative fami...|         DD|
|    Cruising Seniors|         DL|
|     Average Family |         MN|
|         Living well|         KA|
|Successful hedonists|         JH|
|Retired and Relig...|         AX|
|       Career Loners|         HY|
|             Farmers|         JH|
+--------------------+-----------+



In [61]:
# 既存のデータフレームに作成したマージ用のデータフレームを結合
# キーは Customer_main_type
new_df=df.join(region_data,on='Customer_main_type')
new_df.show()

+--------------------+--------------------+----------------+------------------+-----------+----------+-----+-----------+
|  Customer_main_type|    Customer_subtype|Number_of_houses|Avg_size_household|    Avg_age|Avg_Salary|label|Region Code|
+--------------------+--------------------+----------------+------------------+-----------+----------+-----+-----------+
|Family with grown...|Lower class large...|               1|                 2|40-50 years|     25596|    0|         PN|
|Family with grown...|Mixed small town ...|               1|                 2|40-50 years|     26579|    0|         PN|
|Family with grown...|Lower class large...|               1|                 4|30-40 years|     33537|    0|         PN|
|Family with grown...|    Village families|               1|                 3|40-50 years|     22089|    0|         PN|
|Family with grown...|Lower class large...|               1|                 2|40-50 years|     38712|    0|         PN|
|Family with grown...|    Villag

## Pivoting (ピボット)

いわゆる `ピボットテーブル`を作れます。  
データフレームを作成して、分析や確認のために別の形に表を切り替えたい時などに使えます。  
例えは、  

|Product|Amount|Country|  
| :-- | :-- | :-- |  
|Banana |1000  |USA    |  
|Carrots|1500  |USA    |  
|Beans  |1600  |USA    |  
|Orange |2000  |USA    |  
|Orange |2000  |USA    |  
|Banana |400   |China  |  
|Carrots|1200  |China  |  
|Beans  |1500  |China  |  
|Orange |4000  |China  |  
|Banana |2000  |Canada |  
|Carrots|2000  |Canada |  
|Beans  |2000  |Mexico |  

こういうデータフレームがあったとして、これを  

| Product | Canada | China | Mexico | USA |
| :-- | :-- | :-- | :-- | :-- |  
|Orange |null  |4000 |null  |4000|  
|Beans  |null  |1500 |2000  |1600|  
|Banana |2000  |400  |null  |1000|  
|Carrots|2000  |1200 |null  |1500|  

のように、Country の列を回転させて、Product 列でグルーピングした値を並べて表示、みたいなことができます。

In [62]:
# データフレームを Customer_main_type でグループ化し、Avg_age でピボット
df.groupBy('Customer_main_type').pivot('Avg_age').sum('Avg_salary').fillna(0).show()

+--------------------+-----------+-----------+-----------+-----------+-----------+-----------+
|  Customer_main_type|20-30 years|30-40 years|40-50 years|50-60 years|60-70 years|70-80 years|
+--------------------+-----------+-----------+-----------+-----------+-----------+-----------+
|             Farmers|          0|     462027|    2031235|     316206|          0|          0|
|       Career Loners|     143998|     176639|      25701|     105193|      32558|          0|
|Retired and Relig...|     126350|     336631|    2975266|    1687711|     335357|      61124|
|Successful hedonists|      42261|  171278764| 1223362814| 1563071675|  200340129|      15518|
|         Living well|     460528|    2965303|    1795405|     331304|          0|          0|
|      Average Family|          0|   23682805|    7789464|     412490|     226281|          0|
|    Cruising Seniors|          0|      43302|     303601|     529354|     716425|     139538|
|Conservative fami...|      69390|    2381485|    

ピボットは、データの分析を行う際にとても便利です。  
`Excel で頑張って同じものを作ろうとすると、生産性が低下し、無駄につながります。`  
この機会に、覚えておくと良いと思います。

## Window Functions or Window Aggregates (ウインドウ関数またはウインドウ集計)

RDB の`ウインドウ関数`に該当するものです。  
PySpark では、以下の 3 種類のウインドウ関数がサポートされています。  

- Aggregations (集合)
- Ranking (順位付け)
- Analytics (分析)

以下のライブラリのインポートは忘れないようにしてください。

```python
from pyspark.sql.window import Window
```

In [64]:
# 今回は、Ranking を試してみます
# 実行に必要なライブラリをインポート
from pyspark.sql.window import Window
from pyspark.sql.functions import udf,rank, col,row_number

In [65]:
# Avg Salary を順番に並べるためのウィンドウ関数を作成
win = Window.orderBy(df['Avg_Salary'].desc())

In [67]:
# 行番号を rank としてバケット列を追加
df = df.withColumn('rank', row_number().over(win).alias('rank'))
df.select(['Customer_main_type','Customer_subtype','Avg_age','Avg_Salary','rank']).sort('rank').show()

+--------------------+--------------------+-----------+----------+----+
|  Customer_main_type|    Customer_subtype|    Avg_age|Avg_Salary|rank|
+--------------------+--------------------+-----------+----------+----+
|Successful hedonists| High status seniors|60-70 years|  48919896|   1|
|Successful hedonists|High Income, expe...|50-60 years|  48177970|   2|
|Successful hedonists|High Income, expe...|50-60 years|  48069548|   3|
|Successful hedonists|High Income, expe...|40-50 years|  46911924|   4|
|Successful hedonists| High status seniors|40-50 years|  46614009|   5|
|Successful hedonists|High Income, expe...|30-40 years|  45952441|   6|
|Successful hedonists|High Income, expe...|40-50 years|  45864609|   7|
|Successful hedonists| High status seniors|50-60 years|  45592572|   8|
|Successful hedonists| High status seniors|50-60 years|  45170899|   9|
|Successful hedonists|High Income, expe...|50-60 years|  44843830|  10|
|Successful hedonists| High status seniors|50-60 years|  4323034

# まとめ

今回は、PySpark Intro ということで、`PySpark でどういったデータ操作が可能なのか`について紹介しました。  
**Spark ムズカシイ、PySpark 何それ美味しいの？** と思っていたみなさん、SQL の知識があれば、`あれ、PySpark 意外と簡単じゃね？`と思ってもらえるのではないかと思います。  

今回ご紹介した

- Dataframes (データフレーム)
- Select (選択)
- Filter (フィルタリング)
- Where (条件指定)
- Aggregations (集合)
- Collect (配列)
- User-Defined Functions (ユーザー定義関数)
- Joins (結合)
- Pivoting (ピボット)
- Window Functions or Window Aggregates (ウインドウ関数またはウインドウ集計)

を振り返ってもらって、是非、Azure Synapse Analytics の Spark プールの活用を始めてみてください。 
SQL on-demand もいいけど、やっぱりデータ分析の先にあるのは、**どの目的でデータを活用するのか** です。その手段の 1 つとして機械学習があると思いますが、`機械学習をするなら Python/Spark の方が何かと好都合です。`  
是非、自分の会社の中に埋もれている、いろんなデータを PySpark を使って分析してみてください。

# 参考情報

## Microsoft Docs

- [Azure Synapse Analytics での Apache Spark](https://docs.microsoft.com/ja-jp/azure/synapse-analytics/spark/apache-spark-overview)

## Spark SQL Guide

- [PySpark Usage Guide for Pandas with Apache Arrow](http://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html)

## @IT

- [Apache Sparkとは何か――使い方や基礎知識を徹底解説 ](https://www.atmarkit.co.jp/ait/articles/1608/24/news014.html)
