# Joining, Filtering, and Loading Relational Data with AWS Glue

This example shows how to do joins and filters with transforms entirely on DynamicFrames.

### 1. Crawl our sample dataset

The dataset we'll be using in this example was downloaded from the [EveryPolitician](http://everypolitician.org)
website into our sample-dataset bucket in S3, at:

    s3://awsglue-datasets/examples/us-legislators.

It contains data in JSON format about United States legislators and the seats they have held
in the the House of Representatives and the Senate.

For purposes of our example code, we are assuming that you have created an AWS S3 target bucket and folder,
which we refer to here as `s3://glue-sample-target/output-dir/`.

The first step is to crawl this data and put the results into a database called `legislators`
in your Data Catalog, as described [here in the Developer Guide](http://docs.aws.amazon.com/glue/latest/dg/console-crawlers.html).
The crawler will create the following tables in the `legislators` database:

 - `persons_json`
 - `memberships_json`
 - `organizations_json`
 - `events_json`
 - `areas_json`
 - `countries_r_json`

This is a semi-normalized collection of tables containing legislators and their histories.

### 2. Getting started

We will write a script that:

1. Combines persons, organizations, and membership histories into a single legislator
   history data set. This is often referred to as de-normalization.
2. Separates out the senators from the representatives.
3. Writes each of these out to separate parquet files for later analysis.

Begin by running some boilerplate to import the AWS Glue libraries we'll need and set up a single `GlueContext`.


In [None]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## TO BE REPLACED BY YOUR BUCKET NAME ##
my_output_bucket_name = "<YOUR BUCKET NAME> (eg. demo-legislators-293278930)"

glueContext = GlueContext(SparkContext.getOrCreate())

In [2]:
import boto3

client = boto3.client('glue', 'eu-west-1')




In [2]:
%help


# Available Magic Commands

## Sessions Magic

----
    %help                             Return a list of descriptions and input types for all magic commands. 
    %profile            String        Specify a profile in your aws configuration to use as the credentials provider.
    %region             String        Specify the AWS region in which to initialize a session. 
                                      Default from ~/.aws/config on Linux or macOS, 
                                      or C:\Users\ USERNAME \.aws\config" on Windows.
    %idle_timeout       Int           The number of minutes of inactivity after which a session will timeout. 
                                      Default: 2880 minutes (48 hours).
    %session_id_prefix  String        Define a String that will precede all session IDs in the format 
                                      [session_id_prefix]-[session_id]. If a session ID is not provided,
                                      a random UUID will be generated.
    %status                           Returns the status of the current Glue session including its duration, 
                                      configuration and executing user / role.
    %session_id                       Returns the session ID for the running session. 
    %list_sessions                    Lists all currently running sessions by ID.
    %stop_session                     Stops the current session.
    %glue_version       String        The version of Glue to be used by this session. 
                                      Currently, the only valid options are 2.0 and 3.0. 
                                      Default: 2.0.
----

## Selecting Job Types

----
    %streaming          String        Sets the session type to Glue Streaming.
    %etl                String        Sets the session type to Glue ETL.
    %glue_ray           String        Sets the session type to Glue Ray.
----

## Glue Config Magic 
*(common across all job types)*

----

    %%configure         Dictionary    A json-formatted dictionary consisting of all configuration parameters for 
                                      a session. Each parameter can be specified here or through individual magics.
    %iam_role           String        Specify an IAM role ARN to execute your session with.
                                      Default from ~/.aws/config on Linux or macOS, 
                                      or C:\Users\%USERNAME%\.aws\config` on Windows.
    %number_of_workers  int           The number of workers of a defined worker_type that are allocated 
                                      when a session runs.
                                      Default: 5.
    %additional_python_modules  List  Comma separated list of additional Python modules to include in your cluster 
                                      (can be from Pypi or S3).
----

                                      
## Magic for Spark Jobs (ETL & Streaming)

----
    %worker_type        String        Set the type of instances the session will use as workers. 
                                      ETL and Streaming support G.1X and G.2X. 
                                      Default: G.1X.
    %connections        List          Specify a comma separated list of connections to use in the session.
    %extra_py_files     List          Comma separated list of additional Python files From S3.
    %extra_jars         List          Comma separated list of additional Jars to include in the cluster.
    %spark_conf         String        Specify custom spark configurations for your session. 
                                      E.g. %spark_conf spark.serializer=org.apache.spark.serializer.KryoSerializer
----
                                      
## Magic for Ray Job

----
    %min_workers        Int           The minimum number of workers that are allocated to a Ray job. 
                                      Default: 0.
    %object_memory_head Int           The percentage of free memory on the instance head node after a warm start. 
                                      Minimum: 0. Maximum: 100.
    %object_memory_worker Int         The percentage of free memory on the instance worker nodes after a warm start. 
                                      Minimum: 0. Maximum: 100.
----

## Action Magic

----

    %%sql               String        Run SQL code. All lines after the initial %%sql magic will be passed
                                      as part of the SQL code.  
----



### 3. Checking the schemas that the crawler identified

Next, you can easily examine the schemas that the crawler recorded in the Data Catalog. For example,
to see the schema of the `persons_json` table, run the following code:

In [3]:
persons = glueContext.create_dynamic_frame.from_catalog(database="legislators", table_name="persons_json")
print("Count: " + str(persons.count()))
persons.printSchema()

Count: 1961
root
|-- family_name: string
|-- name: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- gender: string
|-- image: string
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- other_names: array
|    |-- element: struct
|    |    |-- lang: string
|    |    |-- note: string
|    |    |-- name: string
|-- sort_name: string
|-- images: array
|    |-- element: struct
|    |    |-- url: string
|-- given_name: string
|-- birth_date: string
|-- id: string
|-- contact_details: array
|    |-- element: struct
|    |    |-- type: string
|    |    |-- value: string
|-- death_date: string


Each person in the table is a member of some congressional body.

Look at the schema of the `memberships_json` table:

In [4]:
memberships = glueContext.create_dynamic_frame.from_catalog(database="legislators", table_name="memberships_json")
print("Count: " + str(memberships.count()))
memberships.printSchema()

Count: 10439
root
|-- area_id: string
|-- on_behalf_of_id: string
|-- organization_id: string
|-- role: string
|-- person_id: string
|-- legislative_period_id: string
|-- start_date: string
|-- end_date: string


Organizations are parties and the two chambers of congress, the Senate and House.
Look at the schema of the `organizations_json` table:

In [5]:
orgs = glueContext.create_dynamic_frame.from_catalog(database="legislators", table_name="organizations_json")
print("Count: " + str(orgs.count()))
orgs.printSchema()

Count: 13
root
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- other_names: array
|    |-- element: struct
|    |    |-- lang: string
|    |    |-- note: string
|    |    |-- name: string
|-- id: string
|-- classification: string
|-- name: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- image: string
|-- seats: int
|-- type: string


### 4. Filtering

Let's only keep the fields that we want and rename `id` to `org_id`. The dataset is small enough that we can
look at the whole thing. The `toDF()` converts a DynamicFrame to a Spark DataFrame, so we can apply the
transforms that already exist in SparkSQL:

In [6]:
orgs = orgs.drop_fields(['other_names','identifiers']).rename_field('id', 'org_id').rename_field('name', 'org_name')
orgs.toDF().show()

+--------------+--------------------+--------------------+--------------------+--------------------+-----+-----------+
|classification|              org_id|            org_name|               links|               image|seats|       type|
+--------------+--------------------+--------------------+--------------------+--------------------+-----+-----------+
|         party|            party/al|                  AL|                null|                null| null|       null|
|         party|      party/democrat|            Democrat|[[website, http:/...|https://upload.wi...| null|       null|
|         party|party/democrat-li...|    Democrat-Liberal|[[website, http:/...|                null| null|       null|
|   legislature|d56acebe-8fdc-47b...|House of Represen...|                null|                null|  435|lower house|
|         party|   party/independent|         Independent|                null|                null| null|       null|
|         party|party/new_progres...|     New Pr

Let's look at the `organizations` that appear in `memberships`:

In [7]:
memberships.select_fields(['organization_id']).toDF().distinct().show()

+--------------------+
|     organization_id|
+--------------------+
|8fa6c3d2-71dc-478...|
|d56acebe-8fdc-47b...|
+--------------------+


### 5. Putting it together

Now let's join these relational tables to create one full history table of legislator
memberships and their correponding organizations, using AWS Glue.

 - First, we join `persons` and `memberships` on `id` and `person_id`.
 - Next, join the result with orgs on `org_id` and `organization_id`.
 - Then, drop the redundant fields, `person_id` and `org_id`.

We can do all these operations in one (extended) line of code:

In [None]:
l_history = Join.apply(orgs,
                       Join.apply(persons, memberships, 'id', 'person_id'),
                       'org_id', 'organization_id').drop_fields(['person_id', 'org_id'])
print("Count: " + str(l_history.count()))
l_history.printSchema()

Execution Interrupted. Attempting to cancel the statement (statement_id=7)
Statement 7 has been cancelled


Great! We now have the final table that we'd like to use for analysis.
Let's write it out in a compact, efficient format for analytics, i.e. Parquet,
that we can run SQL over in AWS Glue, Athena, or Redshift Spectrum.

The following call writes the table across multiple files to support fast parallel
reads when doing analysis later:


In [9]:
glueContext.write_dynamic_frame.from_options(frame = l_history,
              connection_type = "s3",
              connection_options = {"path": "s3://" + my_output_bucket_name + "/output-dir/legislator_history"},
              format = "parquet")

<awsglue.dynamicframe.DynamicFrame object at 0x7f5a64ad3d10>


To put all the history data into a single file, we need to convert it to a data frame, repartition it, and
write it out.

In [10]:
s_history = l_history.toDF().repartition(1)
s_history.write.mode("overwrite").parquet('s3://'+ my_output_bucket_name +'/output-dir/legislator_single')




Or if you want to separate it by the Senate and the House:


In [11]:
l_history.toDF().write.mode("overwrite").parquet('s3://'+ my_output_bucket_name +'/output-dir/legislator_part', partitionBy=['org_name'])


