## Data Transform with Glue

이번 실습에서는 AWS Glue를 활용하여 데이터를 로딩, 변환, 저장하는 기본적인 데이터 처리와 관련된 코드를 확인하고 직접 수행해 봅니다.
이 노트북의 실행을 위해서는 Glue DevEndpoint 클러스터를 생성하고, SageMaker notebook 인스턴스를 연동한 환경 설정이 완료되어야 합니다. 

**[사전에 실습을 위한 데이터가 준비되지 않은 경우]**

AWS Glue에서 s3://analytics-data-seung/e-commerce 경로의 S3 데이터를 크롤링해서 analytics-source 라는 이름으로 데이터베이스를 생성하면 코드 실행이 가능합니다. 

### Jupyter Notebook 기본 사용법
* 명령줄 실행 : Ctrl + Enter
* 명령줄 추가 : 편집창이 아닌 부분 선택하고 A(위에 추가) or B(아래 추가)
* 정상적인 실행이 되지 않는 경우 Kernel 메뉴 - Restart Kernel 선택

### Glue API를 활용하여 데이터 로딩하기
아래 코드는 Glue에서 Job 생성시 기본 코드 템플릿에 포함된 부분입니다. 
노트북에서 Step by Step으로 진행하기 위해서 Job / Bookmark 관련된 일부 코드는 주석처리 하였습니다. 

기본적으로 데이터를 로딩하는 부분에서는 Glue의 API를 주로 사용하도록 합니다. Glue DynamicFrame에서는 대량의 파일을 로딩 / 적재 하는데 최적화된 API를 제공합니다.

데이터 로딩 이후 데이터의 변환에는 Spark DataFrame를 기본적으로 사용합니다.

먼저 AWS Glue의 주요 라이브러리를 로딩하고 Spark 작업을 실행하기 위한 GlueContext를 생성합니다.

In [1]:
#import sys
#from awsglue.transforms import *
#from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
#from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
31,application_1558507935305_0032,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


In [2]:
spark.conf.get("spark.executor.memory")
#spark.conf.set("spakr.executor.memory" "8g")

u'5120M'

**'spark'**라는 이름으로 스파크 세션을 얻어왔습니다. 

Glue Data Catalog에 있는 정보를 기반으로 테이블 데이터를 로딩합니다. 다음 코드가 동작하기 위해서는 앞단계에서 Glue Crawler를 통해 Database와 테이블을 생성해주어야 합니다. 

**[우리가 사용할 주요 데이터 소스의 위치를 지정해 줍니다. ]**

* sourceDatabase = "analytics-source"
* targetS3Bucket = "s3://analytics-data-[자신의 ID]" 
* targetDatabase = "analytics-e-commerce"

In [3]:
sourceDatabase = "analytics-source"
targetS3Bucket = "s3://analytics-data-seung" 
targetDatabase = "analytics-e-commerce"

여러분이 앞쪽에서 Glue 크롤러를 통해 Data Catalog를 생성하였으면 아래에서 데이터베이스 목록과 테이블 목록을 확인할 수 있습니다. 

In [4]:
spark.sql("show databases").show()

+--------------------+
|        databaseName|
+--------------------+
|analytics-e-commerce|
|    analytics-source|
|      datalab-aurora|
|             default|
|         legislators|
|          quicksight|
|     test-analytics2|
+--------------------+

In [5]:
#spark.sql("show tables from legislators").show()
#sqlContext.tables("database_name").filter("tableName like '%abc%'")
#sqlContext.tables("analytics-source").show()

+----------------+-------------+-----------+
|        database|    tableName|isTemporary|
+----------------+-------------+-----------+
|analytics-source|         item|      false|
|analytics-source|item_category|      false|
|analytics-source|       member|      false|
|analytics-source|        order|      false|
|analytics-source|   order_item|      false|
+----------------+-------------+-----------+

**order** 테이블은 우리가 앞에서 생성한 analytics-source 데이터베이스에 포함된 테이블 입니다. 
이후 데이터 변환 작업의 편의성을 위해서 Glue의 DynamicFrame을 DataFrame 형식으로 변경합니다. 

In [6]:
order = glueContext.create_dynamic_frame.from_catalog(database=sourceDatabase, table_name="order").toDF()
#print "Count: ", order.count()
order.printSchema()
order.show(5)

Count:  6770255
root
 |-- member_id: string (nullable = true)
 |-- order_date: string (nullable = true)
 |-- order_time: string (nullable = true)
 |-- shipping_date: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- country: string (nullable = true)
 |-- total_price: long (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- postal_code: long (nullable = true)
 |-- region: string (nullable = true)
 |-- order_id: string (nullable = true)

+---------+----------+-------------+-------------+------------+-------------+-----------+-------------+------------+-----------+------+--------------+
|member_id|order_date|   order_time|shipping_date|order_status|      country|total_price|         city|       state|postal_code|region|      order_id|
+---------+----------+-------------+-------------+------------+-------------+-----------+-------------+------------+-----------+------+--------------+
| ND-18370|2019-02-04|  2/4/19 7:11|   

이후 분석 단계에서 불필요한 컬럼을 미리 식별하여 데이터를 정리하는 작업을 진행합니다. 
몇몇 지역을 나타내는 컬럼의 데이터 분포를 살펴보고, 꼭 필요한 City 컬럼만 남기고 삭제하는 작업을 진행합니다. 

In [7]:
order.groupBy("country").count().sort("count", ascending=False).show(10)
order.groupBy("region").count().sort("count", ascending=False).show(10)
order.groupBy("state").count().sort("count", ascending=False).show(10)
order.groupBy("city").count().sort("count", ascending=False).show(10)
order.groupBy("postal_code").count().sort("count", ascending=False).show(10)

+-------------+-------+
|      country|  count|
+-------------+-------+
|United States|6770255|
+-------------+-------+

+-------+-------+
| region|  count|
+-------+-------+
|  South|2000781|
|   East|1691507|
|Central|1679411|
|   West|1398556|
+-------+-------+

+----------+------+
|     state| count|
+----------+------+
|   Florida|846319|
|  New York|818865|
|      Ohio|590644|
|  Virginia|564405|
|     Texas|537837|
|California|537710|
|  Illinois|537641|
|    Kansas|309009|
|  Kentucky|308947|
|  Colorado|295651|
+----------+------+
only showing top 10 rows

+-------------+------+
|         city| count|
+-------------+------+
|     Elmhurst|537479|
|San Francisco|537249|
|New York City|536476|
|  Garden City|309004|
|    Henderson|308921|
|       Denver|295609|
|       Toledo|295345|
|     Columbus|295223|
|   Chesapeake|295053|
|      Detroit|294700|
+-------------+------+
only showing top 10 rows

+-----------+------+
|postal_code| count|
+-----------+------+
|      60126|5374

지역을 나타내는 컬럼인 country, region, state 컬럼 보다 주로 city 관련 컬럼의 활용도가 높다고 판단되고, postal_code는 city와 중복되므로, country, region, state, postal_code 컬럼은 삭제합니다. 
그리고 이후 member 테이블의 city 컬럼과 구분하기 위해서 컬럼 명칭을 order_city로 변경합니다. 

In [8]:
order.printSchema()
order = order.drop("country", "region", "state", "postal_code")
order = order.withColumnRenamed("city", "order_city")
order.printSchema()

root
 |-- member_id: string (nullable = true)
 |-- order_date: string (nullable = true)
 |-- order_time: string (nullable = true)
 |-- shipping_date: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- country: string (nullable = true)
 |-- total_price: long (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- postal_code: long (nullable = true)
 |-- region: string (nullable = true)
 |-- order_id: string (nullable = true)

root
 |-- member_id: string (nullable = true)
 |-- order_date: string (nullable = true)
 |-- order_time: string (nullable = true)
 |-- shipping_date: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- total_price: long (nullable = true)
 |-- order_city: string (nullable = true)
 |-- order_id: string (nullable = true)

회원별 주문 현황을 살펴보기 위해서 member 테이블을 추가로 로딩하여 order 테이블과 조인하여 member_order 테이블을 생성합니다.

In [9]:
member = glueContext.create_dynamic_frame.from_catalog(database=sourceDatabase, table_name="member").toDF()
#member.select("member_id").distinct().show() - 특정 컬럼으로 distinct
print "Count(distinct): ", member.distinct().count()
member.printSchema()
member.show(5)

Count(distinct):  793
root
 |-- member_id: string (nullable = true)
 |-- country: string (nullable = true)
 |-- login_id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- postal_code: long (nullable = true)
 |-- region: string (nullable = true)
 |-- last_login_ymdt: long (nullable = true)
 |-- membership_level: string (nullable = true)
 |-- login_password: string (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- reg_ymdt: long (nullable = true)

+---------+-------------+--------+------+-----------+----------+-----------+-------+---------------+----------------+--------------------+-------------+---+--------------+
|member_id|      country|login_id|gender|       city|     state|postal_code| region|last_login_ymdt|membership_level|      login_password|         name|age|      reg_ymdt|
+---------+-------------+--------+------+-----------+----------+--------

State와 City 단위로 회원 현황을 간단히 살펴보자

In [10]:
member.groupBy("state", "city").count().sort("count", ascending=False).show(10)

+------------+-------------+-----+
|       state|         city|count|
+------------+-------------+-----+
|  California|San Francisco|   64|
|    New York|New York City|   61|
|  California|  Los Angeles|   59|
|  Washington|      Seattle|   51|
|       Texas|      Houston|   31|
|    Illinois|      Chicago|   27|
|Pennsylvania| Philadelphia|   21|
|       Texas|       Dallas|   15|
|      Oregon|     Portland|   14|
|        Ohio|     Columbus|   11|
+------------+-------------+-----+
only showing top 10 rows

주문 내역을 회원 정보 기반으로 분석하기 위해서, order 테이블과 member 테이블을 조인하여 member_order 테이블을 만듭니다. 

In [11]:
member_order = member.join(order, "member_id")
member_order.printSchema()

root
 |-- member_id: string (nullable = true)
 |-- country: string (nullable = true)
 |-- login_id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- postal_code: long (nullable = true)
 |-- region: string (nullable = true)
 |-- last_login_ymdt: long (nullable = true)
 |-- membership_level: string (nullable = true)
 |-- login_password: string (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- reg_ymdt: long (nullable = true)
 |-- order_date: string (nullable = true)
 |-- order_time: string (nullable = true)
 |-- shipping_date: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- total_price: long (nullable = true)
 |-- order_city: string (nullable = true)
 |-- order_id: string (nullable = true)

회원별 구매 횟수 / 구매 금액 / 나이별, 성별별 구매 금액 등을 계산해서 확인해봅니다. 
먼저 구매 횟수가 가장 많은 회원을 살펴봅니다.

In [12]:
member_order_total = member_order.groupBy("member_id").count().sort("count", ascending=False)
member_order_total.show(10)

+---------+------+
|member_id| count|
+---------+------+
| VF-21715|537481|
| AA-10315|537097|
| BT-11395|309007|
| SB-20290|308915|
| KT-16480|295607|
| JS-15880|295340|
| RC-19960|295162|
| CK-12325|295050|
| HW-14935|294677|
| JH-15910|294300|
+---------+------+
only showing top 10 rows

고객의 나이에 따른 구매 패턴을 살펴보기 위해서 20대 고객의 주문 현황을 살펴봅니다. 특히 눈에 띄는 연령이 있는지 살펴봅니다. 

In [13]:
#member_order.filter(member_order.age.between(20, 29)).groupBy("age").sum("total_price").sort("sum(total_price)", ascending=False).show()
member_order.filter(member_order.age.between(20, 29)).groupBy("age").sum("total_price").sort("age").show()

+---+----------------+
|age|sum(total_price)|
+---+----------------+
| 20|        70795614|
| 21|        70775181|
| 22|           35759|
| 23|           22598|
| 24|        70578067|
| 25|           14573|
| 26|           23684|
| 27|           25835|
| 28|           16736|
| 29|           20889|
+---+----------------+

고객의 나이뿐 아니라 성별에 따른 특이점을 찾기 위해서 성별을 분류 항목에 추가하였습니다. 

In [14]:
member_order.groupBy("age").pivot("gender").sum("total_price").sort("age").show(20)

+---+--------+--------+
|age|       F|       M|
+---+--------+--------+
| 20|70792490|    3124|
| 21|   22134|70753047|
| 22|   16818|   18941|
| 23|   12389|   10209|
| 24|70563836|   14231|
| 25|    8528|    6045|
| 26|   17819|    5865|
| 27|   15316|   10519|
| 28|    6457|   10279|
| 29|   17144|    3745|
| 30|   12332|   11051|
| 31|   13240|71059245|
| 32|77491817|   10367|
| 33|   29039|   19515|
| 34|77250659|81536777|
| 35|    8846|    5238|
| 36|    5708|    7523|
| 37|    9632|   12686|
| 38|    7960|   32160|
| 39|   10081|73767298|
+---+--------+--------+
only showing top 20 rows

이후 분석에서 주문별 상세 주문 내역을 활용하기 위해서 필요한 모든 테이블을 로딩하여, order 테이블과 조인하여 order_detali 테이블을 만듭니다. 

In [15]:
order_item = glueContext.create_dynamic_frame.from_catalog(database=sourceDatabase, table_name="order_item").toDF()
print "Count: ", order_item.count()
order_item.printSchema()

Count:  22573388
root
 |-- item_count: long (nullable = true)
 |-- item_id: string (nullable = true)
 |-- item_price: long (nullable = true)
 |-- order_id: string (nullable = true)

In [16]:
item = glueContext.create_dynamic_frame.from_catalog(database=sourceDatabase, table_name="item").toDF()
item_category = glueContext.create_dynamic_frame.from_catalog(database=sourceDatabase, table_name="item_category").toDF()
print "Count: ", item.count()
item.printSchema()
item = item.withColumnRenamed("name", "item_name")
item.printSchema()

Count:  3660
root
 |-- item_id: string (nullable = true)
 |-- price: long (nullable = true)
 |-- name: string (nullable = true)
 |-- description: string (nullable = true)
 |-- reg_ymdt: long (nullable = true)
 |-- category_id: long (nullable = true)

root
 |-- item_id: string (nullable = true)
 |-- price: long (nullable = true)
 |-- item_name: string (nullable = true)
 |-- description: string (nullable = true)
 |-- reg_ymdt: long (nullable = true)
 |-- category_id: long (nullable = true)

In [17]:
#order_detail = order.join(order_item, "order_id")
order_detail = member.join(order.join(order_item.join(item.join(item_category, "category_id"), "item_id"), "order_id"), "member_id")
print "Count: ", order_detail.count()
order_detail.printSchema()



Count:  27088076
root
 |-- member_id: string (nullable = true)
 |-- country: string (nullable = true)
 |-- login_id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- postal_code: long (nullable = true)
 |-- region: string (nullable = true)
 |-- last_login_ymdt: long (nullable = true)
 |-- membership_level: string (nullable = true)
 |-- login_password: string (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- reg_ymdt: long (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_date: string (nullable = true)
 |-- order_time: string (nullable = true)
 |-- shipping_date: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- total_price: long (nullable = true)
 |-- order_city: string (nullable = true)
 |-- item_id: string (nullable = true)
 |-- item_count: long (nullable = true)
 |-- item_price: long (nullable = true)
 |-- ca

이후 분석 단계에서 불필요한 컬럼은 미리 삭제하여, 불필요한 저장 공간이나 프로세싱을 방지합니다. 
회원 정보중에 일부 보안상의 이슈가 될 수 있는 항목은 삭제합니다. 

In [18]:
order_detail = order_detail.drop("login_id", "membership_level","postal_code", "region", "name", "last_login_ymdt", "login_password", "reg_ymdt", "country")
order_detail = order_detail.drop("order_id", "order_time", "shipping_date", "order_city", "price", "description", "parent_category_name", "item_id", "category_id")
order_detail.printSchema()


root
 |-- member_id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- age: long (nullable = true)
 |-- order_date: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- total_price: long (nullable = true)
 |-- item_count: long (nullable = true)
 |-- item_price: long (nullable = true)
 |-- item_name: string (nullable = true)
 |-- category_name: string (nullable = true)

### Glue DynamicFrame을 활용하여 S3에 데이터 저장하기
앞쪽에서 Dataframe을 통해서 데이터 정리 작업과 Join을 완료한 데이터 파일을 별도의 S3 버킷에 저장합니다.
(DataFrame으로 변경한 데이터는 DynamicFrame로 변경하는 작업이 추가됩니다.)
저장된 데이터를 기반으로 ad-hoc 쿼리와 분석을 수행하도록 합니다. 

In [19]:
from awsglue.dynamicframe import DynamicFrame

앞에서 지정한 S3 버킷에 변환작업을 마친 데이터를 저장합니다. 저장 파일 format과 파티션 키를 정의 할 수 있습니다. 
* member_order
* oredre_detail

In [20]:
order_detail_dyf = DynamicFrame.fromDF(order_detail, glueContext, 'order_detail_dyf')
member_order_total = DynamicFrame.fromDF(member_order, glueContext, 'member_order_total')


In [21]:
# DynamicFrame으로 변경한 데이터를 S3에 저장, 데이터 쿼리 유형에 맞춰 Partition Key를 추가 가능 
datasink0 = glueContext.write_dynamic_frame.from_options(frame = order_detail_dyf, connection_type = "s3", connection_options = {"path": targetS3Bucket + '/e-commerce-analytics/order_detail'}, format = "parquet", transformation_ctx = "datasink0")
#datasink0 = glueContext.write_dynamic_frame.from_options(frame = order_detail_dyf, connection_type = "s3", connection_options = {"path": targetS3Bucket + '/e-commerce-analytics/order_detail', "partitionKeys": "member_id"}, format = "parquet", transformation_ctx = "datasink0")
datasink1 = glueContext.write_dynamic_frame.from_options(frame = member_order_total, connection_type = "s3", connection_options = {"path": targetS3Bucket + '/e-commerce-analytics/member_order'}, format = "parquet", transformation_ctx = "datasink1")


In [22]:
# dataframe을 직접 S3에 write 각각 Parquet 형식과 CSV 형식으로 
#order_detail.repartition(1).write.parquet("s3://analytics-data-seung/e-commerce-analytics/order_detail_parquet/", mode="overwrite")
#order_detail.repartition(1).write.csv("s3://analytics-data-seung/e-commerce-analytics/order_detail_csv/", mode="append")


추가적으로 필요한 데이터를 모두 DynamicFrame으로 변환하여, Parquet 파일 형식으로 저장합니다. 
* order
* order_item
* member
* item
* item_category

앞에서 로딩한여 DataFrame으로 변경하여 사용했던 테이블은 다시 DynamicFrame으로 변경하여 저장합니다.

In [23]:
order_dyf = DynamicFrame.fromDF(order, glueContext, 'order_dyf')
order_item_dyf = DynamicFrame.fromDF(order_item, glueContext, 'order_item_dyf')
member_dyf = DynamicFrame.fromDF(member, glueContext, 'member_dyf')
item_dyf = DynamicFrame.fromDF(item, glueContext, 'item_dyf')
item_category_dyf = DynamicFrame.fromDF(item_category, glueContext, 'item_category_dyf')

In [24]:
datasink2 = glueContext.write_dynamic_frame.from_options(frame = order_dyf, connection_type = "s3", connection_options = {"path": targetS3Bucket + '/e-commerce-analytics/order'}, format = "parquet", transformation_ctx = "datasink2")
datasink3 = glueContext.write_dynamic_frame.from_options(frame = order_item_dyf, connection_type = "s3", connection_options = {"path": targetS3Bucket + '/e-commerce-analytics/order_item'}, format = "parquet", transformation_ctx = "datasink3")
datasink4 = glueContext.write_dynamic_frame.from_options(frame = member_dyf, connection_type = "s3", connection_options = {"path": targetS3Bucket + '/e-commerce-analytics/member'}, format = "parquet", transformation_ctx = "datasink4")
datasink5 = glueContext.write_dynamic_frame.from_options(frame = item_dyf, connection_type = "s3", connection_options = {"path": targetS3Bucket + '/e-commerce-analytics/item'}, format = "parquet", transformation_ctx = "datasink5")
datasink6 = glueContext.write_dynamic_frame.from_options(frame = item_category_dyf, connection_type = "s3", connection_options = {"path": targetS3Bucket + '/e-commerce-analytics/item_category'}, format = "parquet", transformation_ctx = "datasink6")

여기까지 완료하면 우리가 분석에 사용할 e-commerce 관련된 OLTP 성격의 데이터를 모두 S3에 저장하였고, 또한 데이터 탐색과 변환을 통해 만들어진 몇몇 테이블도 S3에 저장하였습니다.
이후 다시 Lab을 따라서 다음 단계를 진행합니다. 