# **CORTEX AI FINANCIAL SERVICES HANDS-ON LAB**
## Authors: John Heisler, Garrett Frere

### Intention of This Lab
This notebook is meant to be a deep-dive into industrialzing functionlity in the companion AI Pipeline Demo in this repository, FSI_Cortex_AI_Pipeline.ipynb. Please run through the AI Pipeline notebook first to understand the functional elements of this lab. 

### AI Pipeline
We will build some python functions to automatically download the latest FOMC documents and load them directly into our stage (no need to land locally then PUT). Then we will use a combination of stored procedures, streams, and tasks to get that data into snowflake tables and ready to Query. In that pipeline, we will automatically generate a hawkish, dovish, or neutral sentiment. In this way, we will maximize the value of our work imbuing into a common dataset. End users will not need invoke any addition logic in plain SQL. ***Good design is invisible!***

# 🛑 --> BEFORE YOU START <-- 🛑

1. **Run the 1_SQL_SETUP_FOMC.sql script.**
2. **Enable our External Access**
    1. In the top right corner of Snowsight click the three vertically aligned dots.
    2. In the context menu, select Notebook Settings.
    3. Select the External Access tab.
    4. Select the toggle switch to the right of FED_RESERVE_ACCESS_INTEGRATION.
3. **Install the appropriate packages in the notebook**
    1. Click on the packages drop down in the top right of Snowsight. 
    2. select the folloiwng packages
        a. bs4
        b. joblib
        c. json5
        d. pandas
        e. python-dotenv
        f. requests
        g. snowflake
        h. snowflake-ml-python
        i. snowflake-snowpark-python


## Download PDF Stored Procedure

Let's build the logic to download the PDFs from the federal reserve website to our stage. We'll put that logic in a Python Stored Procedure which we will automate with a task.

In [None]:
CREATE OR REPLACE PROCEDURE get_fed_pdfs()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python', 'requests', 'bs4', 'snowflake')
EXTERNAL_ACCESS_INTEGRATIONS = ("FED_RESERVE_ACCESS_INTEGRATION")
HANDLER='main'
EXECUTE AS CALLER
AS $$

import snowflake.snowpark as snowpark
import requests 
from bs4 import BeautifulSoup
from io import BytesIO

def get_all_fomc_pdfs():
    try:
        response = requests.get('https://www.federalreserve.gov/monetarypolicy/fomccalendars.htm')
        response.raise_for_status()
        soup = BeautifulSoup(response.text, 'html.parser')
        pdf_links = []
        for link in soup.find_all('a', href=True):
            href = link['href']
            if href.endswith('.pdf'):
                if not href.startswith('http'):
                    href = 'https://www.federalreserve.gov' + href
                pdf_links.append(href)
        return pdf_links
    except requests.RequestException as e:
        print(f"Error fetching the FOMC statement page: {e}")
        return ['no new links']

def main(session):
    database = 'GEN_AI_FSI'
    schema = 'FOMC'
    stage = 'FED_PDF'

    #return all the files on the website
    pdfs = get_all_fomc_pdfs()
    #return all the files in your stage
    query = f'LIST @{database}.{schema}.{stage}'
    stage_files = session.sql(query).collect()
    stage_files = [row['name'] for row in stage_files]
    #set the stage
    stage = f'@{database}.{schema}.{stage}/'
    
    #Download the new files
    for pdf in pdfs:
        filename = pdf.split('/')[-1]
        if 'fed_pdf/' + filename not in stage_files:
            response = requests.get(pdf)
            full_file_name = stage+filename
            file_data = response.content
            buffer = BytesIO(file_data)
            session.file.put_stream(buffer, full_file_name, auto_compress=False, overwrite=False)
            print(filename,':\t downloading')
        else:
            print(filename,':\t already downloaded')
    return str('all data has been downloaded')
$$;

# Create File Extraction Function

We need to extract the text from the PDFs. We will do that with a new python function

In [None]:
--create a fucntion to simply extract the text as a single line
create or replace function PDF_TEXT_EXTRACTOR(file_url string)
returns varchar
language python
runtime_version = '3.9'
handler = 'read_pdf'
packages = ('snowflake-snowpark-python','PyPDF2', 'langchain')
as
$$
from snowflake.snowpark.types import StringType, StructField, StructType
from langchain.text_splitter import RecursiveCharacterTextSplitter
from snowflake.snowpark.files import SnowflakeFile
import PyPDF2, io
import logging
import pandas as pd


def read_pdf(file_url: str) -> str:

    logger = logging.getLogger("udf_logger")
    logger.info(f"Opening file {file_url}")

    with SnowflakeFile.open(file_url, 'rb') as f:
        buffer = io.BytesIO(f.readall())
        
    reader = PyPDF2.PdfReader(buffer)   
    text = ""
    for page in reader.pages:
        try:
            text += page.extract_text().replace('\n', ' ').replace('\0', ' ')
        except:
            text = "Unable to Extract"
            logger.warn(f"Unable to extract from file {file_url}, page {page}")
    
    return text
$$;

# Create and Register generate_prompt Function

As we load data into our system, we want to automatically generate a signal. To do so, we need to call an LLM and pass it our prmpt. 

Below, we build our specialized prompt engineering as a function and then we register the function for later reuse when loading data.

In [None]:
from snowflake.snowpark.context import get_active_session
from snowflake.snowpark.types import *

session = get_active_session() 

def generate_prompt(document_text):
    prompt = f"""
        <Role> You are an experienced Senior Economist deeply knowledgeable on Federal Reserve guidance including FOMC or Federal Open Market Committee meeting minutes and communications.
        You are an expert in interpreting Hawkish and Dovish signals from the Fed or Federal Reserve. Such signals are derived from guidance conveyed in FOMC meeting notes and communications.
        
        As an analyst, you excel at discerning macroeconomic trends for each FOMC meeting notes and communications published by the Federal Reserve.
        The  signal or trends are either Hawkish or Dovish based on the growth outlook and inflation outlook of the Fed. The Federal Reserve has a long 
        term objective of keeping inflation around 2%, and low unemployment. Hawkish sentiment could imply 
        the Federal Reserve intends to raise interest rates to increase the cost of borrowing and slow economic activity. 
        The Fed typically increases interest rates when inflation is high or rising, or when the unemployment 
        rate is low or falling. Conversely, dovish sentiment could imply the Federal Reserve intends to lower interest 
        rates to allow easier access borrowing and lowering the cost of money to stimulate economic activity.  The Fed 
        typically decreases interest rates when inflation is low or falling, or when the unemployment rate is high or rising.
        
        Signal categories known as Economic Policy Stances:
        Hawkish stance or attitude for economic policy
        -characterized by a focus on combating inflation and often involves advocating for higher interest rates and tolerant to higher levels of unemployment.
        -concerned about rising inflation. Hawkish stance believes higher interest rates can help keep inflation in check, even if it slows down economic growth or increases unemployment.
        
        Dovish stance or attitude for economic policy
        -characterized by a focus on prioritizing stimulating economic growth, reducing unemployment, and tolerant to higher levels of inflation.
        -concerned with boosting economic activity, reducing unemployment and, for this reason, lower interest rates are preferred to create economic growth and employment.
        
        Neutral stance or attitude for economic policy
        -characterized by a focus on balance between combating inflation and supporting economic growth, with no strong inclination toward either side.
        -concerned with maintaining a steady economic environment without significant deviations. They seek to neither overly stimulate the economy nor excessively tighten it.
        </Role>
        
        <Data> 
        You are provided the text of a Federal Reserve Guidance or FOMC meeting notes as context. These generally are released before the Federal Reserve takes action on economic policy. 
        </Data>

        <FOMC_meeting_notes>
        {document_text}
        </FOMC_meeting_notes>
        
        <Task>: Follow these instructions,
        1) Review the provided FOMC communication or meeting notes text. Then,
        2) Consider the FOMC members or Committee Members tone and sentiment around economic conditions. Then,
        3) Consider specific guidance and stated conditions that validate the tone and signal FOMC members make concerning current macro economic conditions. Then,
        4) Based on this sentiment classify if the FOMC communication text indicates Hawkish, Dovish, or Neutral outlook for the economy. Be critical and do not categorize sentiment as "Neutral" unless necessary. This will be output as [Signal].
        5) Summarize a concise and accurate rationale for classifying the sentiment Hawkish, Neutral, or Dovish sentiment. This will be output as [Signal_Summary].
        </Task>
        
        <Output> 
        produce valid JSON. Absolutely do not include any additional text before or following the JSON. Output should use following JSON_format
        </Output>
        
        <JSON_format>
        {{
            "Signal": (A trend sentiment classification of Hawkish, Neutral or Dovish),
            "Signal_Summary": (A concise summary of sentiment trend),
        }}
        </JSON_format>"""
    return prompt

session.add_packages("snowflake-snowpark-python", "snowflake-ml-python", "snowflake")

session.udf.register(
  func = generate_prompt
, return_type = StringType()
, input_types = [ StringType()]
, is_permanent = True
, name = 'generate_prompt'
, replace = True
, stage_location = '@fed_logic')

# Build Data Pipeline
We have created our data structure and all of the logic we need to download and parse our unstrucuted data and generate a custom prompt. Next, let's automate the pipeline with Streams and Tasks.

We will follow most of the steps outlined in our documentation examples: https://docs.snowflake.com/en/user-guide/data-load-dirtables-pipeline.

![image info](https://docs.snowflake.com/en/_images/data-lake-dirtable-pipeline.png)

## Automate Ingestion of PDFs
1. Create a task to run every hour that will execute the stored procedure we just created.
2. Create a stream to check for new PDFs in our stage.
3. Create a task to ingest text by calling our PDF_TEXT_EXTRACTOR function directly in SQL.

## 1. Create a task to execute the stored procedure that downloads the PDFs
This will execute the get_fed_pdfs stored procedure we just created-- remember, that just downloads the pdfs to our stage. 

In [None]:
CREATE OR REPLACE TASK download_fed_pdf_to_stage
	WAREHOUSE=GEN_AI_FSI_WH
	SCHEDULE='60 minute' --every hour
	COMMENT='Download new FOMC files to our stage from the fed website.'
	AS 
        CALL get_fed_pdfs();

## 2. Create a stream to check for new PDFs in our stage
* Streams record change data capture (CDC) and we'll use this stream to cdc our stage directory table.
    * https://docs.snowflake.com/en/user-guide/streams-intro

In [None]:
CREATE OR REPLACE STREAM GEN_AI_FSI.FOMC.FOMC_STREAM on DIRECTORY(@FED_PDF);

## 3. Create a task to ingest text 
Tasks allow us to schedule particular actions in Snowflake. In this case, we're scheduling the ingestion of the FOMC PDF text and enriching it with AI-enabled signal. Remember the data is moving from our stage into our Snowflake tables.
    
Learn more about tasks: https://docs.snowflake.com/en/user-guide/tasks-intro


### 🤯 🧠 CHECK IT OUT! 🧠 🤯 
* We're calling our pdf text extractor python logic in SQL! (line 16)
* We're calling our signal logic upon ingestion! (line 33)
* We'll only load data from our stage to our table when the stream has a new record (remember the stream fills up with records of new data in the stage)

NOTE: We’re only bringing in the top 1 result (line 9). This is to speed up the initial load by restricting load to a single file. If you would like to load all of the PDF data into a table, simply delete or comment that clasue out.

In [None]:
CREATE OR REPLACE TASK LOAD_FED_PDFS_STAGE_TO_TABLE
	WAREHOUSE=GEN_AI_FSI_WH
	SCHEDULE='720 minute' --check twice a day, will only execute if the stream has data
	COMMENT='Process new FOMC files on the stage and insert their data into the PDF_DETAIL table.'
	WHEN SYSTEM$STREAM_HAS_DATA('FOMC_STREAM')
	AS INSERT INTO PDF_FULL_TEXT (
    WITH CTE AS ( 
        SELECT TOP 1
                FED_PDF_FULL_TEXT_SEQUENCE.NEXTVAL as ID,
                RELATIVE_PATH,
                SIZE,
                LAST_MODIFIED,
                MD5, 
                ETAG,
                FILE_URL,
                REPLACE(pdf_text_extractor(build_scoped_file_url('@FED_PDF', relative_path)),'''' ,'') as FILE_TEXT, 
                TRY_TO_DATE(REGEXP_SUBSTR (relative_path, '\\d{8}'),'YYYYMMDD') as FILE_DATE,
            FROM 
                FOMC_STREAM
            WHERE 
                METADATA$ACTION='INSERT'
        )
        SELECT
            ID, 
            RELATIVE_PATH,
            SIZE,
            LAST_MODIFIED,
            MD5, 
            ETAG,
            FILE_URL,
            FILE_TEXT, 
            FILE_DATE,
            SNOWFLAKE.CORTEX.TRY_COMPLETE('mistral-large2', generate_prompt(FILE_TEXT)) AS SIGNAL_mis
        FROM
            CTE
    );

## Manual Task Execution
Our task that pulls data from our stream won't run for another hour. Let's first check the stream for data (there should be some there), and then we'll execute the task to get data into snowflake now.

In [None]:
EXECUTE TASK download_fed_pdf_to_stage;

### Monitor Execution
You can check the task executions by following these instructions: https://docs.snowflake.com/en/user-guide/ui-snowsight-tasks#view-and-manage-individual-tasks. 

Once you see that the task is running, you should be able to query that stage location and see some files landing in the stage with the following SQL.

In [None]:
ALTER STAGE FED_PDF REFRESH;
SELECT * FROM DIRECTORY (@FED_PDF);

### Check the Stream
Remember the stream is monitoring for new files in our stage location. Let's take a look at that stage to see if it has any data in it.

In [None]:
--first, let's check the stream.
ALTER STAGE FED_PDF REFRESH;
SELECT * FROM FOMC_STREAM WHERE METADATA$ACTION='INSERT';

### Manual Task Execution
Great, the data is in our internal stage and our stream has captured that new data. Let's manually execute the LOAD_FED_PDFS_STAGE_TO_TABLE task to do the real magic and bring in the PDF text along with our hawkish, dovish, or neutral sentiment.


In [None]:
--now let's manually execute the script
--!! remember, if there was data in the above, it will run, if not, it will not run.
EXECUTE TASK LOAD_FED_PDFS_STAGE_TO_TABLE;

### Inspect Output
Looking at all of the data we can see our Sentiment column with our json object.

In [None]:
SELECT * FROM PDF_FULL_TEXT;