# Query Data with AWS Data Wrangler

**AWS Data Wrangler** is an open-source Python library that extends the power of the Pandas library to AWS connecting DataFrames and AWS data related services (Amazon Redshift, AWS Glue, Amazon Athena, Amazon EMR, Amazon QuickSight, etc).

* https://github.com/awslabs/aws-data-wrangler
* https://aws-data-wrangler.readthedocs.io

Built on top of other open-source projects like Pandas, Apache Arrow, Boto3, s3fs, SQLAlchemy, Psycopg2 and PyMySQL, it offers abstracted functions to execute usual ETL tasks like load/unload data from Data Lakes, Data Warehouses and Databases.

_Note that AWS Data Wrangler is simply a Python library that uses existing AWS Services.  AWS Data Wrangler is not a separate AWS Service.  You install AWS Data Wrangler through `pip install` as we will see next._

# _Pre-Requisite: Make Sure You Created an Athena Table for Both TSV and Parquet in Previous Notebooks_

In [1]:
%store -r ingest_create_athena_table_tsv_passed

In [2]:
try:
    ingest_create_athena_table_tsv_passed
except NameError:
    print("++++++++++++++++++++++++++++++++++++++++++++++")
    print("[ERROR] YOU HAVE TO RUN ALL PREVIOUS NOTEBOOKS.  You did not register the TSV Data.")
    print("++++++++++++++++++++++++++++++++++++++++++++++")

In [3]:
print(ingest_create_athena_table_tsv_passed)

True


In [4]:
if not ingest_create_athena_table_tsv_passed:
    print("++++++++++++++++++++++++++++++++++++++++++++++")
    print("[ERROR] YOU HAVE TO RUN ALL PREVIOUS NOTEBOOKS.  You did not register the TSV Data.")
    print("++++++++++++++++++++++++++++++++++++++++++++++")
else:
    print("[OK]")

[OK]


In [5]:
%store -r ingest_create_athena_table_parquet_passed

In [6]:
try:
    ingest_create_athena_table_parquet_passed
except NameError:
    print("++++++++++++++++++++++++++++++++++++++++++++++")
    print("[ERROR] YOU HAVE TO RUN ALL PREVIOUS NOTEBOOKS.  You did not convert into Parquet data.")
    print("++++++++++++++++++++++++++++++++++++++++++++++")

In [7]:
print(ingest_create_athena_table_parquet_passed)

True


In [8]:
if not ingest_create_athena_table_parquet_passed:
    print("++++++++++++++++++++++++++++++++++++++++++++++")
    print("[ERROR] YOU HAVE TO RUN ALL PREVIOUS NOTEBOOKS.  You did not convert into Parquet data.")
    print("++++++++++++++++++++++++++++++++++++++++++++++")
else:
    print("[OK]")

[OK]


# Setup

In [9]:
import sagemaker
import boto3

sess = sagemaker.Session()
bucket = sess.default_bucket()
role = sagemaker.get_execution_role()
region = boto3.Session().region_name

sm = boto3.Session().client(service_name="sagemaker", region_name=region)

In [10]:
import awswrangler as wr

# Query Parquet from S3 with Push-Down Filters

Read Apache Parquet file(s) from from a received S3 prefix or list of S3 objects paths.

The concept of Dataset goes beyond the simple idea of files and enable more complex features like partitioning and catalog integration (AWS Glue Catalog): 

_dataset (bool)_ â€“ If True read a parquet dataset instead of simple file(s) loading all the related partitions as columns.

In [11]:
p_filter = lambda x: x["product_category"] == "Digital_Software"

In [12]:
path = "s3://{}/amazon-reviews-pds/parquet/".format(bucket)
df_parquet_results = wr.s3.read_parquet(
    path, columns=["star_rating", "product_category", "review_body"], partition_filter=p_filter, dataset=True
)
df_parquet_results.shape

2026-01-16 07:39:11,754	INFO worker.py:1852 -- Started a local Ray instance.


(102084, 3)

In [13]:
df_parquet_results.head(5)

Unnamed: 0,star_rating,review_body,product_category
0,4,So far so good,Digital_Software
1,3,Needs a little more work.....,Digital_Software
2,1,Please cancel.,Digital_Software
3,5,Works as Expected!,Digital_Software
4,4,I've had Webroot for a few years. It expired a...,Digital_Software


# Query Parquet from S3 in Chunks

Batching (chunked argument) (Memory Friendly):

Will enable the function to return a Iterable of DataFrames instead of a regular DataFrame.

There are two batching strategies on Wrangler:
* If chunked=True, a new DataFrame will be returned for each file in your path/dataset.
* If chunked=INTEGER, Wrangler will iterate on the data by number of rows equal to the received INTEGER.

P.S. chunked=True if faster and uses less memory while chunked=INTEGER is more precise in number of rows for each Dataframe.

In [14]:
path = "s3://{}/amazon-reviews-pds/parquet/".format(bucket)
chunk_iter = wr.s3.read_parquet(
    path,
    columns=["star_rating", "review_body"],
    # filters=[("product_category", "=", "Digital_Software")],
    partition_filter=p_filter,
    dataset=True,
    chunked=True,
)

In [15]:
print(next(chunk_iter))

       star_rating                                        review_body  \
0                4                                     So far so good   
1                3                      Needs a little more work.....   
2                1                                     Please cancel.   
3                5                                 Works as Expected!   
4                4  I've had Webroot for a few years. It expired a...   
...            ...                                                ...   
65531            4  I have used Quicken since it was originally re...   
65532            1  The previous Norton I had installed used many ...   
65533            5  I do my own taxes all the time.  This was the ...   
65534            5  Turbotax makes preparing my own return painles...   
65535            4  I've used this program for a long time, but wa...   

       product_category  
0      Digital_Software  
1      Digital_Software  
2      Digital_Software  
3      Digital_Soft

# Query the Glue Catalog (ie. Hive Metastore)
Get an iterator of tables.

In [16]:
database_name = "dsoaws"
table_name_tsv = "amazon_reviews_tsv"
table_name_parquet = "amazon_reviews_parquet"

In [17]:
for table in wr.catalog.get_tables(database="dsoaws"):
    print(table["Name"])

amazon_reviews_parquet
amazon_reviews_tsv


# Query from Athena
Execute any SQL query on AWS Athena and return the results as a Pandas DataFrame.  


In [18]:
%%time
df = wr.athena.read_sql_query(sql="SELECT * FROM {} LIMIT 5000".format(table_name_parquet), database=database_name)

CPU times: user 1.2 s, sys: 311 ms, total: 1.51 s
Wall time: 4.63 s


In [19]:
df.head(5)

Unnamed: 0,marketplace,customer_id,review_id,product_id,product_parent,product_title,star_rating,helpful_votes,total_votes,vine,verified_purchase,review_headline,review_body,year,review_date,product_category
0,US,17747349,R2EI7QLPK4LF7U,B00U7LCE6A,106182406,CCleaner Free [Download],4,0,0,N,Y,Four Stars,So far so good,2015,2015-08-31,Digital_Software
1,US,10956619,R1W5OMFK1Q3I3O,B00HRJMOM4,162269768,ResumeMaker Professional Deluxe 18,3,0,0,N,Y,Three Stars,Needs a little more work.....,2015,2015-08-31,Digital_Software
2,US,13132245,RPZWSYWRP92GI,B00P31G9PQ,831433899,Amazon Drive Desktop [PC],1,1,2,N,Y,One Star,Please cancel.,2015,2015-08-31,Digital_Software
3,US,15696503,R2HGGCCZSSNUCB,B00M9GTJLY,103182180,Intuit Quicken Rental Property Manager 2015,1,0,0,N,Y,Horrible! Would not upgrade previous version f...,Horrible! Would not upgrade previous version ...,2015,2015-08-31,Digital_Software
4,US,9723928,REEE4LHSVPRV9,B00H9A60O4,608720080,Avast Free Antivirus 2015 [Download],1,0,0,N,Y,One Star,Waste of time .,2015,2015-08-31,Digital_Software


# Query from Athena in Chunks
Retrieving in chunks can help reduce memory requirements.  

_This will take a few seconds._

In [20]:
%%time

chunk_iter = wr.athena.read_sql_query(
    sql="SELECT * FROM {} LIMIT 5000".format(table_name_parquet),
    database="{}".format(database_name),
    chunksize=64_000,  # 64 KB Chunks
)

CPU times: user 790 ms, sys: 235 ms, total: 1.03 s
Wall time: 3.87 s


In [21]:
print(next(chunk_iter))

     marketplace customer_id       review_id  product_id product_parent  \
0             US    44887421  R18EZEJVFBS0MM  B00G4IWCO4      324887621   
1             US    12735059  R2J8WEJK8UAOGR  B00A48G1X8      891045636   
2             US    52427998   RUZOPQC0W65JS  B0091JKU5Q      340283309   
3             US    24926273  R1P8XF3FKBRWPP  B0062ONKN2      986867139   
4             US    48472340  R2KQ4FQJ6BDSAE  B004KNWX3U      326036796   
...          ...         ...             ...         ...            ...   
4995          US     1637436  R3LQFUTR01QNMK  B004LLIKVU      473048287   
4996          US    43728564  R2LUM9IKRGCGEM  B0091JL1I6      349029296   
4997          US    12106398   RYV1BN2JIEDGN  B005ESMGGY      379368939   
4998          US    36469502  R15RQLMKXWWXSP  B00IX1I3G6      926539283   
4999          US    27877106  R1HNZT4LZ2GV2I  B00I5426SM      887841728   

                                          product_title  star_rating  \
0     Amazon Gift Card - Pr

# Release Resources

In [22]:
%%html

<p><b>Shutting down your kernel for this notebook to release resources.</b></p>
<button class="sm-command-button" data-commandlinker-command="kernelmenu:shutdown" style="display:none;">Shutdown Kernel</button>
        
<script>
try {
    els = document.getElementsByClassName("sm-command-button");
    els[0].click();
}
catch(err) {
    // NoOp
}    
</script>

*** SIGTERM received at time=1768549314 on cpu 3 ***
PC: @     0x7fb5bee10e9e  (unknown)  epoll_wait
    @     0x7fb566367b0d         64  absl::lts_20240722::AbslFailureSignalHandler()
    @     0x7fb5bed2d520  (unknown)  (unknown)
[2026-01-16 07:41:54,915 E 1830 1830] logging.cc:497: *** SIGTERM received at time=1768549314 on cpu 3 ***
[2026-01-16 07:41:54,915 E 1830 1830] logging.cc:497: PC: @     0x7fb5bee10e9e  (unknown)  epoll_wait
[2026-01-16 07:41:54,916 E 1830 1830] logging.cc:497:     @     0x7fb566367b39         64  absl::lts_20240722::AbslFailureSignalHandler()
[2026-01-16 07:41:54,916 E 1830 1830] logging.cc:497:     @     0x7fb5bed2d520  (unknown)  (unknown)


In [None]:
%%javascript

try {
    Jupyter.notebook.save_checkpoint();
    Jupyter.notebook.session.delete();
}
catch(err) {
    // NoOp
}