In [6]:
%glue_version 4.0
%number_of_workers 2
%worker_type G.4X
%idle_timeout 10
%profile intellibridge-demo

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Setting Glue version to: 4.0
Previous number of workers: None
Setting new number of workers to: 2
Previous worker type: None
Setting new worker type to: G.4X
Current idle_timeout is None minutes.
idle_timeout has been set to 10 minutes.
Previous profile: None
Setting new profile to: intellibridge-demo


In [1]:
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Trying to create a Glue session for the kernel.
Session Type: etl
Worker Type: G.4X
Number of Workers: 2
Session ID: 9bfc816f-5d54-4317-9cfc-38c2d497874f
Applying the following default arguments:
--glue_kernel_version 1.0.3
--enable-glue-datacatalog true
Waiting for session 9bfc816f-5d54-4317-9cfc-38c2d497874f to get into ready status...
Session 9bfc816f-5d54-4317-9cfc-38c2d497874f has been created.



In [2]:
%%sql
show tables in `covid-19`

+----------+--------------------+-----------+
| namespace|           tableName|isTemporary|
+----------+--------------------+-----------+
|`covid-19`|alleninstitute_co...|      false|
|`covid-19`|alleninstitute_me...|      false|
|`covid-19`|aspirevc_crowd_tr...|      false|
|`covid-19`|aspirevc_crowd_tr...|      false|
|`covid-19`|cdc_moderna_vacci...|      false|
|`covid-19`|cdc_pfizer_vaccin...|      false|
|`covid-19`|       country_codes|      false|
|`covid-19`|  county_populations|      false|
|`covid-19`|covid_knowledge_g...|      false|
|`covid-19`|covid_knowledge_g...|      false|
|`covid-19`|covid_knowledge_g...|      false|
|`covid-19`|covid_knowledge_g...|      false|
|`covid-19`|covid_knowledge_g...|      false|
|`covid-19`|covid_knowledge_g...|      false|
|`covid-19`|covid_testing_sta...|      false|
|`covid-19`|covid_testing_us_...|      false|
|`covid-19`|covid_testing_us_...|      false|
|`covid-19`|      covidcast_data|      false|
|`covid-19`|  covidcast_metadata| 

In [3]:
%%sql
select * from `covid-19`.county_populations sort by `population estimate 2018` desc limit 10

+--------------+-----+---------------+-----------+------------------------+
|            id|  id2|         county|      state|population estimate 2018|
+--------------+-----+---------------+-----------+------------------------+
|            Id|  Id2|         County|      State|    Population Estima...|
|0500000US01085| 1085|        Lowndes|    Alabama|                    9974|
|0500000US06057| 6057|         Nevada| California|                   99696|
|0500000US29189|29189|      St. Louis|   Missouri|                  996945|
|0500000US22021|22021|Caldwell Parish|  Louisiana|                    9960|
|0500000US06019| 6019|         Fresno| California|                  994400|
|0500000US28143|28143|         Tunica|Mississippi|                    9944|
|0500000US05051| 5051|        Garland|   Arkansas|                   99154|
|0500000US29079|29079|         Grundy|   Missouri|                    9914|
|0500000US27063|27063|        Jackson|  Minnesota|                    9911|
+-----------

In [4]:
# Create a DynamicFrame of county_populations and print it's schema
dyf = glueContext.create_dynamic_frame.from_catalog(
    database="covid-19", table_name="county_populations"
)
dyf.printSchema()

root
|-- id: string
|-- id2: string
|-- county: string
|-- state: string
|-- population estimate 2018: string


In [5]:
# Rename id2 to simple_id and convert to Int
# Remove spaces and rename population est. and convert to Long
mapped = dyf.apply_mapping(
    mappings=[
        ("id", "string", "id", "string"),
        ("id2", "string", "simple_id", "int"),
        ("county", "string", "county", "string"),
        ("state", "string", "state", "string"),
        ("population estimate 2018", "string", "population_est_2018", "long"),
    ]
)
mapped.printSchema()

root
|-- id: string
|-- simple_id: int
|-- county: string
|-- state: string
|-- population_est_2018: long


In [6]:
mapped_df = mapped.toDF()
mapped_df.show()

+--------------+---------+---------+-------+-------------------+
|            id|simple_id|   county|  state|population_est_2018|
+--------------+---------+---------+-------+-------------------+
|0500000US01001|     1001|  Autauga|Alabama|              55601|
|0500000US01003|     1003|  Baldwin|Alabama|             218022|
|0500000US01005|     1005|  Barbour|Alabama|              24881|
|0500000US01007|     1007|     Bibb|Alabama|              22400|
|0500000US01009|     1009|   Blount|Alabama|              57840|
|0500000US01011|     1011|  Bullock|Alabama|              10138|
|0500000US01013|     1013|   Butler|Alabama|              19680|
|0500000US01015|     1015|  Calhoun|Alabama|             114277|
|0500000US01017|     1017| Chambers|Alabama|              33615|
|0500000US01019|     1019| Cherokee|Alabama|              26032|
|0500000US01021|     1021|  Chilton|Alabama|              44153|
|0500000US01023|     1023|  Choctaw|Alabama|              12841|
|0500000US01025|     1025

In [7]:
spark.sql("create database if not exists demo")


# Set glueContext sink for writing new table
S3_BUCKET = "intellibridge-falcon-glue-demo-us-east-1"
s3output = glueContext.getSink(
    path=f"s3://{S3_BUCKET}/interactive-sessions-blog/populations/",
    connection_type="s3",
    updateBehavior="UPDATE_IN_DATABASE",
    partitionKeys=[],
    compression="snappy",
    enableUpdateCatalog=True,
    transformation_ctx="s3output",
)
s3output.setCatalogInfo(catalogDatabase="demo", catalogTableName="populations")
s3output.setFormat("glueparquet")
s3output.writeFrame(mapped)

# Write out ‘mapped’ to a table in Glue Catalog
s3output = glueContext.getSink(
    path=f"s3://{S3_BUCKET}/interactive-sessions-blog/populations/",
    connection_type="s3",
    updateBehavior="UPDATE_IN_DATABASE",
    partitionKeys=[],
    compression="snappy",
    enableUpdateCatalog=True,
    transformation_ctx="s3output",
)
s3output.setCatalogInfo(catalogDatabase="demo", catalogTableName="populations")
s3output.setFormat("glueparquet")
s3output.writeFrame(mapped)

<awsglue.dynamicframe.DynamicFrame object at 0x7f6a32d300d0>


In [8]:
%%sql
select * from demo.populations sort by `population_est_2018` desc limit 10

+--------------+---------+-----------+----------+-------------------+
|            id|simple_id|     county|     state|population_est_2018|
+--------------+---------+-----------+----------+-------------------+
|0500000US06037|     6037|Los Angeles|California|           10105518|
|0500000US17031|    17031|       Cook|  Illinois|            5180493|
|0500000US48201|    48201|     Harris|     Texas|            4698619|
|0500000US04013|     4013|   Maricopa|   Arizona|            4410824|
|0500000US06073|     6073|  San Diego|California|            3343364|
|0500000US06059|     6059|     Orange|California|            3185968|
|0500000US12086|    12086| Miami-Dade|   Florida|            2761581|
|0500000US48113|    48113|     Dallas|     Texas|            2637772|
|0500000US36047|    36047|      Kings|  New York|            2582830|
|0500000US06065|     6065|  Riverside|California|            2450758|
+--------------+---------+-----------+----------+-------------------+
