In [68]:
import seaborn as sns
sns.set()

In [69]:
import boto3
import mycredentials

# AWS Glue

We have already used AWS Glue to generate metadata and extract a schema from our S3 data lakes. However, AWS Glue can be even more powerful by enabling us to create large scale Extraction-Transformation-Load (ETL) pipelines that grab, parse, and store data in an efficient manner. Glue is simply a serverless ETL tool that helps us do the following:

* **Extract**: Data extraction from several sources, such as business systems, sensors, databanks, or applications.
* **Transform**: This extracted data is converted into a format that various applications can use. 
* **Load**: The now transformed data then gets loaded into a target data storage system.

There are a some component terms associated with AWS Glue we should consider before we cover its implementation.

* **Tables**: These are not your typical relational database tables, but are instead metadata table definitions of data sources - not the data itself. It essentially provides the details for where the data is located.
* **Database**: A database in the AWS Glue Catalog is a container that holds tables. We use this to organize our tables into respective categories.
* **Data Catalog**: The data catalog holds the metadata which summarizes basic information about the processed data. This is where the tables and database live.
* **Crawler**: The crawler is used to retrieve data from the source using custom or built-in classifiers. This is what *crawls* data sources to generate a schema through a certain set of rules. 
* **Classifier**: A classifier is used to match data for the crawler. This is where data specifications can be made. 
* **Job**: the business logic that executes the ETL tasks.
* **Trigger**: Starts the ETL job at a specific time or event.

Ultimately, what AWS Glue allows us to do is create efficient ETL pipelines allowing us to store/access data in a variety of places (Redshift, S3, DynamoDB, etc.). 

![Mini Glue Architecture](https://dataincubator-course.s3.amazonaws.com/miniprojects/images/dwa_mini_glue_architecture.png)

Glue is also very comfortable handling streaming data through the usage of Spark Streaming. These jobs created with Glue will run continuously as they consume data from streaming sources like Apache Kafka or Kinesis Data Streams. After this job has extracted and transformed the data is can then be loaded into a data lake or database.

## Kinesis

[AWS Kinesis](https://aws.amazon.com/kinesis/) is a tool that makes it easy to collect, process, or analyze real-time streaming data. This tool behaves similar to Kafka. These data handling capabilities are provided by three modalities of AWS Kinesis:

* **Kinesis Data Streams**: A scalable streaming service that captures real-time data.
* **Kinesis Data Firehose**: Built off of data streams, Firehose can capture, transform, and load data streams in AWS data stores.
* **Kinesis Data Analytics**: Built off Firehose, this tool enables real-time data analytic processing with SQL or Apache Flink.

This data will be requested and handled through a Kinesis Data Stream producer that you will create.

### The Real-Time Data

We are going to use a Kinesis Stream in this example and get information about subreddits from the [Reddit API](https://www.reddit.com/dev/api/). You will need to generate an API key and create an application which can be done on this [Reddit page](https://www.reddit.com/login/?dest=https%3A%2F%2Fwww.reddit.com%2Fprefs%2Fapps). In order to get access you will need to specify an OAuth token through the `requests` library.

The goal of this project is to utilize the [AWS Glue `DynamicFrameReader`](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-dynamic-frame-reader.html) to read the streaming data being generated by Kinesis, format it, then upload it to an S3 data lake and a DynamoDB [Sink](https://en.wikipedia.org/wiki/Sink_(computing)#:~:text=In%20computing%2C%20a%20sink%2C%20event,from%20another%20object%20or%20function.&text=The%20word%20sink%20has%20been,and%20output%20in%20the%20industry.). There will be two separate jobs to handle both of these tasks but they will utilize the same Glue Schema. 

In [82]:
import requests
import pprint
import pandas as pd

We will primarily be dealing with new posts to the python subreddit. This will provide us with a stream of posts as they're made in real-time. This stream will need to be handled by a [Kinesis Producer](https://aws.amazon.com/blogs/big-data/snakes-in-the-stream-feeding-and-eating-amazon-kinesis-streams-with-python/) - similar to a Kafka Producer. 

When handling the information we are primarily concerned with the following data:
* `subreddit`
* `title`
* `selftext`
* `upvote_ratio`
* `ups`
* `downs`
* `score`

We will generate the following columns with Glue Job:
* `python_mentions` **INTEGER**: Number of times 'python' mentioned in `title` and `selftext`
* `title_length` **INTEGER**: Length of `title`
* `selftext_length` **INTEGER**: Length of `selftext`

These are the items we will persist to the S3 data lake and DynamoDB after being handled by a Glue job. It is important to realize with this stream we do not want to continuously push data to AWS Kinesis that it has been seen before. This filtering should be handled early on in the API request, here is an article that describes a bit of the [process](https://towardsdatascience.com/how-to-use-the-reddit-api-in-python-5e05ddfd1e5c).

The Kinesis data stream should only be handling the raw format of these. It should not handle any of the transformations because we want to reserve that for the Glue ETL process - take a second to think about why that may be the case. 

#### Here we create the kinesis stream

In [87]:
session = boto3.session.Session( 
    region_name=mycredentials.amazonregion,
    aws_access_key_id=mycredentials.amazonid,
    aws_secret_access_key=mycredentials.amazonkey
)

In [None]:
def create_kinesis(session, kinesisname):
    try:
        response = session.client('kinesis').create_stream(
            StreamName='kinesisname',
            ShardCount=1,
        )
        return response
    except Exception as e:
        print(e) 
kinesisname="redditstream"
response = create_kinesis(session,kinesisname)
response

####  redditstream.py
This is the producer for the Kinesis stream

In [None]:
import requests
import pandas as pd
from datetime import datetime
import mycredentials
import json
import time
import numpy as np
from kinesis_helper import KinesisStream
import uuid

# we use this function to convert responses to dataframes
def df_from_response(res):
    # initialize temp dataframe for batch of data in response
    df = pd.DataFrame()

    # loop through each post pulled from res and append to df
    for post in res.json()['data']['children']:
        df = df.append({
            'subreddit': post['data']['subreddit'],
            'title': post['data']['title'],
            'selftext': post['data']['selftext'],
            'upvote_ratio': post['data']['upvote_ratio'],
            'ups': post['data']['ups'],
            'downs': post['data']['downs'],
            'score': post['data']['score'],
            'created_utc': datetime.fromtimestamp(post['data']['created_utc']).strftime('%Y-%m-%dT%H:%M:%SZ'),
            'id': post['data']['id'],
            'kind': post['kind']
        }, ignore_index=True)

    return df

# authenticate API
# authorization after creating reddit application
auth = requests.auth.HTTPBasicAuth(mycredentials.redditclient, mycredentials.redditsecret)

# provide information for access token
data = {
    'grant_type': 'password',
    'username': mycredentials.reddituser,
    'password': mycredentials.redditpass
}
headers = {
    'User-Agent': 'streamtest/0.0.1',
}

# send our request for access token
res = (requests.post('https://www.reddit.com/api/v1/access_token', auth=auth,
    headers=headers, data=data))
# convert response to JSON and pull 'access_token' value
TOKEN = res.json()['access_token']
# add authorization to header
headers = {**headers, **{'Authorization': f'bearer {TOKEN}'}}

# Token is valid for two hours - keep it that way
requests.get('https://oauth.reddit.com/api/v1/me', headers=headers)

# initialize dataframe and parameters for pulling data in loop
new_df = pd.DataFrame()
params = {'limit': 100}
stream = KinesisStream('redditstream')


# we stream the newests posts
while True:

    # make request
    res = requests.get("https://oauth.reddit.com/r/python/new",
                       headers=headers,
                       params=params)

    # get dataframe from response
    new_df = df_from_response(res)
    if not new_df.empty:
        # take the final row (newest entry)
        row = new_df.iloc[0]
        # create fullname
        fullname = row['kind'] + '_' + row['id']
        # add/update fullname in params
        #we use before to get newest submissions
        params['before'] = fullname
        new_df['Data'] = new_df.apply(lambda x: json.dumps(x.to_dict(), separators= (',',':')), axis=1)
        new_df['PartitionKey'] = [str(uuid.uuid4()) for _ in range(len(new_df.index))]
        datadf = new_df[['Data','PartitionKey']].to_dict(orient="records")
        result = stream.send_stream(data=datadf)
        print(datadf, result)
        #add sleep time to limit api calls and prevent timeouts
        time.sleep(1)
    #wait for new posts.
    else:
        time.sleep(10)
        print("waiting for new posts")

#### kinesis_helper.py
This has the producer class for kinesis

In [None]:
import json, uuid, boto3
import mycredentials

class KinesisStream(object):

    def __init__(self, stream):
        self.stream = stream

    def _connected_client(self):
        """ Connect to Kinesis Streams """
        return boto3.client('kinesis',
                            region_name=mycredentials.amazonregion,
                            aws_access_key_id=mycredentials.amazonid,
                            aws_secret_access_key=mycredentials.amazonkey)

    def send_stream(self, data, partition_key=None):

        #we send list of records to kinesis

        client = self._connected_client()
        response = client.put_records(
            Records=data,
           StreamName=self.stream
        )

        return response


### Create Data Catalog

Instead of using a Glue Crawler, create a Glue Data Catalog, Database, and Table that contains the metadata of the data being provided from the Kinesis stream you created. It is important to keep the types consistent and ensure that the types defined in Glue are the format we want those data columns to take. 

In [92]:
def create_glue_db(session, glue_db_name):
    try:
        glue_client = session.client('glue')
        response = glue_client.create_database(
            DatabaseInput={
                'Name': glue_db_name
            }
        )
        return response
    
    except Exception as e:
        print(e)   
kin = session.client('kinesis')
kin_arn = kin.describe_stream(StreamName=kinesisname)['StreamDescription']['StreamARN']
kin_arn = create_glue_kinesis_table(session, kin_arn)        

An error occurred (AlreadyExistsException) when calling the CreateTable operation: Table already exists.


In [93]:
# Create Glue Data Catalog and Database
dbname="reddit"
response = create_glue_db(session,"reddit")
response

An error occurred (AlreadyExistsException) when calling the CreateDatabase operation: Database already exists.


In [101]:
# Attach your Reddit Kinesis Data Stream to AWS Glue
def create_glue_kinesis_table(session,kinesisarn):
    try:
        response = session.client('glue').create_table(
            DatabaseName='reddit',
            TableInput={
                'Name': 'redditstream',
                'Owner': 'string',
                'Retention': 123,
                "StorageDescriptor": {
                    "Columns": [
                            {
                                "Name": "subreddit",
                                "Type": "string",
                                "Comment": ""
                            },
                            {
                                "Name": "title",
                                "Type": "string",
                                "Comment": ""
                            },
                            {
                                "Name": "selftext",
                                "Type": "string",
                                "Comment": ""
                            },
                            {
                                "Name": "upvote_ratio",
                                "Type": "double",
                                "Comment": ""
                            },
                            {
                                "Name": "ups",
                                "Type": "double",
                                "Comment": ""
                            },
                            {
                                "Name": "downs",
                                "Type": "double",
                                "Comment": ""
                            },
                            {
                                "Name": "score",
                                "Type": "double",
                                "Comment": ""
                            },
                            {
                                "Name": "created_utc",
                                "Type": "string",
                                "Comment": ""
                            },
                            {
                                "Name": "id",
                                "Type": "string",
                                "Comment": ""
                            },
                            {
                                "Name": "kind",
                                "Type": "string",
                                "Comment": ""
                            }
                        ],
                    "Location": "redditstream",
                    "InputFormat": "org.apache.hadoop.mapred.TextInputFormat",
                    "OutputFormat": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
                    "Compressed": False,
                    "NumberOfBuckets": 0,
                    "SerdeInfo": {
                        "Name": "json",
                        "SerializationLibrary": "org.openx.data.jsonserde.JsonSerDe",
                        "Parameters": {
                            "paths": "created_utc,downs,id,kind,score,selftext,subreddit,title,ups,upvote_ratio"
                        }
                    },
                    "BucketColumns": [],
                    "SortColumns": [],
                    "Parameters": {
                        "streamARN": "arn:aws:kinesis:us-east-1:496180498631:stream/redditstream",
                        "typeOfData": "kinesis"
                    },
                    "SkewedInfo": {},
                    "StoredAsSubDirectories": False
                },
                "Parameters": {
                    "classification": "json"
                }
            }
                    
        )
        return response
    except Exception as e:
        print(e) 
response = create_glue_kinesis_table(session,kin_arn)
response

An error occurred (AlreadyExistsException) when calling the CreateTable operation: Table already exists.


### S3 Glue Job 

Now that Glue knows the format the streaming data is going to take we can begin creating a job that will transform the data and eventually load it into an S3 data lake and Dynamo database.

Remember, we are going to need a IAM role that has access to the AWS resources that are being interacted with - follow the principle of least privileges.

An S3 bucket will be used to store Glue scripts and logs - create one.

In [102]:
# Create s3 bucket
def create_bucket(session, bucket_name):
    """Creates an AWS S3 bucket in the default region"""
    s3_resource = session.resource('s3')
    try:
        response = s3_resource.create_bucket(
            Bucket=bucket_name,
            #CreateBucketConfiguration={'LocationConstraint': 'us-east-1'}
        )
        
    except Exception as e:
        print(e)
    
    else:
        bucket = s3_resource.Bucket(bucket_name)
        return bucket
bucket_name = 'datalakeminirs'
bucket = create_bucket(session, bucket_name)
s3_resource = session.resource('s3')
bucket = s3_resource.Bucket(bucket_name)

In [103]:
# Create Role
def create_glue_role(session, role_name, policies):
    try:
        iam_client = session.client('iam')
        policy_document = json.dumps({
            "Version": "2012-10-17",
            "Statement": [
                {
                  "Effect": "Allow",
                  "Principal": {
                    "Service": "glue.amazonaws.com"
                  },
                  "Action": "sts:AssumeRole"
                }
            ]
        })
        role = iam_client.create_role(
            RoleName=role_name,
            AssumeRolePolicyDocument=policy_document,
        )
        for policy in policies:
            response = iam_client.attach_role_policy(
                RoleName=role_name, 
                PolicyArn=policy
            ) 
        return role    
            
    except Exception as e:
        print(e)
role_name = 'myGlueS3'
policies = [
    'arn:aws:iam::aws:policy/AmazonS3FullAccess',
    'arn:aws:iam::aws:policy/AWSLakeFormationDataAdmin',
    'arn:aws:iam::aws:policy/CloudWatchFullAccess',
    'arn:aws:iam::aws:policy/AmazonKinesisFullAccess',
    'arn:aws:iam::aws:policy/GlueConsoleFullAccess',
    'arn:aws:iam::aws:policy/AwsGlueDataBrewFullAccessPolicy',
]

role = create_glue_role(session, role_name, policies)
print(role)

An error occurred (EntityAlreadyExists) when calling the CreateRole operation: Role with name myGlueS3 already exists.
None


##### Create S3 Glue Job Script

We are going to create a AWS Glue pipeline that will hydrate a data lake that is stored on S3. This Glue Script should accomplish the following:

* Process data through a [`GlueContext`](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html), [`DynamicFrame`](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-dynamic-frame.html), and Spark Streaming.
* Utilize `ApplyMapping` to correctly type each data column.
* Count total number of times python was mentioned in the `title` and `selftext`.
* Count total length of `title` and `selftext`.
* Create an S3 path with the ingestion year, month, day, and hour for the S3 Data Lake.

***Bonus***: Figure out how to batch the data together for this S3 data stream

In [None]:
# here is the gluejob script
import sys
from pyspark.sql.types import *
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import pyspark.sql.functions as F
from pyspark.sql.functions import col, size, split, lower
from pyspark.sql import DataFrame, Row
import datetime
from awsglue import DynamicFrame

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)



# read kinesis stream from data catalog
dataframe_KinesisStream_node1 = glueContext.create_data_frame.from_catalog(
    database="reddit",
    table_name="redditstream",
    additional_options={"startingPosition": "earliest", "inferSchema": "false"},
    transformation_ctx="dataframe_KinesisStream_node1",
)

def processBatch(data_frame, batchId):
    if data_frame.count() > 0:
        #add additional columns
        df1 = (data_frame.withColumn("length_title", F.length("title"))
            .withColumn("length_selftext", F.length("selftext"))
            .withColumn('python_mentions', F.size(F.split(F.lower(F.col("title")), r"python")) - 1 +
                F.size(F.split(F.lower(F.col("selftext")), r"python")) - 1)
            )
        KinesisStream_node1 = DynamicFrame.fromDF(
            df1, glueContext, "from_data_frame"
        )
        #apply corerct data types
        apply_mapping = ApplyMapping.apply(frame = KinesisStream_node1, mappings = [ \
            ("subreddit", "string", "subreddit", "string"), \
            ("title", "string", "title", "string"), \
            ("selftext", "string", "selftext", "string"), \
            ("upvote_ratio", "double", "upvote_ratio", "double"), \
            ("ups", "double", "ups", "integer"), \
            ("downs", "double", "downs", "integer"), \
            ("score", "double", "score", "integer"), \
            ("created_utc", "string", "screated_utc", "string"), \
            ("id", "string", "id", "string"), \
            ("kind", "string", "kind", "string"), \
            ("length_title", "integer", "length_title", "integer"), \
            ("length_selftext", "integer", "length_selftext", "integer"), \
            ("python_mentions", "integer", "python_mentions", "integer")],\
            transformation_ctx = "apply_mapping")
        df2 = apply_mapping.toDF()
        #add ingestion time to separate into s3 ingestion folders
        KinesisStream_node2 =(DynamicFrame.fromDF(
            glueContext.add_ingestion_time_columns(df2, "hour"),
            glueContext,
            "from_data_frame",)
            )

        S3bucket_node3_path = "s3://datalakeminirs/"
        #create Data sink
        S3bucket_node3 = glueContext.getSink(
            path=S3bucket_node3_path,
            connection_type="s3",
            updateBehavior="LOG",
            partitionKeys=["ingest_year", "ingest_month", "ingest_day", "ingest_hour"],
            enableUpdateCatalog=True,
            transformation_ctx="S3bucket_node3",
        )
        
        S3bucket_node3.setCatalogInfo(
            catalogDatabase="reddit", catalogTableName="output"
        )
        S3bucket_node3.setFormat("json")
        S3bucket_node3.writeFrame(KinesisStream_node2)


glueContext.forEachBatch(
    frame=dataframe_KinesisStream_node1,
    batch_function=processBatch,
    options={
        "windowSize": "100 seconds",
        "checkpointLocation": args["TempDir"] + "/" + args["JOB_NAME"] + "/checkpoint/",
    },
)
job.commit()

In [None]:
# here we create the glue job
glue = session.client('glue')
glue_job_name = 'r5'
script_path = 's3://datalakeminirs/gluejob.py'

response = glue.create_job(
    Name=glue_job_name,
    Description='kinesis stream',
    Role=role,
    ExecutionProperty={
    'MaxConcurrentRuns': 2
        },
    Command={
    'Name': 'gluestreaming',
    'ScriptLocation': script_path,
    'PythonVersion': '3'
        },
    MaxRetries=0,
    Timeout=1440,
    GlueVersion='3.0',
    NumberOfWorkers=2,
    WorkerType='G.1X'
    )
response = glue.start_job_run(
    JobName=glue_job_name
)

The Kinesis data will be handled using Spark, so make sure to specify the use of Spark Streaming [boto3 command](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.create_job) when creating the Glue job.

The function will also need allow the specification of an `outputPath` - the S3 path where the final aggregations are persisted. Think about how you can pass these arguments to our S3 script via boto3.

## Athena & QuickSight

Athena, as you have seen before, is a fast and easy query engine that utilizes the AWS Glue data Catalog to store and retrieve metadata. We can also use QuickSight on top of Athena as our [consumption layer](https://blog.starburst.io/technical-blog/consumption-layer-101).

QuickSight will use the information gathered from Athena and allow us to create a layered dashboard that can provide various visualization tools. This consumption layer can act as the intermediary between data engineers and analyst. 

In [91]:
# Connect to Athena
import re
def athena_query(client, params):
    response = client.start_query_execution(
        QueryString=params['query'],
        QueryExecutionContext={
            'Database': params['database']
        },
        ResultConfiguration={
            'OutputLocation': 's3://' + params['bucket'] + '/output'
        }
    )
    return response
    
    
def athena_fetch_query(session, params):
    athena = session.client('athena')
    execution = athena_query(athena, params)
    execution_id = execution['QueryExecutionId']
    completed = False
    
    while not completed:
        response = athena.get_query_execution(
            QueryExecutionId=execution_id
        )

        state = response['QueryExecution']['Status']['State']
        print('Query state:', state)
        if state == 'FAILED':
            print('Request Failed')
            return None
        elif state == 'SUCCEEDED':
            s3_path = response['QueryExecution']['ResultConfiguration']['OutputLocation']
            filename = re.findall('.*\/(.*)', s3_path)[0]
            return filename

def get_s3_file(session, params, file_path):
    try:
        s3 = session.resource('s3')
        file = s3.Bucket(params['bucket']).Object(file_path).get()
        return file
    
    except Exception as e:
        print(e)

params = {
    'database': "reddit",
    'bucket': "datalakeminirs",
    'query': 'SELECT * FROM output;'
}

file_name = athena_fetch_query(session, params)
file_path = 'output/' + str(file_name)
results_file = get_s3_file(session, params, file_path)

Query state: QUEUED
Query state: QUEUED
Query state: QUEUED
Query state: QUEUED
Query state: RUNNING
Query state: RUNNING
Query state: RUNNING
Query state: RUNNING
Query state: RUNNING
Query state: RUNNING
Query state: RUNNING
Query state: RUNNING
Query state: RUNNING
Query state: RUNNING
Query state: RUNNING
Query state: RUNNING
Query state: RUNNING
Query state: RUNNING
Query state: RUNNING
Query state: RUNNING
Query state: RUNNING
Query state: SUCCEEDED


In [None]:
# Develop a realtime QuickSight Dashboard
https://us-east-1.quicksight.aws.amazon.com/sn/dashboards/f9cfc4e6-0bce-4aaa-b179-736cc387ed84/views/e4331ea7-2564-45f4-be57-2cbbdde9d9b9

## Sharing the Results

After meaningful insights have been generated from the Reddit data you have interacted with, share a QuickSight Dashboard link in the Slack Channel.

# Understanding Glue ETL vs EMR

These are some of the best ETL tools on the market but they are interchangeable. AWS Glue has gained more popularity over the last year due to its ease of use and additional features. However, this doesn't mean it should be your first choice when developing an ETL pipeline. AWS Glue is serverless while EMR requires a bit more infrastructure to begin handling big data operations. Since Glue is serverless this leads to increased costs, opposed to EMR, which is cheaper when performing comparable costs. 

EMR is less scalable than Glue since it works on your onsite platform. If you have more flexible requirements, Glue is probably a better option. But if there are fixed requirements for your setup then EMR is the better choice because of costs.

One last comparison is the EMR temp file storage through HDFS or S3. This can not be done on Glue due to its inherent serverless infrastructure. This, in turn, affects the performance of the Glue. This allows EMR to run the database faster and enhances the overall system performance.

For more information regarding the comparison of these ETL tools check out [this link](https://medium.com/swlh/aws-glue-vs-emr-433b53872b30).

*Copyright &copy; 2022 Pragmatic Institute. This content is licensed solely for personal use. Redistribution or publication of this material is strictly prohibited.*