## This notebook will 

- Create the required Agent tools - functions
- Create the sample data & upload in Delta Tables


%md
## Create UC-Functions to access the yahoo finance API

Get access to :
https://www.financeapi.net/

Create your own API-KEY ( free key will have limitations, please check the dashboard, limk below)

Access to Yahoofinace dashboard : 
 https://www.financeapi.net/dashboard

 

Yahoofinance alternative 

https://site.financialmodelingprep.com/developer/docs#balance-sheet-statements-financial-statements


I have used one specfic researcher "Argus Research" to limit the number of tokens

In [0]:
spark.sql('DROP FUNCTION IF EXISTS sarbanimaiti_catalog.agent_demo.finance_insight_api;')

In [0]:
# Drop the existing function if it exists to avoid conflicts
spark.sql('DROP FUNCTION IF EXISTS sarbanimaiti_catalog.agent_demo.finance_insight_api;')

# Create or replace the Unity Catalog function for financial insights
spark.sql('''
CREATE OR REPLACE FUNCTION sarbanimaiti_catalog.agent_demo.finance_insight_api (query STRING)
RETURNS STRING
LANGUAGE PYTHON
COMMENT 'Returns financial insights for a given ticker symbol.'
AS
$$
try:
    # Import requests module to make HTTP requests
    import requests
    # Define the API endpoint for financial insights
    url = "https://yfapi.net/ws/insights/v1/finance/insights"

    # Set the query parameter with the symbol provided to the function
    params = {"symbol": query}

    # Include your API key in the request headers for authentication
    headers = {
        'x-api-key': "<your own api-key>"
    }

    # Make a GET request to the API
    response = requests.request("GET", url, headers=headers, params=params)

    # Initialize an empty list to hold summaries
    summaries = []
    # Parse the JSON response
    data = response.json()
    # Control variable to ensure only one summary is added
    cntrol = 1

    # Iterate through the reports in the response
    for report in data['finance']['result']['reports']:
        # Check if the report is from Argus Research and control is 1
        if report['provider'] == "Argus Research" and cntrol ==1:
            # Append the summary to the summaries list
            summaries.append(report['summary'])
            # Update control to prevent adding more summaries
            cntrol = cntrol + 1

    # Return the list of summaries
    return summaries
except Exception as e:
    # Return an error message if an exception occurs
    return "Error calling YouTube Search API: {{e}}".format(e=str(e))
$$;
''')

In [0]:
# Test the finance_insight_api function with 'AAPL' as the parameter and collect the result
result = spark.sql("SELECT sarbanimaiti_catalog.agent_demo.finance_insight_api('AAPL')").collect()

In [0]:
result

In [0]:
# # Drop the existing function to avoid conflicts
# spark.sql('DROP FUNCTION IF EXISTS sarbanimaiti_catalog.finance_stock_quote.finance_api;')

# Define a new function in the Unity Catalog
spark.sql('''
CREATE OR REPLACE FUNCTION sarbanimaiti_catalog.agent_demo.finance_stock_quote (query STRING)
RETURNS STRING
LANGUAGE PYTHON
COMMENT 'Returns financial summary of ticker.'
AS
$$
try:
    # Import requests to enable HTTP calls
    import requests
    # API endpoint for fetching finance quotes
    url = "https://yfapi.net/v6/finance/quote"
    # Parameters for the API call, specifying the stock symbols
    params = {"symbols": query}
    # Headers for the API call, including the API key for authentication
    headers = {
        'x-api-key': "<your own api-key>"
    }
    # Execute the GET request with specified URL, headers, and parameters
    response = requests.request("GET", url, headers=headers, params=params)
    # Return the JSON response from the API
    return response.json()
except Exception as e:
    # Return an error message if an exception occurs
    return "Error calling YouTube Search API: {{e}}".format(e=str(e))
$$;
''')

In [0]:
# Test the SQL function to get the real time price , financial summary for the ticker 'AAPL' and collect the result
result = spark.sql("SELECT sarbanimaiti_catalog.agent_demo.finance_stock_quote('AAPL')").collect()

In [0]:
result

## Create the stock historical price tables

##### Upload the stock historical files from "sample_stock_hist_files" folder to your Unity catalog Volume. 
In real case these files can be sourced in various ways.

##### Then  run the below code to :
- Merge all the files in one spark data frame
- Then upload the themerged file in spark delta table which will be used in UC function (Agent Tool)

In [0]:
from pyspark.sql.functions import input_file_name, col, regexp_extract

# Function to rename columns to remove spaces and special characters
def rename_columns(df):
    return df.select([col(c).alias(c.replace(" ", "_").replace("Adj Close", "Adj_Close")) for c in df.columns])

# Load CSV files and add a column with the file name, then rename columns
tsla_df = rename_columns(
    spark.read.option("header", "true")
    .csv("/Volumes/sarbanimaiti_catalog/agent_demo/stocks_historical_price/stock_hist_price/TSLA_prices.csv")
    .withColumn("stock_name", regexp_extract(input_file_name(), r'([^/]+)(?=\.)', 1).substr(0, 4))
    
)

aapl_df = rename_columns(
    spark.read.option("header", "true")
    .csv("/Volumes/sarbanimaiti_catalog/agent_demo/stocks_historical_price/stock_hist_price/AAPL_prices.csv")
    .withColumn("stock_name", regexp_extract(input_file_name(), r'([^/]+)(?=\.)', 1).substr(0, 4))
)

googl_df = rename_columns(
    spark.read.option("header", "true")
    .csv("/Volumes/sarbanimaiti_catalog/agent_demo/stocks_historical_price/stock_hist_price/GOOGL_prices.csv")
    .withColumn("stock_name", regexp_extract(input_file_name(), r'([^/]+)(?=\.)', 1).substr(0, 4))
)

jnj_df = rename_columns(
    spark.read.option("header", "true")
    .csv("/Volumes/sarbanimaiti_catalog/agent_demo/stocks_historical_price/stock_hist_price/JNJ_prices.csv")
    .withColumn("stock_name", regexp_extract(input_file_name(), r'([^/]+)(?=\.)', 1).substr(0, 3))
)

jpm_df = rename_columns(
    spark.read.option("header", "true")
    .csv("/Volumes/sarbanimaiti_catalog/agent_demo/stocks_historical_price/stock_hist_price/JPM_prices.csv")
    .withColumn("stock_name", regexp_extract(input_file_name(), r'([^/]+)(?=\.)', 1).substr(0, 3))
)

amzn_df = rename_columns(
    spark.read.option("header", "true")
    .csv("/Volumes/sarbanimaiti_catalog/agent_demo/stocks_historical_price/stock_hist_price/AMZN_prices.csv")
    .withColumn("stock_name", regexp_extract(input_file_name(), r'([^/]+)(?=\.)', 1).substr(0, 4))
)


# Union all dataframes
merged_df = tsla_df.union(aapl_df).union(googl_df).union(jnj_df).union(jpm_df).union(amzn_df)

# Write to a Delta table
merged_df.write.format("delta").mode("overwrite").saveAsTable("sarbanimaiti_catalog.agent_demo.merged_stock_prices")

In [0]:
%sql
-- verify the table 
SELECT DISTINCT stock_name FROM sarbanimaiti_catalog.agent_demo.merged_stock_prices

## Create Agent Tool : Extract stock closing price from history table

In [0]:
%sql
--Drop the existing function to avoid conflicts
DROP FUNCTION IF EXISTS sarbanimaiti_catalog.agent_demo.get_historical_closing_price;

In [0]:
%sql
--Drop the existing function to avoid conflicts
--spark.sql('DROP FUNCTION IF EXISTS sarbanimaiti_catalog.agent_demo.get_historical_closing_price;')

-- This function takes stock ticker and duration in terms of year as input, refer the corresponding stock historical price table and fetch the historical closing price & volume of the stock for that duration. 
-- Duration in year  eg : '3'
CREATE OR REPLACE FUNCTION sarbanimaiti_catalog.agent_demo.get_hist_closing_price (
  stock_ticker STRING COMMENT 'Stock ticker will be passed in the query',
  yr_duration  BIGINT COMMENT 'Duration for which stock volume and closing price will be extracted'
  
)
returns table(Date DATE , Close FLOAT, Volume INT)
return
(
WITH FilteredStocks AS (
  SELECT
    Date,
    Close,
    Volume,
    stock_name
  FROM
    sarbanimaiti_catalog.agent_demo.merged_stock_prices
  WHERE
    stock_name = stock_ticker
    AND datediff(CURRENT_DATE(), Date) <= 400 * yr_duration
)
SELECT
  Date,
  Close,
  Volume
FROM
  FilteredStocks
ORDER BY
  Date ASC
)


#### Test the tool 

In [0]:

%sql
SELECT * FROM sarbanimaiti_catalog.agent_demo.get_hist_closing_price('JPM', 1)

## Create table to load customer investment preference data

Stesp:



In [0]:
%pip install Faker

In [0]:
import pandas as pd
import random
import json
from faker import Faker

# Initialize Faker for generating synthetic data
fake = Faker()

# Define ranges and options for generating data
income_ranges = ["low", "medium", "high"]
risk_tolerances = ["low", "medium", "high"]
investment_horizons = ["short-term", "medium-term", "long-term"]
investment_goals = ["retirement", "wealth accumulation", "education", "savings"]
industries = ["tech", "healthcare", "energy", "finance", "consumer goods"]
stock_types = ["growth", "dividend", "blue-chip", "value", "penny"]
trading_frequencies = ["daily", "weekly", "monthly"]
holding_periods = ["days", "weeks", "months", "years"]
special_requirements_options = ["ESG-focused investments", "no tobacco stocks", "", "", ""]

# Generate synthetic data
data = []
num_records = 100  # Number of records to generate

for _ in range(num_records):
    customer_id = fake.random_int(min=1000, max=9999)
    age = fake.random_int(min=18, max=75)
    gender = fake.random_element(elements=("Male", "Female"))
    income_range = fake.random_element(elements=income_ranges)
    risk_tolerance = fake.random_element(elements=risk_tolerances)
    investment_horizon = fake.random_element(elements=investment_horizons)
    investment_goal = fake.random_element(elements=investment_goals)
    preferred_industries = random.sample(industries, k=random.randint(1, 3))
    preferred_stock_type = fake.random_element(elements=stock_types)
    average_investment_amount = round(random.uniform(1000, 100000), 2)
    trading_frequency = fake.random_element(elements=trading_frequencies)
    holding_period = fake.random_element(elements=holding_periods)
    historical_performance = json.dumps({
        "avg_return": round(random.uniform(-10, 20), 2),
        "volatility": round(random.uniform(0.5, 3.0), 2)
    })
    special_requirements = fake.random_element(elements=special_requirements_options)
    
    # Add record to data list
    data.append([
        customer_id, age, gender, income_range, risk_tolerance, investment_horizon, 
        investment_goal, preferred_industries, preferred_stock_type, 
        average_investment_amount, trading_frequency, holding_period, 
        historical_performance, special_requirements
    ])

# Create DataFrame
columns = [
    "customer_id", "age", "gender", "income_range", "risk_tolerance", "investment_horizon",
    "investment_goals", "preferred_industries", "preferred_stock_types", 
    "average_investment_amount", "trading_frequency", "holding_period", 
    "historical_performance", "special_requirements"
]
df_synthetic = pd.DataFrame(data, columns=columns)

# Save to CSV if needed
#df_synthetic.to_csv("customer_stock_preferences_synthetic.csv", index=False)

print("Synthetic data generated and saved to 'customer_stock_preferences_synthetic.csv'.")


#### Upload Customer Stock Preferences data to spark delta table 

In [0]:
df_spark = spark.createDataFrame(df_synthetic)
df_spark.write.format("delta").mode("overwrite").saveAsTable("sarbanimaiti_catalog.agent_demo.customer_investment_preferences")

## Create Agent Tool : Extract Customer investment preference

In [0]:
%sql
-- Again check that it exists
DROP FUNCTION IF EXISTS sarbanimaiti_catalog.agent_demo.customer_investment_preferences;


In [0]:
%sql
-- Again check that it exists
DROP FUNCTION IF EXISTS sarbanimaiti_catalog.agent_demo.cust_investment_pref;

-- This function takes stock ticker and duration in terms of year as input, refer the corresponding stock historical price table and fetch the historical closing price & volume of the stock for that duration. 
-- Duration in year  eg : '3'
CREATE OR REPLACE FUNCTION sarbanimaiti_catalog.agent_demo.cust_investment_pref (
  customerID BIGINT COMMENT 'Customer ID'
  
)
returns table(
  customer_id BIGINT, 
  age BIGINT,
  gender STRING, 
  income_range STRING, 
  risk_tolerance STRING, 
  investment_horizon STRING, 
  investment_goals STRING, 
  preferred_industries STRING, 
  preferred_stock_types STRING, 
  average_investment_amount DOUBLE, 
  trading_frequency BIGINT, 
  holding_period BIGINT, 
  historical_performance DOUBLE, 
  special_requirements STRING)
return
(
WITH CustomerPreference  AS (
  SELECT
    customer_id, 
    age,
    gender, 
    income_range, 
    risk_tolerance, 
    investment_horizon, 
    investment_goals, 
    preferred_industries, 
    preferred_stock_types, 
    average_investment_amount, 
    trading_frequency, 
    holding_period, 
    historical_performance, 
    special_requirements
  FROM
    sarbanimaiti_catalog.agent_demo.customer_investment_preferences
  WHERE
    customer_id = customerID )
  select * from CustomerPreference 
)


In [0]:
%sql
--test function
select * from sarbanimaiti_catalog.agent_demo.cust_investment_pref(3584)
