# Joining, Filtering, and Loading Relational Data with AWS Glue

이 예제는 DynamicFrame의 Join, Fliter, Transfer을 수행방법을 보여줍니다. 

### 1. Crawl our sample dataset

먼저 공공 데이터셋인 [모든 입법자 사이트](http://everypolitician.org)를 이용할 예정입니다.

원칙적으로는 해당 Site에 자료를 다운로드 받아야 하지만, 해당 자료는 이미 Public S3에 올려져 있습니다.

    's3://awsglue-datasets/examples/us-legislators'

이 파일은 JSON Format으로 미국 입법자와 그들의 의회에서의 의석에 대한 정보를 포함하고 있습니다. 

중간 중간, 처리 결과를 Target S3로 내려주어야 하기 때문에, 다음과 같은 폴더를 사용할 예정입니다.
    
    's3://glue-sample-target-${accountid}/output-dir/'

첫번째로 할 부분은 Crawler를 만들고 수행하여야 되는 부분이며, 이 부분은 이미 앞에서 수행하였습니다.
수행하고 나면 아래의 6개의 table들이 생성이 될 것 입니다.

 - `persons_json`
 - `memberships_json`
 - `organizations_json`
 - `events_json`
 - `areas_json`
 - `countries_r_json`

해당 테이블은 반정규화된 테이블 형태로 입법자와 그들의 역사를 저장하고 있습니다.

### 2. Getting started

We will write a script that:

1. persons, organizations, membership 기록을 결합하여, 하나의 단일 legislator 테이블로 생성합니다. 이를 반정규화라고 합니다. 
2. 상원의원과 대표드을 구분합니다. 
3. 나중에 분석 할 수 있도록 각각을 별도의 Parquet 파일로 작성합니다.

AWS Glue 라이브러리를 이용하기 전에, 약간의 사전 수행 Script가 필요합니다. 즉 하나의 GlueContext를 정의해야 합니다. 
추가적으로 S3 output directory setting을 수행합니다. 


In [1]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import boto3


glueContext = GlueContext(SparkContext.getOrCreate())
accountid = boto3.client('sts').get_caller_identity().get('Account')
outputs3path = 's3://glue-sample-target' + accountid + '/output-dir/'
temps3path = 's3://glue-sample-target' + accountid + '/temp-dir/'

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1582794776994_0001,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 3. 크롤로거 수행한 테이블 스키마 확인해 보기

Glue DataCatalog에는 Glue Crawler가 수집한 테이블 스키마 정보가 들어있습니다. 
예를 들어, 'person_json' 스키마 정보를 보고 싶다면 아래 Code를 돌려주세요. 


In [3]:
persons = glueContext.create_dynamic_frame.from_catalog(database="legislators", table_name="persons_json")
print ("Count: ", persons.count())
persons.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Count:  1961
root
|-- family_name: string
|-- name: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- gender: string
|-- image: string
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- other_names: array
|    |-- element: struct
|    |    |-- lang: string
|    |    |-- note: string
|    |    |-- name: string
|-- sort_name: string
|-- images: array
|    |-- element: struct
|    |    |-- url: string
|-- given_name: string
|-- birth_date: string
|-- id: string
|-- contact_details: array
|    |-- element: struct
|    |    |-- type: string
|    |    |-- value: string
|-- death_date: string

테이블에있는 각 사람은 의회 의원입니다.

memberships_json 스키마를 확인해 보겠습니다. 

In [4]:
memberships = glueContext.create_dynamic_frame.from_catalog(database="legislators", table_name="memberships_json")
print ("Count: ", memberships.count())
memberships.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Count:  10439
root
|-- area_id: string
|-- on_behalf_of_id: string
|-- organization_id: string
|-- role: string
|-- person_id: string
|-- legislative_period_id: string
|-- start_date: string
|-- end_date: string

조직에는 정당, 상하원, 상원, 백악관등으로 구성되어 있습니다. organizations_json 스키마를 확인해 보겠습니다. 


In [6]:
orgs = glueContext.create_dynamic_frame.from_catalog(database="legislators", table_name="organizations_json")
print ("Count: ", orgs.count())
orgs.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Count:  13
root
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- other_names: array
|    |-- element: struct
|    |    |-- lang: string
|    |    |-- note: string
|    |    |-- name: string
|-- id: string
|-- classification: string
|-- name: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- image: string
|-- seats: int
|-- type: string

### 4. Filtering

원하는 필드 만 유지하고`id`를`org_id`로 이름을 변경해 보겠습니다. 데이터 세트는 우리가 모든 것을 처리할 수 있는 것을 볼 만큼 작습니다. 
`toDF ()`는 DynamicFrame을 Spark DataFrame으로 변환하므로 SparkSQL에 이미 존재하는 변환을 적용할 수 있습니다. 


In [7]:
orgs = orgs.drop_fields(['other_names','identifiers']).rename_field('id', 'org_id').rename_field('name', 'org_name')
orgs.toDF().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+--------------------+--------------------+--------------------+--------------------+-----+-----------+
|classification|              org_id|            org_name|               links|               image|seats|       type|
+--------------+--------------------+--------------------+--------------------+--------------------+-----+-----------+
|         party|            party/al|                  AL|                null|                null| null|       null|
|         party|      party/democrat|            Democrat|[[website, http:/...|https://upload.wi...| null|       null|
|         party|party/democrat-li...|    Democrat-Liberal|[[website, http:/...|                null| null|       null|
|   legislature|d56acebe-8fdc-47b...|House of Represen...|                null|                null|  435|lower house|
|         party|   party/independent|         Independent|                null|                null| null|       null|
|         party|party/new_progres...|     New Pr

'membership'에 나타나는 'organizations'를 살펴보도록 하겠습니다. 

In [8]:
memberships.select_fields(['organization_id']).toDF().distinct().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+
|     organization_id|
+--------------------+
|d56acebe-8fdc-47b...|
|8fa6c3d2-71dc-478...|
+--------------------+

### 5. Putting it together

이제, 이 관계형 테이블들간에 입법자, 소속, 그리고 조직을 뭉처 하나의 테이블로 만들어 봅시다. 물론 Glue를 이용해서요.

 - 먼어, `persons`,`memberships` 를 `id` 와`person_id` 필드를 이용하여 조인합니다. 
 - 그 다음, 해당 결과와 'orgs'의 `org_id`, `organization_id` 필드를 이용하여 조인합니다. 
 - 그 다음, 중복된 필드 - `person_id` , `org_id` 를 제거합니다. 

우리는 이러한 모든 작업을 하나의 (확장 된) 코드 라인으로 수행 할 수 있습니다.

In [10]:
l_history = Join.apply(orgs,
                       Join.apply(persons, memberships, 'id', 'person_id'),
                       'org_id', 'organization_id').drop_fields(['person_id', 'org_id'])
print ("Count: ", l_history.count())
l_history.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Count:  10439
root
|-- role: string
|-- seats: int
|-- org_name: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- type: string
|-- sort_name: string
|-- area_id: string
|-- images: array
|    |-- element: struct
|    |    |-- url: string
|-- on_behalf_of_id: string
|-- other_names: array
|    |-- element: struct
|    |    |-- lang: string
|    |    |-- note: string
|    |    |-- name: string
|-- name: string
|-- birth_date: string
|-- organization_id: string
|-- gender: string
|-- classification: string
|-- death_date: string
|-- legislative_period_id: string
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- image: string
|-- given_name: string
|-- start_date: string
|-- family_name: string
|-- id: string
|-- contact_details: array
|    |-- element: struct
|    |    |-- type: string
|    |    |-- value: string
|-- end_date: string

와우! 이제 분석에 사용할 최종 테이블을 생성하였습니다. 
분석을 위해 작고 효율적인 형식 (예 : Parquet)으로 작성해 보겠습니다.
이를 통하여, AWS Glue, Athena 또는 Redshift Spectrum에서 SQL을 실행할 수 있습니다.

다음 호출은 빠른 병렬 지원을 위해 여러 파일에 테이블을 씁니다.
나중에 분석 할 때 효율적으로 읽게 됩니다.


In [11]:
glueContext.write_dynamic_frame.from_options(frame = l_history,
              connection_type = "s3",
              connection_options = {"path": outputs3path + "legislator_history"},
              format = "parquet")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<awsglue.dynamicframe.DynamicFrame object at 0x7fd2dbddaf28>

모든 히스토리 데이터를 단일 파일로 저장하려면 데이터 프레임으로 변환하고 다시 분할 한 후
써주세요

In [12]:
s_history = l_history.toDF().repartition(1)
s_history.write.parquet( outputs3path + 'legislator_single')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

또는 상원과 하원에 의해 분리하려면 :

In [13]:
l_history.toDF().write.parquet( outputs3path + 'legislator_part', partitionBy=['org_name'])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 6. 관계형 데이터베이스에 쓰기

AWS Glue를 사용하면 Redshift와 같은 관계형 데이터베이스에 쉽게 기록할 수 있습니다.
반 구조화 된 데이터. DynamicFrames를 평탄화하는 변환 'relationalize ()'를 제공합니다.
프레임의 오브젝트가 아무리 복잡해도 상관 없습니다.

예제에서`l_history` DynamicFrame을 사용하여 루트 테이블 이름을 전달합니다.
(`hist_root`) 및`relationalize`에 대한 임시 작업 경로로`DynamicFrameCollection`을 반환합니다.
그런 다음 해당 컬렉션의 DynamicFrames 이름을 나열합니다.

In [16]:
dfc = l_history.relationalize("hist_root", temps3path)
dfc.keys()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

dict_keys(['hist_root', 'hist_root_links', 'hist_root_images', 'hist_root_identifiers', 'hist_root_other_names', 'hist_root_contact_details'])

Relationalize broke the history table out into 6 new tables: a root table containing a record for each object in the
dynamic frame, and auxiliary tables for the arrays. Array handling in relational databases is often sub-optimal,
especially as those arrays become large. Separating out the arrays into separate tables makes the queries go much
faster.
Relationalize는 히스토리 테이블을 6 개의 새 테이블로 분할했습니다. Root Table은 DynamicFrame의 개별 Object를 가지고 있으며, 배열 형태를 위한 보토 테이블들로 구성됩니다. 관계형 Database에서 배열 처리는 종종 최적화되지 않은 상태에서 동작합니다. 특히, 그 배열이 클 경우에는 심합니다. 배열을 별도의 테이블로 분리하면 쿼리가 훨씬 빨라집니다. 

'contact_details'를 살펴보고 분리를 시도해 보겠습니다.

In [17]:
l_history.select_fields('contact_details').printSchema()
dfc.select('hist_root_contact_details').toDF().where("id = 10 or id = 75").orderBy(['id','index']).show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
|-- contact_details: array
|    |-- element: struct
|    |    |-- type: string
|    |    |-- value: string

+---+-----+------------------------+-------------------------+
| id|index|contact_details.val.type|contact_details.val.value|
+---+-----+------------------------+-------------------------+
| 10|    0|                     fax|             202-228-3027|
| 10|    1|                   phone|             202-224-6542|
| 10|    2|                 twitter|               SenSchumer|
| 75|    0|                     fax|             202-224-6747|
| 75|    1|                   phone|             202-224-3934|
+---+-----+------------------------+-------------------------+

The `contact_details` field was an array of structs in the original DynamicFrame.
Each element of those arrays is a separate row in the auxiliary table, indexed by `index`. The `id` here is a foreign key into the `hist_root` table with the key `contact_details`.
'contact_details'필드는 원래 DynamicFrame의 구조체 배열입니다. 배열ㅢ 각 요소들은 개별적인 행으로 구분되어지며, 'index' 를 통하여 색인이 이루어집니다. 여기서는 'id' 키가 'hist_root'의 foreign key가 되며, 'contact_details' 와 맵핑됩니다. 


In [20]:
dfc.select('hist_root') \
    .toDF() \
    .where("contact_details = 10 or contact_details = 75") \
    .select(['id', 'given_name', 'family_name', 'contact_details']) \
    .show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+----------+-----------+---------------+
|                  id|given_name|family_name|contact_details|
+--------------------+----------+-----------+---------------+
|60ae8ebc-b581-44e...|   Charles|    Schumer|             10|
|0d69087e-f098-460...|    Daniel|     Inouye|             75|
+--------------------+----------+-----------+---------------+

위의 명령에서 우리는`toDF ()`와`where` 표현식을 사용하여보고자하는 행을 필터링 함을 주목하십시오.

`hist_root` table과 보조 tables 을 Join하면 아래와 같은 것들을 쉽게 적용할 수 있습니다. 

 - 어레이 지원없는 데이터베이스에 데이터를 로드하기.
 - SQL을 사용하여 배열의 각 개별 항목 쿼리.

우리는 이미, 'redshift-glue-test'에 대한 connection을 이미 가지고 있습니다. 
We already have a connection set up called `redshift3`. To create your own, see
[this topic in the Developer Guide](http://docs.aws.amazon.com/glue/latest/dg/populate-add-connection.html).
한 번에 하나씩 DynamicFrames를 순환하며 컬렉션을 Redshift에 기록해 보겠습니다.

In [None]:
for df_name in dfc.keys():
        m_df = dfc.select(df_name)
        print "Writing to Redshift table: ", df_name
        glueContext.write_dynamic_frame.from_jdbc_conf(frame = m_df,
                                                       catalog_connection = "redshift-glue-test",
                                                       connection_options = {"dbtable": df_name, "database": "testdb"},
                                                       redshift_tmp_dir = "s3://glue-sample-target/temp-dir/")

다음은 Redshift의 테이블 모양입니다. (Redshift QueryEditor에 연결했습니다.)

    select * from pg_table_def
    
                       List of relations
     schema |           name            | type  |   owner
    --------+---------------------------+-------+-----------
     public | hist_root                 | table | test_user
     public | hist_root_contact_details | table | test_user
     public | hist_root_identifiers     | table | test_user
     public | hist_root_images          | table | test_user
     public | hist_root_links           | table | test_user
     public | hist_root_other_names     | table | test_user
    (6 rows)

    select * from pg_table_def where tablename = 'hist_root_contact_details';
    
                 Table "public.hist_root_contact_details"
              Column           |           Type           | Modifiers
    ---------------------------+--------------------------+-----------
     id                        | bigint                   |
     index                     | integer                  |
     contact_details.val.type  | character varying(65535) |
     contact_details.val.value | character varying(65535) |

    select * from pg_table_def where tablename = 'hist_root';
    
                       Table "public.hist_root"
            Column         |           Type           | Modifiers
    -----------------------+--------------------------+-----------
     role                  | character varying(65535) |
     seats                 | integer                  |
     org_name              | character varying(65535) |
     links                 | bigint                   |
     type                  | character varying(65535) |
     sort_name             | character varying(65535) |
     area_id               | character varying(65535) |
     images                | bigint                   |
     on_behalf_of_id       | character varying(65535) |
     other_names           | bigint                   |
     birth_date            | character varying(65535) |
     name                  | character varying(65535) |
     organization_id       | character varying(65535) |
     gender                | character varying(65535) |
     classification        | character varying(65535) |
     legislative_period_id | character varying(65535) |
     identifiers           | bigint                   |
     given_name            | character varying(65535) |
     image                 | character varying(65535) |
     family_name           | character varying(65535) |
     id                    | character varying(65535) |
     death_date            | character varying(65535) |
     start_date            | character varying(65535) |
     contact_details       | bigint                   |
     end_date              | character varying(65535) |

이제 Redshift에서 SQL을 사용하여 이러한 테이블을 쿼리 할 수 있습니다.

    select * from hist_root_contact_details where id = 10 or id = 75 order by id, index;

아래와 같은 결과값이 나옵니다.

     id | index | contact_details.val.type | contact_details.val.value
    ----+-------+--------------------------+---------------------------
     10 |     0 | fax                      |
     10 |     1 |                          | 202-225-1314
     10 |     2 | phone                    |
     10 |     3 |                          | 202-225-3772
     10 |     4 | twitter                  |
     10 |     5 |                          | MikeRossUpdates
     75 |     0 | fax                      |
     75 |     1 |                          | 202-225-7856
     75 |     2 | phone                    |
     75 |     3 |                          | 202-225-2711
     75 |     4 | twitter                  |
     75 |     5 |                          | SenCapito
    (12 rows)


### 결론

전반적으로 AWS Glue는 매우 유연하여 몇 줄의 코드로 일반적으로 작성하는 데 며칠이 걸리는 코드를 구성합니다. 
이후, 좀 더 DynamicFrame에 대한 장점과 DataFrame간의 차이점을 확인하면서 작성하면 더 훌륭한 코드가 나올 것입니다. 