# AWS Glue Studio Notebook
##### 
"""
@Author: Jayesh Patil
@Date: 2024-11-25
@Last Modified by: Jayesh Patil
@Last Modified time: 2024-11-25
@Title: Glue Pipeline using Glue notebook
"""


#### Optional: Run this cell to see available notebook commands ("magics").


In [2]:
%help

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.7 



# Available Magic Commands

## Sessions Magic

----
    %help                             Return a list of descriptions and input types for all magic commands. 
    %profile            String        Specify a profile in your aws configuration to use as the credentials provider.
    %region             String        Specify the AWS region in which to initialize a session. 
                                      Default from ~/.aws/config on Linux or macOS, 
                                      or C:\Users\ USERNAME \.aws\config" on Windows.
    %idle_timeout       Int           The number of minutes of inactivity after which a session will timeout. 
                                      Default: 2880 minutes (48 hours).
    %timeout            Int           The number of minutes after which a session will timeout. 
                                      Default: 2880 minutes (48 hours).
    %session_id_prefix  String        Define a String that will precede all session IDs in the format 
                                      [session_id_prefix]-[session_id]. If a session ID is not provided,
                                      a random UUID will be generated.
    %status                           Returns the status of the current Glue session including its duration, 
                                      configuration and executing user / role.
    %session_id                       Returns the session ID for the running session.
    %list_sessions                    Lists all currently running sessions by ID.
    %stop_session                     Stops the current session.
    %glue_version       String        The version of Glue to be used by this session. 
                                      Currently, the only valid options are 2.0, 3.0 and 4.0. 
                                      Default: 2.0.
    %reconnect          String        Specify a live session ID to switch/reconnect to the sessions.
----

## Selecting Session Types

----
    %streaming          String        Sets the session type to Glue Streaming.
    %etl                String        Sets the session type to Glue ETL.
    %session_type       String        Specify a session_type to be used. Supported values: streaming and etl.
----

## Glue Config Magic 
*(common across all session types)*

----

    %%configure         Dictionary    A json-formatted dictionary consisting of all configuration parameters for 
                                      a session. Each parameter can be specified here or through individual magics.
    %iam_role           String        Specify an IAM role ARN to execute your session with.
                                      Default from ~/.aws/config on Linux or macOS, 
                                      or C:\Users\%USERNAME%\.aws\config` on Windows.
    %number_of_workers  int           The number of workers of a defined worker_type that are allocated 
                                      when a session runs.
                                      Default: 5.
    %additional_python_modules  List  Comma separated list of additional Python modules to include in your cluster 
                                      (can be from Pypi or S3).
    %%tags        Dictionary          Specify a json-formatted dictionary consisting of tags to use in the session.
    
    %%assume_role Dictionary, String  Specify a json-formatted dictionary or an IAM role ARN string to create a session 
                                      for cross account access.
                                      E.g. {valid arn}
                                      %%assume_role 
                                      'arn:aws:iam::XXXXXXXXXXXX:role/AWSGlueServiceRole' 
                                      E.g. {credentials}
                                      %%assume_role
                                      {
                                            "aws_access_key_id" : "XXXXXXXXXXXX",
                                            "aws_secret_access_key" : "XXXXXXXXXXXX",
                                            "aws_session_token" : "XXXXXXXXXXXX"
                                       }
----

                                      
## Magic for Spark Sessions (ETL & Streaming)

----
    %worker_type        String        Set the type of instances the session will use as workers. 
    %connections        List          Specify a comma separated list of connections to use in the session.
    %extra_py_files     List          Comma separated list of additional Python files From S3.
    %extra_jars         List          Comma separated list of additional Jars to include in the cluster.
    %spark_conf         String        Specify custom spark configurations for your session. 
                                      E.g. %spark_conf spark.serializer=org.apache.spark.serializer.KryoSerializer
----

## Action Magic

----

    %%sql               String        Run SQL code. All lines after the initial %%sql magic will be passed
                                      as part of the SQL code.  
    %matplot      Matplotlib figure   Visualize your data using the matplotlib library.
                                      E.g. 
                                      import matplotlib.pyplot as plt
                                      # Set X-axis and Y-axis values
                                      x = [5, 2, 8, 4, 9]
                                      y = [10, 4, 8, 5, 2]
                                      # Create a bar chart 
                                      plt.bar(x, y) 
                                      # Show the plot
                                      %matplot plt    
    %plotly            Plotly figure  Visualize your data using the plotly library.
                                      E.g.
                                      import plotly.express as px
                                      #Create a graphical figure
                                      fig = px.line(x=["a","b","c"], y=[1,3,2], title="sample figure")
                                      #Show the figure
                                      %plotly fig

  
                
----



####  Run this cell to set up and start your interactive session.


In [1]:
%idle_timeout 2880
%glue_version 3.0
%worker_type G.1X
%number_of_workers 5
%connections redshift-demo-connection

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import *
from awsglue.dynamicframe import DynamicFrame
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 3.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 5
Connections to be included:
redshift-demo-connection
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Idle Timeout: 2880
Session ID: 66b826c7-8efa-47d7-aa4f-a3559aad923d
Applying the following default arguments:
--glue_kernel_version 1.0.7
--enable-glue-datacatalog true
Waiting for session 66b826c7-8efa-47d7-aa4f-a3559aad923d to get into ready status...
Session 66b826c7-8efa-47d7-aa4f-a3559aad923d has been created.



#### Example: Create a DynamicFrame from a table in the AWS Glue Data Catalog , dropping null records and display its schema


In [2]:
dyf = glueContext.create_dynamic_frame.from_catalog(database='salesdb', table_name='sales_records_csv')
dyf = DropNullFields.apply(frame=dyf)
dyf.printSchema()

null_fields []
root
|-- id: long
|-- region: string
|-- country: string
|-- item_type: string
|-- sales_channel: string
|-- order_priority: string
|-- order_date: string
|-- order_id: long
|-- ship_date: string
|-- units_sold: long
|-- unit_price: double
|-- unit_cost: double
|-- total_revenue: double
|-- total_cost: double
|-- total_profit: double


#### Example: Convert the DynamicFrame to a Spark DataFrame and display a sample of the data


In [3]:
df = dyf.toDF()
df.show()

+---+--------------------+--------------------+---------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
| id|              region|             country|      item_type|sales_channel|order_priority|order_date| order_id| ship_date|units_sold|unit_price|unit_cost|total_revenue|total_cost|total_profit|
+---+--------------------+--------------------+---------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|  1|Central America a...|Antigua and Barbuda |      Baby Food|       Online|             M|12/20/2013|957081544| 1/11/2014|       552|    255.28|   159.42|    140914.56|  87999.84|    52914.72|
|  2|Central America a...|              Panama|         Snacks|      Offline|             C|  7/5/2010|301644504| 7/26/2010|      2167|    152.58|    97.44|    330640.86| 211152.48|   119488.38|
|  3|              Europe

#### Example: Perform data transformations


In [4]:
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
sales_df = df.withColumn("Order_Date", to_date(unix_timestamp(col('order_date'), 'MM/dd/yyyy').cast('timestamp'))) \
             .withColumn("Ship_Date", to_date(unix_timestamp(col('ship_date'), 'MM/dd/yyyy').cast('timestamp')))

sales_df.show(10, True)


+---+--------------------+--------------------+-------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
| id|              region|             country|    item_type|sales_channel|order_priority|Order_Date| order_id| Ship_Date|units_sold|unit_price|unit_cost|total_revenue|total_cost|total_profit|
+---+--------------------+--------------------+-------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|  1|Central America a...|Antigua and Barbuda |    Baby Food|       Online|             M|2013-12-20|957081544|2014-01-11|       552|    255.28|   159.42|    140914.56|  87999.84|    52914.72|
|  2|Central America a...|              Panama|       Snacks|      Offline|             C|2010-07-05|301644504|2010-07-26|      2167|    152.58|    97.44|    330640.86| 211152.48|   119488.38|
|  3|              Europe|      Cze

#### Group by Region and Country and calculate aggregate metrics


In [5]:
aggregate_df = sales_df.groupBy("Region", "Country", year("order_date").alias('year'), quarter("order_date").alias('quarter')).agg(
    sum("Total_Revenue").alias("Total_Revenue_By_Region_Country"),
    sum("Total_Cost").alias("Total_Cost_By_Region_Country"),
    sum("Total_Profit").alias("Total_Profit_By_Region_Country")
)






#### Show the aggregated data (for demonstration purposes)


In [6]:
aggregate_df.orderBy("year","quarter").show()
aggregate_df.count()

+--------------------+--------------------+----+-------+-------------------------------+----------------------------+------------------------------+
|              Region|             Country|year|quarter|Total_Revenue_By_Region_Country|Total_Cost_By_Region_Country|Total_Profit_By_Region_Country|
+--------------------+--------------------+----+-------+-------------------------------+----------------------------+------------------------------+
|Central America a...|           Nicaragua|2010|      1|                       511987.3|                   291486.79|                     220500.51|
|Middle East and N...|                Oman|2010|      1|                     2680430.97|                  2015687.94|                     664743.03|
|Australia and Oce...|          East Timor|2010|      1|                     1271998.35|                  1099540.35|                      172458.0|
|  Sub-Saharan Africa|              Zambia|2010|      1|                      5837205.6|                  

#### Renaming the cloumns and displaying the content in a sorted manner.

In [7]:
aggregate_df= aggregate_df.withColumnRenamed("Total_Revenue_By_Region_Country","Total_Revenue")\
                          .withColumnRenamed("Total_Cost_By_Region_Country","Total_Cost")\
                          .withColumnRenamed("Total_Profit_By_Region_Country","Total_Profit")
aggregate_df.orderBy("year","quarter").show()

+--------------------+--------------------+----+-------+------------------+------------------+------------------+
|              Region|             Country|year|quarter|     Total_Revenue|        Total_Cost|      Total_Profit|
+--------------------+--------------------+----+-------+------------------+------------------+------------------+
|                Asia|              Brunei|2010|      1|         408277.98|         260586.81|147691.16999999998|
|  Sub-Saharan Africa|              Zambia|2010|      1|         5837205.6|        4453417.91|        1383787.69|
|Middle East and N...|        Saudi Arabia|2010|      1|         179642.54|         124560.66|          55081.88|
|                Asia|            Cambodia|2010|      1| 5682062.319999999|4130533.3400000003|1551528.9800000002|
|Middle East and N...|             Somalia|2010|      1|        2485916.64|        1552431.96|         933484.68|
|Central America a...|Saint Kitts and N...|2010|      1|          245126.7|         1642

#### Example: Convert the Spark DataFrame to a DynamicFrame and display a sample of the data

In [8]:
dyf = DynamicFrame.fromDF(aggregate_df, glueContext, "dynamic_frame")




#### Example: load the dynamic frame into our Amazon Redshift cluster


In [9]:
dyf.show()

{"Region": "Europe", "Country": "Luxembourg", "year": 2010, "quarter": 1, "Total_Revenue": 1123251.46, "Total_Cost": 662970.63, "Total_Profit": 460280.83}
{"Region": "Europe", "Country": "Switzerland", "year": 2014, "quarter": 1, "Total_Revenue": 4429651.8, "Total_Cost": 2873705.61, "Total_Profit": 1555946.19}
{"Region": "Central America and the Caribbean", "Country": "Dominica", "year": 2010, "quarter": 2, "Total_Revenue": 1255966.53, "Total_Cost": 1085682.13, "Total_Profit": 170284.4}
{"Region": "Australia and Oceania", "Country": "Federated States of Micronesia", "year": 2012, "quarter": 2, "Total_Revenue": 5588354.33, "Total_Cost": 4029198.36, "Total_Profit": 1559155.97}
{"Region": "Europe", "Country": "Poland", "year": 2015, "quarter": 4, "Total_Revenue": 5740416.15, "Total_Cost": 4627522.4, "Total_Profit": 1112893.75}
{"Region": "Sub-Saharan Africa", "Country": "Namibia", "year": 2016, "quarter": 1, "Total_Revenue": 1861809.39, "Total_Cost": 1500860.64, "Total_Profit": 360948.75}

In [12]:
redshift_output = glueContext.write_dynamic_frame.from_jdbc_conf(
    frame=dyf,
    catalog_connection="redshift-demo-connection",
    connection_options={"dbtable": "public.Regionalsales","database":"dev"},
    redshift_tmp_dir = "s3://aws-glue-assets-262136919150-us-east-1/temporary/",
    transformation_ctx = "redshift_output"
)



Py4JJavaError: An error occurred while calling o148.pyWriteDynamicFrame.
: java.io.IOException: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID: G1AHNQZ3T2AFRK54; S3 Extended Request ID: 1MqkaG+GPfeqVv4MlgjQA9yyGDzRSrK+VRxC1ABgmc1Xr9kRFKPS9BVXnkUxJZstKc6U8m/iq0DmghTvIFZ0Aql+9lJKac2g; Proxy: null), S3 Extended Request ID: 1MqkaG+GPfeqVv4MlgjQA9yyGDzRSrK+VRxC1ABgmc1Xr9kRFKPS9BVXnkUxJZstKc6U8m/iq0DmghTvIFZ0Aql+9lJKac2g
	at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.list(Jets3tNativeFileSystemStore.java:303)
	at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:510)
	at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1690)
	at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.exists(EmrFileSystem.java:436)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRe