
# Glue Studio Notebook
You are now running a **Glue Studio** notebook; before you can start using your notebook you *must* start an interactive session.

## Available Magics
|          Magic              |   Type       |                                                                        Description                                                                        |
|-----------------------------|--------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------|
| %%configure                 |  Dictionary  |  A json-formatted dictionary consisting of all configuration parameters for a session. Each parameter can be specified here or through individual magics. |
| %profile                    |  String      |  Specify a profile in your aws configuration to use as the credentials provider.                                                                          |
| %iam_role                   |  String      |  Specify an IAM role to execute your session with.                                                                                                        |
| %region                     |  String      |  Specify the AWS region in which to initialize a session                                                                                                  |
| %session_id                 |  String      |  Returns the session ID for the running session.                                                                                                          |
| %connections                |  List        |  Specify a comma separated list of connections to use in the session.                                                                                     |
| %additional_python_modules  |  List        |  Comma separated list of pip packages, s3 paths or private pip arguments.                                                                                 |
| %extra_py_files             |  List        |  Comma separated list of additional Python files from S3.                                                                                                 |
| %extra_jars                 |  List        |  Comma separated list of additional Jars to include in the cluster.                                                                                       |
| %number_of_workers          |  Integer     |  The number of workers of a defined worker_type that are allocated when a job runs. worker_type must be set too.                                          |
| %worker_type                |  String      |  Standard, G.1X, *or* G.2X. number_of_workers must be set too. Default is G.1X                                                                            |
| %glue_version               |  String      |  The version of Glue to be used by this session. Currently, the only valid options are 2.0 and 3.0                                                        |
| %security_configuration     |  String      |  Define a security configuration to be used with this session.                                                                                            |
| %sql                        |  String      |  Run SQL code. All lines after the initial %%sql magic will be passed as part of the SQL code.                                                            |
| %streaming                  |  String      |  Changes the session type to Glue Streaming.                                                                                                              |
| %etl                        |  String      |   Changes the session type to Glue ETL.                                                                                                                   |
| %status                     |              |  Returns the status of the current Glue session including its duration, configuration and executing user / role.                                          |
| %stop_session               |              |  Stops the current session.                                                                                                                               |
| %list_sessions              |              |  Lists all currently running sessions by name and ID.                                                                                                     |

In [None]:
%connections acc-day-glue-vpc-conn

In [None]:
import sys
import boto3
import logging
import json
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import uuid
from datetime import datetime

In [None]:
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

In [None]:
# define logging
MSG_FORMAT = '%(asctime)s %(levelname)s %(name)s: %(message)s'
DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S'
logging.basicConfig(format=MSG_FORMAT, datefmt=DATETIME_FORMAT)
logger = logging.getLogger("default-logger")
logger.setLevel(logging.INFO)

In [None]:
startTimes={}
def startStopwatch():    
    id=str(uuid.uuid4())
    startTimes[id]=datetime.now()
    print(id)
    print(startTimes)
    return id

def stopStopwatch(uuid):
    endTime=datetime.now()
    startTime=startTimes[uuid]
    print("found="+str(startTime))
    return str(endTime-startTime)

def getSecret(name, version=None):
    secrets_client = boto3.client("secretsmanager")
    kwargs = {'SecretId': name}
    if version is not None:
        kwargs['VersionStage'] = version
    response = secrets_client.get_secret_value(**kwargs)
    return response

def getParameter(name):
    ssmClient = boto3.client("ssm")
    response = ssmClient.get_parameter(Name=name, WithDecryption=True)    
    return response

In [None]:
# get secret for aurora
rdsSecretName = getParameter("acc-day-glue-tm-aurora-secret-name")["Parameter"]["Value"]
logger.info("getting secret for source db with name ["+rdsSecretName+"]...")
secretsManagerEntry = getSecret(rdsSecretName)
logger.info("here comes the SecretString...")
logger.info(secretsManagerEntry['SecretString'])
logger.info("db/username")
secretString=secretsManagerEntry['SecretString']
secret=json.loads(secretString)
logger.info(secret)
logger.info("username")
print(secret['username'])
# end - get secret

In [None]:
import pandas as pd
from awsglue.dynamicframe import DynamicFrame

data = [{ "job_name": "Job 231", "duration":10, "entry_time": datetime.now() }]
pandasDF = pd.DataFrame(data, columns=['job_name',"duration","entry_time"])
sparkDF=spark.createDataFrame(pandasDF) 
dynamicFrame=DynamicFrame.fromDF(sparkDF, glueContext, "dynamicFrame")

In [None]:
# define aurora write function
# https://docs.aws.amazon.com/glue/latest/dg/connection-defining.html
print("loading data to aurora...")
start=datetime.now()
glueDriverBucketName = getParameter("acc-day-glue-driver-bucket-name")["Parameter"]["Value"]
jdbcURL = "jdbc:" + "postgresql" + "://" + secret['host'] + ":" + str(secret['port']) + "/" + secret['dbname']
connectionOptions = {
"url": jdbcURL,
"dbtable": "perf_metrics",
"user": secret['username'],
"password": secret['password'],
"customJdbcDriverS3Path": "s3://"+glueDriverBucketName+"/postgresql-42.4.0.jar",
"customJdbcDriverClassName": "org.postgresql.jdbc3.Jdbc3ConnectionPool"}
logger.info("writing data to database...")
sinkFrame = glueContext.write_dynamic_frame.from_options(frame = dynamicFrame, connection_type="jdbc", connection_options=connectionOptions, transformation_ctx = "sinkFrame")
print("done loading data to postgre.")
end=datetime.now()
duration=end-start
print("duration="+str(duration))

In [None]:
id=startStopwatch()

In [None]:
stopStopwatch(id)

In [None]:
# Lab 31 - Merge Files

In [None]:
import boto3

def getParameter(name):
    ssmClient = boto3.client("ssm")
    response = ssmClient.get_parameter(Name=name, WithDecryption=True)    
    return response

In [None]:
s3Client = boto3.client('s3')
sourceBucketName = getParameter("acc-day-glue-trade-bucket-name")["Parameter"]["Value"]
print(sourceBucketName)
#all_objects = s3Client.list_objects(Bucket = sourceBucketName,Prefix="huge-by-ccy/") 
paginator = s3Client.get_paginator('list_objects_v2')
pages = paginator.paginate(Bucket=sourceBucketName, Prefix="huge-by-ccy/")
fullContents=""

In [None]:
#all_objects
pages

In [None]:
for page in pages:
    page["Contents"]
    #page
    #if obj.key.endswith('txt'):
    

In [None]:
for page in pages:
    for obj in page['Contents']:
        key=obj["Key"]
        print(key)
        s3Obj=s3Client.get_object(Bucket=sourceBucketName, Key=key)
        s3Contents=s3Obj["Body"].read().decode('utf-8')
        fullContents=fullContents+str(s3Contents)+"\n"

In [None]:
# write to s3
targetBucketName = getParameter("acc-day-glue-trade-bucket-name")["Parameter"]["Value"]
client = boto3.client('s3')
client.put_object(Body=fullContents, Bucket=targetBucketName, Key="merged.csv")
end=datetime.now()
print("actual duration was ["+str(end-start)+"].")