In [15]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job


sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)




In [8]:
%status

Session ID: 38143288-2129-40fe-bc8f-ff9b7f505028
Status: READY
Role: arn:aws:iam::473718321037:role/service-role/AWSGlueServiceRole
CreatedOn: 2024-04-28 13:33:52.799000+01:00
GlueVersion: 4.0
Session Type: glueetl
Idle Timeout: 2880
Tags: {'owner': '473718321037'}
Worker Type: G.1X
Number of Workers: 5
Region: eu-west-1
Applying the following default arguments:
--glue_kernel_version 1.0.5
--enable-glue-datacatalog true
Arguments Passed: ['--glue_kernel_version: 1.0.5', '--enable-glue-datacatalog: true']


In [9]:
%%sql
SHOW DATABASES;

+---------+
|namespace|
+---------+
|  default|
+---------+


In [17]:
%%sql
show tables in `covid-19`

+----------+--------------------+-----------+
| namespace|           tableName|isTemporary|
+----------+--------------------+-----------+
|`covid-19`|alleninstitute_co...|      false|
|`covid-19`|alleninstitute_me...|      false|
|`covid-19`|aspirevc_crowd_tr...|      false|
|`covid-19`|aspirevc_crowd_tr...|      false|
|`covid-19`|cdc_moderna_vacci...|      false|
|`covid-19`|cdc_pfizer_vaccin...|      false|
|`covid-19`|       country_codes|      false|
|`covid-19`|  county_populations|      false|
|`covid-19`|covid_knowledge_g...|      false|
|`covid-19`|covid_knowledge_g...|      false|
|`covid-19`|covid_knowledge_g...|      false|
|`covid-19`|covid_knowledge_g...|      false|
|`covid-19`|covid_knowledge_g...|      false|
|`covid-19`|covid_knowledge_g...|      false|
|`covid-19`|covid_testing_sta...|      false|
|`covid-19`|covid_testing_us_...|      false|
|`covid-19`|covid_testing_us_...|      false|
|`covid-19`|      covidcast_data|      false|
|`covid-19`|  covidcast_metadata| 

In [None]:
%%sql
select * from `covid-19`.county_populations sort by `population estimate 2018` desc limit 10

In [None]:
# Create a DynamicFrame of county_populations and print it's schema
dyf = glueContext.create_dynamic_frame.from_catalog(database="covid-19", table_name="county_populations")
dyf.printSchema()

In [None]:
# Rename id2 to simple_id and convert to Int
# Remove spaces and rename population est. and convert to Long
mapped = dyf.apply_mapping(
    mappings=[
        ("id", "string", "id", "string"),
        ("id2", "string", "simple_id", "int"),
        ("county", "string", "county", "string"),
        ("state", "string", "state", "string"),
        ("population estimate 2018", "string", "population_est_2018", "long"),
    ]
)
mapped.printSchema()

In [None]:
mapped_df = mapped.toDF()
mapped_df.show()

In [None]:
# Create "demo" Database if none exists
spark.sql("create database if not exists demo")


# Set glueContext sink for writing new table
S3_BUCKET = "kenan-hancer-glue-test1"
s3output = glueContext.getSink(
    path=f"s3://{S3_BUCKET}/interactive-sessions-blog/populations/",
    connection_type="s3",
    updateBehavior="UPDATE_IN_DATABASE",
    partitionKeys=[],
    compression="snappy",
    enableUpdateCatalog=True,
    transformation_ctx="s3output",
)
s3output.setCatalogInfo(catalogDatabase="demo", catalogTableName="populations")
s3output.setFormat("glueparquet")
s3output.writeFrame(mapped)