In [1]:
import numpy as np
import pandas as pd
from bs4 import BeautifulSoup
import requests, lxml
from lxml import html
import json
from decimal import *
import math
import datetime
# from retry import retry

In [2]:
env_vars = {}

with open('config.env') as f:
    for line in f:
        if line.startswith('#') or not line.strip():
            continue
        key, value = line.strip().split('=')
        env_vars[key] = value
        
AIRTABLE_TOKEN = env_vars["AIRTABLE_TOKEN"]
AIRTABLE_BASE_ID = env_vars["AIRTABLE_BASE_ID"]
AIRTABLE_URL = f"https://api.airtable.com/v0/{AIRTABLE_BASE_ID}"

In [3]:
class statistics:
    yfinance_url = "https://finance.yahoo.com/"
    def __init__(self, symbol):
        '''

        :param symbol: stock symbol in all caps
        please note that any Canadian TSX stocks are followed with ".TO"     
        check the relevant URL for formatting.
        '''
        self.symbol = symbol.upper()
        self.path = "quote/{0}/key-statistics?p={0}".format(symbol)
        self.url = self.yfinance_url + self.path
        self.methods = ['scrape_page', 'label_stats']
        self.attributes = ['self.symbol', 'self.path', 'self.url',
                          'self.methods', 'self.hdrs','self.valuation']
#                           'self.fiscal_year', 'self.profitability',
#                           'self.manager_effect','self.income_statement',
#                           'self.balance_sheet', 'self.cash_statement',
#                           'self.price_history', 'self.share_stats',  
#                           'self.dividendSplit']
        self.hdrs = {"authority": "finance.yahoo.com",
                     "method": "GET",
                     "path": self.path,
                     "scheme": "https",
                     "accept": "text/html,application/xml;q=0.9",
                     "accept-encoding": "gzip, deflate, br",
                     "accept-language": "en-US,en;q=0.9",
                     "referer": self.yfinance_url,
                     "sec-fetch-mode": "navigate",
                     "sec-fetch-site": "same-origin",
                     "sec-fetch-user": "?1",
                     "upgrade-insecure-requests": "1",
                     "user-agent": "Mozilla/5.0 (Windows NT 10.0;)"}
#     @retry ((IndexError), tries=3, delay=1, backoff=2)
    def scrape_page(self):
        '''

        :return: scrapes the content of the class URL,
                   using headers defined in the init function,
                   returning a byte string of html code.
        '''
        page = requests.get(self.url, headers=self.hdrs)
        soup = BeautifulSoup(page.content, 'lxml')
        tables = soup.find_all('table')
        iterator = range(0, len(tables))
        function = lambda x: pd.read_html(str(tables[x]))
        table_list = list(map(function, iterator))[0]
#         print('table: ', table_list[0])
        market_cap = table_list[0].iloc[0][1]
        return market_cap
    def label_stats(self, table_list):
        '''
        :param table_list: uses the output of the scrape_page method
        :return: creates attributes for the statistics class object,
                 uses indexLabel method to label columns and set the dataframes' index
        
        '''
        iterator = [table_list[i][0] for i in range(0, len(table_list))]
        
        table_list = list(map(lambda df: self.__indexLabel__(df), iterator))
#         self.valuation, self.fiscal_year, self.profitability, self.manager_effect, \
#         self.income_statement, self.balance_sheet, self.cash_statement, \
#         self.price_history, self.share_stats, self.dividendSplit = table_list
        
        self.valuation = table_list
        print(self.valuation)
        return table_list
    def __indexLabel__(self, df):
        '''
        
        :param df: Takes a dataframe as input.
        :return: returns a dataframe with column labels and a set index.
        
        '''
        df.columns = ['Measure', 'Value']
        df = df.set_index('Measure')
        return df

In [4]:
class RealTimeCurrencyConverter():
    def __init__(self, url):
        self.data = requests.get(url).json()
        self.rates = self.data['rates']
        self.date = self.data['date']
    def convert(self, amount, currency):
        if currency == 'USD':
            return amount
        conversion_rate = self.rates[currency]
        usd_amount = amount / self.rates[currency]
        return usd_amount

In [5]:
exchange_rate_url = 'https://api.exchangerate-api.com/v4/latest/USD'
converter = RealTimeCurrencyConverter(exchange_rate_url)

In [6]:
unit_multiplier = {
    'K': 1000,
    'M': 1000000,
    'B': 1000000000,
    'T': 1000000000000
}
def convert_mc_actual_number(mc_number, mc_number_unit):
    return mc_number * unit_multiplier[mc_number_unit]

In [7]:
def convert_formatted_mc_actual_number(usd_amount):
    oneplace = Decimal(10) ** -1
    if usd_amount >= unit_multiplier['T']:
        formatted_usd_amount = usd_amount/unit_multiplier['T']
        formatted_usd_amount_one_decimal = Decimal(formatted_usd_amount).quantize(oneplace, rounding=ROUND_UP)
        return str(formatted_usd_amount_one_decimal) + ' T'
    elif usd_amount >= unit_multiplier['B']:
        formatted_usd_amount = usd_amount/unit_multiplier['B']
        formatted_usd_amount_one_decimal = Decimal(formatted_usd_amount).quantize(oneplace, rounding=ROUND_UP)
        return str(formatted_usd_amount_one_decimal) + ' B'
    elif usd_amount >= unit_multiplier['M']:
        formatted_usd_amount = usd_amount/unit_multiplier['M']
        formatted_usd_amount_one_decimal = Decimal(formatted_usd_amount).quantize(oneplace, rounding=ROUND_UP)
        return str(formatted_usd_amount_one_decimal) + ' M'
    else:
        formatted_usd_amount = usd_amount/unit_multiplier['K']
        formatted_usd_amount_one_decimal = Decimal(formatted_usd_amount).quantize(oneplace, rounding=ROUND_UP)
        return str(formatted_usd_amount_one_decimal) + ' K'

In [8]:
def get_currency(stock_symbol):
    if 'HK' in stock_symbol:
        return 'HKD'
    elif 'SS' in stock_symbol or 'SZ' in stock_symbol:
        return 'CNY'
    else:
        return 'USD'

In [9]:
def get_note(currency, time):
    if currency == 'USD':
        return '"Valuation is reported as market capitalization. It was last updated on {}."'.format(time)
    elif currency == 'HKD':
        return '"Valuation is reported as market capitalization. It was converted from {} using the exchange rate from {}."'.format(currency, time)
    elif currency == 'CNY':
        return '"Valuation is reported as market capitalization. It was converted from RMB using the exchange rate from {}."'.format(time)

In [10]:
def save_in_json(input):        
    with open('yfinance_data.json', 'w', encoding='utf-8') as jsonfile:
        jsonfile.write(json.dumps(input, indent=4))

In [11]:
def get_yfinance_data(sheet_name):
    url = f"{AIRTABLE_URL}/{sheet_name}"
    headers = {
        'Authorization': f'Bearer {AIRTABLE_TOKEN}',
        'Content-Type': 'application/json'
    }
    
    response = requests.request("GET", url, headers=headers)
    return response

In [12]:
# @retry(ValueError, tries=3, delay=1, jitter=1)
def pull_from_yfinance_and_save_as_json(records):
    for record in records:
        now = datetime.datetime.now()
        time = now.strftime("%B %d, %Y %H:%M:%S")
        
        unparsed_ticker = record['fields']['_ticker_symbol TEST']
        stock_symbol = prioritize(unparsed_ticker)
        
        market_cap = statistics(stock_symbol).scrape_page()
        print(stock_symbol, ': ', market_cap)
        
        if isinstance(market_cap, str) and market_cap[-1] in ['K', 'M', 'B', 'T']:
            mc_number = float(market_cap[:-1])
            mc_number_unit = market_cap[-1] # M for million, B for billion, T for trillion
            mc_actual_number = convert_mc_actual_number(mc_number, mc_number_unit)
            currency = get_currency(stock_symbol)
            usd_amount = converter.convert(mc_actual_number, currency)
            
            formatted_mc = 'USD ' + convert_formatted_mc_actual_number(usd_amount)
            record['fields']['company_valuation'] = formatted_mc
            note = get_note(currency, time)
            record['fields']['company_valuation_notes'] = note

    save_in_json(records)

In [13]:
def prioritize(ticker):
    tickers_for_company = ticker.split(',')

    if 'NYSE' in ticker:
        nyse_ticker = list(filter(lambda t: ('NYSE' in t), tickers_for_company))[0]
        return nyse_ticker.split(':')[1].strip()
    elif 'HK' in ticker:
        return list(filter(lambda t: ('HK' in t), tickers_for_company))[0].strip()
    elif 'SS' in ticker:
        return list(filter(lambda t: ('SS' in t), tickers_for_company))[0].strip()
    elif 'SZ' in ticker:
        return list(filter(lambda t: ('SZ' in t), tickers_for_company))[0].strip()

In [15]:
airtable_response = get_yfinance_data('Testing').json()
records = airtable_response['records']
pull_from_yfinance_and_save_as_json(records)

BABA :  240.92B
3988.HK :  1.11T
3908.HK :  168.95B
0031.HK :  1.53B
BGNE :  19.32B
1288.HK :  1.30T
000625.SZ :  71.15B
600733.SS :  28.64B
0939.HK :  1.48T
2020.HK :  252.64B
JOBS :  4.10B
CEA :  11.87B
1211.HK :  698.09B
BILI :  10.19B
2357.HK :  32.54B
9888.HK :  363.24B
1062.HK :  330.85M
3328.HK :  439.48B
300750.SZ :  1.05T
KRKR :  38.01M
