In [None]:
import os
import snowflake.connector
import boto3
import json
from dotenv import load_dotenv
load_dotenv()

# Connect to Snowflake
conn = snowflake.connector.connect(
    user=os.getenv('SNOWFLAKE_USER'),
    password=os.getenv('SNOWFLAKE_PASSWORD'),
    account=os.getenv('SNOWFLAKE_ACCOUNT'),
    role='ACCOUNTADMIN'
)

# Create a cursor
cur = conn.cursor()

# AWS session
session = boto3.Session(
    aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'),
    aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY'),
)

In [None]:
# Deploy stack
cf_client = session.client('cloudformation', region_name='eu-central-1')

stack_name = 'game-events-pipeline'
template_body = open('cf_kinesis_pipeline.yaml').read()
capabilities = ['CAPABILITY_AUTO_EXPAND', 'CAPABILITY_NAMED_IAM']

response = cf_client.create_stack(
    StackName=stack_name,
    TemplateBody=template_body,
    Capabilities=capabilities,
    Parameters=[
        {
            'ParameterKey': 'KinesisShards',
            'ParameterValue': '1'
        },
        {
            'ParameterKey': 'FirehoseInterval',
            'ParameterValue': '60'
        }
    ]
)
cf_outputs = {}
while True:
    stack_status = cf_client.describe_stacks(StackName=stack_name)['Stacks'][0]['StackStatus']
    if stack_status == 'CREATE_FAILED' or stack_status == 'ROLLBACK_IN_PROGRESS':
        events = cf_client.describe_stack_events(StackName=stack_name)
        error_event = next((event for event in events['StackEvents'] if event['ResourceStatus'] == 'CREATE_FAILED' or event['ResourceStatus'] == 'ROLLBACK_IN_PROGRESS'), None)
        error_message = error_event['ResourceStatusReason']
        print('Stack status failed. Error message:\n' + error_message)
        break
    elif stack_status == 'CREATE_COMPLETE':
        stack_outputs = cf_client.describe_stacks(StackName=stack_name)['Stacks'][0]['Outputs']
        for output in stack_outputs:
            key = output['OutputKey']
            value = output['OutputValue']
            cf_outputs[key] = value
        print('stack created')
        break
    
print(cf_outputs)

In [None]:
# Create a storage integration
s3_int_name = 's3_int_game_data'
query = f"""
CREATE OR REPLACE STORAGE INTEGRATION {s3_int_name}
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = S3
  ENABLED = TRUE
  STORAGE_AWS_ROLE_ARN = '{cf_outputs['RoleArn']}'
  STORAGE_ALLOWED_LOCATIONS = ('s3://{cf_outputs['BucketName']}');
"""
cur.execute(query)

# Retrieve the Snowflake external ID and user role
cur.execute(f"describe integration {s3_int_name}")
result = cur.fetchall()
for i in result:
    if i[0]=='STORAGE_AWS_IAM_USER_ARN':
        storage_aws_iam_user_arn = i[2]
    elif i[0]=='STORAGE_AWS_EXTERNAL_ID':
        storage_aws_external_id = i[2]

In [None]:
# Update the trust policy of the IAM role
iam = session.client('iam')
role_name = cf_outputs['RoleArn'].split('/')[-1]

assume_role_policy_document = json.dumps({
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": storage_aws_iam_user_arn,
            },
            "Action": "sts:AssumeRole",
            "Condition": {
                "StringEquals": {
                    "sts:ExternalId": storage_aws_external_id
                }
            }
        },
        {
            "Effect": "Allow",
            "Principal": {
                "Service": [
                    "lambda.amazonaws.com",
                    "firehose.amazonaws.com",
                    "kinesis.amazonaws.com"
                ]
            },
            "Action": "sts:AssumeRole"
        }
    ]
})

response = iam.update_assume_role_policy(
    RoleName=role_name,
    PolicyDocument=assume_role_policy_document
)
if response['ResponseMetadata']['HTTPStatusCode']==200:
    print('updated trust policy')

In [None]:
# create table
sf_table = "streaming_db.raw_dev.iap"
cur.execute(f"""
CREATE TABLE IF NOT EXISTS {sf_table} (
  event_version VARCHAR(10),
  event_id VARCHAR(36),
  event_name VARCHAR(20),
  event_timestamp TIMESTAMP_NTZ,
  app_version VARCHAR(10),
  event_data VARIANT,
  server_received_time NUMBER
);
""")

In [None]:
# create stage
cur.execute("USE DATABASE streaming_db;")
cur.execute("USE SCHEMA raw_dev;")

stage_name = 'iap_stage'
cur.execute(f"""
CREATE OR REPLACE STAGE {stage_name}
  URL = 's3://{cf_outputs['BucketName']}'
  STORAGE_INTEGRATION = {s3_int_name}
""")

In [None]:
# create pipe
pipe_name = "iap_pipe"
cur.execute(f"""
CREATE OR REPLACE PIPE {pipe_name}
  AUTO_INGEST = TRUE
AS
COPY INTO {sf_table}
FROM @streaming_db.raw_dev.{stage_name}
FILE_FORMAT = (type ='JSON',
    compression=gzip,
    strip_outer_array=true)
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE;
""")

In [None]:
# Retrieve sqs queue arn
cur.execute(f"describe pipe {pipe_name}")
result = cur.fetchall()
for i in result[0]:
    if str(i)[:3] == 'arn':
        arn_sqs_queue = i

# Set bucket notification
queue_config = {
    'Id': 'bucket_level_notification',
    'Queue': arn_sqs_queue,
    'Events': [
        's3:ObjectCreated:*',
    ],        
}

bucket_notification_config = {
    'QueueConfiguration': queue_config
}
s3 = session.client('s3')
s3.put_bucket_notification(
    Bucket=cf_outputs['BucketName'],
    NotificationConfiguration=bucket_notification_config
)
if response['ResponseMetadata']['HTTPStatusCode']==200:
    print('added bucket notification')

In [None]:
# Clean up SF
cur.execute(f"drop stage {stage_name}")
cur.execute(f"drop integration {s3_int_name}")
cur.execute(f"drop table {sf_table}")
cur.execute(f"drop pipe {pipe_name}")
print('cleaned snowflake')

In [None]:
# Clean up AWS
# Empty bucket before deleting
response = cf_client.delete_stack(
    StackName=stack_name
)

if response['ResponseMetadata']['HTTPStatusCode']==200:
    print('cleaned up AWS')

In [None]:
# Close the cursor and connection
cur.close()
conn.close()