# Synap Spark를 사용하여 데이터 탐색 및 수정

이 작업에서는 Synapse Spark Notebook을 사용해 데이터 레이크의 `wwi-02/sale-poc` 폴더에 있는 파일 몇 개를 살펴봅니다. 그리고 이 랩 뒷부분에서 Synapse 파이프라인을 사용해 디렉터리의 모든 파일을 수집할 수 있도록 Python 코드를 사용하여 `sale-20170502.csv` 파일의 문제를 해결합니다.

먼저 Notebook 내에서 변수를 설정하여 기본 데이터 레이크 스토리지 계정의 이름을 제공해야 합니다. 아래 셀을 실행하기 전에 `[YOUR-DATA-LAKE-ACCOUNT-NAME]`을 사용자의 Synapse 작업 영역과 연결된 기본 데이터 레이크 스토리지 계정 이름으로 바꿔야 합니다.

Synapse Studio에서 **데이터** 허브로 이동하여 **연결됨** 탭을 선택한 다음 **Azure Data Lake Storage Gen2** 아래에서 **asadatalake**로 시작하는 스토리지 계정 이름을 찾으면 데이터 레이크 스토리지 계정의 이름을 확인할 수 있습니다.

![데이터 허브 연결됨 탭에서 기본 데이터 레이크 스토리지 계정이 강조 표시되어 있는 그래픽](https://solliancepublicdata.blob.core.windows.net/images/synapse/data-hub-primary-data-lake-storage-account.png "Primary ADLS Gen2 Account")

1. 데이터 레이크 스토리지 계정 이름을 복사한 다음 아래 셀의 `[YOUR-DATA-LAKE-ACCOUNT-NAME]` 대신 붙여넣습니다. 그런 다음 셀을 선택하면 표시되는 **셀 실행** 단추를 선택하여 셀을 실행합니다.

    ![데이터 허브 연결됨 탭에서 기본 데이터 레이크 스토리지 계정이 강조 표시되어 있는 그래픽](https://solliancepublicdata.blob.core.windows.net/images/synapse/synapse-notebook-run-cell.png "Primary ADLS Gen2 Account")



In [None]:
adls_account_name = '[YOUR-DATA-LAKE-ACCOUNT-NAME]'

## Spark를 사용하여 파일 살펴보기

1. Synapse Spark를 사용하여 데이터를 탐색할 때는 첫 단계로 데이터 레이크에서 파일을 로드합니다. 이 작업에는 `SparkSession`의 `spark.read.load()` 메서드를 사용할 수 있습니다.

2. Spark에서는 파일의 데이터를 [데이터 프레임](https://spark.apache.org/docs/2.2.0/sql-programming-guide.html#datasets-and-dataframes)에 로드할 수 있습니다. 데이터 프레임은 명명된 열로 데이터를 구조화할 수 있는 추상화입니다. 아래 셀을 실행하여 `sale-20170501.csv` 파일의 데이터를 데이터 프레임에 로드합니다. 셀 왼쪽을 마우스로 가리킨 다음 파란색 **셀 실행** 단추를 선택하면 셀을 실행할 수 있습니다.

    ![실행할 셀 내용 왼쪽의 셀 실행 단추가 강조 표시되어 있는 그래픽](https://solliancepublicdata.blob.core.windows.net/images/synapse/synapse-notebook-run-cell-load-sale-20170501-csv.png "Run cell")

In [None]:
# 먼저 이전에 올바른 서식이 지정된 것으로 확인된 `sale-20170501.csv` 파일을 로드합니다.
# `header` 및 `inferSchema` 매개 변수의 사용 방식을 살펴봅니다. `header`는 파일의 첫 번째 행에 열 이름이 포함되어 있음을 나타냅니다.
# `inferSchema`는 파일 내의 데이터를 사용하여 데이터 형식을 유추하도록 Spark에 명령합니다.
df = spark.read.load(f'abfss://wwi-02@{adls_account_name}.dfs.core.windows.net/sale-poc/sale-20170501.csv', format='csv', header=True, inferSchema=True)

## 데이터 프레임의 내용 확인

`sale-20170501.csv` 파일의 데이터를 데이터 프레임에 로드했으므로 이제 데이터 프레임의 다양한 메서드를 사용하여 데이터 속성을 살펴볼 수 있습니다.

1. 우선 가져온 데이터부터 살펴보겠습니다. 아래 셀을 실행하여 데이터 프레임의 데이터를 표시한 후 살펴봅니다.

In [None]:
display(df.limit(10))

2. Azure Synapse의 SQL 주문형 기능을 살펴볼 때 확인했던 것처럼, Spark에서는 파일 내에 포함된 데이터를 확인하고 쿼리할 수 있습니다. 

3. 이제 데이터 프레임의 `printSchema()` 메서드를 사용하여 데이터 프레임 작성 시에 `inferSchema` 매개 변수를 사용하는 경우의 결과를 확인해 보겠습니다. 아래 셀을 실행하고 출력을 살펴봅니다.

In [None]:
# 이제 유추된 스키마를 인쇄해 보겠습니다. 다음 작업에서 2017년 5월 2일 파일에 누락된 헤더를 추가하려면 이 정보가 필요합니다.
df.printSchema()

4. `printSchema` 메서드는 Spark 엔진이 평가한 각 필드 내에 포함된 데이터를 기준으로 하는 데이터 형식과 필드 이름을 모두 출력합니다.

    > 뒷부분에서 형식이 잘못된 `sale-20170502.csv` 파일의 스키마를 정의할 때 이 정보를 사용할 수 있습니다. 필드 이름과 데이터 형식 외에 파일에 포함된 기능이나 열 수도 확인해야 합니다. 여기서는 파일에 필드 11개가 포함되어 있습니다. 이러한 필드를 사용하여 단일 데이터 행을 분할할 위치를 결정합니다.

5. 수행 가능한 추가 탐색의 예로, 아래 셀을 실행하여 새 데이터 프레임을 만든 후 표시합니다. 이 데이터 프레임에는 고유 고객 및 제품 ID 쌍의 순서가 지정된 목록이 포함됩니다. 이러한 유형의 함수를 사용하여 대상 필드에서 잘못된 값이나 빈 값을 빠르게 찾을 수 있습니다.

In [None]:
# CustomerId의 내림차순으로 정렬된 고유 CustomerId 및 ProductId 값 목록이 포함된 새 데이터 프레임을 만듭니다.
df_distinct_products = df.select('CustomerId', 'ProductId').distinct().orderBy('CustomerId')

# 결과 데이터 프레임의 첫 100개 행을 표시합니다.
display(df_distinct_products.limit(100))

6. 이번에는 위에서 사용했던 `load()` 메서드를 사용하여 `sale-20170502.csv` 파일을 열어서 탐색해 보겠습니다.

In [None]:
# 다음으로는 첫 번째 파일에 사용했던 것과 같은 `load()` 메서드를 사용하여 2017년 5월 2일 파일을 읽어 보겠습니다.
df = spark.read.load(f'abfss://wwi-02@{adls_account_name}.dfs.core.windows.net/sale-poc/sale-20170502.csv', format='csv')
display(df.limit(10))

7. Spark에서도 T-SQL에서 확인했던 것과 비슷한 오류가 표시됩니다. 즉, 처리된 열 수가 열 수 제한인 20480개를 초과했을 수 있다는 오류가 발생합니다. 이 파일의 데이터를 사용하려면 고급 메서드를 사용해야 합니다. 다음 섹션에서 이 메서드를 사용하는 방법을 살펴봅니다.


## 형식이 잘못된 CSV 파일 처리 및 수정

> 아래 단계에서는 `wwi-02/sale-poc` 폴더의 파일을 살펴볼 때 확인했던 형식이 잘못된 CSV 파일 `sale-20170502.csv`를 수정하는 예제 코드를 제공합니다. 이러한 코드 외에도 여러 가지 방법을 통해 Spark를 사용하여 형식이 잘못된 CSV 파일의 "수정"을 처리할 수 있습니다.

1. 잘못된 파일을 "수정"하려면 프로그래밍 방식을 사용해야 합니다. 구체적으로는 Python을 사용하여 파일 내용을 읽어들인 다음 구문 분석하여 적절한 형태로 설정해야 합니다.

    > 여기서는 한 행의 데이터를 처리해야 하므로 `SparkContext`의 `textFile()` 메서드를 사용하여 파일(행 컬렉션)을 RDD(Resilient Distributed Dataset)로 읽어들이면 됩니다. 그러면 열 수 관련 오류를 해결할 수 있습니다. 열 하나에 저장된 문자열 값 하나만 수정하면 되기 때문입니다.

2. 아래 셀을 실행하여 파일 데이터가 포함된 RDD를 로드합니다.

In [None]:
# NumPy 라이브러리를 가져옵니다. NumPy는 배열 사용 시에 사용되는 python 라이브러리입니다.
import numpy as np

# CSV 파일을 RDD(Resilient Distributed Dataset) 형식의 텍스트 파일로 읽어들입니다. 이 작업에서는 파일의 각 행을 RDD의 행으로 읽어들입니다.
rdd = sc.textFile(f'abfss://wwi-02@{adls_account_name}.dfs.core.windows.net/sale-poc/sale-20170502.csv')

3. 데이터를 RDD에 저장하고 나면 RDD에서 데이터가 입력된 첫 번째 행(RDD의 행은 첫 번째 행 하나뿐임)에 액세스하여 행을 개별 필드로 분할할 수 있습니다. Notepad++에서 파일을 검사한 결과 모든 필드는 쉼표(,)로 구분되어 있음이 확인되었습니다. 그러므로, 먼저 쉼표를 기준으로 행을 분할하여 필드 값 배열을 만들어 보겠습니다. 아래 셀을 실행하여 데이터 배열을 만듭니다.

In [None]:
# 행이 하나뿐이므로 RDD의 첫 번째 행을 가져와 필드 구분 기호(쉼표)로 분할합니다.
data = rdd.first().split(',')

field_count = len(data)
# 필드 읽기 수를 배열로 인쇄합니다.
print(field_count)

4. 필드 구분 기호로 행을 분할하여 파일의 모든 개별 필드 값이 포함된 배열을 만들었습니다. 필드 값의 수는 위에 나와 있습니다.

5. 이제 아래 셀을 실행하여 11개 필드를 행 하나로 구문 분석하면 생성될 것으로 예상되는 행 수를 간단하게 계산합니다.

In [None]:
import math

expected_row_count = math.floor(field_count / 11)
print(f'The expected row count is: {expected_row_count}')

6. 다음으로는 각 "행"과 연관된 데이터를 저장할 배열을 만듭니다.

    > max_index를 각 행에 포함될 것으로 예상되는 열 수로 설정합니다. `wwi-02/sale-poc` 폴더의 다른 파일을 살펴본 결과 포함된 열 수는 11개이므로, max_index 값을 11로 설정합니다.

7. 변수를 설정한 후에는 아래 셀을 사용하여 `data` 배열에서 코드를 반복 실행해 11개 값을 모두 한 행에 할당합니다. 이렇게 하면 이전에는 한 행에 있었던 데이터를 파일의 열과 적절한 데이터가 포함된 해당 행으로 "분할"할 수 있습니다.

8. 아래 셀을 실행하여 파일 데이터의 행 배열을 만듭니다.

In [None]:
# 각 "행"과 연관된 데이터를 저장할 배열을 만듭니다. max_index를 각 행의 열 수로 설정합니다. 5월 1일 파일의 스키마를 살펴볼 때 확인했던 것처럼, 열 수는 11입니다.
row_list = []
max_index = 11

# 이제 파일의 단일 행에서 추출된 값 배열에서 코드를 반복 실행하여 열이 11개인 행을 작성합니다.
while max_index <= len(data):
    row = [data[i] for i in np.arange(max_index-11, max_index)]
    row_list.append(row)

    max_index += 11

print(f'The row array contains {len(row_list)} rows. The expected number of rows was {expected_row_count}.')

9. 파일 데이터를 행 단위로 사용하려면 수행해야 하는 마지막 작업은 Spark 데이터 프레임으로 데이터를 읽어들이는 것입니다. 아래 셀에서는 `createDataFrame()` 메서드를 사용하여 `row_list` 배열을 데이터 프레임으로 변환합니다. 이 과정에서 열 이름도 추가됩니다. 열 이름은 `wwi-02/sale-poc` 디렉터리의 서식이 올바른 파일에서 확인했던 스키마를 기준으로 추가됩니다.

10. 아래 셀을 실행하여 파일의 행 데이터가 포함된 데이터 프레임을 만든 다음 처음 10개 행을 표시합니다.

In [None]:
# 마지막으로, 앞에서 작성한 row_list를 사용하여 데이터 프레임을 만듭니다. 이 데이터 프레임에는 스키마 매개 변수를 추가할 수 있습니다. 이 매개 변수에는 첫 번째 파일의 스키마에서 확인했던 열 이름이 포함됩니다.
df_fixed = spark.createDataFrame(row_list,schema=['TransactionId', 'CustomerId', 'ProductId', 'Quantity', 'Price', 'TotalAmount', 'TransactionDateId', 'ProfitAmount', 'Hour', 'Minute', 'StoreId'])
display(df_fixed.limit(10))

## 데이터 레이크에 "수정된" 파일 쓰기

1. 탐색 및 파일 수정 프로세스의 마지막 단계에서는 데이터를 데이터 레이크에 다시 씁니다. 그러면 `wwi-02/sale-poc` 폴더의 다른 파일에 사용하는 것과 동일한 프로세스에 따라 해당 데이터를 수집할 수 있습니다.

2. 아래 셀을 실행하여 데이터 레이크 내 `sale-20170502-fixed` 폴더에 있는 일련의 파일에 데이터 프레임을 저장합니다.

    > 참고: Spark는 여러 작업자 노드에서 워크로드를 병렬화합니다. 따라서 파일을 저장하면 단일 파일이 아닌 컬렉션의 "부분" 파일로 저장됩니다. 단일 파일을 만드는 데 사용할 수 있는 라이브러리도 있지만, Spark Notebooks(파일이 기본적으로 작성되는 위치)를 통해 이러한 방식으로 생성된 파일을 사용하는 방법을 파악해 두면 유용합니다.


In [None]:
df.write.format('csv').option('header',True).mode('overwrite').option('sep',',').save(f'abfss://wwi-02@{adls_account_name}.dfs.core.windows.net/sale-poc/sale-20170502-fixed')

## 데이터 레이크에서 수정된 파일 검사

1. 수정된 파일을 데이터 레이크에 쓰고 나면 해당 파일을 빠르게 검사하여 이제 서식이 올바른지를 확인할 수 있습니다. 위의 `wwi-02` 탭을 선택하고 `sale-20170502-fixed` 폴더를 두 번 클릭합니다.

    ![](https://solliancepublicdata.blob.core.windows.net/images/synapse/wwi-02-sale-poc-sale-20170502-fixed.png)

2. `sale-20170502-fixed` 폴더에서 이름이 `part`로 시작하고 확장명이 `.csv`인 첫 번째 파일을 마우스 오른쪽 단추로 클릭하고 상황에 맞는 메뉴에서 **미리 보기**를 선택합니다.

    ![](https://solliancepublicdata.blob.core.windows.net/images/synapse/wwi-02-sale-poc-sale-20170502-fixed-content.png)

3. **미리 보기** 대화 상자에서 적절한 열이 표시되며 각 필드의 값이 올바른지를 확인합니다.

    ![](https://solliancepublicdata.blob.core.windows.net/images/synapse/sale-20170502-fixed-preview.png)

## 요약

이 연습에서는 Spark Notebook을 사용하여 데이터 레이크의 파일 내에 저장된 데이터를 살펴보았습니다. 그리고 Python 코드를 사용하여 서식이 잘못된 CSV 파일의 데이터를 추출하고, 해당 파일의 데이터를 적절한 행으로 어셈블한 다음 "수정된" 파일을 데이터 레이크에 다시 썼습니다.

이제 랩 가이드로 돌아가서 랩 2의 다음 섹션을 계속 진행할 수 있습니다.
