In [1]:
from pyspark.sql import SparkSession
import os

spark = SparkSession.builder.appName('Ops').config('hive.metastore.client.factory.class, com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory').enableHiveSupport().getOrCreate()

AWS_ACCESS_KEY_ID = os.environ['AWS_ACCESS_KEY_ID']
AWS_SECRET_ACCESS_KEY = os.environ['AWS_SECRET_ACCESS_KEY']
AWS_SESSION_TOKEN = os.environ['AWS_SESSION_TOKEN']

spark.sparkContext._jsc.hadoopConfiguration().set('fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider')
spark.sparkContext._jsc.hadoopConfiguration().set('fs.s3a.access.key', AWS_ACCESS_KEY_ID)
spark.sparkContext._jsc.hadoopConfiguration().set('fs.s3a.secret.key', AWS_SECRET_ACCESS_KEY)
spark.sparkContext._jsc.hadoopConfiguration().set('fs.s3a.session.token', AWS_SESSION_TOKEN)

df = spark.read.csv('s3a://< s3 bucket name >/< | delimited csv file >', inferSchema=True,header=True,sep='|')
df.printSchema()

root
 |-- meta_data: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- entity_type_id: integer (nullable = true)
 |-- entity_subtype_id: integer (nullable = true)
 |-- original_source_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- is_canonical: string (nullable = true)
 |-- connect_id: integer (nullable = true)
 |-- promoted_from_entity_id: string (nullable = true)
 |-- duplicate_of_entity_id: string (nullable = true)
 |-- unique_hash: string (nullable = true)



In [2]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())
user = spark.sparkContext.sparkUser
print(user)

<bound method SparkContext.sparkUser of <SparkContext master=local[*] appName=PySparkShell>>


In [3]:
persons = glueContext.create_dynamic_frame.from_catalog(
             database="legislators",
             table_name="persons_json")
print("Count: ", persons.count())
persons.printSchema()

Count:  1961
root
|-- family_name: string
|-- name: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- gender: string
|-- image: string
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- other_names: array
|    |-- element: struct
|    |    |-- lang: string
|    |    |-- note: string
|    |    |-- name: string
|-- sort_name: string
|-- images: array
|    |-- element: struct
|    |    |-- url: string
|-- given_name: string
|-- birth_date: string
|-- id: string
|-- contact_details: array
|    |-- element: struct
|    |    |-- type: string
|    |    |-- value: string
|-- death_date: string



In [4]:
memberships = glueContext.create_dynamic_frame.from_catalog(
                 database="legislators",
                 table_name="memberships_json")
print("Count: ", memberships.count())
memberships.printSchema()

Count:  10439
root
|-- area_id: string
|-- on_behalf_of_id: string
|-- organization_id: string
|-- role: string
|-- person_id: string
|-- legislative_period_id: string
|-- start_date: string
|-- end_date: string



In [5]:
orgs = glueContext.create_dynamic_frame.from_catalog(
           database="legislators",
           table_name="organizations_json")
print("Count: ", orgs.count())
orgs.printSchema()


Count:  13
root
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- other_names: array
|    |-- element: struct
|    |    |-- lang: string
|    |    |-- note: string
|    |    |-- name: string
|-- id: string
|-- classification: string
|-- name: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- image: string
|-- seats: int
|-- type: string



In [6]:
orgs = orgs.drop_fields(['other_names',
                        'identifiers']).rename_field(
                            'id', 'org_id').rename_field(
                               'name', 'org_name')
orgs.toDF().show()

+--------------+--------------------+--------------------+--------------------+--------------------+-----+-----------+
|classification|              org_id|            org_name|               links|               image|seats|       type|
+--------------+--------------------+--------------------+--------------------+--------------------+-----+-----------+
|         party|            party/al|                  AL|                null|                null| null|       null|
|         party|      party/democrat|            Democrat|[[website, http:/...|https://upload.wi...| null|       null|
|         party|party/democrat-li...|    Democrat-Liberal|[[website, http:/...|                null| null|       null|
|   legislature|d56acebe-8fdc-47b...|House of Represen...|                null|                null|  435|lower house|
|         party|   party/independent|         Independent|                null|                null| null|       null|
|         party|party/new_progres...|     New Pr

In [7]:
memberships.select_fields(['organization_id']).toDF().distinct().show()

+--------------------+
|     organization_id|
+--------------------+
|d56acebe-8fdc-47b...|
|8fa6c3d2-71dc-478...|
+--------------------+



In [8]:
l_history = Join.apply(orgs,
                       Join.apply(persons, memberships, 'id', 'person_id'),
                       'org_id', 'organization_id').drop_fields(['person_id', 'org_id'])
print("Count: ", l_history.count())
l_history.printSchema()

Count:  10439
root
|-- role: string
|-- seats: int
|-- org_name: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- type: string
|-- sort_name: string
|-- area_id: string
|-- images: array
|    |-- element: struct
|    |    |-- url: string
|-- on_behalf_of_id: string
|-- other_names: array
|    |-- element: struct
|    |    |-- lang: string
|    |    |-- note: string
|    |    |-- name: string
|-- name: string
|-- birth_date: string
|-- organization_id: string
|-- gender: string
|-- classification: string
|-- death_date: string
|-- legislative_period_id: string
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- image: string
|-- given_name: string
|-- start_date: string
|-- family_name: string
|-- id: string
|-- contact_details: array
|    |-- element: struct
|    |    |-- type: string
|    |    |-- value: string
|-- end_date: string



In [9]:
glueContext.write_dynamic_frame.from_options(frame = l_history,
          connection_type = "s3",
          connection_options = {"path": "s3://< s3 bucket name >/legislators/output-dir/legislator_history"},
          format = "parquet")

<awsglue.dynamicframe.DynamicFrame at 0x7f5341affe10>

In [10]:
s_history = l_history.toDF().repartition(1)
s_history.write.parquet('s3://< s3 bucket name >/legislators/output-dir/legislator_single')

In [11]:
l_history.toDF().write.parquet('s3://< s3 bucket name >/legislators/output-dir/legislator_part',
                               partitionBy=['org_name'])

In [12]:
dfc = l_history.relationalize("hist_root", "s3://< s3 bucket name >/legislators/temp-dir/")
dfc.keys()

dict_keys(['hist_root', 'hist_root_links', 'hist_root_images', 'hist_root_identifiers', 'hist_root_other_names', 'hist_root_contact_details'])

In [13]:
l_history.select_fields('contact_details').printSchema()
dfc.select('hist_root_contact_details').toDF().where("id = 10 or id = 75").orderBy(['id','index']).show()

root
|-- contact_details: array
|    |-- element: struct
|    |    |-- type: string
|    |    |-- value: string

+---+-----+------------------------+-------------------------+
| id|index|contact_details.val.type|contact_details.val.value|
+---+-----+------------------------+-------------------------+
| 10|    0|                     fax|             202-228-3027|
| 10|    1|                   phone|             202-224-6542|
| 10|    2|                 twitter|               SenSchumer|
| 75|    0|                     fax|             202-224-6747|
| 75|    1|                   phone|             202-224-3934|
+---+-----+------------------------+-------------------------+



In [14]:
dfc.select('hist_root').toDF().where(
    "contact_details = 10 or contact_details = 75").select(
       ['id', 'given_name', 'family_name', 'contact_details']).show()

+--------------------+----------+-----------+---------------+
|                  id|given_name|family_name|contact_details|
+--------------------+----------+-----------+---------------+
|60ae8ebc-b581-44e...|   Charles|    Schumer|             10|
|0d69087e-f098-460...|    Daniel|     Inouye|             75|
+--------------------+----------+-----------+---------------+



In [15]:
for df_name in dfc.keys():
    print(df_name)
    each_df = dfc.select(df_name)
#    print(each_df.count())
    s3_path = "s3a://< s3 bucket name >/legislators/for-redshift/" + df_name + "/"
    glueContext.write_dynamic_frame.from_options(frame = each_df,
            connection_type = "s3",
            connection_options = {"path": s3_path},
            format = 'parquet')

hist_root
hist_root_links
hist_root_images
hist_root_identifiers
hist_root_other_names
hist_root_contact_details
