# GeoJSON ファイルのロードと計算処理の分散実行

このエクササイズでは、**行ごとにポリゴン情報が含まれる** [GeoJSONSeq](https://gdal.org/drivers/vector/geojsonseq.html) ファイルを Spark でロードし、各ワーカー上で分散処理を行います。

Spark SQL は自動的にJSONデータセットのスキーマを推測しデータフレームとしてロードすることができます。この変換は、JSONファイル上の `SparkSession.read.json` を使って行うことができます。

`spark.read.json` が求めるファイルは一般的な JSON ファイルではないことに注意してください。各行は別個の自己内包の有効な JSON オブジェクトでなければなりません。これは [JSON Lines](https://jsonlines.org/) や [Newline-Delimited JSON](http://ndjson.org/) とも呼ばれます。

http://mogile.web.fc2.com/spark/sql-data-sources-json.html

## JSON ファイルのロード

In [0]:
%%time

df = spark.read.json("/mnt/testblob/geojson/niigata_cropfiled_geojsonl.json")

## データの取得
PySparkでは、アクションを使用してDataFrame `show()` の先頭/最初の N (5,10,100 ..) 行を取得し、それらをコンソールまたはログに表示できます。行のリストとしての先頭と最後の n 行 (Scala の場合は Array[Row])。Spark アクションは Spark Driver に結果を取得するため、大きなデータセットを抽出するときは十分に注意する必要があります。

https://sparkbyexamples.com/spark/show-top-n-rows-in-spark-pyspark/

In [0]:
display(df.take(5))

In [0]:
# RDD のパーティション数を返します
df.rdd.getNumPartitions()

## キャッシュ
Pyspark の `cache()` メソッドを使用して変換の中間結果をキャッシュし、キャッシュされた上で実行される他の変換がより高速に実行されるようにします。変換の結果をキャッシュすることは、長時間実行される PySpark アプリケーション/ジョブのパフォーマンスを向上させるための最適化のトリックの 1 つです。

https://sparkbyexamples.com/pyspark/pyspark-cache-explained/

In [0]:
df.cache()

## ユーザー定義関数 (UDF) 
ユーザー定義関数 (UDF) は、ユーザーによって定義された関数であり、ユーザー環境でカスタム ロジックを再利用できます。Databricks は、拡張可能なロジックを配布できるように、さまざまな種類の UDF をサポートしています。<br><br>

- https://docs.databricks.com/udf/index.html
- https://docs.databricks.com/udf/python.html#use-udf-with-dataframes

### Polygon 座標から面積を計算する UDF を定義

GeoJson から得られる Polygon 座標を投影座標に変換した後、m^2 の単位で面積を計算します。

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import StringType, IntegerType, FloatType, DoubleType, DecimalType
from pyspark.sql.functions import lit, pandas_udf, PandasUDFType, col
from shapely.geometry import Polygon, shape
import geopandas as gpd
import pyproj
from shapely.ops import transform

wgs84 = pyproj.CRS('EPSG:4326')
utm = pyproj.CRS('EPSG:3100')
project = pyproj.Transformer.from_crs(wgs84, utm, always_xy=True).transform

def calc_area(geometry): 
    polygon_geom = shape(geometry.asDict(True))
    utm_point = transform(project, polygon_geom)
    area = str(utm_point.area)
  
    return area

calc_area_udf = udf(calc_area, StringType())

## 列を追加する
PySpark `withColumn()` は、値の変更、既存の列のデータ型の変換、新しい列の作成などに使用される DataFrame の変換関数です。

https://sparkbyexamples.com/pyspark/pyspark-withcolumn/

In [0]:
%%time

# area 列を追加
joined = df.withColumn("area", calc_area_udf(col("geometry")))


## ファイルに保存
Spark の遅延評価のため、下記の保存を実行するまで実際の演算は実行されません。

In [0]:
%%time
joined.write.mode('Overwrite').json("/mnt/testblob/geojson/sparkarea")

## 参考　GeoPandas による非並列ロード

Spark を使わずに GeoPandas の機能を使って面積を計算する処理を実装します。<br>
注意：読み込みには約 4 分程度かかります。

In [0]:
from geopandas import GeoDataFrame

# Loading cropfields Data
geo_df = GeoDataFrame.from_file('/dbfs/mnt/testblob/geojson/niigata_cropfiled_geojsonl.json')
geo_df.head(5)

In [0]:
# CRS の確認
geo_df.crs

In [0]:
# CRS の変換
geo_df = geo_df.to_crs(epsg=3100)
geo_df.head(5)

In [0]:
# GeoPandas の面積計算(m^2)
geo_df["area"] = geo_df['geometry'].area
geo_df.head(5)

注意：保存には約 12 分程度かかります。

In [0]:
%%time
geo_df.to_file("/dbfs/mnt/testblob/geojson/niigata_area.json", driver='GeoJSONSeq')