#  Data Catalog with Glue Crawler
  WS Glue Crawler is a powerful automated tool offered by AWS Glue for discovering and cataloging metadata about data sources, which enables services like Glue and Athena to query data directly from different sources. By simply pointing the crawler to your data source, whether it's a database or a data lake, it will automatically scan and extract the schema information, data types, and other relevant metadata. This metadata is then organized and stored in a database table in the AWS Glue Data Catalog, providing a centralized repository for understanding and managing your data assets. In this lab, you will create a crawler that will be in charge of populating the Glue Data Catalog with the newly transformed data in S3, you will be able to query the data directly from S3 using Athena in the final part.

Start by checking the databases in the data catalog using AWS SDK for pandas (`awswrangler`):

In [None]:
import datetime as dt
import gzip
import json
from typing import Dict

import awswrangler as wr
import boto3
import pandas as pd
import smart_open
import warnings

In [None]:
databases = wr.catalog.databases()
print(databases)
DATABASE_NAME = "de-c3w2lab1-aws-reviews"
if DATABASE_NAME not in databases.values:
    wr.catalog.create_database(DATABASE_NAME)
    print(wr.catalog.databases())
else:
    print(f"Database {DATABASE_NAME} already exists")

In [None]:
glue_client = boto3.client('glue',region_name="us-east-1")
configuration= {"Version": 1.0,"Grouping": {"TableGroupingPolicy": "CombineCompatibleSchemas" }}
CRAWLER_NAME = "de-c3w2lab1-crawler"
response = glue_client.create_crawler(
    Name= CRAWLER_NAME,
    ### START CODE HERE ### (12 lines of code)
    Role='Cloud9-de-c3w2lab1-glue-role', # @REPLACE EQUALS 'None',
    DatabaseName=DATABASE_NAME, # @KEEP
    Description='Amazon Reviews for toys', # @REPLACE EQUALS 'None',
    Targets={ # @KEEP
        'S3Targets': [ # @KEEP
            { # @KEEP
                'Path': f's3://{BUCKET_NAME}/processed_data/snappy/partition_by_year_month/toys_reviews/', # @REPLACE 'Path': f's3://{None}/processed_data/snappy/partition_by_year_month/toys_reviews/',
            },# @KEEP
            { # @KEEP
                'Path': f's3://{BUCKET_NAME}/processed_data/snappy/partition_by_sales_category/toys_metadata/', # @REPLACE 'Path': f's3://{None}/processed_data/snappy/partition_by_sales_category/toys_metadata/',
            } # @KEEP            
        ]} # @KEEP
    
    ### END CODE HERE ###
)

In [None]:
response = glue_client.list_crawlers()
print(response['CrawlerNames'])
response = glue_client.start_crawler(
    Name= CRAWLER_NAME
)
print(response)

The crawler will start looking for data sources in the S3 targets that we have defined, it should take around 3 minutes for the first run.

After around 3 minutes, check that the two tables were created for each processed dataset using the following command:

In [None]:
wr.catalog.tables(database=DATABASE_NAME)

#  Data Querying with Athena

In [None]:
warnings.simplefilter('ignore', FutureWarning)
sql = "SELECT * FROM toys_reviews LIMIT 5"
df = wr.athena.read_sql_query(sql, database=DATABASE_NAME, s3_output=f's3://{BUCKET_NAME}/athena_output/')
df.head()

In [None]:
#Next, run this test query to find the top 5 products with the most reviews, return the name of the product and the count of reviews.
# Ignore the products with an empty title in the metadata table.
sql = """
SELECT met.title, count(distinct toy.reviewerid) as review_count
FROM toys_metadata met 
LEFT JOIN toys_reviews toy
ON met.asin = toy.asin
WHERE met.title <> ''
GROUP BY met.title
ORDER BY count(distinct toy.reviewerid) DESC
LIMIT 5"""
df = wr.athena.read_sql_query(sql, database=DATABASE_NAME, s3_output=f's3://{BUCKET_NAME}/athena_output/')
df.head()


#Now run the next test query to find the top 10 products in terms of average rating, but the products should have at least 1000 reviews.
# Return the title, sales category and the average rating.
sql = """
SELECT met.title, met.sales_category, avg(toy.overall) as review_avg
FROM toys_metadata met 
LEFT JOIN toys_reviews toy
ON met.asin = toy.asin
GROUP BY met.title, met.sales_category
HAVING count(distinct toy.reviewerid) > 1000
ORDER BY avg(toy.overall) DESC
LIMIT 10"""
df = wr.athena.read_sql_query(sql, database=DATABASE_NAME, s3_output=f's3://{BUCKET_NAME}/athena_output/')
df.head()

#Next, run this query to determine the average rating for each brand and the number of products they have in the database. 
# Only show the top 10 brands with the highest product counts in the database.
sql = """
SELECT met.brand, count(distinct met.asin) as product_count, avg(toy.overall) as review_avg
FROM toys_metadata met 
LEFT JOIN toys_reviews toy
ON met.asin = toy.asin
WHERE met.brand <> ''
GROUP BY met.brand
ORDER BY count(distinct toy.asin) DESC
LIMIT 10"""
df = wr.athena.read_sql_query(sql, database=DATABASE_NAME, s3_output=f's3://{BUCKET_NAME}/athena_output/')
df.head()

# For the last test query, you want to look at 25 random reviews that gave a rating of 5 to a toy car product. Return the product title, description, the review text and overall score. 
# Ignore products that have an empty title, use the LIKE operator to search reviews with toy car in their text.
sql = """
SELECT met.title, met.description, toy.reviewtext, toy.overall
FROM toys_reviews toy
LEFT JOIN toys_metadata met
ON toy.asin = met.asin
WHERE toy.reviewtext like '%toy car%' and toy.overall = 5.0 and met.title <> '' 
LIMIT 25"""
df = wr.athena.read_sql_query(sql, database=DATABASE_NAME, s3_output=f's3://{BUCKET_NAME}/athena_output/')
df.head()

<a name='7'></a>
## COmparing Partitioning and Compression Features of Parquet Format

**Compression**

The Parquet format, commonly used in data lake architectures, supports various [compression codecs](https://parquet.apache.org/docs/file-format/data-pages/compression/) such as Snappy, Gzip, and LZO. These codecs play a crucial role in reducing the storage space needed by Parquet files. However, they come with a trade-off: while they reduce storage costs and optimize resource utilization, they can also impact processing speeds during data ingestion, transformation, and querying. To test this with a practical example, you will store the same transformed data using three different compression options: `UNCOMPRESSED`, `SNAPPY` and `GZIP`: 

- `UNCOMPRESSED`: Data is left uncompressed.
- `SNAPPY`: Snappy is a fast compression/decompression library that works well with Parquet files. It provides good compression ratios and is optimized for speed. Snappy compression is often a good choice for balancing compression efficiency and query performance.
- `GZIP`: Gzip is a widely used compression algorithm that provides high compression ratios. However, it tends to be slower in both compression and decompression compared to Snappy. Gzip compression can achieve higher levels of compression but may result in slower query performance.

**Partitioning**

Partitioning in Parquet is a technique used to better organize data within Parquet files based on partition keys. By partitioning data, Parquet optimizes query performance by reducing the amount of data that needs to be scanned during query execution. 

**Experiments**

To explore the compression features, you will process the product metadata 3 times; in each time, you will choose a different option for the compression algorithm with no partitioning as shown in the left table here. Then you will compare for this data, "uncompressed versus Snappy" and then "Snappy versus Gzip". 

To explore the partitioning features, you will process the review dataset 3 times; in each time, you will choose a different option for the partitioning column, with Snappy for compression. Then you will compare for this data, "partitioning versus no partitioning" and then "partitioning by year and month" versus "partitioning by the product id (asin)".

<img src="images/experiment.png" width="700"/>

# Clean Up

In [None]:
try:
    response = glue_client.delete_crawler(
        Name= CRAWLER_NAME
    )
    print("Crawler deleted successfully")
except glue_client.exceptions.EntityNotFoundException:
    print("Crawler does not exist or has already been deleted")

try:
    wr.catalog.delete_database(name=DATABASE_NAME)
    print(f"Database {DATABASE_NAME} deleted successfully")
except Exception as e:
    print(f"Error deleting database: {e}")
    
response = glue_client.list_crawlers()
print("Remaining crawlers:", response['CrawlerNames'])

databases = wr.catalog.databases()
print("Remaining databases:", databases)