# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [None]:
%help

####  Run this cell to set up and start your interactive session.


In [6]:
%idle_timeout 2880
%glue_version 3.0
%worker_type G.1X
%number_of_workers 5
%connections futures-historical-glue-redshift-connection

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 0.37.0 
Current idle_timeout is 2800 minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 3.0
Previous worker type: G.1X
Setting new worker type to: G.1X
Previous number of workers: 5
Setting new number of workers to: 50
Connections to be included:
futures-historical-glue-redshift-connection


In [1]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

from pyspark.sql.functions import *
from awsglue.dynamicframe import DynamicFrame
from awsglue.dynamicframe import DynamicFrame
from awsglue.context import GlueContext
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::378248803116:role/Admin-All-Access-Glue
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 50
Session ID: e4d595e6-474c-4604-88e1-96580a57e722
Job Type: glueetl
Applying the following default arguments:
--glue_kernel_version 0.37.0
--enable-glue-datacatalog true
Waiting for session e4d595e6-474c-4604-88e1-96580a57e722 to get into ready status...
Session e4d595e6-474c-4604-88e1-96580a57e722 has been created.



#### Example: Create a DynamicFrame from a table in the AWS Glue Data Catalog and display its schema


In [2]:
from pyspark.sql.functions import *
from awsglue.dynamicframe import DynamicFrame
import boto3




In [3]:
# Get the S3 path to the Parquet files from the Glue job arguments
S3bucket = "s3://futures-historical-data"

# Use boto3 to list the files in the S3 bucket
s3 = boto3.resource("s3")
bucket_name = S3bucket[5:].split("/")[0]
prefix = "/".join(S3bucket[5:].split("/")[1:])
s3_files = [f"s3://{bucket_name}/{obj.key}" for obj in s3.Bucket(bucket_name).objects.filter(Prefix=prefix)]
parquet_files = [f for f in s3_files if f.endswith(".parquet")]




In [16]:
for file in parquet_files:
    # Load the Parquet file into a DynamicFrame
    dynamic_frame = glueContext.create_dynamic_frame_from_options(
        connection_type="s3",
        format="parquet",
        connection_options={
            "paths": [file]
        },
        format_options={
            "compression": "snappy"
        }
    )

    # Script generated for node Adjust Column Names
    AdjustColumnNames_node2 = ApplyMapping.apply(
        frame=dynamic_frame,
        mappings=[
            ("<DATE>", "string", "Date", "varchar"),
            ("<TIME>", "string", "Time", "varchar"),
            ("<OPEN>", "double", "price_open", "double"),
            ("<HIGH>", "double", "price_high", "double"),
            ("<LOW>", "double", "price_low", "double"),
            ("<CLOSE>", "double", "price_close", "double"),
            ("<TICKVOL>", "bigint", "tickvol", "bigint"),
            ("<VOL>", "bigint", "vol", "bigint"),
            ("<SPREAD>", "bigint", "spread", "bigint"),
            ("<INSTRUMENT>", "string", "instrument", "varchar")
        ],
        transformation_ctx="AdjustColumnNames_node2",
    )

    # Script generated for node Concatenate Columns
    ConcatenateColumns_node1681862381775 = DynamicFrame.fromDF(
        AdjustColumnNames_node2.toDF().withColumn("datetime", concat_ws(' ', 'Date', 'Time')),
        glueContext,
        'ConverttoDateTime_node1681856295074')
    
    # Script generated for node Convert to DateTime
    ConverttoDateTime_node1681856295074 = DynamicFrame.fromDF(
        ConcatenateColumns_node1681862381775.toDF().withColumn("datetime", to_timestamp(col('datetime'), 'yyyy.MM.dd HH:mm:ss')),
        glueContext,
        'ConverttoDateTime_node1681856295074')

    # Script to drop Date and Time columns
    DropDateAndTimeColumns = DynamicFrame.fromDF(
        ConverttoDateTime_node1681856295074.toDF().drop('Date', 'Time'),
        glueContext,
        'DropDateAndTimeColumns')

    # Generate a unique table name based on the Parquet file name
    full_table_name = "public.futures_" + file.split("/")[-1].split(".")[0]
    table = full_table_name.split('.')[1]
    
    # Write data to Redshift Table
    AmazonRedshift_node3 = glueContext.write_dynamic_frame.from_options(
        frame=DropDateAndTimeColumns,
        connection_type="redshift",
        connection_options={
            "postactions": f"BEGIN; MERGE INTO {full_table_name} USING public.futures_temp_7a8472 ON {table}.datetime = futures_temp_7a8472.datetime WHEN MATCHED THEN UPDATE SET price_open = futures_temp_7a8472.price_open, price_high = futures_temp_7a8472.price_high, price_low = futures_temp_7a8472.price_low, price_close = futures_temp_7a8472.price_close, tickvol = futures_temp_7a8472.tickvol, vol = futures_temp_7a8472.vol, spread = futures_temp_7a8472.spread, instrument = futures_temp_7a8472.instrument, datetime = futures_temp_7a8472.datetime WHEN NOT MATCHED THEN INSERT VALUES (futures_temp_7a8472.price_open, futures_temp_7a8472.price_high, futures_temp_7a8472.price_low, futures_temp_7a8472.price_close, futures_temp_7a8472.tickvol, futures_temp_7a8472.vol, futures_temp_7a8472.spread, futures_temp_7a8472.instrument, futures_temp_7a8472.datetime); DROP TABLE public.futures_temp_7a8472; END;",
            "redshiftTmpDir": "s3://aws-glue-assets-378248803116-us-east-1/temporary/",
            "useConnectionProperties": "true",
            "dbtable": "public.futures_temp_7a8472",
            "connectionName": "futures-historical-glue-redshift-connection",
            "preactions": f"CREATE TABLE IF NOT EXISTS {full_table_name} (price_open DOUBLE PRECISION, price_high DOUBLE PRECISION, price_low DOUBLE PRECISION, price_close DOUBLE PRECISION, tickvol BIGINT, vol BIGINT, spread BIGINT, instrument varchar, datetime TIMESTAMP); DROP TABLE IF EXISTS public.futures_temp_7a8472; CREATE TABLE public.futures_temp_7a8472 (price_open DOUBLE PRECISION, price_high DOUBLE PRECISION, price_low DOUBLE PRECISION, price_close DOUBLE PRECISION, tickvol BIGINT, vol BIGINT, spread BIGINT, instrument varchar, datetime TIMESTAMP);",
        },
        transformation_ctx="AmazonRedshift_node3",
    )
job.commit()

{"<DATE>": "2001.04.01", "<TIME>": "17:30:00", "<OPEN>": 0.876, "<HIGH>": 0.876, "<LOW>": 0.876, "<CLOSE>": 0.876, "<TICKVOL>": 1, "<VOL>": 11, "<SPREAD>": 0, "<INSTRUMENT>": "6E"}


In [3]:
### THIS IS TO TEST THE REDSHIFT CONNECTION ###
### THIS WILL TAKE A WHILE TO LOAD BASED ON THE TABLE SIZE ###

# from pyspark.sql import SparkSession

# # Replace the values below with your own Redshift connection details
# jdbc_url = "jdbc:redshift://futures-historical-cluster.cxdeoy4msdli.us-east-1.redshift.amazonaws.com:5439/futures-historical-redshift-database"
# user = "awsuser"
# password = "ceu-BRM.yqu5dmq9wfj"

# # Create a SparkSession and set the JDBC connection properties
# spark = SparkSession.builder.appName("myApp").getOrCreate()
# df = spark.read \
#   .format("jdbc") \
#   .option("url", jdbc_url) \
#   .option("dbtable", "public.futures") \
#   .option("user", user) \
#   .option("password", password) \
#   .option("driver", "com.amazon.redshift.jdbc.Driver") \
#   .load()

# # Print the DataFrame schema and show a sample of rows
# df.printSchema()
# df.show(5)


root
 |-- price_open: double (nullable = true)
 |-- price_high: double (nullable = true)
 |-- price_low: double (nullable = true)
 |-- price_close: double (nullable = true)
 |-- tickvol: integer (nullable = true)
 |-- vol: integer (nullable = true)
 |-- spread: integer (nullable = true)
 |-- instrument: string (nullable = true)
 |-- datetime: timestamp (nullable = true)

+----------+----------+---------+-----------+-------+----+------+----------+-------------------+
|price_open|price_high|price_low|price_close|tickvol| vol|spread|instrument|           datetime|
+----------+----------+---------+-----------+-------+----+------+----------+-------------------+
|    0.8756|    0.8756|   0.8756|     0.8756|   null|null|  null|        6E|2001-04-01 17:31:00|
|    0.8756|    0.8756|   0.8754|     0.8754|   null|null|  null|        6E|2001-04-01 17:50:00|
|    0.8764|    0.8764|   0.8764|     0.8764|   null|null|  null|        6E|2001-04-01 18:20:00|
|     0.876|     0.876|    0.876|      0.876

hello
