# Query Data with AWS Data Wrangler

**AWS Data Wrangler** is an open-source Python library that extends the power of the Pandas library to AWS connecting DataFrames and AWS data related services (Amazon Redshift, AWS Glue, Amazon Athena, Amazon EMR, Amazon QuickSight, etc).

* https://github.com/awslabs/aws-data-wrangler
* https://aws-data-wrangler.readthedocs.io

Built on top of other open-source projects like Pandas, Apache Arrow, Boto3, s3fs, SQLAlchemy, Psycopg2 and PyMySQL, it offers abstracted functions to execute usual ETL tasks like load/unload data from Data Lakes, Data Warehouses and Databases.

_Note that AWS Data Wrangler is simply a Python library that uses existing AWS Services.  AWS Data Wrangler is not a separate AWS Service.  You install AWS Data Wrangler through `pip install` as we will see next._

# _Pre-Requisite: Make Sure You Created an Athena Table for Both TSV and Parquet in Previous Notebooks_

In [1]:
%store -r ingest_create_athena_table_tsv_passed

In [2]:
try:
    ingest_create_athena_table_tsv_passed
except NameError:
    print("++++++++++++++++++++++++++++++++++++++++++++++")
    print("[ERROR] YOU HAVE TO RUN ALL PREVIOUS NOTEBOOKS.  You did not register the TSV Data.")
    print("++++++++++++++++++++++++++++++++++++++++++++++")

In [3]:
print(ingest_create_athena_table_tsv_passed)

True


In [4]:
if not ingest_create_athena_table_tsv_passed:
    print("++++++++++++++++++++++++++++++++++++++++++++++")
    print("[ERROR] YOU HAVE TO RUN ALL PREVIOUS NOTEBOOKS.  You did not register the TSV Data.")
    print("++++++++++++++++++++++++++++++++++++++++++++++")
else:
    print("[OK]")

[OK]


In [5]:
%store -r ingest_create_athena_table_parquet_passed

In [6]:
try:
    ingest_create_athena_table_parquet_passed
except NameError:
    print("++++++++++++++++++++++++++++++++++++++++++++++")
    print("[ERROR] YOU HAVE TO RUN ALL PREVIOUS NOTEBOOKS.  You did not convert into Parquet data.")
    print("++++++++++++++++++++++++++++++++++++++++++++++")

In [7]:
print(ingest_create_athena_table_parquet_passed)

True


In [8]:
if not ingest_create_athena_table_parquet_passed:
    print("++++++++++++++++++++++++++++++++++++++++++++++")
    print("[ERROR] YOU HAVE TO RUN ALL PREVIOUS NOTEBOOKS.  You did not convert into Parquet data.")
    print("++++++++++++++++++++++++++++++++++++++++++++++")
else:
    print("[OK]")

[OK]


# Setup

In [9]:
import sagemaker
import boto3

sess = sagemaker.Session()
bucket = sess.default_bucket()
role = sagemaker.get_execution_role()
region = boto3.Session().region_name

sm = boto3.Session().client(service_name="sagemaker", region_name=region)

In [10]:
import awswrangler as wr

# Query Parquet from S3 with Push-Down Filters

Read Apache Parquet file(s) from from a received S3 prefix or list of S3 objects paths.

The concept of Dataset goes beyond the simple idea of files and enable more complex features like partitioning and catalog integration (AWS Glue Catalog): 

_dataset (bool)_ – If True read a parquet dataset instead of simple file(s) loading all the related partitions as columns.

In [11]:
p_filter = lambda x: x["product_category"] == "Digital_Software"

In [12]:
path = "s3://{}/amazon-reviews-pds/parquet/".format(bucket)
df_parquet_results = wr.s3.read_parquet(
    path, columns=["star_rating", "product_category", "review_body"], partition_filter=p_filter, dataset=True
)
df_parquet_results.shape

(102084, 3)

In [13]:
df_parquet_results.head(5)

Unnamed: 0,star_rating,review_body,product_category
0,4,QuickBooks is a good program but this version ...,Digital_Software
1,1,did not work. at all,Digital_Software
2,1,do not installe this it causes you computer to...,Digital_Software
3,5,So much of an improvement over the McAfee Anti...,Digital_Software
4,3,ok,Digital_Software


# Query Parquet from S3 in Chunks

Batching (chunked argument) (Memory Friendly):

Will enable the function to return a Iterable of DataFrames instead of a regular DataFrame.

There are two batching strategies on Wrangler:
* If chunked=True, a new DataFrame will be returned for each file in your path/dataset.
* If chunked=INTEGER, Wrangler will iterate on the data by number of rows equal to the received INTEGER.

P.S. chunked=True if faster and uses less memory while chunked=INTEGER is more precise in number of rows for each Dataframe.

In [14]:
path = "s3://{}/amazon-reviews-pds/parquet/".format(bucket)
chunk_iter = wr.s3.read_parquet(
    path,
    columns=["star_rating", "product_category", "review_body"],
    # filters=[("product_category", "=", "Digital_Software")],
    partition_filter=p_filter,
    dataset=True,
    chunked=True,
)

In [15]:
print(next(chunk_iter))

       star_rating                                        review_body  \
0                4  QuickBooks is a good program but this version ...   
1                1                               did not work. at all   
2                1  do not installe this it causes you computer to...   
3                5  So much of an improvement over the McAfee Anti...   
4                3                                                 ok   
...            ...                                                ...   
65531            5  Having just spent a MIND-BLOWING four days wit...   
65532            5  I've always been afraid to do my own taxes due...   
65533            5  It is exactly what I needed to get my photo's ...   
65534            5  This is refined software compared to earlier v...   
65535            2  If you need it on a immediate basis, do not or...   

       product_category  
0      Digital_Software  
1      Digital_Software  
2      Digital_Software  
3      Digital_Soft

# Query the Glue Catalog (ie. Hive Metastore)
Get an iterator of tables.

In [16]:
database_name = "dsoaws"
table_name_tsv = "amazon_reviews_tsv"
table_name_parquet = "amazon_reviews_parquet"

In [17]:
for table in wr.catalog.get_tables(database="dsoaws"):
    print(table["Name"])

amazon_reviews_parquet
amazon_reviews_tsv


# Query from Athena
Execute any SQL query on AWS Athena and return the results as a Pandas DataFrame.  


In [18]:
%%time
df = wr.athena.read_sql_query(sql="SELECT * FROM {} LIMIT 5000".format(table_name_parquet), database=database_name)

CPU times: user 598 ms, sys: 66.6 ms, total: 665 ms
Wall time: 4.18 s


In [19]:
df.head(5)

Unnamed: 0,marketplace,customer_id,review_id,product_id,product_parent,product_title,star_rating,helpful_votes,total_votes,vine,verified_purchase,review_headline,review_body,year,review_date,product_category
0,US,10495702,R3TKUASN3SL16O,B00MEUPSXS,625882117,QuickBooks Premier 2015,4,1,1,N,Y,QuickBooks is a good program but this version ...,QuickBooks is a good program but this version ...,2015,2015-08-18,Digital_Software
1,US,9682914,RE85JCBUDBPKR,B00SG9ABQU,303901870,Pluto TV: 100+ Free Channels [Download],1,0,0,N,Y,One Star,did not work. at all,2015,2015-08-18,Digital_Software
2,US,49441252,R2P2G6RSG65GCH,B00IT6HE5G,228166066,"IObit Malware Fighter 2 Pro, 1 Year / 1 PC [Do...",1,0,0,N,Y,One Star,do not installe this it causes you computer to...,2015,2015-08-18,Digital_Software
3,US,49409661,R3IJOHVKU1YJZW,B00MHZ6Z64,249773946,Norton Security,5,0,0,N,Y,Stellar Performance and Protection,So much of an improvement over the McAfee Anti...,2015,2015-08-18,Digital_Software
4,US,19623397,R1M7N70CZFLU3J,B00SOJ3GMS,232333503,Unlimited Free VPN by betternet [Download],3,0,0,N,Y,Three Stars,ok,2015,2015-08-18,Digital_Software


# Query from Athena in Chunks
Retrieving in chunks can help reduce memory requirements.  

_This will take a few seconds._

In [20]:
%%time

chunk_iter = wr.athena.read_sql_query(
    sql="SELECT * FROM {} LIMIT 5000".format(table_name_parquet),
    database="{}".format(database_name),
    chunksize=64_000,  # 64 KB Chunks
)

CPU times: user 340 ms, sys: 46.5 ms, total: 386 ms
Wall time: 3.16 s


In [21]:
print(next(chunk_iter))

     marketplace customer_id       review_id  product_id product_parent  \
0             US    24371595  R27ZP1F1CD0C3Y  B004LLIL5A      346014806   
1             US    42489718   RJ7RSBCHUDNNE  B004LLIKVU      473048287   
2             US      861463  R1HVYBSKLQJI5S  B00IX1I3G6      926539283   
3             US    10670435  R15H8E7WD6XD29  B004KNWX6C      763371347   
4             US    48872127   RVN4P3RU4F8IE  BT00CTOYC0      506740729   
...          ...         ...             ...         ...            ...   
4995          US    38856386  R3OP3MS0XW63LE  B00XUUAB92      347974698   
4996          US    11539371   RO7XUKMRN9C4O  B00AF0K82U      938949631   
4997          US     5707810  R22Q4EAGD8800K  B00JDQLFZ6      775486538   
4998          US    20337354  R32N8AE4S166QO  B00H5BM3L6       40124087   
4999          US    11223211   RO15VF6YKA3VK  B00IX1I3G6      926539283   

                                          product_title  star_rating  \
0                         A

# Release Resources

In [22]:
%%html

<p><b>Shutting down your kernel for this notebook to release resources.</b></p>
<button class="sm-command-button" data-commandlinker-command="kernelmenu:shutdown" style="display:none;">Shutdown Kernel</button>
        
<script>
try {
    els = document.getElementsByClassName("sm-command-button");
    els[0].click();
}
catch(err) {
    // NoOp
}    
</script>

In [23]:
%%javascript

try {
    Jupyter.notebook.save_checkpoint();
    Jupyter.notebook.session.delete();
}
catch(err) {
    // NoOp
}

<IPython.core.display.Javascript object>