# 환경 설정

AWS Glue 도커 이미지를 다운로드 한다.<br/>

`
$ docker --version
Docker version 19.03.13, build 4484c46d9d
$ docker pull amazon/aws-glue-libs:glue_libs_1.0.0_image_01
`

Jupyter notebook을 실행하기 위해서, 다음과 같이 도커를 실행한다.


`
$ docker run -itd -p 8888:8888 \
  -p 4040:4040 \
  -v ~/.aws:/root/.aws:ro \
  --name glue_jupyter \
  amazon/aws-glue-libs:glue_libs_1.0.0_image_01 \
  /home/jupyter/jupyter_start.sh
`

`localhost:8888` 주소로 브라우저에서 접속하면, Jupyter Notebook이 정상적으로 실행되는지 확인한다.

### Jupyter Notebook Autocomplete 기능 설정

Jupyter Notebook에서 Autocomplete 기능을 사용하기 위해서 __[jupyter-tabnine](https://github.com/wenmin-wu/jupyter-tabnine)__를 설치한다.

<ol>
<li> <b>glue_jupyter</b> 도커 컨테이너에 접속한다.
    
`
$ docker exec -it glue_jupyter bash
root@9b55fff7063f:/#
root@9b55fff7063f:/# python3 --version
Python 3.6.10
`

<li> 도커 컨테이너에서 다음 명령어를 실행해서 <b>jupyter-tabnine</b>을 설치한다.


> pip3 install jupyter-tabnine <br/>
> jupyter nbextension install --py jupyter_tabnine  <br/>
> jupyter nbextension enable --py jupyter_tabnine  <br/>
> jupyter serverextension enable --py jupyter_tabnine  <br/>


예를 들어, 아래와 같이 위의 명령어를 순서대로 실행한다.

`
root@9b55fff7063f:/# pip3 install jupyter-tabnine
root@9b55fff7063f:/# jupyter nbextension install --py jupyter_tabnine
root@9b55fff7063f:/# jupyter nbextension enable --py jupyter_tabnine
root@9b55fff7063f:/# jupyter serverextension enable --py jupyter_tabnine
`

</ol>

# 실습 하기

이 노트북을 도커 컨터네이너로 복사한다.

`
$ docker cp AWS-Glue-ETL-PySpark-Cheatsheet.ipynb glue_jupyter:/home/jupyter/jupyter_default_dir/
$ docker exec glue_jupyter ls /home/jupyter/jupyter_default_dir/
AWS-Glue-ETL-PySpark-Cheatsheet.ipynb
`


In [1]:
import warnings

# 경고 메시지를 무시하고 숨기기
warnings.filterwarnings(action='ignore')

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,,pyspark,idle,,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Importing GlueContext

In [2]:
import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import *
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.types import *
from pyspark.sql import Row

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
glueContext = GlueContext(SparkContext.getOrCreate())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Dataset 1

In [4]:
order_list = [
               ['1005', '623', 'YES', '1418901234', '75091'],
               ['1006', '547', 'NO',  '1418901256', '75034'],
               ['1007', '823', 'YES', '1418901300', '75023'],
               ['1008', '912', 'NO',  '1418901400', '82091'],
               ['1009', '321', 'YES', '1418902000', '90093']
             ]

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
# Define schema for the order_list
order_schema = StructType([  
                      StructField("order_id", StringType()),
                      StructField("customer_id", StringType()),
                      StructField("essential_item", StringType()),
                      StructField("timestamp", StringType()),
                      StructField("zipcode", StringType())
                    ])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
# Create a Spark Dataframe from the python list and the schema
df_orders = spark.createDataFrame(order_list, schema=order_schema)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
df_orders.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+-----------+--------------+----------+-------+
|order_id|customer_id|essential_item| timestamp|zipcode|
+--------+-----------+--------------+----------+-------+
|    1005|        623|           YES|1418901234|  75091|
|    1006|        547|            NO|1418901256|  75034|
|    1007|        823|           YES|1418901300|  75023|
|    1008|        912|            NO|1418901400|  82091|
|    1009|        321|           YES|1418902000|  90093|
+--------+-----------+--------------+----------+-------+

In [8]:
df_orders.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- essential_item: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- zipcode: string (nullable = true)

### DynamicFrame

Spark DataFrame을 Glue DynamicFrame으로 변환한다.

In [9]:
dyf_orders = DynamicFrame.fromDF(df_orders, glueContext, "dyf")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
dyf_orders.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
|-- order_id: string
|-- customer_id: string
|-- essential_item: string
|-- timestamp: string
|-- zipcode: string

### AWS Glue transform functions.

### ApplyMapping


DynamicFrame에 mappings을 적용한다.

In [11]:
dyf_applyMapping = ApplyMapping.apply(frame=dyf_orders, mappings=[
    ("order_id", "String", "order_id", "Long"),
    ("customer_id", "String", "customer_id", "String"),
    ("essential_item", "String", "essential_item", "String"),
    ("timestamp", "String", "timestamp", "Long"),
    ("zipcode", "String", "zip", "Long")
])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

ℹ️ <b>Glue transform functions의 arguments 확인 방법</b>
    
describeArgs()를 이용해서 GlueTransform 클래스의 named arguments에 대한 정보를 얻을 수 있다.

In [12]:
ApplyMapping.describeArgs()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[{'name': 'frame', 'type': 'DynamicFrame', 'description': 'DynamicFrame to transform', 'optional': False, 'defaultValue': None}, {'name': 'mappings', 'type': 'DynamicFrame', 'description': 'List of mapping tuples (source col, source type, target col, target type)', 'optional': False, 'defaultValue': None}, {'name': 'case_sensitive', 'type': 'Boolean', 'description': 'Whether ', 'optional': True, 'defaultValue': 'False'}, {'name': 'transformation_ctx', 'type': 'String', 'description': 'A unique string that is used to identify stats / state information', 'optional': True, 'defaultValue': ''}, {'name': 'info', 'type': 'String', 'description': 'Any string to be associated with errors in the transformation', 'optional': True, 'defaultValue': '""'}, {'name': 'stageThreshold', 'type': 'Integer', 'description': 'Max number of errors in the transformation until processing will error out', 'optional': True, 'defaultValue': '0'}, {'name': 'totalThreshold', 'type': 'Integer', 'description': 'Max n

In [13]:
import pprint

pprint.pprint(ApplyMapping.describeArgs())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[{'defaultValue': None,
  'description': 'DynamicFrame to transform',
  'name': 'frame',
  'optional': False,
  'type': 'DynamicFrame'},
 {'defaultValue': None,
  'description': 'List of mapping tuples (source col, source type, target col, '
                 'target type)',
  'name': 'mappings',
  'optional': False,
  'type': 'DynamicFrame'},
 {'defaultValue': 'False',
  'description': 'Whether ',
  'name': 'case_sensitive',
  'optional': True,
  'type': 'Boolean'},
 {'defaultValue': '',
  'description': 'A unique string that is used to identify stats / state '
                 'information',
  'name': 'transformation_ctx',
  'optional': True,
  'type': 'String'},
 {'defaultValue': '""',
  'description': 'Any string to be associated with errors in the '
                 'transformation',
  'name': 'info',
  'optional': True,
  'type': 'String'},
 {'defaultValue': '0',
  'description': 'Max number of errors in the transformation until processing '
                 'will error out',
  'n

In [14]:
dyf_applyMapping.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
|-- order_id: long
|-- customer_id: string
|-- essential_item: string
|-- timestamp: long
|-- zip: long

### Filter

지정된 조건자 함수를 만족하는 입력 DynamicFrame에서 record를 선택하여 새로운 DynamicFrame을 만든다.

이 예제에서는 essential items 찾는다.

In [15]:
dyf_filter = Filter.apply(frame=dyf_applyMapping, f=lambda x: x["essential_item"] == 'YES')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
dyf_filter.toDF().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+-----------+-----+----------+--------+
|essential_item|customer_id|  zip| timestamp|order_id|
+--------------+-----------+-----+----------+--------+
|           YES|        623|75091|1418901234|    1005|
|           YES|        823|75023|1418901300|    1007|
|           YES|        321|90093|1418902000|    1009|
+--------------+-----------+-----+----------+--------+

### Map

입력 DynamicFrame의 모든 record에 함수를 적용하여 새로운 DynamicFrame을 만든다.

In [17]:
# This function takes in a dynamic frame record and checks if zipcode # 75034 is present in it. If present, it adds another column 
# “next_day_air” with value as True
def next_day_air(rec):
    if rec["zip"] == 75034:
        rec["next_day_air"] = True
    return rec

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
mapped_dyF = Map.apply(frame=dyf_applyMapping, f=next_day_air)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
mapped_dyF.toDF().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+-----------+-----+----------+--------+------------+
|essential_item|customer_id|  zip| timestamp|order_id|next_day_air|
+--------------+-----------+-----+----------+--------+------------+
|           YES|        623|75091|1418901234|    1005|        null|
|            NO|        547|75034|1418901256|    1006|        true|
|           YES|        823|75023|1418901300|    1007|        null|
|            NO|        912|82091|1418901400|    1008|        null|
|           YES|        321|90093|1418902000|    1009|        null|
+--------------+-----------+-----+----------+--------+------------+

### Dataset 2

In [20]:
jsonStr1 = u'{ "zip": 75091, "customers": [{ "id": 623, "address": "108 Park Street, TX"}, { "id": 231, "address": "763 Marsh Ln, TX" }]}'
jsonStr2 = u'{ "zip": 82091, "customers": [{ "id": 201, "address": "771 Peek Pkwy, GA" }]}'
jsonStr3 = u'{ "zip": 75023, "customers": [{ "id": 343, "address": "66 P Street, NY" }]}'
jsonStr4 = u'{ "zip": 90093, "customers": [{ "id": 932, "address": "708 Fed Ln, CA"}, { "id": 102, "address": "807 Deccan Dr, CA" }]}'

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
df_row = spark.createDataFrame([
    Row(json=jsonStr1),
    Row(json=jsonStr2),
    Row(json=jsonStr4)
])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
df_json = spark.read.json(df_row.rdd.map(lambda r: r.json))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
df_json.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-----+
|           customers|  zip|
+--------------------+-----+
|[[108 Park Street...|75091|
|[[771 Peek Pkwy, ...|82091|
|[[708 Fed Ln, CA,...|90093|
+--------------------+-----+

In [24]:
df_json.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- customers: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- address: string (nullable = true)
 |    |    |-- id: long (nullable = true)
 |-- zip: long (nullable = true)

Spark DataFrame을 Glue DynamicFrame으로 변환한다.

In [25]:
dyf_json = DynamicFrame.fromDF(df_json, glueContext, "dyf_json")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
dyf_json.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
|-- customers: array
|    |-- element: struct
|    |    |-- address: string
|    |    |-- id: long
|-- zip: long

### SelectFields

DynamicFrame의 필드를 얻는다.

In [27]:
dyf_selectFields = SelectFields.apply(frame=dyf_filter, paths=['zip'])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
dyf_selectFields.toDF().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+
|  zip|
+-----+
|75091|
|75023|
|90093|
+-----+

### Join

In [29]:
dyf_join = Join.apply(dyf_json, dyf_selectFields, 'zip', 'zip')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

ℹ️ 컬럼 이름이 중복되는 경우, AWS Glue는 중복 컬럼 중 하나에 period(.)를 붙인다.

In [30]:
dyf_join.toDF().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-----+-----+
|           customers| .zip|  zip|
+--------------------+-----+-----+
|[[708 Fed Ln, CA,...|90093|90093|
|[[108 Park Street...|75091|75091|
+--------------------+-----+-----+

### Dropfields

DynamicFrame 내에서 특정 필드를 제거한다.

In [31]:
dyf_dropfields = DropFields.apply(frame=dyf_join, paths="`.zip`")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [32]:
dyf_dropfields.toDF().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-----+
|           customers|  zip|
+--------------------+-----+
|[[708 Fed Ln, CA,...|90093|
|[[108 Park Street...|75091|
+--------------------+-----+

### Relationalize

중첩된 구조를 변경해서 여러 개의 DynamicFrame을 생성한다.

In [33]:
dyf_relationalize = dyf_dropfields.relationalize("root", "/home/glue/GlueLocalOutput")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [34]:
import pprint

pprint.pprint(Relationalize.describeArgs())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[{'defaultValue': None,
  'description': 'The DynamicFrame to relationalize',
  'name': 'frame',
  'optional': False,
  'type': 'DynamicFrame'},
 {'defaultValue': None,
  'description': 'path to store partitions of pivoted tables in csv format',
  'name': 'staging_path',
  'optional': True,
  'type': 'String'},
 {'defaultValue': 'roottable',
  'description': 'Name of the root table',
  'name': 'name',
  'optional': True,
  'type': 'String'},
 {'defaultValue': '{}',
  'description': 'dict of optional parameters for relationalize',
  'name': 'options',
  'optional': True,
  'type': 'Dictionary'},
 {'defaultValue': '',
  'description': 'A unique string that is used to identify stats / state '
                 'information',
  'name': 'transformation_ctx',
  'optional': True,
  'type': 'String'},
 {'defaultValue': '""',
  'description': 'Any string to be associated with errors in the '
                 'transformation',
  'name': 'info',
  'optional': True,
  'type': 'String'},
 {'defaultV

In [35]:
dyf_relationalize.keys()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

dict_keys(['root', 'root_customers'])

### SelectFromCollection

DynamicFrameCollection에서 DynamicFrame 하나를 선택한다.

In [36]:
dyf_selectFromCollection = SelectFromCollection.apply(dyf_relationalize, "root")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [37]:
dyf_selectFromCollection.toDF().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+-----+
|customers|  zip|
+---------+-----+
|        1|90093|
|        2|75091|
+---------+-----+

In [38]:
dyf_selectFromCollection = SelectFromCollection.apply(dyf_relationalize, "root_customers")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [39]:
dyf_selectFromCollection.toDF().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+-----+---------------------+----------------+
| id|index|customers.val.address|customers.val.id|
+---+-----+---------------------+----------------+
|  1|    0|       708 Fed Ln, CA|             932|
|  1|    1|    807 Deccan Dr, CA|             102|
|  2|    0|  108 Park Street, TX|             623|
|  2|    1|     763 Marsh Ln, TX|             231|
+---+-----+---------------------+----------------+

### RenameField

DynamicFrame 내에서 컬럼(노드) 이름을 변경한다.

In [40]:
dyf_renameField_1 = RenameField.apply(dyf_selectFromCollection, "`customers.val.address`", "address")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [41]:
dyf_renameField_2 = RenameField.apply(dyf_renameField_1, "`customers.val.id`", "cust_id")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [42]:
dyf_renameField_2.toDF().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+-----+-------------------+-------+
| id|index|            address|cust_id|
+---+-----+-------------------+-------+
|  1|    0|     708 Fed Ln, CA|    932|
|  1|    1|  807 Deccan Dr, CA|    102|
|  2|    0|108 Park Street, TX|    623|
|  2|    1|   763 Marsh Ln, TX|    231|
+---+-----+-------------------+-------+

In [43]:
dyf_dropfields_rf = DropFields.apply(
  frame = dyf_renameField_2,
  paths = ["index", "id"]
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [44]:
dyf_dropfields_rf.toDF().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+-------+
|            address|cust_id|
+-------------------+-------+
|     708 Fed Ln, CA|    932|
|  807 Deccan Dr, CA|    102|
|108 Park Street, TX|    623|
|   763 Marsh Ln, TX|    231|
+-------------------+-------+

### ResolveChoice

컬럼의 데이터 타입이 2 가지 이상인 경우, 사용자가 지정한 컬럼 타입으로 타입 변환한다.

In [45]:
import pprint

pprint.pprint(ResolveChoice.describeArgs())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[{'defaultValue': None,
  'description': 'DynamicFrame to transform',
  'name': 'frame',
  'optional': False,
  'type': 'DynamicFrame'},
 {'defaultValue': None,
  'description': 'List of specs (path, action)',
  'name': 'specs',
  'optional': True,
  'type': 'List'},
 {'defaultValue': '',
  'description': 'resolve choice option',
  'name': 'choice',
  'optional': True,
  'type': 'String'},
 {'defaultValue': '',
  'description': 'Glue catalog database name, required for MATCH_CATALOG '
                 'choice',
  'name': 'database',
  'optional': True,
  'type': 'String'},
 {'defaultValue': '',
  'description': 'Glue catalog table name, required for MATCH_CATALOG choice',
  'name': 'table_name',
  'optional': True,
  'type': 'String'},
 {'defaultValue': '',
  'description': 'A unique string that is used to identify stats / state '
                 'information',
  'name': 'transformation_ctx',
  'optional': True,
  'type': 'String'},
 {'defaultValue': '""',
  'description': 'Any string

In [46]:
dyf_resolveChoice = dyf_dropfields_rf.resolveChoice(specs = [('cust_id','cast:String')])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [47]:
dyf_resolveChoice.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
|-- address: string
|-- cust_id: string

### Dataset 3

In [48]:
warehouse_inventory_list = [
              ['TX_WAREHOUSE', '{\
                          "strawberry":"220",\
                          "pineapple":"560",\
                          "mango":"350",\
                          "pears":null}'
              ],
              ['CA_WAREHOUSE', '{\
                         "strawberry":"34",\
                         "pineapple":"123",\
                         "mango":"42",\
                         "pears":null}'
              ],
              ['CO_WAREHOUSE', '{\
                         "strawberry":"340",\
                         "pineapple":"180",\
                         "mango":"2",\
                         "pears":null}'
              ]
            ]

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [49]:
warehouse_schema = StructType([StructField("warehouse_loc", StringType()),
                              StructField("data", StringType())])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [50]:
df_warehouse = spark.createDataFrame(warehouse_inventory_list, schema=warehouse_schema)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [51]:
dyf_warehouse = DynamicFrame.fromDF(df_warehouse, glueContext, "dyf_warehouse")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [52]:
dyf_warehouse.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
|-- warehouse_loc: string
|-- data: string

### Unbox

DynamicFrame의 문자열에서 json 테이터를 추출한다. (Python의 json.loads() 함수와 비슷한 기능을 수행한다.)

In [53]:
import pprint

pprint.pprint(Unbox.describeArgs())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[{'defaultValue': None,
  'description': 'The DynamicFrame on which to call Unbox',
  'name': 'frame',
  'optional': False,
  'type': 'DynamicFrame'},
 {'defaultValue': None,
  'description': 'full path to the StringNode to unbox',
  'name': 'path',
  'optional': False,
  'type': 'String'},
 {'defaultValue': None,
  'description': 'file format -- "avro" or "json" only',
  'name': 'format',
  'optional': False,
  'type': 'String'},
 {'defaultValue': '',
  'description': 'A unique string that is used to identify stats / state '
                 'information',
  'name': 'transformation_ctx',
  'optional': True,
  'type': 'String'},
 {'defaultValue': '""',
  'description': 'Any string to be associated with errors in the '
                 'transformation',
  'name': 'info',
  'optional': True,
  'type': 'String'},
 {'defaultValue': '0',
  'description': 'Max number of errors in the transformation until processing '
                 'will error out',
  'name': 'stageThreshold',
  'optional'

In [54]:
dyf_unbox = Unbox.apply(frame=dyf_warehouse, path="data", format="json")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [55]:
dyf_unbox.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
|-- warehouse_loc: string
|-- data: struct
|    |-- strawberry: string
|    |-- pineapple: string
|    |-- mango: string
|    |-- pears: null

In [56]:
dyf_unbox.toDF().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------+----------------+
|warehouse_loc|            data|
+-------------+----------------+
| TX_WAREHOUSE|[220, 560, 350,]|
| CA_WAREHOUSE|  [34, 123, 42,]|
| CO_WAREHOUSE|  [340, 180, 2,]|
+-------------+----------------+

### UnnestFrame

DynamicFrame의 중첩을 해제하고 중첩 된 개체를 최상위 요소로 병합하고 배열 개체에 대한 join 키를 생성한다.

In [57]:
dyf_unnest = UnnestFrame.apply(frame=dyf_unbox)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [58]:
dyf_unnest.toDF().printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- warehouse_loc: string (nullable = true)
 |-- data.strawberry: string (nullable = true)
 |-- data.pineapple: string (nullable = true)
 |-- data.mango: string (nullable = true)
 |-- data.pears: null (nullable = true)

In [59]:
dyf_unnest.toDF().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------+---------------+--------------+----------+----------+
|warehouse_loc|data.strawberry|data.pineapple|data.mango|data.pears|
+-------------+---------------+--------------+----------+----------+
| TX_WAREHOUSE|            220|           560|       350|      null|
| CA_WAREHOUSE|             34|           123|        42|      null|
| CO_WAREHOUSE|            340|           180|         2|      null|
+-------------+---------------+--------------+----------+----------+

### DropNullFields

DynamicFrame의 모든 레코드에서 값이 누락되거나 널(Null)인 필드를 삭제한다.

In [60]:
dyf_dropNullfields = DropNullFields.apply(frame=dyf_unnest)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

null_fields ['`data.pears`']

In [61]:
dyf_dropNullfields.toDF().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------+---------------+--------------+----------+
|warehouse_loc|data.strawberry|data.pineapple|data.mango|
+-------------+---------------+--------------+----------+
| TX_WAREHOUSE|            220|           560|       350|
| CA_WAREHOUSE|             34|           123|        42|
| CO_WAREHOUSE|            340|           180|         2|
+-------------+---------------+--------------+----------+

### SplitFields


DynamicFrame을 지정된 필드별로 두 개의 새로운 것으로 분할한다.

In [62]:
dyf_splitFields = SplitFields.apply(frame=dyf_dropNullfields,
                                    paths=["`data.strawberry`", "`data.pineapple`"],
                                    name1="a", name2="b")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [63]:
dyf_retrieve_a = SelectFromCollection.apply(dyf_splitFields, "a")
dyf_retrieve_a.toDF().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+--------------+
|data.strawberry|data.pineapple|
+---------------+--------------+
|            220|           560|
|             34|           123|
|            340|           180|
+---------------+--------------+

In [64]:
dyf_retrieve_b = SelectFromCollection.apply(dyf_splitFields, "b")
dyf_retrieve_b.toDF().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------+----------+
|warehouse_loc|data.mango|
+-------------+----------+
| TX_WAREHOUSE|       350|
| CA_WAREHOUSE|        42|
| CO_WAREHOUSE|         2|
+-------------+----------+

### SplitRows


DynamicFrame을 지정된 행으로 두 개로 분할한다.

In [65]:
dyf_splitRows = SplitRows.apply(frame=dyf_dropNullfields,
                               comparison_dict={"`data.pineapple`": {
                                   ">": "100", 
                                   "<": "200"}},
                               name1='pa_200_less',
                               name2='pa_200_more')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [66]:
dyf_pa_200_less = SelectFromCollection.apply(dyf_splitRows, "pa_200_less")
dyf_pa_200_less.toDF().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------+---------------+--------------+----------+
|warehouse_loc|data.strawberry|data.pineapple|data.mango|
+-------------+---------------+--------------+----------+
| CA_WAREHOUSE|             34|           123|        42|
| CO_WAREHOUSE|            340|           180|         2|
+-------------+---------------+--------------+----------+

In [67]:
dyf_pa_200_more = SelectFromCollection.apply(dyf_splitRows, "pa_200_more")
dyf_pa_200_more.toDF().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------+---------------+--------------+----------+
|warehouse_loc|data.strawberry|data.pineapple|data.mango|
+-------------+---------------+--------------+----------+
| TX_WAREHOUSE|            220|           560|       350|
+-------------+---------------+--------------+----------+

### Spigot


Spigot을 사용하면 변환 중에 샘플 데이터 세트를 대상에 쓸 수 있다.

이 예제에서는 상위 10 개 레코드를 로컬 저장소에 기록한다.

In [68]:
import pprint

pprint.pprint(Spigot.describeArgs())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[{'defaultValue': None,
  'description': 'spigot this DynamicFrame',
  'name': 'frame',
  'optional': False,
  'type': 'DynamicFrame'},
 {'defaultValue': None,
  'description': 'file path to write spigot',
  'name': 'path',
  'optional': False,
  'type': 'string'},
 {'defaultValue': None,
  'description': 'topk -> first k records, prob -> probability of picking any '
                 'record',
  'name': 'options',
  'optional': True,
  'type': 'Json'}]

In [69]:
dyf_splitFields = Spigot.apply(dyf_pa_200_less, '/home/glue/GlueLocalOutput/Spigot/', 'top10')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
name 'basestring' is not defined
Traceback (most recent call last):
  File "/home/aws-glue-libs/awsglue.zip/awsglue/transforms/transform.py", line 24, in apply
    return transform(*args, **kwargs)
  File "/home/aws-glue-libs/awsglue.zip/awsglue/transforms/field_transforms.py", line 434, in __call__
    return frame.spigot(path,options,transformation_ctx)
  File "/home/aws-glue-libs/awsglue.zip/awsglue/dynamicframe.py", line 297, in spigot
    return DynamicFrame(self._jdf.spigot(path, makeOptions(self._sc, options), transformation_ctx,
  File "/home/aws-glue-libs/awsglue.zip/awsglue/utils.py", line 24, in makeOptions
    elif isinstance(py_obj, basestring):
NameError: name 'basestring' is not defined



⚠️ 로컬 환경 구성에 따라 Spigot에 오류가 발생할 수 있다. AWS Glue 엔드 포인트 또는 AWS Glue ETL 작업을 사용하여 이 함수를 실행할 수 있다.

### Write Dynamic Frame


`write_dynamic_frame` 함수는 지정된 연결 및 형식을 사용하여 DynamicFrame을 저장한다. 이 예제에서는 로컬 스토리지에 저장한다.(`connection_options`에서 POSIX 경로 인수와 함께 `S3`의 `connection_type`을 사용하여 로컬 스토리지에 쓸 수 있다).

<b>참고</b>: 
__[DynamicFrameWriter](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-dynamic-frame-writer.html#aws-glue-api-crawler-pyspark-extensions-dynamic-frame-writer-from_options)__

In [70]:
glueContext.write_dynamic_frame.from_options(frame=dyf_splitFields,
                                             connection_options={'path': '/home/glue/GlueLocalOutput/'},
                                             connection_type='s3',
                                             format='json')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<awsglue.dynamicframe.DynamicFrameCollection object at 0x7f33c4fb05c0>

### Reference

- __[Building an AWS Glue ETL pipeline locally without an AWS account](
https://aws.amazon.com/ko/blogs/big-data/building-an-aws-glue-etl-pipeline-locally-without-an-aws-account/)__
- __[Developing AWS Glue ETL jobs locally using a container](https://aws.amazon.com/ko/blogs/big-data/developing-aws-glue-etl-jobs-locally-using-a-container/)__