# Convert TSV Data To Parquet with Athena

In this notebook, we will show you how you can easily convert that data now into Apache Parquet file format.

In [1]:
import boto3
import sagemaker

sess = sagemaker.Session()
bucket = sess.default_bucket()
role = sagemaker.get_execution_role()
region = boto3.Session().region_name

In [2]:
%store -r ingest_create_athena_table_tsv_passed

In [3]:
print(ingest_create_athena_table_tsv_passed)

True


# Import PyAthena

In [4]:
from pyathena import connect

# Create Parquet Files from TSV Table

As you can see from the query below, we’re also adding a new `year` column to our dataset by converting the `review_date` string to a date format, and then cast the year out of the date. Let’s store the year value as an integer. And let's partition the Parquet data by `Product Category`.

In [5]:
# Set S3 path to Parquet data
s3_path_parquet = "s3://{}/amazon-reviews-pds/parquet".format(bucket)

# Set Athena parameters
database_name = "dsoaws"
table_name_tsv = "amazon_reviews_tsv"
table_name_parquet = "amazon_reviews_parquet"

In [6]:
# Set S3 staging directory -- this is a temporary directory used for Athena queries
s3_staging_dir = "s3://{0}/athena/staging".format(bucket)

In [7]:
conn = connect(region_name=region, s3_staging_dir=s3_staging_dir)

# Execute Statement
_This can take a few minutes.  Please be patient._

In [8]:
# SQL statement to execute
statement = """CREATE TABLE IF NOT EXISTS {}.{}
WITH (format = 'PARQUET', external_location = '{}', partitioned_by = ARRAY['product_category']) AS
SELECT marketplace,
         customer_id,
         review_id,
         product_id,
         product_parent,
         product_title,
         star_rating,
         helpful_votes,
         total_votes,
         vine,
         verified_purchase,
         review_headline,
         review_body,
         CAST(YEAR(DATE(review_date)) AS INTEGER) AS year,
         DATE(review_date) AS review_date,
         product_category
FROM {}.{}""".format(
    database_name, table_name_parquet, s3_path_parquet, database_name, table_name_tsv
)

print(statement)

CREATE TABLE IF NOT EXISTS dsoaws.amazon_reviews_parquet
WITH (format = 'PARQUET', external_location = 's3://sagemaker-us-east-1-117859797117/amazon-reviews-pds/parquet', partitioned_by = ARRAY['product_category']) AS
SELECT marketplace,
         customer_id,
         review_id,
         product_id,
         product_parent,
         product_title,
         star_rating,
         helpful_votes,
         total_votes,
         vine,
         verified_purchase,
         review_headline,
         review_body,
         CAST(YEAR(DATE(review_date)) AS INTEGER) AS year,
         DATE(review_date) AS review_date,
         product_category
FROM dsoaws.amazon_reviews_tsv


In [9]:
import pandas as pd

pd.read_sql(statement, conn)

Unnamed: 0,rows


# Load partitions by running `MSCK REPAIR TABLE`

As a last step, we need to load the Parquet partitions. To do so, just issue the following SQL command: 

In [10]:
statement = "MSCK REPAIR TABLE {}.{}".format(database_name, table_name_parquet)

print(statement)

MSCK REPAIR TABLE dsoaws.amazon_reviews_parquet


In [11]:
import pandas as pd

df = pd.read_sql(statement, conn)
df.head(5)

# Show the Partitions

In [12]:
statement = "SHOW PARTITIONS {}.{}".format(database_name, table_name_parquet)

print(statement)

SHOW PARTITIONS dsoaws.amazon_reviews_parquet


In [13]:
df_partitions = pd.read_sql(statement, conn)
df_partitions.head(5)

Unnamed: 0,partition
0,product_category=Digital_Video_Games
1,product_category=Digital_Software
2,product_category=Gift Card


# Show the Tables

In [14]:
statement = "SHOW TABLES in {}".format(database_name)

In [15]:
df_tables = pd.read_sql(statement, conn)
df_tables.head(5)

Unnamed: 0,tab_name
0,amazon_reviews_parquet
1,amazon_reviews_tsv


In [16]:
if table_name_parquet in df_tables.values:
    ingest_create_athena_table_parquet_passed = True

In [17]:
%store ingest_create_athena_table_parquet_passed

Stored 'ingest_create_athena_table_parquet_passed' (bool)


# Run Sample Query

In [18]:
product_category = "Digital_Software"

statement = """SELECT * FROM {}.{}
    WHERE product_category = '{}' LIMIT 100""".format(
    database_name, table_name_parquet, product_category
)

print(statement)

SELECT * FROM dsoaws.amazon_reviews_parquet
    WHERE product_category = 'Digital_Software' LIMIT 100


In [19]:
df = pd.read_sql(statement, conn)
df.head(5)

Unnamed: 0,marketplace,customer_id,review_id,product_id,product_parent,product_title,star_rating,helpful_votes,total_votes,vine,verified_purchase,review_headline,review_body,year,review_date,product_category
0,US,41754720,R19OFJV91M7D8X,B000YMR61A,141393130,TurboTax Deluxe Federal + State 2007,2,12,13,N,N,"Easy to use, 1 comment 1 serious problem",I chose the deluxe version CD because of mortg...,2008,2008-02-11,Digital_Software
1,US,51669529,R1I6G894K5AGG5,B000YMR61A,141393130,TurboTax Deluxe Federal + State 2007,4,6,9,N,N,Schedule C IS for business- figures it would ...,"Schedule C IS for business, so figures it wou...",2008,2008-02-08,Digital_Software
2,US,24731012,R17OE43FFEP81I,B000YMR5X4,234295632,TurboTax Premier Federal + State 2007,2,9,16,N,N,Hassel to download,I wish that companies can test several scenari...,2008,2008-02-05,Digital_Software
3,US,16049580,R15MGDDK63B52Z,B000YMR61A,141393130,TurboTax Deluxe Federal + State 2007,3,14,14,N,N,beware of vista,i just installed turbotax deluxe 2007. If you ...,2008,2008-02-05,Digital_Software
4,US,46098046,R1GGJJA2R68033,B000YMNI2Q,847631772,TurboTax Basic 2007,1,54,60,N,N,don't waste your money,The description mentions that you can use this...,2008,2008-01-26,Digital_Software


# Review the New Athena Table in the Glue Catalog

In [20]:
from IPython.core.display import display, HTML

display(
    HTML(
        '<b>Review <a target="top" href="https://console.aws.amazon.com/glue/home?region={}#">AWS Glue Catalog</a></b>'.format(
            region
        )
    )
)

In just a few steps we have set up Amazon Athena to connect to our Amazon Customer Reviews TSV files, and transformed them into Apache Parquet file format. 

You might have noticed that our second sample query finished in a fraction of the time compared to the one before we ran on the TSV table. We sped up our query results by leveraging our data being stored as Parquet and partitioned by `product_category`. 


# Store Variables for the Next Notebooks

In [21]:
%store

Stored variables and their in-db values:
balance_dataset                                       -> True
balanced_bias_data_jsonlines_s3_uri                   -> 's3://sagemaker-us-east-1-117859797117/bias-detect
balanced_bias_data_s3_uri                             -> 's3://sagemaker-us-east-1-117859797117/bias-detect
bias_data_s3_uri                                      -> 's3://sagemaker-us-east-1-117859797117/bias-detect
experiment_name                                       -> 'Amazon-Customer-Reviews-BERT-Experiment-161761858
feature_group_name                                    -> 'reviews-feature-group-1617618587'
feature_store_offline_prefix                          -> 'reviews-feature-store-1617618587'
ingest_create_athena_db_passed                        -> True
ingest_create_athena_table_parquet_passed             -> True
ingest_create_athena_table_tsv_passed                 -> True
max_seq_length                                        -> 64
model_ab_endpoint_name             