# Analytics On AWS workshop

Take your time to read through the instructions provided in this notebook.

###### Learning Objectives
- Understand how to interactively author Glue ETL scripts using Glue Dev Endpoints & SageMaker notebooks
- Use boto3 to call Glue APIs to do Glue administrative and operational activities

**Execute the code blocks one cell at a time**

###### Import Libraries 
- In this notebook we will be using the following classes, here are some of the important ones
    - SparkContext - Main entry point for Spark functionality. A SparkContext represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
    - GlueContext - Wraps the Apache SparkSQL SQLContext object, and thereby provides mechanisms for interacting with the Apache Spark platform
    - boto3 - AWS's Python SDK, we will be using this library to make call to AWS APIs
    - awsglue - AWS's pyspark library which provides the needed 
    
    
# Here data transform that we we will perform

<img src="https://unnik.s3.amazonaws.com/public-files/unnik-lab-guides/aws-summit-2019/techfest/glue-generate-diagram.png" alt="drawing" width="600"/>


#### Execute Code 🔻

In [1]:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import boto3
import time


Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1654136102606_0001,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Exploring your raw dataset
- In this step you will:
    - Create a dynamic frame for your 'raw' table from AWS Glue catalog
    - Explore the schema of the datasets
    - Count rows in raw table
    - View a sample of the data 

## Glue Dynamic Frames Basics

- AWS Glue's dynamic data frames is a powerful data structure.
- They provide a precise representation of the underlying semi-structured data, especially when dealing with columns or fields with varying types.
- They also provide powerful primitives to deal with nesting and unnesting.
- A dynamic record is a self-describing record: Each record encodes its columns and types, so every record can have a schema that is unique from all others in the dynamic frame.
- For ETL, we needed somthing more dynamic, hence we created the Glue Dynamic DataFrames. DDF are an implementaion of DF that relaxes the requiements of having a rigid schema. Its designed for semi-structured data.
- It maintains a schema per-record, its easy to restucture, tag and modify. 


#### Read More : https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-dynamic-frame.html

#### Execute Code 🔻


In [2]:

glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Crate dynamic frame from Glue catalog
- In this block we are using gluecontext to create a new dynamicframe from glue catalog
- 

Other ways to create dynamicframes in Glue:
- create_dynamic_frame_from_rdd
- create_dynamic_frame_from_catalog
- create_dynamic_frame_from_options

#### Read More:https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html

#### Execute Code 🔻


In [3]:
raw_data = glueContext.create_dynamic_frame.from_catalog(database="analyticsworkshopdb", table_name="raw")

reference_data = glueContext.create_dynamic_frame.from_catalog(database="analyticsworkshopdb", table_name="reference_data")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# View schema
- In this step we view the schema of the dynamic frame
- printSchema( ) – Prints the schema of the underlying DataFrame.

#### Execute Code 🔻

In [4]:
raw_data.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
|-- uuid: string
|-- device_ts: string
|-- device_id: int
|-- device_temp: int
|-- track_id: int
|-- activity_type: string
|-- partition_0: string
|-- partition_1: string
|-- partition_2: string
|-- partition_3: string

In [5]:
reference_data.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
|-- track_id: string
|-- track_name: string
|-- artist_name: string

# Count records
- In this step we will count the number of records in the dataframe
- count( ) – Returns the number of rows in the underlying DataFrame

#### Execute Code 🔻

In [6]:
print(f'raw_data (count) = {raw_data.count()}')
print(f'reference_data (count) = {reference_data.count()}')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

raw_data (count) = 14000
reference_data (count) = 100

# Show sample records
- You can use to method to show samples of data in the datasets
- use show() method to display a sample of records in the frame
- here were are showing the top 5 records in the DF


#### Execute Code 🔻

In [7]:
raw_data.toDF().show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+---------+-----------+--------+-------------+-----------+-----------+-----------+-----------+
|                uuid|           device_ts|device_id|device_temp|track_id|activity_type|partition_0|partition_1|partition_2|partition_3|
+--------------------+--------------------+---------+-----------+--------+-------------+-----------+-----------+-----------+-----------+
|c02e3a10-9db1-496...|2022-06-02 01:45:...|       45|         40|      15|    Traveling|       2022|         06|         02|         01|
|c6d3341e-1f2f-460...|2022-06-02 01:45:...|       37|         28|      13|    Traveling|       2022|         06|         02|         01|
|08e87e81-1a57-421...|2022-06-02 01:45:...|       11|         32|      25|    Traveling|       2022|         06|         02|         01|
|480d03af-4c8c-499...|2022-06-02 01:45:...|       38|         28|      21|    Traveling|       2022|         06|         02|         01|
|a1085f7f-5c27-4a7...|2022-06-02 01:45:..

In [8]:
reference_data.toDF().show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+-----------+--------------------+
|track_id| track_name|         artist_name|
+--------+-----------+--------------------+
|       1| God's Plan|               Drake|
|       2|Meant To Be|Bebe Rexha & Flor...|
|       3|    Perfect|          Ed Sheeran|
|       4|    Finesse|Bruno Mars & Cardi B|
|       5|     Psycho|Post Malone Featu...|
+--------+-----------+--------------------+
only showing top 5 rows

# Using Spark SQL to explore data

- Having the ability of 
- In Glue, you can leverage Spark's SQL engine to run SQL queries over your data
- If you have a DynamicFrame called my_dynamic_frame, you can use the following snippet to convert the DynamicFrame to a DataFrame, issue a SQL query, and then convert back to a DynamicFrame

### Spark SQL - Filtering & Counting - activity_type = Running
- In this block, we will filter & count the number of events with activity_type = Running

#### Execute Code 🔻

In [9]:
# Adding raw_data as a temporary table in sql context for spark

raw_data.toDF().createOrReplaceTempView("temp_raw_data")

# Running the SQL statement which 
runningDF = spark.sql("select * from temp_raw_data where activity_type = 'Running'")
print(f'Running (count): {runningDF.count()}')

runningDF.show(5)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Running (count): 1376
+--------------------+--------------------+---------+-----------+--------+-------------+-----------+-----------+-----------+-----------+
|                uuid|           device_ts|device_id|device_temp|track_id|activity_type|partition_0|partition_1|partition_2|partition_3|
+--------------------+--------------------+---------+-----------+--------+-------------+-----------+-----------+-----------+-----------+
|29d6c9af-c4a7-4cb...|2022-06-02 01:45:...|       13|         32|      23|      Running|       2022|         06|         02|         01|
|9b25e6b5-44be-4d8...|2022-06-02 01:45:...|       39|         34|      24|      Running|       2022|         06|         02|         01|
|ae9ae1c3-6d68-422...|2022-06-02 01:45:...|       34|         28|      29|      Running|       2022|         06|         02|         01|
|3ed7e3b9-9495-452...|2022-06-02 01:45:...|       41|         32|      19|      Running|       2022|         06|         02|         01|
|6be5a500-15e8-41c.

### Spark SQL - Filtering & Counting - activity_type = Working
- In this block, we will filter & count the number of events with activity_type = Working

#### Execute Code 🔻

In [10]:
# Running the SQL statement which 
workingDF = spark.sql("select * from temp_raw_data where activity_type = 'Working'")
print(f'Working (count): {workingDF.count()}')

workingDF.show(5)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Working (count): 2779
+--------------------+--------------------+---------+-----------+--------+-------------+-----------+-----------+-----------+-----------+
|                uuid|           device_ts|device_id|device_temp|track_id|activity_type|partition_0|partition_1|partition_2|partition_3|
+--------------------+--------------------+---------+-----------+--------+-------------+-----------+-----------+-----------+-----------+
|b664df51-555e-498...|2022-06-02 01:45:...|       29|         28|      26|      Working|       2022|         06|         02|         01|
|10444bd1-dc5b-46e...|2022-06-02 01:45:...|       45|         40|      30|      Working|       2022|         06|         02|         01|
|3fd228a3-84ec-410...|2022-06-02 01:45:...|       39|         32|      11|      Working|       2022|         06|         02|         01|
|569d43db-38e5-4de...|2022-06-02 01:45:...|       26|         34|      26|      Working|       2022|         06|         02|         01|
|b060f5f6-78e3-4c4.

### Glue Transforms - Filtering & Counting - activity_type = Running
- Now, lets perform the same operation using Glue inbuilt transforms
- We will use the **filter** transform
- Filter() - Selects records from a DynamicFrame and returns a filtered DynamicFrame.
- You specify a function, such as a function, which determines whether a record is output (function returns true) or not (function returns false).
- In this function, we are filtering on the condition activity_type == 'Running'

#### Read More: https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-transforms-filter.html#aws-glue-api-crawler-pyspark-transforms-filter-example

#### Execute Code 🔻

In [11]:

def filter_function(dynamic_record):
	if dynamic_record['activity_type'] == 'Running':
		return True
	else:
		return False
runningDF = Filter.apply(frame=raw_data, f=filter_function)

print(f'Running (count): {runningDF.count()}')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Running (count): 1376

### Glue Transforms - Filtering & Counting - activity_type = Working (Using python Lambda Expressions)
- Small anonymous functions can be created with the lambda keyword.
- Lambda functions can be used wherever function objects are required. They are syntactically restricted to a single expression. 
- Example: This function returns the sum of its two arguments: lambda a, b: a+b.

#### Execute Code 🔻

In [12]:

workingDF = Filter.apply(frame=raw_data, f=lambda x: x['activity_type'] == 'Working')

print(f'Working (count): {workingDF.count()}')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Working (count): 2779

### Glue Transforms - Joining two dataframes 
- Performs an equality join on two DynamicFrames.
- This transforms accepts the following arguments.
    - frame1 – The first DynamicFrame to join
    - frame2 – The second DynamicFrame to join
    - keys1 – The keys to join on for the first frame
    - keys2 – The keys to join on for the second frame
- In our case we will be joining the these two frames : **raw_data** & **reference_data**
- We will be joing these two frames on column **track_id**

#### Read More: https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-transforms-join.html

#### Execute Code 🔻

In [13]:

joined_data = Join.apply(raw_data, reference_data, 'track_id', 'track_id')


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### View schema
- In this step we view the schema of the dynamic frame
- printSchema( ) – Prints the schema of the underlying DataFrame.

#### Execute Code 🔻

In [14]:
joined_data.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
|-- track_id: string
|-- partition_2: string
|-- activity_type: string
|-- .track_id: int
|-- partition_1: string
|-- device_temp: int
|-- track_name: string
|-- artist_name: string
|-- partition_3: string
|-- device_ts: string
|-- device_id: int
|-- partition_0: string
|-- uuid: string

###### Cleaning up the joined_data dynamicframe
- Other than the columns we were interested in we have the partition columns
- These were generated by firehose for placing the files in yyyy/mm/dd/hh directory structure in S3
- We will use Glue's in-built **DropFields** transform to drop partition columns

###### Read more about AWS Glue transforms here : https://docs.aws.amazon.com/glue/latest/dg/built-in-transforms.html

#### Execute Code 🔻

In [15]:

joined_data_clean = DropFields.apply(frame=joined_data, paths=['partition_0','partition_1','partition_2','partition_3'])


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### View schema after DropFields transform
#### Execute Code 🔻

In [16]:
joined_data_clean.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
|-- track_id: string
|-- activity_type: string
|-- .track_id: int
|-- device_temp: int
|-- track_name: string
|-- artist_name: string
|-- device_ts: string
|-- device_id: int
|-- uuid: string

###### sample data 

In [17]:
joined_data_clean.toDF().show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+-------------+---------+-----------+----------+---------------+--------------------+---------+--------------------+
|track_id|activity_type|.track_id|device_temp|track_name|    artist_name|           device_ts|device_id|                uuid|
+--------+-------------+---------+-----------+----------+---------------+--------------------+---------+--------------------+
|      21|    Traveling|       21|         28|   Him & I|G-Eazy & Halsey|2022-06-02 01:45:...|       38|480d03af-4c8c-499...|
|      21|    Traveling|       21|         34|   Him & I|G-Eazy & Halsey|2022-06-02 01:45:...|       26|701c0e53-6db4-471...|
|      21|    Traveling|       21|         32|   Him & I|G-Eazy & Halsey|2022-06-02 01:45:...|       30|c38e32e0-8e7a-43f...|
|      21|    Traveling|       21|         28|   Him & I|G-Eazy & Halsey|2022-06-02 01:45:...|        4|a71ae55e-acc9-4f7...|
|      21|      Sitting|       21|         40|   Him & I|G-Eazy & Halsey|2022-06-02 01:45:...|       29|56c983f1-05c1-

# Final step of the transform - Writing transformed data to S3
- In this step we will be using Glue's write_dynamic_frame functionality to write transformed data to S3
- We will be storing the transformed data in a different directory & in parquet format
- make sure you change the D3 bucket name **yourname-analytics-workshop-bucket** to reflect your bucket name 


---
- Why parquet format ? 
    - Apache Parquet is a columnar storage formats that is optimized for fast retrieval of data and used in AWS analytical applications.
    - Columnar storage formats have the following characteristics that make them suitable for using with Athena:
    Compression by column, with compression algorithm selected for the column data type to save storage space in Amazon S3 and reduce disk space and I/O during query processing.
    - Predicate pushdown in Parquet and ORC enables queries to fetch only the blocks it needs, improving query performance.
    - When a  query obtains specific column values from your data, it uses statistics from data block predicates, such as max/min values, to determine whether to read or skip the block.
    - Splitting of data in Parquet allows analytics tools to split the reading of data to multiple readers and increase parallelism during its query processing.
    
#### Execute Code 🔻

In [18]:
try:
    datasink = glueContext.write_dynamic_frame.from_options(
        frame = joined_data_clean, connection_type="s3",
        connection_options = {"path": "s3://kinesis-analytics-workshop-bucket/data/processed-data/"},
        format = "parquet")
    print('Transformed data written to S3')
except Exception as ex:
    print('Something went wrong')
    print(ex)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Transformed data written to S3

# Using boto3 to run & automate AWS Glue 

- Boto is the AWS SDK for Python. It enables Python developers to create, configure, and manage AWS services. Boto provides an easy to use, object-oriented API, as well as low-level access to AWS services.


# Add transformed data set to glue catalog
- Now they you have written your transformed data to S3, we need to add it to the glue catalog so you can query it using athena
- This block of take take close to 60 seconds to run, do not terminate  stop the execution


#### Execute Code 🔻

In [19]:

glueclient = boto3.client('glue', region_name='us-east-1')

response = glueclient.start_crawler(Name='AnalyticsworkshopCrawler')

print('---')

crawler_state = None
while (crawler_state != 'STOPPING'):
    response = glueclient.get_crawler(Name='AnalyticsworkshopCrawler')
    crawler_state = str(response['Crawler']['State'])
    time.sleep(1)

print('Crawler Stopped')
print('---')
time.sleep(3)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

---
Crawler Stopped
---

# Use boto to view the list of tables in analyticsworkshopdb database

#### Execute Code 🔻

In [20]:

print('** analyticsworkshopdb has following tables**')
response = glueclient.get_tables(
    DatabaseName='analyticsworkshopdb',
)

for table in response['TableList']:
    print(table['Name'])



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

** analyticsworkshopdb has following tables**
508c54a2_1eac_4af9_a180_3b6d1a38baca_csv
508c54a2_1eac_4af9_a180_3b6d1a38baca_csv_metadata
processed_data
raw
reference_data

# 😎
=========================

### If you wish you take this notebook and its output back home - you can download / export it:

- In Jupyter's menu bar click **File**:
    - Download As: Notebook(.ipynb) (you can reimport it a jupyter notebook in the future)
    - Download As: HTML (shows code + results in an easy to read format)


=========================

# NEXT Steps: Go back to the lab guide