# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [None]:
%help

####  Run this cell to set up and start your interactive session.


In [4]:
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

You are already connected to a glueetl session 898cd97a-f72f-47df-bd35-d79a94558412.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.


You are already connected to a glueetl session 898cd97a-f72f-47df-bd35-d79a94558412.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Setting Glue version to: 4.0


You are already connected to a glueetl session 898cd97a-f72f-47df-bd35-d79a94558412.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous worker type: G.1X
Setting new worker type to: G.1X


You are already connected to a glueetl session 898cd97a-f72f-47df-bd35-d79a94558412.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous number of workers: 5
Setting new number of workers to: 5



#### Example: Create a DynamicFrame from a table in the AWS Glue Data Catalog and display its schema


In [6]:
dyf = glueContext.create_dynamic_frame_from_options(connection_type="s3",connection_options={"paths": ["s3://s3-bkt-us/sample.csv"]}, format="csv",format_options={"withHeader": True})
dyf.printSchema()

root
|-- Name: string
|-- Age: string
|-- Gender: string
|-- Blood Type: string
|-- Medical Condition: string
|-- Date of Admission: string
|-- Doctor: string
|-- Hospital: string
|-- Insurance Provider: string
|-- Billing Amount: string
|-- Room Number: string
|-- Admission Type: string
|-- Discharge Date: string
|-- Medication: string
|-- Test Results: string


#### Example: Convert the DynamicFrame to a Spark DataFrame and display a sample of the data


In [7]:
df = dyf.toDF()
df.show()

+--------------------+---+------+----------+-----------------+-----------------+------------------+--------------------+------------------+--------------+-----------+--------------+--------------+-----------+------------+
|                Name|Age|Gender|Blood Type|Medical Condition|Date of Admission|            Doctor|            Hospital|Insurance Provider|Billing Amount|Room Number|Admission Type|Discharge Date| Medication|Test Results|
+--------------------+---+------+----------+-----------------+-----------------+------------------+--------------------+------------------+--------------+-----------+--------------+--------------+-----------+------------+
|     Tiffany Ramirez| 81|Female|        O-|         Diabetes|       17-11-2022|    Patrick Parker|    Wallace-Hamilton|          Medicare|   37490.98336|        146|      Elective|    01-12-2022|    Aspirin|Inconclusive|
|         Ruben Burns| 35|  Male|        O+|           Asthma|       01-06-2023|     Diane Jackson|Burke, Griffi

# ETL Batch Id and ETL Batch Date.

In [18]:
from pyspark.sql.functions import col,lit,current_date
from datetime import datetime

etlbatch_id = datetime.now().strftime("%Y%m%d%H%M%S%f")[:17]
df = df.withColumn("ETL_batch_id",lit(etlbatch_id))
df = df.withColumn("ETL_batch_date", lit(current_date()))
df.show()

+--------------------+---+------+----------+-----------------+-----------------+------------------+--------------------+------------------+--------------+-----------+--------------+--------------+-----------+------------+--------------+----------+--------------------+-----------------+--------------+
|                Name|Age|Gender|Blood Type|Medical Condition|Date of Admission|            Doctor|            Hospital|Insurance Provider|Billing Amount|Room Number|Admission Type|Discharge Date| Medication|Test Results|Admission Days|Risk Score|                UUID|     ETL_batch_id|ETL_batch_date|
+--------------------+---+------+----------+-----------------+-----------------+------------------+--------------------+------------------+--------------+-----------+--------------+--------------+-----------+------------+--------------+----------+--------------------+-----------------+--------------+
|     Tiffany Ramirez| 81|Female|        O-|         Diabetes|       2022-11-17|    Patrick Pa

# Convert NA, Blanks, None to Null.

In [10]:
dyf = glueContext.create_dynamic_frame_from_options(connection_type="s3",connection_options={"paths": ["s3://s3-bkt-us/sample.csv"]}, format="csv",format_options={"withHeader": True})

df = dyf.toDF()
df.show(55)

df = df.na.replace("", None)\
       .na.replace("NA",None)\
       .na.replace("None",None)
df.show(55)

+--------------------+---+------+----------+-----------------+-----------------+------------------+--------------------+------------------+--------------+-----------+--------------+--------------+-----------+------------+
|                Name|Age|Gender|Blood Type|Medical Condition|Date of Admission|            Doctor|            Hospital|Insurance Provider|Billing Amount|Room Number|Admission Type|Discharge Date| Medication|Test Results|
+--------------------+---+------+----------+-----------------+-----------------+------------------+--------------------+------------------+--------------+-----------+--------------+--------------+-----------+------------+
|     Tiffany Ramirez| 81|Female|        O-|         Diabetes|       17-11-2022|    Patrick Parker|    Wallace-Hamilton|          Medicare|   37490.98336|        146|      Elective|    01-12-2022|    Aspirin|Inconclusive|
|         Ruben Burns| 35|  Male|        O+|           Asthma|       01-06-2023|     Diane Jackson|Burke, Griffi

# Remove Duplicate records.

In [19]:
df.show(55)
print(df.count())

df_no_duplicates = df.dropDuplicates()

df_no_duplicates.show(55) 
print(df_no_duplicates.count())

+--------------------+---+------+----------+-----------------+-----------------+------------------+--------------------+------------------+--------------+-----------+--------------+--------------+-----------+------------+--------------+----------+--------------------+-----------------+--------------+
|                Name|Age|Gender|Blood Type|Medical Condition|Date of Admission|            Doctor|            Hospital|Insurance Provider|Billing Amount|Room Number|Admission Type|Discharge Date| Medication|Test Results|Admission Days|Risk Score|                UUID|     ETL_batch_id|ETL_batch_date|
+--------------------+---+------+----------+-----------------+-----------------+------------------+--------------------+------------------+--------------+-----------+--------------+--------------+-----------+------------+--------------+----------+--------------------+-----------------+--------------+
|     Tiffany Ramirez| 81|Female|        O-|         Diabetes|       2022-11-17|    Patrick Pa

# If Complete Row has NA, Blanks, Null . Remove that row.

In [20]:
df.show(55)
print(df.count())

df_cleaned = df.dropna(how='all',subset = None)

df_cleaned.show(55)
print(df_cleaned.count())

+--------------------+---+------+----------+-----------------+-----------------+------------------+--------------------+------------------+--------------+-----------+--------------+--------------+-----------+------------+--------------+----------+--------------------+-----------------+--------------+
|                Name|Age|Gender|Blood Type|Medical Condition|Date of Admission|            Doctor|            Hospital|Insurance Provider|Billing Amount|Room Number|Admission Type|Discharge Date| Medication|Test Results|Admission Days|Risk Score|                UUID|     ETL_batch_id|ETL_batch_date|
+--------------------+---+------+----------+-----------------+-----------------+------------------+--------------------+------------------+--------------+-----------+--------------+--------------+-----------+------------+--------------+----------+--------------------+-----------------+--------------+
|     Tiffany Ramirez| 81|Female|        O-|         Diabetes|       2022-11-17|    Patrick Pa

# Duration of Stay using DOA and DOD.

In [21]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import datediff, to_date
 
df = df.withColumn('Date of Admission', to_date(df['Date of Admission'], 'dd-MM-yyyy')) 
df = df.withColumn('Discharge Date', to_date(df['Discharge Date'], 'dd-MM-yyyy')) 
df = df.withColumn('Admission Days', datediff(df['Discharge Date'], df['Date of Admission']))
 
df.show(55)

+--------------------+---+------+----------+-----------------+-----------------+------------------+--------------------+------------------+--------------+-----------+--------------+--------------+-----------+------------+--------------+----------+--------------------+-----------------+--------------+
|                Name|Age|Gender|Blood Type|Medical Condition|Date of Admission|            Doctor|            Hospital|Insurance Provider|Billing Amount|Room Number|Admission Type|Discharge Date| Medication|Test Results|Admission Days|Risk Score|                UUID|     ETL_batch_id|ETL_batch_date|
+--------------------+---+------+----------+-----------------+-----------------+------------------+--------------------+------------------+--------------+-----------+--------------+--------------+-----------+------------+--------------+----------+--------------------+-----------------+--------------+
|     Tiffany Ramirez| 81|Female|        O-|         Diabetes|       2022-11-17|    Patrick Pa

# RiskScore.

In [22]:
from pyspark.sql.functions import udf,lit,col
from pyspark.sql.types import FloatType

def calculate_risk_score(age, max_age, medical_condition, billing_amount, max_billing_amount):
    weight_age = 0.4
    weight_medical_condition = 0.3
    weight_billing_amount = 0.3
    age_score = (float(age) / float(max_age)) * weight_age
    diabetes_score = float((1 if medical_condition == "Diabetes" else 0)) * weight_medical_condition
    billing_score = round(float(billing_amount),2) / round(float(max_billing_amount),2) * weight_billing_amount
    answer=age_score + diabetes_score + billing_score
    answer=round(answer,2)
    return answer
 
calculate_risk_score_udf = udf(calculate_risk_score, FloatType())

max_age = df.agg({"Age": "max"}).collect()[0][0]
max_billing_amount = df.agg({"Billing Amount": "max"}).collect()[0][0]
 
df = df.withColumn("Risk Score", calculate_risk_score_udf(
    col("Age"),
    lit(max_age),
    col("Medical Condition"),
    col("Billing Amount"),
    lit(max_billing_amount)
))

df.show()

+--------------------+---+------+----------+-----------------+-----------------+------------------+--------------------+------------------+--------------+-----------+--------------+--------------+-----------+------------+--------------+----------+--------------------+-----------------+--------------+
|                Name|Age|Gender|Blood Type|Medical Condition|Date of Admission|            Doctor|            Hospital|Insurance Provider|Billing Amount|Room Number|Admission Type|Discharge Date| Medication|Test Results|Admission Days|Risk Score|                UUID|     ETL_batch_id|ETL_batch_date|
+--------------------+---+------+----------+-----------------+-----------------+------------------+--------------------+------------------+--------------+-----------+--------------+--------------+-----------+------------+--------------+----------+--------------------+-----------------+--------------+
|     Tiffany Ramirez| 81|Female|        O-|         Diabetes|       2022-11-17|    Patrick Pa

# Adding Record Using UUID.

In [23]:
import uuid
def generate_uuid():
    return str(uuid.uuid4())

generate_uuid_udf = udf(generate_uuid)
 
df = df.withColumn('UUID', generate_uuid_udf())
df.show()

+--------------------+---+------+----------+-----------------+-----------------+------------------+--------------------+------------------+--------------+-----------+--------------+--------------+-----------+------------+--------------+----------+--------------------+-----------------+--------------+
|                Name|Age|Gender|Blood Type|Medical Condition|Date of Admission|            Doctor|            Hospital|Insurance Provider|Billing Amount|Room Number|Admission Type|Discharge Date| Medication|Test Results|Admission Days|Risk Score|                UUID|     ETL_batch_id|ETL_batch_date|
+--------------------+---+------+----------+-----------------+-----------------+------------------+--------------------+------------------+--------------+-----------+--------------+--------------+-----------+------------+--------------+----------+--------------------+-----------------+--------------+
|     Tiffany Ramirez| 81|Female|        O-|         Diabetes|       2022-11-17|    Patrick Pa

# Convert the datatypes based on respective column.

In [24]:
df.printSchema()
df.show(55)
# Define function to cast values to desired data types
def cast_column_values(column_name, data_type):
    return col(column_name).cast(data_type)
 
# Define mapping of column names to data types
column_data_types = {
    "ETL_batch_id": "integer",
    "ETL_batch_date": "date",
    "UUID": "string",
    "Name": "string",
    "Age": "integer",
    "Gender": "string",
    "Blood Type": "string",
    "Medical Condition": "string",
    "Date of Admission": "date",
    "Doctor": "string",
    "Hospital": "string",
    "Insurance Provider": "string",
    "Billing Amount": "float",
    "Room Number": "integer",
    "Admission Type": "string",
    "Discharge Date": "date",
    "Medication": "string",
    "Test Results": "string",
    "Admission Days": "integer",
    "Risk Score": "float"
    # Add more columns and respective data types as needed
}
 
# Apply function to convert data types for each column
for column, data_type in column_data_types.items():
    df = df.withColumn(column, cast_column_values(column, data_type))
df.printSchema()
df.show()

root
 |-- Name: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Blood Type: string (nullable = true)
 |-- Medical Condition: string (nullable = true)
 |-- Date of Admission: date (nullable = true)
 |-- Doctor: string (nullable = true)
 |-- Hospital: string (nullable = true)
 |-- Insurance Provider: string (nullable = true)
 |-- Billing Amount: string (nullable = true)
 |-- Room Number: string (nullable = true)
 |-- Admission Type: string (nullable = true)
 |-- Discharge Date: date (nullable = true)
 |-- Medication: string (nullable = true)
 |-- Test Results: string (nullable = true)
 |-- Admission Days: integer (nullable = true)
 |-- Risk Score: float (nullable = true)
 |-- UUID: string (nullable = true)
 |-- ETL_batch_id: string (nullable = false)
 |-- ETL_batch_date: date (nullable = false)

+--------------------+---+------+----------+-----------------+-----------------+------------------+--------------------+------------------+-

# Rearranging columns

In [25]:
desired_columns = ['ETL_batch_id', 'ETL_batch_date','UUID','Name','Age','Gender','Blood Type','Medical Condition','Date of Admission','Doctor','Hospital','Insurance Provider',
                   'Billing Amount','Room Number', 'Admission Type','Discharge Date','Medication','Test Results','Admission Days','Risk Score']


# Reorder columns
df = df.select(desired_columns + [col for col in df.columns if col not in desired_columns])

df.show(55)

+------------+--------------+--------------------+--------------------+---+------+----------+-----------------+-----------------+------------------+--------------------+------------------+--------------+-----------+--------------+--------------+-----------+------------+--------------+----------+
|ETL_batch_id|ETL_batch_date|                UUID|                Name|Age|Gender|Blood Type|Medical Condition|Date of Admission|            Doctor|            Hospital|Insurance Provider|Billing Amount|Room Number|Admission Type|Discharge Date| Medication|Test Results|Admission Days|Risk Score|
+------------+--------------+--------------------+--------------------+---+------+----------+-----------------+-----------------+------------------+--------------------+------------------+--------------+-----------+--------------+--------------+-----------+------------+--------------+----------+
|        null|    2024-03-27|68f6265f-d007-494...|     Tiffany Ramirez| 81|Female|        O-|         Diabete

# Dropping csv to s3.

In [31]:
from awsglue.dynamicframe import DynamicFrame
dyf_new=DynamicFrame.fromDF(df, glueContext, "final_df")




In [33]:
glueContext.write_dynamic_frame_from_options(frame=dyf_new, 
                                             connection_type = "dynamodb", 
                                             connection_options = {"tableName": "med"})

<awsglue.dynamicframe.DynamicFrame object at 0x7f87b147ff40>
