# PySparkによるデータ変換

PythonからApache Sparkを操作する際に使用するAPIであるPySparkの基本的な使い方を説明します。

**参考資料**
- [PySparkことはじめ \#Databricks \- Qiita](https://qiita.com/taka_yayoi/items/a7ee6287031374efa88a)
- [About Spark – Databricks](https://databricks.com/jp/spark/about)
- [Databricks Apache Sparkクイックスタート \- Qiita](https://qiita.com/taka_yayoi/items/bf5fb09a0108aa14770b)
- [Databricks Apache Sparkデータフレームチュートリアル \- Qiita](https://qiita.com/taka_yayoi/items/2a7e9bb792eba316de4b)
- [PySpark Documentation — PySpark 3\.2\.1 documentation](https://spark.apache.org/docs/latest/api/python/)
- [Beginner’s Guide on Databricks: Spark Using Python & PySpark \| by Christopher Lewis \| Analytics Vidhya \| Medium](https://medium.com/analytics-vidhya/beginners-guide-on-databricks-spark-using-python-pyspark-de74d92e4885)
- [【PySpark入門】第１弾 PySparkとは？ \- サーバーワークスエンジニアブログ](https://blog.serverworks.co.jp/introducing-pyspark-1)

## ライブラリのインポート

処理に必要なモジュールをインポートします。

In [0]:
from pyspark.sql.functions import col, avg
from pyspark.sql.types import IntegerType, FloatType

## データのロード

PySparkでデータをロードする際には`spark.read`を使用します。`format`の引数に読み込むデータのフォーマットを指定します。`json`、`parquet`、`delta`などが指定できます。読み込んだデータはSparkデータフレームとなります。

その前に、読み込むデータを以下のコマンドで確認します。

In [0]:
%fs
ls dbfs:/databricks-datasets/samples/population-vs-price/

In [0]:
# データフレームにサンプルデータをロードします
df = spark.read.format("csv").option("header", True).load("/databricks-datasets/samples/population-vs-price/data_geo.csv")

In [0]:
df.show(20)

In [0]:
# Databricksでデータフレームを表示するにはdisplay関数を使うと便利です
display(df)

### カラムの確認

In [0]:
df.columns

### スキーマの確認

In [0]:
# データフレームのスキーマを表示
df.printSchema()

## カラム名の変更

- `withColumnRenamed`を使ってカラム名を変更します。

In [0]:
df2 = df.withColumnRenamed('2014 rank', '2014_rank')\
.withColumnRenamed('State Code', 'state_code')\
.withColumnRenamed('2014 Population estimate', '2014_pop_estimate')\
.withColumnRenamed('2015 median sales price', '2015_median_sales_price')

In [0]:
display(df2)

## データ型の変換

- 既に存在しているデータフレームのカラムを指定するには、`col`関数の引数にカラム名を指定します。
- `cast`にデータ型を指定してキャストします。
- `withColumn`を用いて、キャストした後の値を持つカラムで更新します。

[Data Types \- Spark 3\.2\.1 Documentation](https://spark.apache.org/docs/latest/sql-ref-datatypes.html)

In [0]:
df3 = df2.withColumn("2014_rank", col("2014_rank").cast(IntegerType()))\
 .withColumn("2014_pop_estimate", col("2014_pop_estimate").cast(IntegerType()))\
 .withColumn("2015_median_sales_price", col("2015_median_sales_price").cast(FloatType()))

display(df3)

## データの操作

### フィルタリング、ソート

以下の例では、`df3`で`2015_median_sales_price`が100より大きいレコードを`2015_median_sales_price`の降順でソートし、カラム`2014_rank`, `City`, `2015_median_sales_price`を取得しています。

In [0]:
display(df3.select("2014_rank", "City", "2015_median_sales_price")\
        .where("2015_median_sales_price > 100")\
        .orderBy(col("2015_median_sales_price").desc()))

### 集計

以下の処理を行なっています。

1. `state_code`でレコードをグルーピング
1. グループごとの`2015_median_sales_price`の平均値を計算
1. 平均値降順でレコードを取得

In [0]:
display(df3.groupBy("state_code")\
        .agg(avg("2015_median_sales_price").alias("2015_median_sales_price_avg"))
        .orderBy(col("2015_median_sales_price_avg").desc()))

## pandasとのやりとり

matplotlibで可視化したいなどpandas前提の処理を行う場合には、Sparkデータフレームをpandasデータフレームに変換します。

In [0]:
df4 = df3.groupBy("state_code")\
        .agg(avg("2015_median_sales_price").alias("2015_median_sales_price_avg"))\
        .orderBy(col("2015_median_sales_price_avg").desc()).limit(10)

In [0]:
import matplotlib.pyplot as plt

# pandasデータフレームに変換します
pdf = df4.toPandas()

# 棒グラフを描画します
plt.bar(pdf['state_code'], pdf['2015_median_sales_price_avg'], align="center")           
plt.show()

pandasデータフレームをSparkデータフレームに変換することもできます。

In [0]:
# Sparkデータフレームへの変換
sdf = spark.createDataFrame(pdf)
display(sdf)

## その他のAPI

### Spark SQL

データフレームをテーブルあるいは一時ビューに登録することで、SQLを使用してデータを操作することができるようになります。

テーブルは永続化されますが、一時ビューは永続化されず、クラスターが稼働している間のみ一時ビューを作成したセッションでのみ利用することができます。

In [0]:
# データフレームを一時ビューに登録します
df3.createOrReplaceTempView("pop_price")

In [0]:
# '2014_rank' カラムに基づいて上位10位の市を参照します
top_10_results = spark.sql("""SELECT * FROM pop_price 
                              WHERE 2014_rank <= 10
                              SORT BY 2014_rank ASC""")
display(top_10_results)

In [0]:
%sql
SELECT
  *
FROM
  pop_price
WHERE
  2014_rank <= 10 SORT BY 2014_rank ASC

### pandas API on Spark

pandas APIに慣れ親しんでいる方は、pandas API on Spark(旧Koalas)を活用することもできます。

[Pandas API on Spark \| Databricks Documentation](https://docs.databricks.com/aws/ja/pandas/pandas-on-spark)

In [0]:
spark.conf.set("spark.sql.ansi.enabled", "false")

In [0]:
import pyspark.pandas as ps

psdf = sdf.pandas_api()  # pandas-on-Sparkデータフレーム

# pandasのお作法でカラムにアクセスします
psdf['state_code']

# END