# Convert TSV Data To Parquet with Athena

In [9]:
import boto3
import sagemaker

sess = sagemaker.Session()
bucket = sess.default_bucket()
role = sagemaker.get_execution_role()
region = boto3.Session().region_name

In [10]:
ingest_create_athena_table_parquet_passed = False

In [11]:
%store -r ingest_create_athena_table_tsv_passed

In [12]:
try:
    ingest_create_athena_table_tsv_passed
except NameError:
    print("++++++++++++++++++++++++++++++++++++++++++++++")
    print("[ERROR] YOU HAVE TO RUN ALL PREVIOUS NOTEBOOKS.  You did not register the TSV Data.")
    print("++++++++++++++++++++++++++++++++++++++++++++++")

In [13]:
print(ingest_create_athena_table_tsv_passed)

True


In [14]:
if not ingest_create_athena_table_tsv_passed:
    print("++++++++++++++++++++++++++++++++++++++++++++++")
    print("[ERROR] YOU HAVE TO RUN ALL PREVIOUS NOTEBOOKS.  You did not register the TSV Data.")
    print("++++++++++++++++++++++++++++++++++++++++++++++")
else:
    print("[OK]")

[OK]


# Import PyAthena

In [15]:
from pyathena import connect

# Create Parquet Files from TSV Table

Convert CSV to Parquet
Partition by year (based on age) → Since we don’t have a date column, we assume age can determine year of birth.
Upload to S3

In [16]:
# Set S3 path to Parquet data
s3_path_parquet = "s3://cardiovale-solutions-datascience-pipeline/parquet-data/cardio_train_cleaned"

# Set Athena parameters
database_name = "cardiovale_db"  # project database
table_name_tsv = "cardio_train_cleaned"  # Your existing TSV table
table_name_parquet = "cardio_train_parquet"  # New Parquet table


In [17]:
# Set S3 staging directory -- this is a temporary directory used for Athena queries
s3_staging_dir = "s3://cardiovale-solutions-datascience-pipeline/athena/staging"


In [18]:
conn = connect(region_name=region, s3_staging_dir=s3_staging_dir)

# Execute Statement
_This can take a few minutes.  Please be patient._

In [19]:
# Set Parquet table name
table_name_parquet = "cardio_train_parquet"

# Set S3 path for Parquet storage
s3_path_parquet = "s3://cardiovale-solutions-datascience-pipeline/raw-data/parquet/cardio_train/"

# Corrected SQL statement
statement = """CREATE TABLE IF NOT EXISTS {}.{}
WITH (format = 'PARQUET', external_location = '{}', partitioned_by = ARRAY['gender']) AS
SELECT 
         id,
         age,
         height,
         weight,
         ap_hi,
         ap_lo,
         cholesterol,
         gluc,
         smoke,
         alco,
         active,
         cardio,
         gender  -- Move partition column LAST
FROM {}.{}""".format(
    database_name, table_name_parquet, s3_path_parquet, database_name, "cardio_train_cleaned"
)

print(statement)


CREATE TABLE IF NOT EXISTS cardiovale_db.cardio_train_parquet
WITH (format = 'PARQUET', external_location = 's3://cardiovale-solutions-datascience-pipeline/raw-data/parquet/cardio_train/', partitioned_by = ARRAY['gender']) AS
SELECT 
         id,
         age,
         height,
         weight,
         ap_hi,
         ap_lo,
         cholesterol,
         gluc,
         smoke,
         alco,
         active,
         cardio,
         gender  -- Move partition column LAST
FROM cardiovale_db.cardio_train_cleaned


In [20]:
import pandas as pd

pd.read_sql(statement, conn)

  pd.read_sql(statement, conn)


Unnamed: 0,rows


# Load partitions by running `MSCK REPAIR TABLE`

As a last step, we need to load the Parquet partitions. To do so, just issue the following SQL command: 

In [21]:
statement = "MSCK REPAIR TABLE {}.{}".format(database_name, table_name_parquet)

print(statement)

MSCK REPAIR TABLE cardiovale_db.cardio_train_parquet


In [22]:
import pandas as pd

df = pd.read_sql(statement, conn)
df.head(5)

  df = pd.read_sql(statement, conn)


# Show the Partitions

In [23]:
statement = "SHOW PARTITIONS {}.{}".format(database_name, table_name_parquet)

print(statement)

SHOW PARTITIONS cardiovale_db.cardio_train_parquet


In [24]:
df_partitions = pd.read_sql(statement, conn)
df_partitions.head(5)

  df_partitions = pd.read_sql(statement, conn)


Unnamed: 0,partition
0,gender=1
1,gender=2


# Show the Tables

In [25]:
statement = "SHOW TABLES in {}".format(database_name)

In [26]:
df_tables = pd.read_sql(statement, conn)
df_tables.head(5)

  df_tables = pd.read_sql(statement, conn)


Unnamed: 0,tab_name
0,cardio_train_cleaned
1,cardio_train_parquet
2,quitline_fixed


In [27]:
if table_name_parquet in df_tables.values:
    ingest_create_athena_table_parquet_passed = True

In [28]:
%store ingest_create_athena_table_parquet_passed

Stored 'ingest_create_athena_table_parquet_passed' (bool)


# Run Sample Query

In [29]:
gender_partition = 1  # Ensure this is an integer, not a string

statement = """SELECT * FROM {}.{}
    WHERE gender = {} LIMIT 100""".format(
    database_name, table_name_parquet, gender_partition
)

print(statement)  # Print query for debugging



SELECT * FROM cardiovale_db.cardio_train_parquet
    WHERE gender = 1 LIMIT 100


In [30]:
df = pd.read_sql(statement, conn)
df.head(5)

  df = pd.read_sql(statement, conn)


Unnamed: 0,id,age,height,weight,ap_hi,ap_lo,cholesterol,gluc,smoke,alco,active,cardio,gender
0,1,20228,156,85.0,140,90,3,1,0,0,1,1,1
1,2,18857,165,64.0,130,70,3,1,0,0,0,1,1
2,4,17474,156,56.0,100,60,1,1,0,0,0,0,1
3,8,21914,151,67.0,120,80,2,2,0,0,0,0,1
4,9,22113,157,93.0,130,80,3,1,0,0,1,0,1


In [31]:
if not df.empty:
    print("[OK]")
else:
    print("++++++++++++++++++++++++++++++++++++++++++++++++++++++")
    print("[ERROR] YOUR DATA HAS NOT BEEN CONVERTED TO PARQUET. LOOK IN PREVIOUS CELLS TO FIND THE ISSUE.")
    print("++++++++++++++++++++++++++++++++++++++++++++++++++++++")

[OK]


# Review the New Athena Table in the Glue Catalog

In [32]:
from IPython.core.display import display, HTML

display(
    HTML(
        '<b>Review <a target="top" href="https://console.aws.amazon.com/glue/home?region={}#">AWS Glue Catalog</a></b>'.format(
            region
        )
    )
)

  from IPython.core.display import display, HTML


# Store Variables for the Next Notebooks

In [33]:
%store

Stored variables and their in-db values:
autopilot_train_s3_uri                                -> 's3://sagemaker-us-east-1-424808199142/data/amazon
balanced_bias_data_jsonlines_s3_uri                   -> 's3://sagemaker-us-east-1-424808199142/bias-detect
balanced_bias_data_s3_uri                             -> 's3://sagemaker-us-east-1-424808199142/bias-detect
bias_data_s3_uri                                      -> 's3://sagemaker-us-east-1-424808199142/bias-detect
ingest_create_athena_db_passed                        -> True
ingest_create_athena_table_parquet_passed             -> True
ingest_create_athena_table_tsv_passed                 -> True
s3_private_path_tsv                                   -> 's3://sagemaker-us-east-1-424808199142/amazon-revi
s3_public_path_tsv                                    -> 's3://usd-mads-508/amazon-reviews-pds/tsv'
setup_dependencies_passed                             -> True
setup_iam_roles_passed                                -> True
setup_ins

# Release Resources

In [34]:
%%html

<p><b>Shutting down your kernel for this notebook to release resources.</b></p>
<button class="sm-command-button" data-commandlinker-command="kernelmenu:shutdown" style="display:none;">Shutdown Kernel</button>
        
<script>
try {
    els = document.getElementsByClassName("sm-command-button");
    els[0].click();
}
catch(err) {
    // NoOp
}    
</script>

In [35]:
%%javascript

try {
    Jupyter.notebook.save_checkpoint();
    Jupyter.notebook.session.delete();
}
catch(err) {
    // NoOp
}

<IPython.core.display.Javascript object>