"""
@Author: Shivraj Yelave
@Date: 2024-10-28
@Last Modified by: Shivraj Yelave
@Last Modified time: 2024-10-28
@Title: Glue Pipeline using Glue notebook
"""

# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [None]:
%help

####  Run this cell to set up and start your interactive session.


In [1]:
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Define the application name
app_name = "MyGlueJob"

# Initialize the SparkSession with the application name
spark = SparkSession.builder \
    .appName(app_name) \
    .getOrCreate()


# Create Glue context
glueContext = GlueContext(spark.sparkContext)




Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.5 
Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 4.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 5
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Idle Timeout: 2880
Session ID: 60e5f037-59e0-40d3-9f1d-b1994c57b2f6
Applying the following default arguments:
--glue_kernel_version 1.0.5
--enable-glue-datacatalog true
Waiting for session 60e5f037-59e0-40d3-9f1d-b1994c57b2f6 to get into ready status...
Session 60e5f037-59e0-40d3-9f1d-b1994c57b2f6 ha

In [5]:
# Create a new Glue job/session
job = Job(glueContext)

try:
    # Step 1: Extract data from Glue Crawler
    database_name = "project2-patabase"
    table_name = "project2_source_bucket"

    # Create a DynamicFrame from the Glue Catalog
    dyf = glueContext.create_dynamic_frame.from_catalog(database=database_name, table_name=table_name)
    dyf.printSchema()  # Print schema for reference

    # Convert DynamicFrame to DataFrame
    df = dyf.toDF()
    df.show()

    # Step 2: Count nulls in each column
    null_counts = df.select([F.sum(F.col(c).isNull().cast("int")).alias(c) for c in df.columns])
    null_counts.show()

    # Step 3: Define numeric columns
    numeric_cols = ['temp', 'atemp', 'hum', 'windspeed', 'casual', 'registered', 'cnt']

    # Function to replace outliers with the mean
    def replace_outliers_with_mean(df, col_name):
        # Calculate Q1, Q3, and IQR
        quantiles = df.approxQuantile(col_name, [0.25, 0.75], 0.05)
        Q1, Q3 = quantiles[0], quantiles[1]
        IQR = Q3 - Q1
        
        # Define outlier bounds
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR
        
        # Calculate the mean for the column
        mean_value = df.select(F.mean(F.col(col_name))).collect()[0][0]
        
        # Replace outliers with the mean
        df = df.withColumn(
            col_name,
            F.when((F.col(col_name) < lower_bound) | (F.col(col_name) > upper_bound), mean_value).otherwise(F.col(col_name))
        )
        
        return df

    # Step 4: Apply the function to each numeric column
    for col_name in numeric_cols:
        df = replace_outliers_with_mean(df, col_name)

    # Show the updated DataFrame
    df.show()


root
|-- instant: long
|-- dteday: string
|-- season: long
|-- yr: long
|-- mnth: long
|-- hr: long
|-- holiday: long
|-- weekday: long
|-- workingday: long
|-- weathersit: long
|-- temp: double
|-- atemp: double
|-- hum: double
|-- windspeed: double
|-- casual: long
|-- registered: long
|-- cnt: long

+-------+----------+------+---+----+---+-------+-------+----------+----------+----+------+----+---------+------+----------+---+
|instant|    dteday|season| yr|mnth| hr|holiday|weekday|workingday|weathersit|temp| atemp| hum|windspeed|casual|registered|cnt|
+-------+----------+------+---+----+---+-------+-------+----------+----------+----+------+----+---------+------+----------+---+
|      1|2011-01-01|     1|  0|   1|  0|      0|      6|         0|         1|0.24|0.2879|0.81|      0.0|     3|        13| 16|
|      2|2011-01-01|     1|  0|   1|  1|      0|      6|         0|         1|0.22|0.2727| 0.8|      0.0|     8|        32| 40|
|      3|2011-01-01|     1|  0|   1|  2|      0|      6|

In [14]:

# Step 5: Convert DataFrame to DynamicFrame
dyf_processed = DynamicFrame.fromDF(df, glueContext, "dyf_processed")

# Step 6: Define Redshift connection options
redshift_options = {
    "url": "jdbc:redshift://project2-destination-cluster.cvkuuj2uqb2z.ap-south-1.redshift.amazonaws.com:5439/project2_redshift_database",
    "user": "project2_user",  # Replace with your Redshift username
    "password": "Shiv1234",  # Replace with your Redshift password
    "dbtable": "processed_data",  # The target table in Redshift
    "redshiftTmpDir": "s3://project2-redshift-tmp/"  # Specify your temp S3 bucket
}

# Step 7: Write the DynamicFrame to Redshift
try:
    glueContext.write_dynamic_frame.from_options(
        frame=dyf_processed,
        connection_type="redshift",
        connection_options=redshift_options
    )
    print("Data written to Redshift successfully.")
except Exception as e:
    print(f"Error occurred while writing to Redshift: {str(e)}")

# Stop the Spark session (if needed)
spark.stop() 

Error occurred while writing to Redshift: An error occurred while calling o83.getSink.
: java.sql.SQLException: The connection attempt failed.
	at com.amazon.redshift.util.RedshiftException.getSQLException(RedshiftException.java:56)
	at com.amazon.redshift.Driver.connect(Driver.java:319)
	at com.amazonaws.services.glue.util.JDBCWrapper$.$anonfun$connectionProperties$5(JDBCUtils.scala:1122)
	at com.amazonaws.services.glue.util.JDBCWrapper$.$anonfun$connectWithSSLAttempt$2(JDBCUtils.scala:1073)
	at scala.Option.getOrElse(Option.scala:189)
	at com.amazonaws.services.glue.util.JDBCWrapper$.$anonfun$connectWithSSLAttempt$1(JDBCUtils.scala:1073)
	at scala.Option.getOrElse(Option.scala:189)
	at com.amazonaws.services.glue.util.JDBCWrapper$.connectWithSSLAttempt(JDBCUtils.scala:1073)
	at com.amazonaws.services.glue.util.JDBCWrapper$.connectionProperties(JDBCUtils.scala:1118)
	at com.amazonaws.services.glue.util.JDBCWrapper.connectionProperties$lzycompute(JDBCUtils.scala:824)
	at com.amazonaws.