In [2]:

#Import packages and secrets
import sys
import requests
import pandas as pd
import json
import boto3
import botocore
import pyarrow
import snowflake.connector
from snowflake.connector.pandas_tools import write_pandas
from datetime import datetime
from config import api_key_secret, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY
from aws_secretsmanager_caching import SecretCache, SecretCacheConfig 

############################################################################
###### SET API TARGET, PARAMETERS, DESTINATION BUCKET AND LOGGING ##########
############################################################################

# Set API Target and Parameters
base_url = "https://api.domain.com.au"
version = "v2"
year = "2016"
api_name = "suburbPerformanceStatistics"
property_types = ["House","Unit"]
period_size = 'Quarters'
total_periods = '100'
header = {"X-API-Key" : api_key_secret} #Authentication

# Set Destination S3 bucket variables
s3 = boto3.client('s3')
jsonbucket = 'sbx-apidomainonline-injest-json'
parquetbucket = 'sbx-apidomainonline-injest-parquet'

# Create empty list of lists to capture details for each successful loop run
loop_log = []

############################################################################
############### GET LIST OF SEEDS FOR API CALL FROM SNOWFLAKE ##############
############################################################################

# Get secrets from AWS Secrets Manager
client = botocore.session.get_session().create_client('secretsmanager')
cache_config = SecretCacheConfig()
cache = SecretCache( config = cache_config, client = client)
secrets = cache.get_secret_string('snowflake-creds')
secrets = (json.loads(secrets)) #Put secrets into a dictionary

# Initialize connection to Snowflake
ctx = snowflake.connector.connect(
    user='NICKLILLEYMAN',
    password=(secrets['password']),
    account=(secrets['account']),
    region =(secrets['region']),
    warehouse=(secrets['warehouse']),
    database='SBX_RAW',
    schema='PUBLIC'
    )

# Create a cursor object.
cur = ctx.cursor()

# Execute SQL to get list of seeds from dim_suburb_geography, minus any seeds already executed in previous runs (see api_call_log table)
seed_table = "SBX_ANALYTICS.DBT_NLILLEYMAN_COMMON.DIM_SUBURB_GEOGRAPHY"
sql = ("""
SELECT
    DIM_SUBURB_SK
    ,SUBURB_ID
    ,SUBURB
    ,POSTCODE
    ,STATE
FROM %(seed_table)s
WHERE
    STATE = 'WA'
    AND  CONCAT('%(api_name)s','-',DIM_SUBURB_SK) NOT IN (SELECT SEED_KEY FROM SBX_RAW.PUBLIC.API_CALL_LOG)
  --AND SUBURB in ('Willetton','Brentwood','Harrisdale','Leeming')
ORDER BY DIM_SUBURB_SK
""" % {"seed_table": seed_table,"api_name": api_name})
cur.execute(sql)
if cur.rowcount == 0:
    print("No seeds to process")

# Put query results into a dataframe and restrict dataframe to limit API calls
df = cur.fetch_pandas_all()
df['api_call_target_id'] = df.reset_index().index #Create row number for iterator
start_iterating = 0
stop_iterating = start_iterating + 10
df = df.loc[df["api_call_target_id"].between(start_iterating,stop_iterating)] #restict dataframe

#Convert dataframe to lists
api_call_target_ids = ((df["api_call_target_id"]).astype(str).tolist())  #convert df to string and then convert to list
dim_suburb_sks = (df["DIM_SUBURB_SK"]).tolist()
states = (df["STATE"]).tolist()
suburbs = (df["SUBURB"]).tolist()
postcodes = ((df["POSTCODE"]).astype(str).tolist()) #convert df to string and then convert to list


############################################################################
############### LOOP THROUGH SEEDS AND CALL API FOR EACH SEED ##############
############################################################################

# Loop through each item in lists, construct request URL, output JSON and parquet, log results
for i in property_types:
    property_types = i
    for api_call_target_id,state,suburb,postcode,dim_suburb_sk in zip(api_call_target_ids,states,suburbs,postcodes,dim_suburb_sks):    
        try:
            #Define metadata variables
            api_call_datetime = datetime.now()
            seed_key = api_name+'-'+dim_suburb_sk
            full_url = base_url+"/"+version+"/"+api_name+"/"+str(state)+"/"+str(suburb)+"/"+str(postcode)+"?propertyCategory="+str(property_types)+"&periodSize="+str(period_size)+"&totalPeriods="+str(total_periods)
            #Call API and put errors and responses into variables
            response = requests.get(full_url, headers=header)
            api_status_code, api_status_reason = response.status_code, response.reason #Get API call status & reasons for error
            response = response.json()
            #Define filename
            file_name = state+"_"+suburb+"_"+postcode+"_"+year+"_"+str(api_status_code)+"_"+api_name+"_"+property_types
            
            #Convert json response to parquet file
            df = pd.DataFrame(response)
            df.to_parquet(file_name+".parquet")
            
            #Upload parquet files into S3 bucket
            s3.upload_file(file_name+".parquet", parquetbucket, file_name+".parquet")
            os.remove(file_name+".parquet")
            
            #Upload json response into S3 bucket
            df.to_json(file_name+".json")
            os.remove(file_name+".json")
            #s3.put_object(Key=file_name+".json",Body=json.dumps(response), Bucket=jsonbucket)

            #Set loop outcome to Success
            loop_outcome = "Success"
            #Logging - append each loop to list of lists
            loop_log.append([api_name,seed_table,seed_key,full_url,api_status_code,api_status_reason,loop_outcome,api_call_datetime])            
        except:
            loop_outcome = "Error"
            loop_log.append([api_name,seed_table,seed_key,full_url,api_status_code,api_status_reason,loop_outcome,api_call_datetime])            

#Convert loop_log to dataframe and adjust datatypes
loop_log_df = pd.DataFrame(loop_log, columns = ['API_NAME', 'SEED_TABLE', 'SEED_KEY', 'TARGET_URL', 'API_STATUS_CODE', 'API_STATUS_REASON','LOOP_OUTCOME','API_CALL_DATETIME'])
loop_log_df['API_CALL_DATETIME'] = (loop_log_df["API_CALL_DATETIME"]).astype(str)

#Write loop_log_df to Snowflake API_CALL_LOG table
success, nchunks, nrows, _ = write_pandas(ctx, loop_log_df, 'API_CALL_LOG')
print(loop_log_df)


                       API_NAME  \
0   suburbPerformanceStatistics   
1   suburbPerformanceStatistics   
2   suburbPerformanceStatistics   
3   suburbPerformanceStatistics   
4   suburbPerformanceStatistics   
5   suburbPerformanceStatistics   
6   suburbPerformanceStatistics   
7   suburbPerformanceStatistics   
8   suburbPerformanceStatistics   
9   suburbPerformanceStatistics   
10  suburbPerformanceStatistics   
11  suburbPerformanceStatistics   
12  suburbPerformanceStatistics   
13  suburbPerformanceStatistics   
14  suburbPerformanceStatistics   
15  suburbPerformanceStatistics   
16  suburbPerformanceStatistics   
17  suburbPerformanceStatistics   
18  suburbPerformanceStatistics   
19  suburbPerformanceStatistics   
20  suburbPerformanceStatistics   
21  suburbPerformanceStatistics   

                                           SEED_TABLE  \
0   SBX_ANALYTICS.DBT_NLILLEYMAN_COMMON.DIM_SUBURB...   
1   SBX_ANALYTICS.DBT_NLILLEYMAN_COMMON.DIM_SUBURB...   
2   SBX_ANALYTICS.DBT_N