
# Glue Studio Notebook
You are now running a **Glue Studio** notebook; before you can start using your notebook you *must* start an interactive session.

## Available Magics
|          Magic              |   Type       |                                                                        Description                                                                        |
|-----------------------------|--------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------|
| %%configure                 |  Dictionary  |  A json-formatted dictionary consisting of all configuration parameters for a session. Each parameter can be specified here or through individual magics. |
| %profile                    |  String      |  Specify a profile in your aws configuration to use as the credentials provider.                                                                          |
| %iam_role                   |  String      |  Specify an IAM role to execute your session with.                                                                                                        |
| %region                     |  String      |  Specify the AWS region in which to initialize a session                                                                                                  |
| %session_id                 |  String      |  Returns the session ID for the running session.                                                                                                          |
| %connections                |  List        |  Specify a comma separated list of connections to use in the session.                                                                                     |
| %additional_python_modules  |  List        |  Comma separated list of pip packages, s3 paths or private pip arguments.                                                                                 |
| %extra_py_files             |  List        |  Comma separated list of additional Python files from S3.                                                                                                 |
| %extra_jars                 |  List        |  Comma separated list of additional Jars to include in the cluster.                                                                                       |
| %number_of_workers          |  Integer     |  The number of workers of a defined worker_type that are allocated when a job runs. worker_type must be set too.                                          |
| %worker_type                |  String      |  Standard, G.1X, *or* G.2X. number_of_workers must be set too. Default is G.1X                                                                            |
| %glue_version               |  String      |  The version of Glue to be used by this session. Currently, the only valid options are 2.0 and 3.0 (eg: %glue_version 2.0)                                |
| %security_config            |  String      |  Define a security configuration to be used with this session.                                                                                            |
| %sql                        |  String      |  Run SQL code. All lines after the initial %%sql magic will be passed as part of the SQL code.                                                            |
| %streaming                  |  String      |  Changes the session type to Glue Streaming.                                                                                                              |
| %etl                        |  String      |   Changes the session type to Glue ETL.                                                                                                                   |
| %status                     |              |  Returns the status of the current Glue session including its duration, configuration and executing user / role.                                          |
| %stop_session               |              |  Stops the current session.                                                                                                                               |
| %list_sessions              |              |  Lists all currently running sessions by name and ID.                                                                                                     |
| %spark_conf                 |  String      |  Specify custom spark configurations for your session. E.g. %spark_conf spark.serializer=org.apache.spark.serializer.KryoSerializer                       |

In [None]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 0.35 
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::519852036875:role/service-role/AWSGlueServiceRole-PA-crawler
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 5
Session ID: a58bde3d-bab0-404e-b6af-550a3a0a5f9b
Applying the following default arguments:
--glue_kernel_version 0.35
--enable-glue-datacatalog true
Waiting for session a58bde3d-bab0-404e-b6af-550a3a0a5f9b to get into ready status...
Session a58bde3d-bab0-404e-b6af-550a3a0a5f9b has been created

UnknownMagic: unknown magic command 'number_of_workers'


In [1]:
####Creating data souce after crawling tables
abc_out = glueContext.create_dynamic_frame.from_catalog(
    database = "db-combo-practice",
    table_name = "abc_out"
)
ref1 = glueContext.create_dynamic_frame.from_catalog(
    database = "db-combo-practice",
    table_name = "ref1"
)
ref2 = glueContext.create_dynamic_frame.from_catalog(
    database = "db-combo-practice",
    table_name = "ref2"
)
ref3 = glueContext.create_dynamic_frame.from_catalog(
    database = "db-combo-practice",
    table_name = "ref3"
)




In [2]:
# Dynamic Frame to Spark DataFrame 
sparkDf_abc_out = abc_out.toDF()
sparkDf_ref1 = ref1.toDF()
sparkDf_ref2 = ref2.toDF()
sparkDf_ref3 = ref3.toDF()
#import concat from functions and lit one,
#for concate to work,nested bracket structure needed
from  pyspark.sql.functions import concat
from  pyspark.sql.functions import lit
sparkDf_abc_out = sparkDf_abc_out.withColumn("com",concat("subnum",concat(lit('-'),"visitid",concat(lit('-'),"visitseq"))))
#need to drop column name "statusid_dec" cause it duplicate in another table and partially filled in abc_out table
sparkDf_abc_out = sparkDf_abc_out.drop("statusid_dec")
sparkDf_ref1 = sparkDf_ref1.withColumn("com1",concat("subnum",concat(lit('-'),"visitid",concat(lit('-'),"visitseq"))))
sparkDf_ref2 = sparkDf_ref2.withColumn("com1",concat("subnum",concat(lit('-'),"visitid",concat(lit('-'),"visitseq"))))
sparkDf_ref3 = sparkDf_ref3.withColumn("com1",concat("subnum",concat(lit('-'),"visitid",concat(lit('-'),"visitseq"))))




In [3]:
# Select columns from spark dataframe
up_ref1 = sparkDf_ref1.select("eamnum","com1")
up_ref2 = sparkDf_ref2.select("aenum","com1")
up_ref3 = sparkDf_ref3.select("statusid_dec","com1")
#combining abc file and ref1
combo = sparkDf_abc_out.join(up_ref1,sparkDf_abc_out.com ==  sparkDf_ref1.com1,"inner")
#combining combo and ref2
combo1 = combo.join(up_ref2,combo.com ==  up_ref2.com1,"inner")
#combining combo1 and ref3
combo2 = combo1.join(up_ref3,combo1.com ==  up_ref3.com1,"inner")




In [5]:
df_final = combo2.drop("com1")
df_final = df_final.withColumn("IDENTIFIER",concat("com",concat(lit('-'),"aenum")))





In [8]:
#now only selected columns are needed
Result = df_final.select("IDENTIFIER","RELDEVA_DEC","RELDEVB_DEC","RELDEVC_DEC","RELPROCA_DEC","RELPROCB_DEC","RELPROCC_DEC","SUBNUM","VISITSEQ","VISITID","AENUM","STATUSID_DEC","EAMNUM")





In [12]:
# Import Dyanmic DataFrame class
from awsglue.dynamicframe import DynamicFrame

#Convert from Spark Data Frame to Glue Dynamic Frame
Result_conv = DynamicFrame.fromDF(Result, glueContext, "convert")
#transform for a single file
retransform = Result_conv.repartition(1)

glueContext.write_dynamic_frame.from_options(
frame =retransform,
connection_type = "s3",
connection_options = {"path":"s3://aa-bucket-for-custom-jobs/output/"},
format = "csv",
format_options={
"separator": ","
},
transformation_ctx = "datasink01"
)
#############################
import boto3
client = boto3.client('s3')
BUCKET_NAME= "aa-bucket-for-custom-jobs"
PREFIX="output/"
response = client.list_objects(
    Bucket=BUCKET_NAME,
    Prefix=PREFIX,
)
# print(response)
name = response["Contents"][0]["Key"]
# print(name)
client.copy_object(Bucket="aa-bucket-for-custom-jobs", CopySource=BUCKET_NAME+'/'+name, Key=PREFIX+"RESULT.csv")
client.delete_object(Bucket=BUCKET_NAME, Key=name)
job.commit()

{'ResponseMetadata': {'RequestId': 'RD5A1MDWC3QYZCF5', 'HostId': 'soh8XEeATmrHmrZQk1Tzese3CXceOGkG2WKQ35KStdRqniHFeXRAjUhtZfzgHhXBag7KK6B3wwlh4JNKASJvOQ==', 'HTTPStatusCode': 204, 'HTTPHeaders': {'x-amz-id-2': 'soh8XEeATmrHmrZQk1Tzese3CXceOGkG2WKQ35KStdRqniHFeXRAjUhtZfzgHhXBag7KK6B3wwlh4JNKASJvOQ==', 'x-amz-request-id': 'RD5A1MDWC3QYZCF5', 'date': 'Mon, 31 Oct 2022 07:29:43 GMT', 'server': 'AmazonS3'}, 'RetryAttempts': 0}}
