# Query data with Athena

### Retrieve finalcsv df from Store

In [78]:
%store -r merged_df

### Define Bucket

In [79]:
import sagemaker
bucket = sagemaker.Session().default_bucket()
print(bucket)

sagemaker-us-east-1-418795740224


### Connect to Athena

In [80]:
#!pip install pyathena
import boto3
from pyathena import connect
region = boto3.Session().region_name
stagingdir = "s3://{}/athena/staging/".format(bucket)
conn = connect(region_name=region, s3_staging_dir=stagingdir)

# Create Database 
### Define db name and sql script

In [81]:
import pandas as pd
dbname = "viewboost"
sql = "CREATE DATABASE IF NOT EXISTS {}".format(dbname)
pd.read_sql(sql, conn)

### Verify Database creation

In [82]:
sql2 = "SHOW DATABASES"
pd.read_sql(sql2, conn)

Unnamed: 0,database_name
0,default
1,viewboost
2,youtubedb


# Create Table

### Define tablename and final destination path

In [83]:
tablename = 'test'
s3_private_path_tsv = "s3://{}/athena/files/".format(bucket)

In [84]:
merged_df.columns

Index(['location', 'title', 'channelTitle', 'tags', 'view_count', 'likes',
       'dislikes', 'comment_count', 'comments_disabled', 'ratings_disabled',
       'description_length', 'timepublished', 'trending_month',
       'trending_year', 'category'],
      dtype='object')

### Create table statement

In [85]:
pd.read_sql("DROP TABLE IF EXISTS {}.{}".format(dbname,tablename), conn)
statement = """CREATE EXTERNAL TABLE IF NOT EXISTS {}.{}(
                location string,
                title string,
                channelTitle string,
                tags string,
                view_count int,
                likes int,
                dislikes int,	
                comment_count int,
                comments_disabled boolean,
                ratings_disabled boolean,
                description_length int,
                timepublished string,
                trending_month string,
                trending_year string,
                category string)
                
ROW FORMAT SERDE
  'org.apache.hadoop.hive.serde2.OpenCSVSerde' 
WITH SERDEPROPERTIES ( 
  'escapeChar'='\"\"', 
  'quoteChar'='\"', 
  'separatorChar'=',') 
LOCATION '{}'
TBLPROPERTIES ("skip.header.line.count"="1");""".format(dbname, tablename, s3_private_path_tsv)

pd.read_sql(statement, conn)

### Verify table creation using test query

In [86]:
statement = "SELECT * FROM {}.{} LIMIT 3".format(dbname, tablename)
pd.read_sql(statement, conn)

Unnamed: 0,location,title,channeltitle,tags,view_count,likes,dislikes,comment_count,comments_disabled,ratings_disabled,description_length,timepublished,trending_month,trending_year,category
0,US,How To Make a Curried Egg Sandwich,HowToBasic,how to make a curried egg sandwich|curried egg...,1238677,104736,3736,13876,False,False,788,18,8,2020,Howto & Style
1,US,Trump takes executive action to address econom...,ABC News,president|trump|donald|executive|orders|stimul...,1090847,10922,2517,9876,False,False,147,1,8,2020,News & Politics
2,US,JUDAS AND THE BLACK MESSIAH - Official Trailer,Warner Bros. Pictures,warner bros|warner brothers|wb|fred hampton|wi...,971704,23311,1987,3240,False,False,4342,23,8,2020,Entertainment


# Create a Parquet Table

### Define tablename_parquet and final destination path

In [87]:
tablename_parquet = 'test_parquet'
s3_private_path_parquet = "s3://{}/parquet".format(bucket)

### Create table statement
##### Ensure the partitioning column is the last listed column name

In [88]:
pd.read_sql("DROP TABLE IF EXISTS {}.{}".format(dbname,tablename_parquet), conn)
statement = """CREATE TABLE IF NOT EXISTS {}.{}
WITH (format = 'PARQUET', external_location = '{}', partitioned_by = ARRAY['category']) AS
SELECT location,
                title,
                channelTitle,
                tags,
                view_count,
                likes,
                dislikes,	
                comment_count,
                comments_disabled,
                ratings_disabled,
                description_length,
                timepublished,
                trending_month,
                trending_year,
                category
FROM {}.{}""".format(
    dbname, tablename_parquet, s3_private_path_parquet, dbname, tablename
)
pd.read_sql(statement, conn)

Unnamed: 0,rows


### Repair Partitions

In [89]:
statement = "MSCK REPAIR TABLE {}.{}".format(dbname, tablename_parquet)
df = pd.read_sql(statement, conn)
df.head(5)

### Show partitions

In [90]:
statement = "SHOW PARTITIONS {}.{}".format(dbname, tablename_parquet)
df_partitions = pd.read_sql(statement, conn)
df_partitions

Unnamed: 0,partition
0,category=Film & Animation
1,category=Sports
2,category=Entertainment
3,category=__HIVE_DEFAULT_PARTITION__
4,category=Music
5,category=People & Blogs
6,category=Comedy
7,category=News & Politics
8,category=Travel & Events
9,category=Science & Technology


### Verify Table was made by showing Tables

In [91]:
statement = "SHOW TABLES in {}".format(dbname)

In [92]:
df_tables = pd.read_sql(statement, conn)
df_tables.head(5)

Unnamed: 0,tab_name
0,jsonus
1,test
2,test_parquet
3,us_json
4,usjson


# Release Resources

In [93]:
%%html

<p><b>Shutting down your kernel for this notebook to release resources.</b></p>
<button class="sm-command-button" data-commandlinker-command="kernelmenu:shutdown" style="display:none;">Shutdown Kernel</button>
        
<script>
try {
    els = document.getElementsByClassName("sm-command-button");
    els[0].click();
}
catch(err) {
    // NoOp
}    
</script>

In [94]:
%%javascript

try {
    Jupyter.notebook.save_checkpoint();
    Jupyter.notebook.session.delete();
}
catch(err) {
    // NoOp
}

<IPython.core.display.Javascript object>