# Create Athena Tables

***

## Libraries

In [2]:
import boto3
import sagemaker
!pip install --disable-pip-version-check -q PyAthena==2.1.0
from pyathena import connect
import pandas as pd

[0m

## Variables

In [3]:
db_name = "sdpd"
Bucket = 'sdpd-bucket' 
region = boto3.Session().region_name
s3_staging_dir = "s3://{}/athena/staging".format(Bucket)
conn = connect(region_name=region, s3_staging_dir=s3_staging_dir)
role = sagemaker.get_execution_role()
s3 = boto3.client('s3') # Create an S3 client
s3_resource = boto3.resource('s3') # Create an S3 resource

print('S3_staging_dir - > ',s3_staging_dir)
print('Conn - > ',conn)
print('Region - > ',region)


S3_staging_dir - >  s3://sdpd-bucket/athena/staging
Conn - >  <pyathena.connection.Connection object at 0x7f47875dc250>
Region - >  us-east-1


## Functions

In [4]:
def Header(file):
    obj = s3.get_object(Bucket=Bucket, Key=file)
    header = obj['Body'].read(1000).decode().split('\n')[0]
    header_list = header.split(',')
    return header_list

In [5]:
def SQL_Tail(location):
    tail = f"""
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
    WITH SERDEPROPERTIES ('field.delim' = ',')
    STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
    LOCATION '{location}'
    TBLPROPERTIES (
      'classification' = 'csv',
      'skip.header.line.count' = '1'
    );
    """
    return tail


In [6]:
def SQL_Table_Create(db_name,table_name,location,file):
    create_table_sql = f"CREATE EXTERNAL TABLE IF NOT EXISTS `{db_name}`.`{table_name}` ("
    for col in Header(location+file):
        create_table_sql += f"`{col}` string,"
    create_table_sql = create_table_sql[:-1] + ")"  # Remove trailing comma and add closing parenthesis
    create_table_sql = create_table_sql + SQL_Tail(f's3://{Bucket}/{location}/')
    
    return create_table_sql

## Create

In [7]:
statement = "CREATE DATABASE IF NOT EXISTS {}".format(db_name)
pd.read_sql(statement, conn)

statement = "SHOW DATABASES"

df_show = pd.read_sql(statement, conn)
print(df_show.head(5))

if db_name in df_show.values:
    ingest_create_athena_db_passed = True
    

  database_name
0       default
1        dsoaws
2          sdpd
3       watersd


## Tables

### Calls

In [8]:
Call_SQL = SQL_Table_Create(db_name,'Call_Data','Call_Data/','SDPD_Calls.csv')
Type_SQL = SQL_Table_Create(db_name,'Type_Data','Type_Data/','SDPD_Type.csv')
Dispo_SQL = SQL_Table_Create(db_name,'Dispo_Data','Dispo_Data/','SDPD_Dispo.csv')
Stops_SQL = SQL_Table_Create(db_name,'Stops_Data','Stops_Data/','SDPD_Stops.csv')
Stops_Dic_SQL = SQL_Table_Create(db_name,'Stops_Dic_Data','Stops_Dic_Data/','SDPD_Stops_Dic.csv')

Calls_Hr_SQL = SQL_Table_Create(db_name,'Calls_Hour','Calls/Calls_Hour/','Calls_Hour.csv')
Stops_HR_SQL= SQL_Table_Create(db_name,'Stops_Hourly','Stops/Stops_Hourly/','Stops_Hourly.csv')

In [9]:
pd.read_sql(Call_SQL, conn)
pd.read_sql(Type_SQL, conn)
pd.read_sql(Dispo_SQL, conn)
pd.read_sql(Stops_SQL, conn)
pd.read_sql(Stops_Dic_SQL, conn)
pd.read_sql(Calls_Hr_SQL, conn)
pd.read_sql(Stops_HR_SQL, conn)

In [10]:
statement = """SELECT * FROM {}.{}
    order by date_time desc
     LIMIT 100
     """.format(
    db_name, 'Stops_Hourly'
)

print(statement)

SELECT * FROM sdpd.Stops_Hourly
    order by date_time desc
     LIMIT 100
     


In [11]:
df = pd.read_sql(statement, conn)
df.head(5)

Unnamed: 0,date_time,count
0,2022-12-31 23:00:00+00:00,3
1,2022-12-31 22:00:00+00:00,8
2,2022-12-31 21:00:00+00:00,7
3,2022-12-31 20:00:00+00:00,11
4,2022-12-31 19:00:00+00:00,10


<div class="alert alert-block alert-success">
<b>End:</b> Athena Buckets Created
</div>

In [12]:
Stops_Hourly_count = """SELECT count(*) FROM {}.{}
     """.format(
    db_name, 'Stops_Hourly'
)

print(Stops_Hourly_count)

SELECT count(*) FROM sdpd.Stops_Hourly
     


In [13]:
calls = pd.read_sql(Stops_Hourly_count, conn)

In [14]:
calls

Unnamed: 0,_col0
0,39480
