# Joining, Filtering, and Loading Relational Data with AWS Glue

This example shows how to do joins and filters with transforms entirely on DynamicFrames.

### 1. Crawl our sample dataset

The dataset we'll be using in this example was downloaded from the [EveryPolitician](http://everypolitician.org)
website into our sample-dataset bucket in S3, at:

    s3://awsglue-datasets/examples/us-legislators.

It contains data in JSON format about United States legislators and the seats they have held
in the the House of Representatives and the Senate.

For purposes of our example code, we are assuming that you have created an AWS S3 target bucket and folder,
which we refer to here as `s3://aws-glue-875692608981/output-dir/`.

The first step is to crawl this data and put the results into a database called `legislators`
in your Data Catalog, as described [here in the Developer Guide](http://docs.aws.amazon.com/glue/latest/dg/console-crawlers.html).
The crawler will create the following tables in the `legislators` database:

 - `persons_json`
 - `memberships_json`
 - `organizations_json`
 - `events_json`
 - `areas_json`
 - `countries_r_json`

This is a semi-normalized collection of tables containing legislators and their histories.

### 2. Getting started

We will write a script that:

1. Combines persons, organizations, and membership histories into a single legislator
   history data set. This is often referred to as de-normalization.
2. Separates out the senators from the representatives.
3. Writes each of these out to separate parquet files for later analysis.

Begin by running some boilerplate to import the AWS Glue libraries we'll need and set up a single `GlueContext`.


In [None]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())

### 3. Checking the schemas that the crawler identified

Next, you can easily examine the schemas that the crawler recorded in the Data Catalog. For example,
to see the schema of the `persons_json` table, run the following code:

In [None]:
persons = glueContext.create_dynamic_frame.from_catalog(database="legislators", table_name="persons_json")
print("Count: " + str(persons.count()))
persons.printSchema()

Each person in the table is a member of some congressional body.

Look at the schema of the `memberships_json` table:

In [None]:
memberships = glueContext.create_dynamic_frame.from_catalog(database="legislators", table_name="memberships_json")
print("Count: " + str(memberships.count()))
memberships.printSchema()

Organizations are parties and the two chambers of congress, the Senate and House.
Look at the schema of the `organizations_json` table:

In [None]:
orgs = glueContext.create_dynamic_frame.from_catalog(database="legislators", table_name="organizations_json")
print("Count: " + str(orgs.count()))
orgs.printSchema()

### 4. Filtering

Let's only keep the fields that we want and rename `id` to `org_id`. The dataset is small enough that we can
look at the whole thing. The `toDF()` converts a DynamicFrame to a Spark DataFrame, so we can apply the
transforms that already exist in SparkSQL:

In [None]:
orgs = orgs.drop_fields(['other_names','identifiers']).rename_field('id', 'org_id').rename_field('name', 'org_name')
orgs.toDF().show()

Let's look at the `organizations` that appear in `memberships`:

In [None]:
memberships.select_fields(['organization_id']).toDF().distinct().show()


### 5. Putting it together

Now let's join these relational tables to create one full history table of legislator
memberships and their correponding organizations, using AWS Glue.

 - First, we join `persons` and `memberships` on `id` and `person_id`.
 - Next, join the result with orgs on `org_id` and `organization_id`.
 - Then, drop the redundant fields, `person_id` and `org_id`.

We can do all these operations in one (extended) line of code:

In [None]:
l_history = Join.apply(orgs,
                       Join.apply(persons, memberships, 'id', 'person_id'),
                       'org_id', 'organization_id').drop_fields(['person_id', 'org_id'])
print("Count: " + str(l_history.count()))
l_history.printSchema()

Great! We now have the final table that we'd like to use for analysis.
Let's write it out in a compact, efficient format for analytics, i.e. Parquet,
that we can run SQL over in AWS Glue, Athena, or Redshift Spectrum.

The following call writes the table across multiple files to support fast parallel
reads when doing analysis later:


In [None]:
glueContext.write_dynamic_frame.from_options(frame = l_history,
              connection_type = "s3",
              connection_options = {"path": "s3://aws-glue-875692608981/output-dir/legislator_history"},
              format = "parquet")

To put all the history data into a single file, we need to convert it to a data frame, repartition it, and
write it out.

In [None]:
s_history = l_history.toDF().repartition(1)
s_history.write.parquet('s3://aws-glue-875692608981/output-dir/legislator_single')

Or if you want to separate it by the Senate and the House:


In [None]:
l_history.toDF().write.parquet('s3://aws-glue-875692608981/output-dir/legislator_part', partitionBy=['org_name'])

### 6. Writing to Relational Databases

AWS Glue makes it easy to write it to relational databases like Redshift even with
semi-structured data. It offers a transform, `relationalize()`, that flattens DynamicFrames
no matter how complex the objects in the frame may be.

Using the `l_history` DynamicFrame in our example, we pass in the name of a root table
(`hist_root`) and a temporary working path to `relationalize`, which returns a `DynamicFrameCollection`.
We then list the names of the DynamicFrames in that collection:

In [None]:
dfc = l_history.relationalize("hist_root", "s3://aws-glue-875692608981/temp-dir/")
dfc.keys()

Relationalize broke the history table out into 6 new tables: a root table containing a record for each object in the
dynamic frame, and auxiliary tables for the arrays. Array handling in relational databases is often sub-optimal,
especially as those arrays become large. Separating out the arrays into separate tables makes the queries go much
faster.

Let's take a look at the separation by examining `contact_details`:

In [None]:
l_history.select_fields('contact_details').printSchema()
dfc.select('hist_root_contact_details').toDF().where("id = 10 or id = 75").orderBy(['id','index']).show()

The `contact_details` field was an array of structs in the original DynamicFrame.
Each element of those arrays is a separate row in the auxiliary table, indexed by
`index`. The `id` here is a foreign key into the `hist_root` table with the key
`contact_details`.

In [None]:
dfc.select('hist_root').toDF().where("contact_details = 10 or contact_details = 75").select(['id', 'given_name', 'family_name', 'contact_details']).show()

Notice in the commands above that we used `toDF()` and subsequently a `where` expression to filter for the rows that
we wanted to see.

So, joining the `hist_root` table with the auxiliary tables allows you to:

 - Load data into databases without array support.
 - Query each individual item in an array using SQL.

We already have a connection set up called `redshift3`. To create your own, see
[this topic in the Developer Guide](http://docs.aws.amazon.com/glue/latest/dg/populate-add-connection.html).
Let's write this collection into Redshift by cycling through the DynamicFrames one at a time:

In [None]:
for df_name in dfc.keys():
        m_df = dfc.select(df_name)
        print("Writing to Redshift table: " + df_name)
        glueContext.write_dynamic_frame.from_jdbc_conf(frame = m_df,
                                                       catalog_connection = "redshift",
                                                       connection_options = {"dbtable": df_name, "database": "redshiftdb"},
                                                       redshift_tmp_dir = "s3://aws-glue-875692608981/temp-dir/")

Notice in the commands above that we used `toDF()` and subsequently a `where` expression to filter for the rows that
we wanted to see.

So, joining the `hist_root` table with the auxiliary tables allows you to:

 - Load data into databases without array support.
 - Query each individual item in an array using SQL.

We already have a connection set up called `redshift`. To create your own, see
[this topic in the Developer Guide](http://docs.aws.amazon.com/glue/latest/dg/populate-add-connection.html).
Let's write this collection into Redshift by cycling through the DynamicFrames one at a time:

    for df_name in dfc.keys():
        m_df = dfc.select(df_name)
        print("Writing to Redshift table: " + df_name)
        glueContext.write_dynamic_frame.from_jdbc_conf(frame = m_df,
                                                       catalog_connection = "redshift3",
                                                       connection_options = {"dbtable": df_name, "database": "testdb"},
                                                       redshift_tmp_dir = "s3://aws-glue-875692608981/temp-dir/")

Here's what the tables look like in Redshift. (We connected to Redshift through psql.)

    testdb=# \d
                       List of relations
     schema |           name            | type  |   owner
    --------+---------------------------+-------+-----------
     public | hist_root                 | table | test_user
     public | hist_root_contact_details | table | test_user
     public | hist_root_identifiers     | table | test_user
     public | hist_root_images          | table | test_user
     public | hist_root_links           | table | test_user
     public | hist_root_other_names     | table | test_user
    (6 rows)

    testdb=# \d hist_root_contact_details
                 Table "public.hist_root_contact_details"
              Column           |           Type           | Modifiers
    ---------------------------+--------------------------+-----------
     id                        | bigint                   |
     index                     | integer                  |
     contact_details.val.type  | character varying(65535) |
     contact_details.val.value | character varying(65535) |

    testdb=# \d hist_root
                       Table "public.hist_root"
            Column         |           Type           | Modifiers
    -----------------------+--------------------------+-----------
     role                  | character varying(65535) |
     seats                 | integer                  |
     org_name              | character varying(65535) |
     links                 | bigint                   |
     type                  | character varying(65535) |
     sort_name             | character varying(65535) |
     area_id               | character varying(65535) |
     images                | bigint                   |
     on_behalf_of_id       | character varying(65535) |
     other_names           | bigint                   |
     birth_date            | character varying(65535) |
     name                  | character varying(65535) |
     organization_id       | character varying(65535) |
     gender                | character varying(65535) |
     classification        | character varying(65535) |
     legislative_period_id | character varying(65535) |
     identifiers           | bigint                   |
     given_name            | character varying(65535) |
     image                 | character varying(65535) |
     family_name           | character varying(65535) |
     id                    | character varying(65535) |
     death_date            | character varying(65535) |
     start_date            | character varying(65535) |
     contact_details       | bigint                   |
     end_date              | character varying(65535) |

Now you can query these tables using SQL in Redshift:

    testdb=# select * from hist_root_contact_details where id = 10 or id = 75 order by id, index;

With this result:

     id | index | contact_details.val.type | contact_details.val.value
    ----+-------+--------------------------+---------------------------
     10 |     0 | fax                      |
     10 |     1 |                          | 202-225-1314
     10 |     2 | phone                    |
     10 |     3 |                          | 202-225-3772
     10 |     4 | twitter                  |
     10 |     5 |                          | MikeRossUpdates
     75 |     0 | fax                      |
     75 |     1 |                          | 202-225-7856
     75 |     2 | phone                    |
     75 |     3 |                          | 202-225-2711
     75 |     4 | twitter                  |
     75 |     5 |                          | SenCapito
    (12 rows)


### Conclusion

Overall, AWS Glue is quite flexible allowing you to do in a few lines of code, what normally would take days to
write. The entire source to target ETL scripts from end-to-end can be found in the accompanying Python file,
[join_and_relationalize.py](join_and_relationalize.py).