# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [None]:
%help

####  Run this cell to set up and start your interactive session.


In [5]:
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5


Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.7 
Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 4.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 5


In [1]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

# Check if terminate_session method exists before calling it
if hasattr(glueContext, 'terminate_session'):
    glueContext.terminate_session()

Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Idle Timeout: 2880
Session ID: 18527fa6-7111-460f-afd2-cf79c8a06163
Applying the following default arguments:
--glue_kernel_version 1.0.7
--enable-glue-datacatalog true
Waiting for session 18527fa6-7111-460f-afd2-cf79c8a06163 to get into ready status...
Session 18527fa6-7111-460f-afd2-cf79c8a06163 has been created.



#### Example: Create a DynamicFrame from a table in the AWS Glue Data Catalog and display its schema


In [2]:
dyf = glueContext.create_dynamic_frame.from_catalog(database='testdbjul2024', table_name='insurance_csv')
dyf.printSchema()

root
|-- patient_id: string
|-- insurance_company: string
|-- policy_number: string
|-- coverage_amount: double
|-- insurance_cost: double


#### Example: Convert the DynamicFrame to a Spark DataFrame and display a sample of the data


In [4]:
df = dyf.toDF()
df.show()
df.count()

+----------+-----------------+-------------+---------------+--------------+
|patient_id|insurance_company|policy_number|coverage_amount|insurance_cost|
+----------+-----------------+-------------+---------------+--------------+
|     P0002| UnitedHealthcare|      aB04270|       10537.14|       5475.54|
|     P0003|            Aetna|      lc97160|       12755.05|       2746.27|
|     P0004|            Aetna|      ep06455|       25838.15|       6669.42|
|     P0005|       Blue Cross|      Wi27285|        43530.0|       6751.31|
|     P0006|Kaiser Permanente|      QX30416|       24558.51|       7501.82|
|     P0007|            Aetna|      ef92039|       46144.34|       7930.02|
|     P0008|Kaiser Permanente|      Qj02474|       14153.59|       1258.01|
|     P0009|       Blue Cross|      pF45404|       48381.47|       4665.04|
|     P0010|            Cigna|      oa45220|       48414.56|       2971.87|
|     P0011|       Blue Cross|      FD86064|       23933.08|       9643.23|
|     P0012|

#### Example: Visualize data with matplotlib


In [5]:
transformed_df = df.select("patient_id", "insurance_company", "policy_number", "coverage_amount")
transformed_df.show()
transformed_df.count()

+----------+-----------------+-------------+---------------+
|patient_id|insurance_company|policy_number|coverage_amount|
+----------+-----------------+-------------+---------------+
|     P0002| UnitedHealthcare|      aB04270|       10537.14|
|     P0003|            Aetna|      lc97160|       12755.05|
|     P0004|            Aetna|      ep06455|       25838.15|
|     P0005|       Blue Cross|      Wi27285|        43530.0|
|     P0006|Kaiser Permanente|      QX30416|       24558.51|
|     P0007|            Aetna|      ef92039|       46144.34|
|     P0008|Kaiser Permanente|      Qj02474|       14153.59|
|     P0009|       Blue Cross|      pF45404|       48381.47|
|     P0010|            Cigna|      oa45220|       48414.56|
|     P0011|       Blue Cross|      FD86064|       23933.08|
|     P0012|       Blue Cross|      rT62510|       20605.81|
|     P0013|Kaiser Permanente|      yy35955|        28814.2|
|     P0014|            Aetna|      yP46617|       31544.14|
|     P0015| UnitedHealt

#### Example: Write the data in the DynamicFrame to a location in Amazon S3 and a table for it in the AWS Glue Data Catalog


In [7]:
columns_to_check = ["patient_id", "insurance_company", "policy_number", "coverage_amount"]
data_frame_no_duplicates = transformed_df.dropDuplicates(columns_to_check)
data_frame_no_duplicates.show()
data_frame_no_duplicates.count()

+----------+-----------------+-------------+---------------+
|patient_id|insurance_company|policy_number|coverage_amount|
+----------+-----------------+-------------+---------------+
|     P0007|            Aetna|      ef92039|       46144.34|
|     P0017| UnitedHealthcare|      Vz85917|       37269.64|
|     P0020| UnitedHealthcare|      Hn88338|       40440.86|
|     P0027|            Cigna|      AZ13346|       31841.38|
|     P0032| UnitedHealthcare|      eu52783|       29794.73|
|     P0036| UnitedHealthcare|      qT68353|       38696.46|
|     P0041| UnitedHealthcare|      cc77511|       10453.15|
|     P0043|       Blue Cross|      jj70926|       44721.77|
|     P0045|Kaiser Permanente|      Aq93468|        22609.3|
|     P0047|Kaiser Permanente|      pj74267|       42698.72|
|     P0059|            Cigna|      IJ01882|       20651.86|
|     P0060|Kaiser Permanente|      xd12437|       32768.39|
|     P0062|            Aetna|      xn10311|       41593.65|
|     P0064|            

In [8]:
# Remove rows with null values in any column
df_no_nulls = data_frame_no_duplicates.na.drop()
df_no_nulls.show()
df_no_nulls.count()

+----------+-----------------+-------------+---------------+
|patient_id|insurance_company|policy_number|coverage_amount|
+----------+-----------------+-------------+---------------+
|     P0007|            Aetna|      ef92039|       46144.34|
|     P0017| UnitedHealthcare|      Vz85917|       37269.64|
|     P0020| UnitedHealthcare|      Hn88338|       40440.86|
|     P0027|            Cigna|      AZ13346|       31841.38|
|     P0032| UnitedHealthcare|      eu52783|       29794.73|
|     P0036| UnitedHealthcare|      qT68353|       38696.46|
|     P0041| UnitedHealthcare|      cc77511|       10453.15|
|     P0043|       Blue Cross|      jj70926|       44721.77|
|     P0045|Kaiser Permanente|      Aq93468|        22609.3|
|     P0047|Kaiser Permanente|      pj74267|       42698.72|
|     P0059|            Cigna|      IJ01882|       20651.86|
|     P0060|Kaiser Permanente|      xd12437|       32768.39|
|     P0062|            Aetna|      xn10311|       41593.65|
|     P0064|            

In [9]:
from awsglue.dynamicframe import DynamicFrame

# Convert DataFrame to DynamicFrame
dynamic_frame = DynamicFrame.fromDF(df_no_nulls, glueContext, "dynamic_frame")

try:
    # Write the DynamicFrame to an S3 bucket
    glueContext.write_dynamic_frame.from_options(
        frame=dynamic_frame,
        connection_type="s3",
        connection_options={"path": "s3://bdjul2024/Processed/"},
        format="parquet"  # Specify the format, can be 'json', 'csv', 'parquet', etc.
    )
    print("Write to S3 succeeded.")
except Exception as e:
    print(f"Error: {str(e)}")
    # Print the contents of the DynamicFrame for debugging
    dynamic_frame.printSchema()
    dynamic_frame.show()


<awsglue.dynamicframe.DynamicFrame object at 0x7f2b42efcf10>
Write to S3 succeeded.
