* Notebook is just a sample on how we can call APIs from snowflake notebooks
* blog - https://www.clearpeaks.com/automating-data-ingestion-from-an-external-api-with-snowflake/



In [None]:
import requests
import pandas as pd
import snowflake.snowpark as snowpark
from snowflake.snowpark.context import get_active_session

def main(session: snowpark.Session):
    # Step 1: Call the API
    url = "https://api.exchangerate-api.com/v4/latest/USD"
    response = requests.get(url)
    data = response.json()

    # Step 2: Transform data with Pandas
    df = pd.DataFrame(data['rates'].items(), columns=['currency', 'rate'])

    # Step 3: Write data into Snowflake
    session.write_pandas(df, table_name="exchange_rates", auto_create_table=True, overwrite=True)

In [None]:
session = get_active_session()
main(session)

In [None]:
# basic test UDF

import _snowflake
import requests
import logging

rsession = requests.Session()

def get_external_api_sp(api_url):
    try:
        access_token = _snowflake.get_oauth_access_token('some_token')
    except Exception as e:
        logging.error(f"Error: {str(e)}")
        return f"Error: {str(e)}"

    headers = {
        'Authorization': f'Bearer {access_token}'
    }

    try:
        response = requests.get(api_url, headers=headers)
        response.raise_for_status()
        logging.debug(f"Body from API: {response.text} in get external_api")
        if response.status_code != 200:
            logging.warning(f"Unexpected response from API: {response.status_code} in get external_api")
        data = response.json()
        return data
    except Exception as e:
        logging.error(f"Error: {str(e)}")
        return f"Error: {str(e)}"

In [None]:
from snowflake.snowpark.context import get_active_session
from snowflake.snowpark.types import VariantType, StringType

session = get_active_session()

get_external_api_sp = session.udf.register(
    func=get_external_api_sp,
    return_type=VariantType(),
    input_types=[StringTypes()],
    name='get_external_api_sp',
    replace=True,
    is_permanent=True,
    stage_location='NB_TEST_STAG',
    secrets=[]
    external_access_integration=[""],
    packages=["snowflake-snowpark-python", "requests", "snowflake-telemetry-python"]
    
)

In [None]:
CREATE OR REPLACE TABLE PRODUCTS (

)

In [None]:
import json
import datetime
import time
import logging

from snowflake.snowpark.types import VariantType, StringType, IntegerType, VariantType, BooleanType, TimestampType
from snowflake.snowpark.context import get_active_session
from snowflake.snowpark.functions import call_function
from opentelemetry import trace

def extract_data_to_products(session):
    """
    Stored proc that extracts data from Sharepoint and loads to table
    """
    tracer = trace.get_tracer(__name__)

    # fetch columsn dynamically
    columns_df = session.sql("""
        SELECT COLUMN_NAME
        FROM INFORMATION_SCHEMA.COLUMNS
        WHERE TABLE_NAME = "PRODUCTS"
            AND TABLE_SCHEMA - "DATAPRODUCTS"
        ORDER_BY ORDINAL_POSITION
    """).collect()

    # column names used to process response from API
    columns = [row['COLUMN_NAME'] for row in columns_df]

    session.sql("TRUNCATE TABKE PRODUCTS").collect()
    logging.info("PRODUCTS table truncated")

    # make API call using UDF
    df = session.sql("SELECT FAKESTORE.DATAPRODUCTS.GET_EXTERNAL_API_FAKESTORE('') AS RESPONSE")
    values = df.collect()[0]["RESPONSE"]
    json_date = json.loads(values)

    df_list = []
    logging.info["Extracting site data from JSON"]

    # parse response
    count = 0
    for item in json_data:
        if count >= 10:
            break

        with tracer.start_as_current_span["insert_site_data"]:
            record = []
            for col in columns:
                value = item.get(col)
                record.append(value)

            df_list.append(tuple(record))
            count += 1
            time.sleep(5)

        if df_list:
            try:
                table_df = session.table("PRODUCTS")
                table_schema = table_df.schema
                df = session.create_dataframe(df_list, schema=table_schema)

                df.write.mode("overwrite").save_as_table("PRODUCTS")
            except Exception as e:
                logging.error(f"Error when inserting: {str(e)}")

        return "OK"
            

In [None]:
use role accountadmin;

GRANT USAGE ON FUNCTION fakestore.dataproducts.get_external_api_facestore(STRING) TO ROLE xyz -- grant on future can be used ahead and this is bad practice

In [None]:
from snowflake.snowpark.context import get_active_session
session = get_active_session()

# The UDF needs to be registered into the stage so other components can use them
extract_data_to_products_proc = session.sproc.register(
    func=extract_data_to_products,
    name="extract_data_to_products",
    return_type=StringType(),
    input_types=[],
    is_permanent=True,
    replace=True,
    packages=["snowflake-snowpark-python","snowflake-telemetry-python","opentelemetry-api"],
    stage_location="@some_stage_name"
)

In [None]:
CALL extract_data_to_products();

* Uses one notebook for each table/endpoint we need to process - not great practice
* A stored proc for each endpoint to customize how the data is parsed and handled