# Helper Functions and Imports

In [None]:
import requests
from bs4 import BeautifulSoup
from dateutil import parser
import pandas as pd
import numpy as np
import datetime

def federal_reserve_datetime_conversion(date_string, time_string):
    """
    Convert date and time string from Federal Reserve press release format to UTC datetime object
    eg.. date_string = 'January 06, 2021'
         time_string = '2:00 p.m. EST'
    """

    time_string = time_string.split(' ')
    if time_string[1] == 'p.m.' and time_string[0].split(':')[0] != '12':
        time_string[0] = str(int(time_string[0].split(':')[0]) + 12) + ':' + time_string[0].split(':')[1]
    elif time_string[1] == 'a.m.' and time_string[0].split(':')[0] == '12':
        time_string[0] = '00:' + time_string[0].split(':')[1]
    if time_string[2] == 'EST':
        time_string[0] = str(int(time_string[0].split(':')[0]) + 5) + ':' + time_string[0].split(':')[1]
    elif time_string[2] == 'EDT':
        time_string[0] = str(int(time_string[0].split(':')[0]) + 4) + ':' + time_string[0].split(':')[1]
    elif time_string[2] == 'UTC':
        pass
    else:
        raise ValueError(f'Timezone not recognized: {time_string}')

    time_string = time_string[0]

    # Parse the time string into a time object
    time_obj = datetime.datetime.strptime(time_string, '%H:%M')
    # Get the current date in Eastern time zone
    date_obj = datetime.datetime.strptime(date_string, '%B %d, %Y').date()

    # Combine the date and time objects
    date_obj = datetime.datetime.combine(date_obj, time_obj.time())

    return str(date_obj)

federal_reserve_datetime_conversion('January 06, 2021', '3:45 p.m. EDT')


In [None]:
import requests
from bs4 import BeautifulSoup
from dateutil import parser
import pandas as pd
import datetime

df_columns = ['timestamp', 'url', 'title', 'text', 'summary', 'source', 'tags', 'type', 'language', 'country']

parse_urls = [
    {   'url': 'https://www.federalreserve.gov/newsevents/pressreleases/',
        'source': 'Federal Reserve',
        'tags': 'general',
        'type': 'press release',
        'language': 'en',
        'country': 'US'
    },
        ]

url_parsed_dict = dict()
regulatory_pr_df = pd.DataFrame()
debugging = True
start_date = datetime.date(2021, 1, 1)
end_date = datetime.date(2023, 1, 1)

for parse_url in parse_urls:

    if parse_url['source'] == 'Federal Reserve':
        
        for n in range(int((end_date - start_date).days)):
            for category in ['monetary', 'other', 'orders', 'bcreg', 'enforcement']:

                char_append = 'a'
                while True:
                    response = requests.get(parse_url['url'] + category + (start_date + datetime.timedelta(n)).strftime('%Y%m%d') + char_append + '.htm')
                    # print(reponse.url)
                    if response.status_code != 200:
                        break

                    # increment char_append
                    char_append = chr(ord(char_append) + 1)

                    if debugging:
                        print(f"Valid: {response.url}")
                    processed_row = dict() 
                    soup = BeautifulSoup(response.content, 'html.parser')
                    date_string = soup.find('p', class_='article__time').text.strip()
                    time_string = soup.find('p', class_='releaseTime')
                    if 'immediate' in time_string.text:
                        time_string = '00:00 a.m. UTC'
                    else:
                        time_string = time_string.text.split('at')[-1].split('T')[0].strip() + 'T'

                    time_string = time_string.replace('  ', ' ')

                    processed_row['timestamp'] = federal_reserve_datetime_conversion(date_string, time_string)
                    if debugging:
                        print(processed_row['timestamp'], date_string, time_string)

                    processed_row['url'] = response.url
                    if processed_row['url'] not in url_parsed_dict:
                        url_parsed_dict[processed_row['url']] = True
                    else:
                        continue
                    processed_row['title'] = soup.find('h3', class_='title').text.strip()
                    processed_row['text'] = soup.find('div', class_='col-xs-12 col-sm-8 col-md-8').text.strip()
                    processed_row['summary'] = ''
                    processed_row['source'] = parse_url['source']
                    processed_row['tags'] = parse_url['tags']
                    processed_row['type'] = parse_url['type']
                    processed_row['language'] = parse_url['language']
                    processed_row['country'] = parse_url['country']

                    if debugging:
                        print(processed_row)

                    processed_row_df = pd.DataFrame(processed_row, index=[0])
                    regulatory_pr_df = pd.concat([regulatory_pr_df, processed_row_df], ignore_index=True, axis=0)

regulatory_pr_df.to_csv('regulatory_pr_fed.csv', index=False)
regulatory_pr_df

In [None]:
df = pd.read_csv('regulatory_pr.csv')
df

# Main

In [None]:
parse_urls = [
    {   'url': 'https://www.federalreserve.gov/newsevents/pressreleases/',
        'source': 'Federal Reserve',
        'tags': 'general',
        'type': 'press release',
        'language': 'en',
        'country': 'US'
    },
    {   'url': "https://www.fdic.gov/news/press-releases/2021/pr21",
        'source': 'FDIC',
        'tags': 'general',
        'type': 'press release',
        'language': 'en',
        'country': 'US'
    },
    {   'url': "https://www.fdic.gov/news/press-releases/2022/pr22",
        'source': 'FDIC',
        'tags': 'general',
        'type': 'press release',
        'language': 'en',
        'country': 'US'
    },
    {   'url': "https://www.sec.gov/news/pressreleases?aId=&combine=crypto&year=2021&month=All",
        'source': 'SEC',
        'tags': 'crypto',
        'type': 'press release',
        'language': 'en',
        'country': 'US'
    },
    {   'url': "https://www.sec.gov/news/pressreleases?aId=&combine=&year=2021&month=All",
        'source': 'SEC',
        'tags': 'general',
        'type': 'press release',
        'language': 'en',
        'country': 'US'
    },
    {   'url': "https://www.sec.gov/news/pressreleases?aId=&combine=crypto&year=2022&month=All",
        'source': 'SEC',
        'tags': 'crypto',
        'type': 'press release',
        'language': 'en',
        'country': 'US'
    },
    {   'url': "https://www.sec.gov/news/pressreleases?aId=&combine=&year=2022&month=All",
        'source': 'SEC',
        'tags': 'general',
        'type': 'press release',
        'language': 'en',
        'country': 'US'
    },
        ]

url_parsed_dict = dict()
regulatory_pr_df = pd.DataFrame()
debugging = False
start_date = datetime.date(2021, 1, 1)
end_date = datetime.date(2023, 1, 1)

for parse_url in parse_urls:

    if parse_url['source'] == 'Federal Reserve':
        
        for n in range(int((end_date - start_date).days)):
            for category in ['monetary', 'other', 'orders', 'bcreg', 'enforcement']:

                char_append = 'a'
                while True:
                    response = requests.get(parse_url['url'] + category + (start_date + datetime.timedelta(n)).strftime('%Y%m%d') + char_append + '.htm')
                    # print(reponse.url)
                    if response.status_code != 200:
                        break

                    # increment char_append
                    char_append = chr(ord(char_append) + 1)

                    if debugging:
                        print(f"Valid: {response.url}")
                    processed_row = dict() 
                    soup = BeautifulSoup(response.content, 'html.parser')
                    date_string = soup.find('p', class_='article__time').text.strip()
                    time_string = soup.find('p', class_='releaseTime')
                    if 'immediate' in time_string.text:
                        time_string = '00:00 a.m. UTC'
                    else:
                        time_string = time_string.text.split('at')[-1].split('T')[0].strip() + 'T'

                    time_string = time_string.replace('  ', ' ')

                    processed_row['timestamp'] = federal_reserve_datetime_conversion(date_string, time_string)
                    if debugging:
                        print(processed_row['timestamp'], date_string, time_string)

                    processed_row['url'] = response.url
                    if processed_row['url'] not in url_parsed_dict:
                        url_parsed_dict[processed_row['url']] = True
                    else:
                        continue
                    processed_row['title'] = soup.find('h3', class_='title').text.strip()
                    processed_row['text'] = soup.find('div', class_='col-xs-12 col-sm-8 col-md-8').text.strip()
                    processed_row['summary'] = ''
                    processed_row['source'] = parse_url['source']
                    processed_row['tags'] = parse_url['tags']
                    processed_row['type'] = parse_url['type']
                    processed_row['language'] = parse_url['language']
                    processed_row['country'] = parse_url['country']

                    if debugging:
                        print(processed_row)

                    processed_row_df = pd.DataFrame(processed_row, index=[0])
                    regulatory_pr_df = pd.concat([regulatory_pr_df, processed_row_df], ignore_index=True, axis=0)

    if parse_url['source'] == 'FDIC':

        i = 1
        while True:
            response = requests.get(parse_url['url'] + str(i).zfill(3) + '.html')
            if response.status_code != 200:
                if debugging:
                    print(f'pr21{str(i).zfill(3)}.html not found. Breaking loop.')
                break

            i += 1
            processed_row = dict() 
            soup = BeautifulSoup(response.content, 'html.parser')

            date_string = soup.find('span', class_='prdate').text.strip()
            if len(date_string.split(',')) == 3:
                processed_row['timestamp'] = datetime.datetime.strptime(date_string, '%A, %B %d, %Y').strftime('%Y-%m-%d %H:%M:%S')
            elif len(date_string.split(',')) == 2:
                processed_row['timestamp'] = datetime.datetime.strptime(date_string, '%B %d, %Y').strftime('%Y-%m-%d %H:%M:%S')
            processed_row['url'] = response.url
            if processed_row['url'] not in url_parsed_dict:
                url_parsed_dict[processed_row['url']] = True
            else:
                continue
            processed_row['title'] = soup.find_all('div', class_='prtitle')[0].find('h1').text.strip()
            
            processed_row['text'] = ''
            for paragraph in soup.find('article', class_='order-2').find_all('p'):
                processed_row['text'] += paragraph.text.strip() + '\n'
            processed_row['summary'] = ''
            processed_row['source'] = parse_url['source']
            processed_row['tags'] = parse_url['tags']
            processed_row['type'] = parse_url['type']
            processed_row['language'] = parse_url['language']
            processed_row['country'] = parse_url['country']

            if debugging:
                print(processed_row)

            processed_row_df = pd.DataFrame(processed_row, index=[0])
            regulatory_pr_df = pd.concat([regulatory_pr_df, processed_row_df], ignore_index=True, axis=0)

    if parse_url['source'] == 'SEC':

        # continue

        response = requests.get(parse_url['url'])
        # print(response.status_code)
        soup = BeautifulSoup(response.content, 'html.parser')

        articles = soup.find_all('tr', class_='pr-list-page-row')

        for article in reversed(articles):


            processed_row = dict() 
            row = article.find_all('td')

            # 1. Time
            processed_row['timestamp'] = parser.parse(row[1].find('time')['datetime']).strftime('%Y-%m-%d %H:%M:%S')
            # 2. URL
            processed_row['url'] = "https://www.sec.gov" + row[2].find('a')['href']
            if processed_row['url'] not in url_parsed_dict:
                url_parsed_dict[processed_row['url']] = True
            else:
                continue
            # 3. Title
            processed_row['title'] = row[2].find('a').text.strip()
            # 4. Text
            article_soup = BeautifulSoup(requests.get(processed_row['url']).content, 'html.parser')
            processed_row['text'] = article_soup.find('div', class_='article-body').text.strip()
            # 5. Summary
            processed_row['summary'] = ''
            # 6. Source
            processed_row['source'] = parse_url['source']
            # 7. Tags
            processed_row['tags'] = parse_url['tags']
            # 8. Type
            processed_row['type'] = parse_url['type']
            # 9. Language
            processed_row['language'] = parse_url['language']
            # 10. Country
            processed_row['country'] = parse_url['country']

            if debugging:
                print(processed_row)

            processed_row_df = pd.DataFrame(processed_row, index=[0])
            regulatory_pr_df = pd.concat([regulatory_pr_df, processed_row_df], ignore_index=True, axis=0)

regulatory_pr_df['timestamp'] = pd.to_datetime(regulatory_pr_df['timestamp'])
regulatory_pr_df = regulatory_pr_df.sort_values(by=['timestamp'])
regulatory_pr_df = regulatory_pr_df[(regulatory_pr_df['timestamp'] >= '2021-03-01 00:00:00') & (regulatory_pr_df['timestamp'] <= '2022-09-19 17:30:00')]
regulatory_pr_df.reset_index(drop=True, inplace=True)
regulatory_pr_df


In [None]:
regulatory_pr_df.to_csv('regulatory_pr.csv', index=False)

# Load and use regulatory_pr

In [None]:
regulatory_pr_df = pd.read_csv('regulatory_pr.csv')
regulatory_pr_df

In [None]:
crypto_keywords = ['cryptoinvestor',
                  'btc',
                  'cryptoexchange',
                  'ethereum',
                  'cryptocurrency',
                  'crypto',
                  'hodl',
                  'crypto currency',
                  'dogecoin',
                  'kraken',
                  'token',
                  'cryptocurrencymarket',
                  'gemini',
                  'coinbase',
                  'bitcoin',
                  'ico',
                  'altcoin',
                  'cryptocurrencies',
                  'binance',
                  'cryptonews',
                  'web3',
                  'cryptocurrencyexchange',
                  'doge',
                  'crypto mining',
                  '$eth',
                  'blockchain',
                  '$btc',
                  'coin',
                  'doge coin']

crypto_related = list()

for index, row in regulatory_pr_df.iterrows():
    found = False
    for keyword in crypto_keywords:
        if keyword in row['text'].lower().split():
            found = True
            # print(f"Found {keyword} in {row['url']}")
            break
    if found:
        crypto_related.append(1)
    else:
        crypto_related.append(0)


regulatory_pr_df['crypto_related'] = crypto_related
crypto_regulatory_pr_df =  regulatory_pr_df[regulatory_pr_df['crypto_related'] == 1]
crypto_regulatory_pr_df.reset_index(drop=True, inplace=True)

In [None]:
len(crypto_regulatory_pr_df)

# Use FinBERT

In [None]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
try:
    device = 'mps' if torch.backends.mps.is_available() else 'cpu'
except: 
    pass
print(f'{device} is available')

tokenizer = AutoTokenizer.from_pretrained("ProsusAI/finbert")
model = AutoModelForSequenceClassification.from_pretrained("ProsusAI/finbert")
model.to(device)

text = "SVB bank has collapsed due to high interest rates"
def sentiment_analysis(text):
    inputs = tokenizer(text, return_tensors="pt")
    inputs = {k: v.to(device) for k, v in inputs.items()}
    outputs = model(**inputs)
    predictions = torch.softmax(outputs.logits, dim=1).tolist()[0]
    return predictions

# predictions = sentiment_analysis(text)
# print(f"Negative: {predictions[0]:.2f}")
# print(f"Neutral: {predictions[1]:.2f}")
# print(f"Positive: {predictions[2]:.2f}")

In [None]:
positive_sentiment_list = list()
negative_sentiment_list = list()
for index, row in regulatory_pr_df.iterrows():
    prompt = f"{row['title']}\n{row['text']}"
    predictions = sentiment_analysis(prompt[:512])

    # print(f"{prompt}")
    # print(f"Negative: {predictions[0]:.2f}")
    # print(f"Neutral: {predictions[1]:.2f}")
    # print(f"Positive: {predictions[2]:.2f}")
    if predictions[0] > predictions[1] and predictions[0] > predictions[2]:
        positive_sentiment_list.append(0)
        negative_sentiment_list.append(1)
    elif predictions[2] > predictions[0] and predictions[2] > predictions[1]:
        positive_sentiment_list.append(1)
        negative_sentiment_list.append(0)
    else:
        positive_sentiment_list.append(0)
        negative_sentiment_list.append(0)
        
    
regulatory_pr_df['positive_sentiment'] = positive_sentiment_list
regulatory_pr_df['negative_sentiment'] = negative_sentiment_list

In [None]:
positive_sentiment_list = list()
negative_sentiment_list = list()
for index, row in crypto_regulatory_pr_df.iterrows():
    prompt = f"{row['title']}\n{row['text']}"
    predictions = sentiment_analysis(prompt[:512])

    # print(f"{prompt}")
    # print(f"Negative: {predictions[0]:.2f}")
    # print(f"Neutral: {predictions[1]:.2f}")
    # print(f"Positive: {predictions[2]:.2f}")
    if predictions[0] > predictions[1] and predictions[0] > predictions[2]:
        positive_sentiment_list.append(0)
        negative_sentiment_list.append(1)
    elif predictions[2] > predictions[0] and predictions[2] > predictions[1]:
        positive_sentiment_list.append(1)
        negative_sentiment_list.append(0)
    else:
        positive_sentiment_list.append(0)
        negative_sentiment_list.append(0)
        
    
crypto_regulatory_pr_df['positive_sentiment'] = positive_sentiment_list
crypto_regulatory_pr_df['negative_sentiment'] = negative_sentiment_list

## Compare sentiment crypto news vs fmdd

In [None]:
import requests
from bs4 import BeautifulSoup
from dateutil import parser
import pandas as pd
import numpy as np
import datetime

regulatory_pr_df = pd.read_csv('regulatory_pr.csv')
crypto_regulatory_pr_df =  regulatory_pr_df[regulatory_pr_df['crypto_related'] == 1]

In [None]:
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots

price_data_df = pd.read_csv('../new_values.csv')
crypto_regulatory_pr_positive_df = crypto_regulatory_pr_df[crypto_regulatory_pr_df['positive_sentiment'] == 1]
crypto_regulatory_pr_negative_df = crypto_regulatory_pr_df[crypto_regulatory_pr_df['negative_sentiment'] == 1]

fig = make_subplots(specs=[[{"secondary_y": True}]])
fig.add_trace(go.Scatter(x=price_data_df['timestamp'], y=price_data_df['Forward MDD'], name='Forward MDD'),
            secondary_y=False)
fig.add_trace(go.Scatter(x=price_data_df['timestamp'], y=price_data_df[f'close'], name=f'Price'),
            secondary_y=False)
fig.add_trace(go.Scatter(   x=crypto_regulatory_pr_positive_df['timestamp'], y=crypto_regulatory_pr_positive_df['positive_sentiment'], 
                            name='Positive Sentiment', line_shape='hvh', mode='markers',
                            line=dict(color='teal', width=4),
                            text=crypto_regulatory_pr_positive_df['title']),
            secondary_y=True)
fig.add_trace(go.Scatter(   x=crypto_regulatory_pr_negative_df['timestamp'], y=crypto_regulatory_pr_negative_df['negative_sentiment'], 
                            name='Negative Sentiment', line_shape='hvh', mode='markers',
                            line=dict(color='red', width=4),
                            text=crypto_regulatory_pr_positive_df['title']),
            secondary_y=True)
fig.update_layout(title_text="Forward MDD vs Sentiment for Crypto related Press Releases")
fig.show()

In [None]:
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots

price_data_df = pd.read_csv('../new_values.csv')
regulatory_pr_positive_df = regulatory_pr_df[regulatory_pr_df['positive_sentiment'] == 1]
regulatory_pr_negative_df = regulatory_pr_df[regulatory_pr_df['negative_sentiment'] == 1]

fig = make_subplots(specs=[[{"secondary_y": True}]])
fig.add_trace(go.Scatter(x=price_data_df['timestamp'], y=price_data_df['Forward MDD'], name='Forward MDD'),
            secondary_y=False)
fig.add_trace(go.Scatter(x=price_data_df['timestamp'], y=price_data_df[f'close'], name=f'Price'),
            secondary_y=False)
fig.add_trace(go.Scatter(   x=regulatory_pr_positive_df['timestamp'], y=regulatory_pr_positive_df['positive_sentiment'], 
                            name='Positive Sentiment', line_shape='hvh', mode='markers',
                            line=dict(color='teal', width=4),
                            text=regulatory_pr_positive_df['title']),
            secondary_y=True)
fig.add_trace(go.Scatter(   x=regulatory_pr_negative_df['timestamp'], y=regulatory_pr_negative_df['negative_sentiment'], 
                            name='Negative Sentiment', line_shape='hvh', mode='markers',
                            line=dict(color='red', width=4),
                            text=regulatory_pr_positive_df['title']),
            secondary_y=True)
fig.update_layout(title_text="Forward MDD vs Sentiment for all Press Releases")
fig.show()


## Analyze Change in FMDD for PR release

In [None]:
for hours_delta in [1,2,4]:

    fmdd_increase = list()
    price_increase = list()
    positive_pr_fmmdd_increase = 0
    positive_pr_price_increase = 0
    negative_pr_fmmdd_increase = 0
    negative_pr_price_increase = 0
    positive_pr_count = 0
    negative_pr_count = 0
    for i, row in crypto_regulatory_pr_df.iterrows():
        # Get FMDD value for timestamp in price_data_df just before the news release
        timestamp_start, price_start, fmdd_start = price_data_df[price_data_df['timestamp'] <= row['timestamp']].iloc[-1][['timestamp', 'close', 'Forward MDD']].values
        timestamp_end = str(datetime.datetime.strptime(timestamp_start, '%Y-%m-%d %H:%M:%S') + datetime.timedelta(hours=hours_delta))
        fmdd_mean = np.mean(price_data_df[(price_data_df['timestamp'] > timestamp_start) & (price_data_df['timestamp'] <= timestamp_end)]['Forward MDD'].values)
        price_mean = np.mean(price_data_df[(price_data_df['timestamp'] > timestamp_start) & (price_data_df['timestamp'] <= timestamp_end)]['close'].values)
        if fmdd_mean > fmdd_start:
            fmdd_increase.append(1)
        else:
            fmdd_increase.append(0)
        if price_mean > price_start:
            price_increase.append(1)
        else:
            price_increase.append(0)

        if row['positive_sentiment'] == 1:
            positive_pr_count += 1
            if fmdd_increase[-1] == 1:
                positive_pr_fmmdd_increase += 1
            if price_increase[-1] == 1:
                positive_pr_price_increase += 1
        if row['negative_sentiment'] == 1:
            negative_pr_count += 1
            if fmdd_increase[-1] == 1:
                negative_pr_fmmdd_increase += 1
            if price_increase[-1] == 1:
                negative_pr_price_increase += 1
        
    positive_pr_fmmdd_increase = positive_pr_fmmdd_increase/positive_pr_count if positive_pr_count > 0 else 0
    positive_pr_price_increase = positive_pr_price_increase/positive_pr_count if positive_pr_count > 0 else 0
    negative_pr_fmmdd_increase = negative_pr_fmmdd_increase/negative_pr_count if negative_pr_count > 0 else 0
    negative_pr_price_increase = negative_pr_price_increase/negative_pr_count if negative_pr_count > 0 else 0

    # crypto_regulatory_pr_df['fmdd_increase'] = fmdd_increase
    # crypto_regulatory_pr_df['price_increase'] = price_increase

    print(f'Crypto only Press Releases(PR) Num: {positive_pr_count+negative_pr_count} within {hours_delta} hours')
    print(f"Positive PR FMDD decrease: {round(100*(1-positive_pr_fmmdd_increase),2)}%")
    print(f"Negative PR FMDD increase: {round(100*(negative_pr_fmmdd_increase),2)}%")
    print(f"Positive PR Price increase: {round(100*(positive_pr_price_increase),2)}%")
    print(f"Negative PR Price decrease: {round(100*(1-negative_pr_price_increase),2)}%")