# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [2]:
%help

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.5 



# Available Magic Commands

## Sessions Magic

----
    %help                             Return a list of descriptions and input types for all magic commands. 
    %profile            String        Specify a profile in your aws configuration to use as the credentials provider.
    %region             String        Specify the AWS region in which to initialize a session. 
                                      Default from ~/.aws/config on Linux or macOS, 
                                      or C:\Users\ USERNAME \.aws\config" on Windows.
    %idle_timeout       Int           The number of minutes of inactivity after which a session will timeout. 
                                      Default: 2880 minutes (48 hours).
    %timeout            Int           The number of minutes after which a session will timeout. 
                                      Default: 2880 minutes (48 hours).
    %session_id_prefix  String        Define a String that will precede all session IDs in the format 
                                      [session_id_prefix]-[session_id]. If a session ID is not provided,
                                      a random UUID will be generated.
    %status                           Returns the status of the current Glue session including its duration, 
                                      configuration and executing user / role.
    %session_id                       Returns the session ID for the running session.
    %list_sessions                    Lists all currently running sessions by ID.
    %stop_session                     Stops the current session.
    %glue_version       String        The version of Glue to be used by this session. 
                                      Currently, the only valid options are 2.0, 3.0 and 4.0. 
                                      Default: 2.0.
    %reconnect          String        Specify a live session ID to switch/reconnect to the sessions.
----

## Selecting Session Types

----
    %streaming          String        Sets the session type to Glue Streaming.
    %etl                String        Sets the session type to Glue ETL.
    %glue_ray           String        Sets the session type to Glue Ray.
    %session_type       String        Specify a session_type to be used. Supported values: streaming, etl and glue_ray. 
----

## Glue Config Magic 
*(common across all session types)*

----

    %%configure         Dictionary    A json-formatted dictionary consisting of all configuration parameters for 
                                      a session. Each parameter can be specified here or through individual magics.
    %iam_role           String        Specify an IAM role ARN to execute your session with.
                                      Default from ~/.aws/config on Linux or macOS, 
                                      or C:\Users\%USERNAME%\.aws\config` on Windows.
    %number_of_workers  int           The number of workers of a defined worker_type that are allocated 
                                      when a session runs.
                                      Default: 5.
    %additional_python_modules  List  Comma separated list of additional Python modules to include in your cluster 
                                      (can be from Pypi or S3).
    %%tags        Dictionary          Specify a json-formatted dictionary consisting of tags to use in the session.
    
    %%assume_role Dictionary, String  Specify a json-formatted dictionary or an IAM role ARN string to create a session 
                                      for cross account access.
                                      E.g. {valid arn}
                                      %%assume_role 
                                      'arn:aws:iam::XXXXXXXXXXXX:role/AWSGlueServiceRole' 
                                      E.g. {credentials}
                                      %%assume_role
                                      {
                                            "aws_access_key_id" : "XXXXXXXXXXXX",
                                            "aws_secret_access_key" : "XXXXXXXXXXXX",
                                            "aws_session_token" : "XXXXXXXXXXXX"
                                       }
----

                                      
## Magic for Spark Sessions (ETL & Streaming)

----
    %worker_type        String        Set the type of instances the session will use as workers. 
    %connections        List          Specify a comma separated list of connections to use in the session.
    %extra_py_files     List          Comma separated list of additional Python files From S3.
    %extra_jars         List          Comma separated list of additional Jars to include in the cluster.
    %spark_conf         String        Specify custom spark configurations for your session. 
                                      E.g. %spark_conf spark.serializer=org.apache.spark.serializer.KryoSerializer
----
                                      
## Magic for Ray Session

----
    %min_workers        Int           The minimum number of workers that are allocated to a Ray session. 
                                      Default: 1.
    %object_memory_head Int           The percentage of free memory on the instance head node after a warm start. 
                                      Minimum: 0. Maximum: 100.
    %object_memory_worker Int         The percentage of free memory on the instance worker nodes after a warm start. 
                                      Minimum: 0. Maximum: 100.
----

## Action Magic

----

    %%sql               String        Run SQL code. All lines after the initial %%sql magic will be passed
                                      as part of the SQL code.  
    %matplot      Matplotlib figure   Visualize your data using the matplotlib library.
                                      E.g. 
                                      import matplotlib.pyplot as plt
                                      # Set X-axis and Y-axis values
                                      x = [5, 2, 8, 4, 9]
                                      y = [10, 4, 8, 5, 2]
                                      # Create a bar chart 
                                      plt.bar(x, y) 
                                      # Show the plot
                                      %matplot plt    
    %plotly            Plotly figure  Visualize your data using the plotly library.
                                      E.g.
                                      import plotly.express as px
                                      #Create a graphical figure
                                      fig = px.line(x=["a","b","c"], y=[1,3,2], title="sample figure")
                                      #Show the figure
                                      %plotly fig

  
                
----



####  Run this cell to set up and start your interactive session.


In [1]:
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.5 
Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 4.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 5
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Idle Timeout: 2880
Session ID: f26fbff3-3ff9-43d7-b5d8-c14a59510795
Applying the following default arguments:
--glue_kernel_version 1.0.5
--enable-glue-datacatalog true
Waiting for session f26fbff3-3ff9-43d7-b5d8-c14a59510795 to get into ready status...
Session f26fbff3-3ff9-43d7-b5d8-c14a59510795 ha

#### Example: Create a DynamicFrame from a table in the AWS Glue Data Catalog and display its schema


In [8]:
dyf = glueContext.create_dynamic_frame.from_catalog(database='database_name', table_name='table_name')
dyf.printSchema()

Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Idle Timeout: 2880
Session ID: 9f9b6664-ee98-44c2-8394-ad27610164c2
Applying the following default arguments:
--glue_kernel_version 1.0.5
--enable-glue-datacatalog true


Following exception encountered while creating session: An error occurred (AlreadyExistsException) when calling the CreateSession operation: Session already created, sessionId=9f9b6664-ee98-44c2-8394-ad27610164c2 

Error message: Session already created, sessionId=9f9b6664-ee98-44c2-8394-ad27610164c2 

Traceback (most recent call last):
  File "/home/jupyter-user/.local/lib/python3.9/site-packages/aws_glue_interactive_sessions_kernel/glue_kernel_utils/KernelGateway.py", line 100, in create_session
    response = self.glue_client.create_session(
  File "/home/jupyter-user/.local/lib/python3.9/site-packages/botocore/client.py", line 553, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/home/jupyter-user/.local/lib/python3.9/site-packages/botocore/client.py", line 1009, in _make_api_call
    raise error_class(parsed_response, operation_name)
botocore.errorfactory.AlreadyExistsException: An error occurred (AlreadyExistsException) when calling the CreateSession o

In [8]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_extract

# Initialize a Spark Session
spark = SparkSession.builder.appName('openaq_processing').getOrCreate()

# Read from S3 bucket
data_location = "s3://myawasbuckt-airquality/openaq/air_quality_data.json"
df = spark.read.json(data_location)

# Drop the original 'city' column and rename 'location' to 'city'
df = df.drop("city")
df = df.withColumnRenamed("location", "city")

# Filter out cities with names that seem coded (like MK0036A)
# This will ensure that if there's a sequence of a letter followed by a number anywhere in the city name, it's excluded.
df = df.filter(~col("city").rlike("[A-Z]+[0-9]+"))

# Show the schema and sample data
df.printSchema()
df.show(5, truncate=False)

root
 |-- coordinates: struct (nullable = true)
 |    |-- latitude: double (nullable = true)
 |    |-- longitude: double (nullable = true)
 |-- country: string (nullable = true)
 |-- date: struct (nullable = true)
 |    |-- local: string (nullable = true)
 |    |-- utc: string (nullable = true)
 |-- entity: string (nullable = true)
 |-- isAnalysis: string (nullable = true)
 |-- isMobile: boolean (nullable = true)
 |-- city: string (nullable = true)
 |-- locationId: long (nullable = true)
 |-- parameter: string (nullable = true)
 |-- sensorType: string (nullable = true)
 |-- unit: string (nullable = true)
 |-- value: double (nullable = true)

+--------------------------------------+-------+------------------------------------------------------+-------------------------+----------+--------+-------------------+----------+---------+---------------+-----+-----+
|coordinates                           |country|date                                                  |entity                   |is

In [13]:
# Transformation: Adding a new column "quality_label" based on 'value'
def quality_label(value):
    value = row['value']
    parameter = row['parameter']
    
    if parameter == 'nox':
        if value < 40:
            return 'Good'
        elif value < 80:
            return 'Moderate'
        else:
            return 'Unhealthy'
    elif parameter == 'pm25':
        if value < 12:
            return 'Good'
        elif value < 35.4:
            return 'Moderate'
        else:
            return 'Unhealthy'
    elif parameter == 'pm10':
        if value < 50:
            return 'Good'
        elif value < 150:
            return 'Moderate'
        else:
            return 'Unhealthy'
    elif parameter == 'no2':
        if value < 40:
            return 'Good'
        elif value < 80:
            return 'Moderate'
        else:
            return 'Unhealthy'
    elif parameter == 'so2':
        if value < 40:
            return 'Good'
        elif value < 80:
            return 'Moderate'
        else:
            return 'Unhealthy'
    elif parameter == 'o3':
        if value < 50:
            return 'Good'
        elif value < 100:
            return 'Moderate'
        else:
            return 'Unhealthy'
    elif parameter == 'no':
        if value < 40:
            return 'Good'
        elif value < 100:
            return 'Moderate'
        else:
            return 'Unhealthy'
    elif parameter == 'co':
        if value < 5:
            return 'Good'
        elif value < 10:
            return 'Moderate'
        else:
            return 'Unhealthy'

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

quality_label_udf = udf(quality_label, StringType())

# More data exploration: Count of data points per country
country_counts = df.groupBy("country").count().orderBy(col("count").desc())
country_counts.show()

# More data exploration: Average value per city
city_avg = df.groupBy("city").agg({"value": "avg"}).orderBy(col("avg(value)").desc())
city_avg.show()

# At the end of the exploration and transformation, you can choose to write the data back to S3 if needed.
# For demonstration purposes, I've commented out the write step below. You can uncomment it to save the data.
output_location = "s3://myawasbuckt-airquality/openaq/processed_data/"

+-------+-----+
|country|count|
+-------+-----+
|     GB| 4781|
|     NL| 2722|
|     CL| 1029|
|     US|  345|
|     BE|  171|
+-------+-----+

+--------------------+------------------+
|                city|        avg(value)|
+--------------------+------------------+
|           Talagante| 93.38121546961327|
|              Curicó| 68.82222222222222|
|            Punteras|44.899441340782126|
|    Ealing Horn Lane|31.385792349726778|
|London Haringey P...|29.820508105849584|
|   London Hillingdon| 27.08360950276243|
|           Cauquenes| 19.94743016759777|
|London Marylebone...| 19.13181081081081|
|   London Bloomsbury|18.189461848484846|
|London N. Kensington|17.498828573407202|
|   Haringey Roadside|17.214494535519123|
|    Zaanstad-Hemkade| 16.84562500000001|
|      Coya Población|15.467584269662916|
|   London Harlington|14.852347256410257|
|Hoek v. Holland-B...|14.196249999999997|
|Tower Hamlets Roa...|14.026051912568306|
|     Camden Kerbside|12.731212049335863|
|        Coyhai