In [22]:
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import col, trim, to_date, when
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

You are already connected to a glueetl session 1a0e7969-58cc-49b8-abfa-178a38ca5cfe.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Current idle_timeout is 2880 minutes.
idle_timeout has been set to 2880 minutes.


You are already connected to a glueetl session 1a0e7969-58cc-49b8-abfa-178a38ca5cfe.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Setting Glue version to: 4.0


You are already connected to a glueetl session 1a0e7969-58cc-49b8-abfa-178a38ca5cfe.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous worker type: G.1X
Setting new worker type to: G.1X


You are already connected to a glueetl session 1a0e7969-58cc-49b8-abfa-178a38ca5cfe.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous number of workers: 5
Setting new number of workers to: 5



In [23]:
# Loading the dataset into a DynamicFrame
dynamicFrameEnigma = glueContext.create_dynamic_frame.from_catalog(database="coviddatabase", table_name="enigma-jhu")

# Since, the above python code will show col0, col1,...as first row. so by mapping the column names to new ones
mapped_dynamic_frame = dynamicFrameEngima.apply_mapping([
    ('col0', 'string', 'fips', 'string'),
    ('col1', 'string', 'admin2', 'string'),
    ('col2', 'string', 'province_state', 'string'),
    ('col3', 'string', 'country_region', 'string'),
    ('col4', 'string', 'last_update', 'string'),
    ('col5', 'string', 'latitude', 'string'),
    ('col6', 'string', 'longitude', 'string'),
    ('col7', 'string', 'confirmed', 'string'),
    ('col8', 'string', 'deaths', 'string'),
    ('col9', 'string', 'recovered', 'string'),
    ('col10', 'string', 'active', 'string'),
    ('col11', 'string', 'combined_key', 'string')
])

# Converting the DynamicFrame to an iterable of records
records = mapped_dynamic_frame.toDF().collect()

# Shifting the rows (make the second row the first row)
header = records.pop(0)

# Creating a new DynamicFrame from the remaining records
newdynamicFrameEnigma = DynamicFrame.fromDF(spark.createDataFrame(records, schema=header), glueContext, "newdynamicFrameEnigma")

# Showing Dynamic Frame to Spark DataFrame
sparkDf = newdynamicFrameEnigma.toDF()
# Showing spark DF
sparkDf.show(10)

+----+------+--------------+--------------+-------------------+--------+---------+---------+------+---------+------+----------------+
|fips|admin2|province_state|country_region|        last_update|latitude|longitude|confirmed|deaths|recovered|active|    combined_key|
+----+------+--------------+--------------+-------------------+--------+---------+---------+------+---------+------+----------------+
|    |      |         Anhui|         China|2020-01-22T17:00:00|  31.826|  117.226|        1|      |         |      |    Anhui, China|
|    |      |       Beijing|         China|2020-01-22T17:00:00|  40.182|  116.414|       14|      |         |      |  Beijing, China|
|    |      |     Chongqing|         China|2020-01-22T17:00:00|  30.057|  107.874|        6|      |         |      |Chongqing, China|
|    |      |        Fujian|         China|2020-01-22T17:00:00|  26.079|  117.987|        1|      |         |      |   Fujian, China|
|    |      |         Gansu|         China|2020-01-22T17:00:00

In [24]:
# Data Cleaning 
# Handling Missing and Null Values
# Replacing empty strings with None to handle them as null values
sparkDf = sparkDf.select([when(col(c) == "", None).otherwise(col(c)).alias(c) for c in sparkDf.columns])

# Replacing missing and null values with default values
fill_values = {
    "fips": "Unknown",
    "admin2": "Unknown",
    "province_state": "Unknown",
    "country_region": "Unknown",
    "latitude": 0.0,
    "longitude": 0.0,
    "confirmed": 0,
    "deaths": 0,
    "recovered": 0,
    "active": 0,
    "combined_key": "Unknown"
}

# Applying fill values for nulls
sparkDf = sparkDf.na.fill(fill_values)

# Show the DataFrame to inspect the first 10 rows after filling values
sparkDf.show(10)


+-------+-------+--------------+--------------+-------------------+--------+---------+---------+------+---------+------+----------------+
|   fips| admin2|province_state|country_region|        last_update|latitude|longitude|confirmed|deaths|recovered|active|    combined_key|
+-------+-------+--------------+--------------+-------------------+--------+---------+---------+------+---------+------+----------------+
|Unknown|Unknown|         Anhui|         China|2020-01-22T17:00:00|  31.826|  117.226|        1|     0|        0|     0|    Anhui, China|
|Unknown|Unknown|       Beijing|         China|2020-01-22T17:00:00|  40.182|  116.414|       14|     0|        0|     0|  Beijing, China|
|Unknown|Unknown|     Chongqing|         China|2020-01-22T17:00:00|  30.057|  107.874|        6|     0|        0|     0|Chongqing, China|
|Unknown|Unknown|        Fujian|         China|2020-01-22T17:00:00|  26.079|  117.987|        1|     0|        0|     0|   Fujian, China|
|Unknown|Unknown|         Gansu|  

In [25]:
# Converting back to DynamicFrame for writing to S3
cleaned_dynamic_frame = DynamicFrame.fromDF(sparkDf, glueContext, "cleaned_dynamic_frame")

# Write the cleaned data back to S3
output_path = "s3://etlemrbucket/cleandata/"
glueContext.write_dynamic_frame.from_options(
    frame=cleaned_dynamic_frame,
    connection_type="s3",
    connection_options={"path": output_path},
    format="csv"
)

<awsglue.dynamicframe.DynamicFrame object at 0x7f83b6ddaaa0>


In [26]:
# Loading the dataset into a DynamicFrame
dynamicFrameUSstates = glueContext.create_dynamic_frame.from_catalog(database="coviddatabase", table_name="us_states")
# Map the column names to new ones
mapped_dynamic_frame = dynamicFrameUSstates.apply_mapping([
    ('col0', 'string', 'date', 'string'),
    ('col1', 'string', 'state', 'string'),
    ('col2', 'string', 'fips', 'string'),
    ('col3', 'string', 'cases', 'string'),
    ('col4', 'string', 'deaths', 'string')
])
# Converting the DynamicFrame to an iterable of records
records = mapped_dynamic_frame.toDF().collect()

# Shifting the rows (make the second row the first row)
header = records.pop(0)

# Creating a new DynamicFrame from the remaining records
newdynamicFrameUSstates = DynamicFrame.fromDF(spark.createDataFrame(records, schema=header), glueContext, "newdynamicFrameUSstates")

# Showing Dynamic Frame to Spark DataFrame
sparkDf = newdynamicFrameUSstates.toDF()
# Showing spark DF
sparkDf.show(10)

+----------+----------+----+-----+------+
|      date|     state|fips|cases|deaths|
+----------+----------+----+-----+------+
|2020-01-21|Washington|  53|    1|     0|
|2020-01-22|Washington|  53|    1|     0|
|2020-01-23|Washington|  53|    1|     0|
|2020-01-24|  Illinois|  17|    1|     0|
|2020-01-24|Washington|  53|    1|     0|
|2020-01-25|California|  06|    1|     0|
|2020-01-25|  Illinois|  17|    1|     0|
|2020-01-25|Washington|  53|    1|     0|
|2020-01-26|   Arizona|  04|    1|     0|
|2020-01-26|California|  06|    2|     0|
+----------+----------+----+-----+------+
only showing top 10 rows


In [27]:
# Handling Missing and Null Values
# Replacing empty strings with None to handle them as null values
sparkDf = sparkDf.select([when(col(c) == "", None).otherwise(col(c)).alias(c) for c in sparkDf.columns])

# Replacing missing and null values with default values
fill_values = {
    "date": "1970-01-01",
    "state": "Unknown",
    "fips": "000",
    "cases": 0,
    "deaths": 0
}

# Applying fill values for nulls
sparkDf = sparkDf.na.fill(fill_values)
# Showing the DataFrame to inspect the first 10 rows after filling values
sparkDf.show(10)

+----------+----------+----+-----+------+
|      date|     state|fips|cases|deaths|
+----------+----------+----+-----+------+
|2020-01-21|Washington|  53|    1|     0|
|2020-01-22|Washington|  53|    1|     0|
|2020-01-23|Washington|  53|    1|     0|
|2020-01-24|  Illinois|  17|    1|     0|
|2020-01-24|Washington|  53|    1|     0|
|2020-01-25|California|  06|    1|     0|
|2020-01-25|  Illinois|  17|    1|     0|
|2020-01-25|Washington|  53|    1|     0|
|2020-01-26|   Arizona|  04|    1|     0|
|2020-01-26|California|  06|    2|     0|
+----------+----------+----+-----+------+
only showing top 10 rows


In [28]:
# Converting back to DynamicFrame for writing to S3
cleaned_dynamic_frame = DynamicFrame.fromDF(sparkDf, glueContext, "cleaned_dynamic_frame")

# Write the cleaned data back to S3
output_path = "s3://etlemrbucket/cleandata/"
glueContext.write_dynamic_frame.from_options(
    frame=cleaned_dynamic_frame,
    connection_type="s3",
    connection_options={"path": output_path},
    format="csv"
)

<awsglue.dynamicframe.DynamicFrame object at 0x7f83d62e13c0>


In [31]:
# Loading the dataset into a DynamicFrame
dynamicFrameUScounties = glueContext.create_dynamic_frame.from_catalog(database="coviddatabase", table_name="us_counties")
# Map the column names to new ones
mapped_dynamic_frame = dynamicFrameUScounties.apply_mapping([
    ('col0', 'string', 'date', 'string'),
    ('col1', 'string', 'county', 'string'),
    ('col2', 'string', 'state', 'string'),
    ('col3', 'string', 'fips', 'string'),
    ('col4', 'string', 'cases', 'string'),
    ('col5', 'string', 'deaths', 'string')
])
# Converting the DynamicFrame to an iterable of records
records = mapped_dynamic_frame.toDF().collect()

# Shifting the rows (make the second row the first row)
header = records.pop(0)

# Creating a new DynamicFrame from the remaining records
newdynamicFrameUScounties = DynamicFrame.fromDF(spark.createDataFrame(records, schema=header), glueContext, "newdynamicFrameUScounties")

# Showing Dynamic Frame to Spark DataFrame
sparkDf = newdynamicFrameUScounties.toDF()
# Showing spark DF
sparkDf.show(10)

+----------+-----------+----------+-----+-----+------+
|      date|     county|     state| fips|cases|deaths|
+----------+-----------+----------+-----+-----+------+
|2020-01-21|  Snohomish|Washington|53061|    1|     0|
|2020-01-22|  Snohomish|Washington|53061|    1|     0|
|2020-01-23|  Snohomish|Washington|53061|    1|     0|
|2020-01-24|       Cook|  Illinois|17031|    1|     0|
|2020-01-24|  Snohomish|Washington|53061|    1|     0|
|2020-01-25|     Orange|California|06059|    1|     0|
|2020-01-25|       Cook|  Illinois|17031|    1|     0|
|2020-01-25|  Snohomish|Washington|53061|    1|     0|
|2020-01-26|   Maricopa|   Arizona|04013|    1|     0|
|2020-01-26|Los Angeles|California|06037|    1|     0|
+----------+-----------+----------+-----+-----+------+
only showing top 10 rows


In [35]:
# Handle Missing and Null Values
# Replace empty strings with None to handle them as null values
sparkDf = sparkDf.select([when(col(c) == "", None).otherwise(col(c)).alias(c) for c in sparkDf.columns])

# Define fill values for nulls
fill_values = {
    "date": "1970-01-01",
    "county": "Unknown",
    "state": "Unknown",
    "fips": "000",
    "cases": 0,
    "deaths": 0
}

# Apply fill values for nulls
sparkDf = sparkDf.na.fill(fill_values)
# Showing the DataFrame to inspect the first 10 rows after filling values
sparkDf.show(10)

+----------+-----------+----------+-----+-----+------+
|      date|     county|     state| fips|cases|deaths|
+----------+-----------+----------+-----+-----+------+
|2020-01-21|  Snohomish|Washington|53061|    1|     0|
|2020-01-22|  Snohomish|Washington|53061|    1|     0|
|2020-01-23|  Snohomish|Washington|53061|    1|     0|
|2020-01-24|       Cook|  Illinois|17031|    1|     0|
|2020-01-24|  Snohomish|Washington|53061|    1|     0|
|2020-01-25|     Orange|California|06059|    1|     0|
|2020-01-25|       Cook|  Illinois|17031|    1|     0|
|2020-01-25|  Snohomish|Washington|53061|    1|     0|
|2020-01-26|   Maricopa|   Arizona|04013|    1|     0|
|2020-01-26|Los Angeles|California|06037|    1|     0|
+----------+-----------+----------+-----+-----+------+
only showing top 10 rows


In [36]:
# Converting back to DynamicFrame for writing to S3
cleaned_dynamic_frame = DynamicFrame.fromDF(sparkDf, glueContext, "cleaned_dynamic_frame")

# Write the cleaned data back to S3
output_path = "s3://etlemrbucket/cleandata/"
glueContext.write_dynamic_frame.from_options(
    frame=cleaned_dynamic_frame,
    connection_type="s3",
    connection_options={"path": output_path},
    format="csv"
)

<awsglue.dynamicframe.DynamicFrame object at 0x7f83c27c6b00>


In [37]:
# Loading the dataset into a DynamicFrame
dynamicFrameUSdaily = glueContext.create_dynamic_frame.from_catalog(database="coviddatabase", table_name="us_daily")

# Since, the above python code will show col0, col1,...as first row. so by mapping the column names to new ones
# Map the column names to new ones
mapped_dynamic_frame = dynamicFrameUSdaily.apply_mapping([
    ('col0', 'string', 'date', 'string'),
    ('col1', 'string', 'states', 'string'),
    ('col2', 'string', 'positive', 'string'),
    ('col3', 'string', 'negative', 'string'),
    ('col4', 'string', 'pending', 'string'),
    ('col5', 'string', 'hospitalizedCurrently', 'string'),
    ('col6', 'string', 'hospitalizedCumulative', 'string'),
    ('col7', 'string', 'inIcuCurrently', 'string'),
    ('col8', 'string', 'inIcuCumulative', 'string'),
    ('col9', 'string', 'onVentilatorCurrently', 'string'),
    ('col10', 'string', 'onVentilatorCumulative', 'string'),
    ('col11', 'string', 'dateChecked', 'string'),
    ('col12', 'string', 'death', 'string'),
    ('col13', 'string', 'hospitalized', 'string'),
    ('col14', 'string', 'totalTestResults', 'string'),
    ('col15', 'string', 'lastModified', 'string'),
    ('col16', 'string', 'recovered', 'string'),
    ('col17', 'string', 'total', 'string'),
    ('col18', 'string', 'posNeg', 'string'),
    ('col19', 'string', 'deathIncrease', 'string'),
    ('col20', 'string', 'hospitalizedIncrease', 'string'),
    ('col21', 'string', 'negativeIncrease', 'string'),
    ('col22', 'string', 'positiveIncrease', 'string'),
    ('col23', 'string', 'totalTestResultsIncrease', 'string'),
    ('col24', 'string', 'hash', 'string')
])

# Converting the DynamicFrame to an iterable of records
records = mapped_dynamic_frame.toDF().collect()

# Shifting the rows (make the second row the first row)
header = records.pop(0)

# Creating a new DynamicFrame from the remaining records
newdynamicFrameUSdaily = DynamicFrame.fromDF(spark.createDataFrame(records, schema=header), glueContext, "newdynamicFrameUSdaily")

# Showing Dynamic Frame to Spark DataFrame
sparkDf = newdynamicFrameUSdaily.toDF()
# Showing spark DF
sparkDf.show(10)

+--------+------+--------+--------+-------+---------------------+----------------------+--------------+---------------+---------------------+----------------------+--------------------+------+------------+----------------+--------------------+---------+-----+------+-------------+--------------------+----------------+----------------+------------------------+--------------------+
|    date|states|positive|negative|pending|hospitalizedCurrently|hospitalizedCumulative|inIcuCurrently|inIcuCumulative|onVentilatorCurrently|onVentilatorCumulative|         dateChecked| death|hospitalized|totalTestResults|        lastModified|recovered|total|posNeg|deathIncrease|hospitalizedIncrease|negativeIncrease|positiveIncrease|totalTestResultsIncrease|                hash|
+--------+------+--------+--------+-------+---------------------+----------------------+--------------+---------------+---------------------+----------------------+--------------------+------+------------+----------------+--------------

In [38]:
# Define fill values for nulls
fill_values = {
    "date": "1970-01-01",
    "states": "Unknown",
    "positive": 0,
    "negative": 0,
    "pending": 0,
    "hospitalizedCurrently": 0,
    "hospitalizedCumulative": 0,
    "inIcuCurrently": 0,
    "inIcuCumulative": 0,
    "onVentilatorCurrently": 0,
    "onVentilatorCumulative": 0,
    "dateChecked": "1970-01-01",
    "death": 0,
    "hospitalized": 0,
    "totalTestResults": 0,
    "lastModified": "1970-01-01",
    "recovered": 0,
    "total": 0,
    "posNeg": 0,
    "deathIncrease": 0,
    "hospitalizedIncrease": 0,
    "negativeIncrease": 0,
    "positiveIncrease": 0,
    "totalTestResultsIncrease": 0,
    "hash": "Unknown"
}

# Apply fill values for nulls
sparkDf = sparkDf.na.fill(fill_values)
# Showing the DataFrame to inspect the first 10 rows after filling values
sparkDf.show(10)

+--------+------+--------+--------+-------+---------------------+----------------------+--------------+---------------+---------------------+----------------------+--------------------+------+------------+----------------+--------------------+---------+-----+------+-------------+--------------------+----------------+----------------+------------------------+--------------------+
|    date|states|positive|negative|pending|hospitalizedCurrently|hospitalizedCumulative|inIcuCurrently|inIcuCumulative|onVentilatorCurrently|onVentilatorCumulative|         dateChecked| death|hospitalized|totalTestResults|        lastModified|recovered|total|posNeg|deathIncrease|hospitalizedIncrease|negativeIncrease|positiveIncrease|totalTestResultsIncrease|                hash|
+--------+------+--------+--------+-------+---------------------+----------------------+--------------+---------------+---------------------+----------------------+--------------------+------+------------+----------------+--------------

In [39]:
# Converting back to DynamicFrame for writing to S3
cleaned_dynamic_frame = DynamicFrame.fromDF(sparkDf, glueContext, "cleaned_dynamic_frame")

# Write the cleaned data back to S3
output_path = "s3://etlemrbucket/cleandata/"
glueContext.write_dynamic_frame.from_options(
    frame=cleaned_dynamic_frame,
    connection_type="s3",
    connection_options={"path": output_path},
    format="csv"
)

<awsglue.dynamicframe.DynamicFrame object at 0x7f83d62e2f80>
