# Synapse Spark を使用してデータの探索と修正を行う

このタスクでは、Synapse Spark ノートブックを使用してデータ レイクの `wwi-02/sale-poc` フォルダー内のファイルをいくつか探索します。また、Python コードを使用して `sale-20170502.csv` ファイルの問題を修正し、このラボで後ほど Synapse パイプラインを使用してディレクトリのファイルがすべて取り込まれるようにします。

まず、プライマリ データ レイク ストレージ アカウントの名前を提供するためにノートブック内で変数を設定する必要があります。以下のセルを実行する前に、`[YOUR-DATA-LAKE-ACCOUNT-NAME]` を、使用している Syanpse ワークスペースに関連のあるプライマリ データ レイク ストレージ アカウントの名前に置き換えなくてはなりません。

データ レイク ストレージ アカウントの名前は見つけるには、Synapse Studio で 「**データ**」 ハブに移動して 「**リンク**」 タブを選択し、**Azure Data Lake Storage Gen2** で「**asadatalake**」で始まるストレージ アカウント名を探してください。

![プライマリ データ レイク ストレージ アカウントが 「データ」 ハブの 「リンク」 タブで強調表示されています。](https://solliancepublicdata.blob.core.windows.net/images/synapse/data-hub-primary-data-lake-storage-account.png "Primary ADLS Gen2 Account")

1.データ レイク ストレージ アカウントの名前をコピーし、セルで `[YOUR-DATA-LAKE-ACCOUNT-NAME]` の代わりに貼り付けます。セルの選択後に表示される 「**セルの実行**」 ボタンを選択すると、セルを実行できます。

    ![プライマリ データ レイク ストレージ アカウントが 「データ」 ハブの 「リンク」 タブで強調表示されています。](https://solliancepublicdata.blob.core.windows.net/images/synapse/synapse-notebook-run-cell.png "Primary ADLS Gen2 Account")



In [None]:
adls_account_name = '[YOUR-DATA-LAKE-ACCOUNT-NAME]'

## Spark を使用してファイルを探索する

1.Synapse Spark を使用してデータを探索する最初の手順は、データ レイクからファイルを読み込むことです。このために、`SparkSession` で `spark.read.load()` メソッドを使用できます。

2.Spark でファイルを [DataFrames](https://spark.apache.org/docs/2.2.0/sql-programming-guide.html#datasets-and-dataframes) に読み込みます。これは、名前が指定された列でデータの構造化を可能にする抽象化です。以下のセルを実行し、`sale-20170501.csv` ファイルのデータをデータ フレームに読み込みます。セルを実行するには、マウスをセルの左側に動かし、「**セルの実行**」 ボタンを選択します。

    ![「セルの実行」 ボタンが、実行するセルのコンテンツの左側で強調表示されています。](https://solliancepublicdata.blob.core.windows.net/images/synapse/synapse-notebook-run-cell-load-sale-20170501-csv.png "Run cell")

In [None]:
# 最初に `sale-20170501.csv` ファイルを読み込みます。これは前の探索によって適正に書式化されていることがわかっているファイルです。
# `header` と `inferSchema` パラメーターの使用に留意してください。 header は、ファイルの最初の行に列のヘッダーが含まれていることを示唆します。
# `inferSchema` は、ファイル内のデータを使用してデータ型を推測するよう Spark に指示します。
df = spark.read.load(f'abfss://wwi-02@{adls_account_name}.dfs.core.windows.net/sale-poc/sale-20170501.csv', format='csv', header=True, inferSchema=True)

## DataFrame のコンテンツを表示する

`Sale-20170501.csv` ファイルのデータをデータ フレームに読み込むと、データ フレームのさまざまなメソッドを使用してデータのプロパティを探索できます。

1.まず、インポートされたデータを見てみましょう。以下のセルを実行すると、データ フレームでデータを表示して調べられます。

In [None]:
display(df.limit(10))

2.Azure Synapse の SQL オンデマンド機能で探索した場合と同様に、Spark ではファイル内のデータを表示して、これに対するクエリを実行できます。 

3.データ フレームの `printSchema()` メソッドを使うと、データ フレームの作成時に `inferSchema` パラメーターを使用した結果が表示されます。以下のクエリを実行して出力を確認します。

In [None]:
# ここで推測されたスキーマを印刷します。この情報は、以下で「May 2, 2017」ファイルのヘッダーが欠落している場合に必要になります。
df.printSchema()

4.`printSchema` メソッドは、各フィールド内に含まれているデータの Spark エンジンの評価に基づき、フィールド名とデータ型の両方を出力します。

    > この情報は後ほど、形式の不良な `sale-20170502.csv` ファイルのスキーマを定義する際に利用できます。フィールド名とデータ型のほか、ファイルに含まれている機能や列の数に留意してください。この場合は、フィールドが 11 あります。これは、データの単一の行を分割する箇所を判定するために使用されます。

5.さらに可能な探索の例として、以下のセルを実行し、明確な Customer と Product Id のペアに順序を付けたリストが含まれている新しいデータ フレームを表示することができます。このような関数のタイプを利用すると、ターゲットのファイルで無効な値や空の値を見つけられます。

In [None]:
# 明確な CustomerId と ProductId の値のリスト (CustomerId の降順) が含まれている新しいデータ フレームを作成します。
df_distinct_products = df.select('CustomerId', 'ProductId').distinct().orderBy('CustomerId')

# その結果のデータ フレームの最初の 100 行を表示します。
display(df_distinct_products.limit(100))

6.次に、上記のように `load()` メソッドを使用し、`sale-20170502.csv` ファイルを開いて探索してみましょう。

In [None]:
# 次に、最初のファイルで使用したものと同じ `load()` メソッドを使用して、「May 2, 2017」ファイルで読み取りを試してみましょう。
df = spark.read.load(f'abfss://wwi-02@{adls_account_name}.dfs.core.windows.net/sale-poc/sale-20170502.csv', format='csv')
display(df.limit(10))

7.T-SQL の場合と同様に、処理された列の数が 20480 列の限度を超過した可能性があるという類似したエラー メッセージが Spark で表示されます。このファイルのデータを使うには、次のセクションで説明されているような、さらに高度なメソッドを利用する必要があります。


## 形式の不良な CSV ファイルを取り扱い修正する

> 以下の手順は、`wwi-02/sale-poc` フォルダーのファイル探索中に発見した形式の不良な CSV ファイル `sale-20170502.csv` を修正するためのコードの例です。これは、Spark を使用して形式の不良な CSV ファイルの「修正」を行う数多くの方法のひとつに過ぎません。

1.不良なファイルを「修正」するには、プログラミングのアプローチが必要です。Python を使用してファイルのコンテンツを読み取り、それを解析して適切な形にする必要があります。

    > 単一行のデータを扱うには、`SparkContext` の `textFile()` メソッドを使用し、行のコレクションとしてファイルをレジリエントな分散データセット (RDD) に読み込みます。基本的に単一の列に格納されている単一の文字列の値を取得していることになるので、列数に関するエラーを回避できます。

2.以下のセルを実行し、ファイルのデータが含まれている RDD を読み込みます。

In [None]:
# NumPy ライブラリをインポートします。NumPy は、配列で使用される python ライブラリです。
import numpy as np

# CSV ファイルをテキスト ファイルとしてレジリエントな分散データセット (RDD) に読み取ります。ファイルの各行が RDD の行に読み取られます。
rdd = sc.textFile(f'abfss://wwi-02@{adls_account_name}.dfs.core.windows.net/sale-poc/sale-20170502.csv')

3.これで RDD にデータが格納されたため、RDD で最初にデータが読み込まれた唯一の行にアクセスし、これを個々のフィールドに分割できます。Notepad++ でファイルを調べたので、すべてのフィールドはコンマ (,) で区切られていることがわかっています。まず、これを分割して、フィールド値の配列を作成しましょう。以下のセルを実行してデータ配列を作成します。

In [None]:
# 1 行しかないことがわかっているので、RDD の最初の行を使用してフィールド区切り記号 (コンマ) で分割します。
data = rdd.first().split(',')

field_count = len(data)
# 配列に読み取られたフィールドのカウントを印刷します。
print(field_count)

4.フィールド区切り記号で行を分割したため、ファイル内の個々のフィールド値すべての配列が作成されました。そのカウントは上記のとおりです。

5.以下のセルを実行すると、11 フィールドごとに単一行に解析して生成される行数をすばやく計算して予測できます。

In [None]:
import math

expected_row_count = math.floor(field_count / 11)
print(f'The expected row count is: {expected_row_count}')

6.次に、各「行」に関連のあるデータを格納する配列を作成してみましょう。

    > max_index を各行で予測される列数に設定します。`Wwi-02/sale-poc` フォルダーでの他のファイルの探索から 11 列が含まれていることがわかっているので、この値を設定します。

7.変数を設定するほか、以下のセルを使用して `data` 配列をループして、1 行に 11 の値を割り当てます。これにより、かつて単一行だったデータを、ファイルからの適切なデータと列が含まれた複数の適切な行に「分割」できます。

8.以下のセルを実行し、ファイルのデータから行の配列を作成します。

In [None]:
# 各「行」に関連のあるデータを格納するための配列を作成します。max_index は、各行の列の数に設定します。「May 1」ファイルのスキーマを表示する際に判明しているとおり、これは 11 です。
row_list = []
max_index = 11

# ファイルの単一の行から抽出した値の配列を通して、11 列で構成される行を構築します。
while max_index <= len(data):
    row = [data[i] for i in np.arange(max_index-11, max_index)]
    row_list.append(row)

    max_index += 11

print(f'The row array contains {len(row_list)} rows. The expected number of rows was {expected_row_count}.')

9.ファイル データを行として使用するには、最後にこれを Spark DataFrame に読み取る必要があります。以下のセルで、`createDataFrame()` メソッドを使用して `row_list` 配列をデータ フレームに変換し、列の名前も追加します。列の名前は、`wwi-02/sale-poc` ディレクトリの形式の良好なファイルで観察されたスキーマに基づいています。

10.以下のセルを実行し、ファイルからの行データが含まれているデータ フレームを作成し、最初の 10 行を表示します。

In [None]:
# 最後に、上記で作成した row_list を使用して DataFrame を作成します。これをスキーマのパラメーターに追加できます。ここには、最初のファイルのスキーマで表示されていた列の名前が含まれています。
df_fixed = spark.createDataFrame(row_list,schema=['TransactionId', 'CustomerId', 'ProductId', 'Quantity', 'Price', 'TotalAmount', 'TransactionDateId', 'ProfitAmount', 'Hour', 'Minute', 'StoreId'])
display(df_fixed.limit(10))

## 「修正済み」のファイルをデータ レイクに書き込む

1.ファイルの探索・修正プロセスの一環として実行する最後の手順は、データを再びデータ レイクに書き込むことです。これにより、`wwi-02/sale-poc` フォルダーの他のファイルと同じプロセスに従ってデータを取り込めます。

2.以下のセルを実行し、`sale-20170502-fixed` という名前のフォルダーの一連のファイルとしてデータ レイクでデータフレームを保存します。

    > 注: Spark はワーカー ノード全体でワークロードを並列処理するため、ファイルを保存すると、単一のファイルではなく、コレクションの「部分」ファイルとして保存されます。単一のファイルを作成するために使用できるライブラリもありますが、Spark ノートブックでネイティブに生成されたファイルの使用に慣れておくようにしましょう。


In [None]:
df.write.format('csv').option('header',True).mode('overwrite').option('sep',',').save(f'abfss://wwi-02@{adls_account_name}.dfs.core.windows.net/sale-poc/sale-20170502-fixed')

## 修正済みのファイルをデータ レイクで検査する

1.修正されたファイルをデータ レイクに書き込むと、これをすばやく検査して、ファイルが適切に書式化されていることを確認できます。上記の `wwi-02` タブを選択し、`sale-20170502-fixed` フォルダーをダブルクリックしてください。

    ![](https://solliancepublicdata.blob.core.windows.net/images/synapse/wwi-02-sale-poc-sale-20170502-fixed.png)

2.`Sale-20170502-fixed` フォルダーで、名前が `part` で始まり、拡張子が `.csv` の最初のファイルを右クリックし、コンテキスト メニューで 「**プレビュー**」 を選択します。

    ![](https://solliancepublicdata.blob.core.windows.net/images/synapse/wwi-02-sale-poc-sale-20170502-fixed-content.png)

3.[**プレビュー**] ダイアログで、適切な列が表示され、各フィールドのデータが有効であることを確認します。

    ![](https://solliancepublicdata.blob.core.windows.net/images/synapse/sale-20170502-fixed-preview.png)

## まとめ

この演習では、Spark ノートブックを使用して、データ レイクのファイル内に格納されているデータを探索しました。Python コードを使用して、形式の不良な CSV ファイルからデータを抽出し、そのファイルから適切な行にデータをアセンブルして、「修正済み」のファイルを再びデータ レイクに書き込みました。

ラボのガイドに戻り、ラボ 2 の次のセクションに進んでください。
