**Step 0:** Setting up environment

In [None]:
!apt-get update -qq > /dev/null
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
!tar xf spark-2.4.7-bin-hadoop2.7.tgz
!pip install -q findspark
!pip install yahoo_fin --upgrade

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"

import findspark
findspark.init()

from pyspark import SparkContext, SparkConf

spark_conf = SparkConf()\
  .setAppName("YourTest")\
  .setMaster("local[*]")

sc = SparkContext.getOrCreate(spark_conf)

In [None]:
import yahoo_fin.stock_info as si

**Step 1:** Gathered all the required data for companies in our index of interest (S&P500) from yahoo_fin API into a spark RDD data structure.

In [None]:

tickers = si.tickers_sp500() # replace with any basket of stocks

def get_info(ticker):
  df1 = si.get_stats(ticker)
  df2 = si.get_analysts_info(ticker)['Growth Estimates']


  eps = float(df1[df1['Attribute'] == 'Diluted EPS (ttm)'].iloc[0][1])
  growth_rate = float(df2[df2['Growth Estimates'] == 'Next 5 Years (per annum)'].iloc[0][1][:-1])
  if eps == float("nan") or growth_rate == float("nan"):
    return 0, 0, 0
  PE_ratio = growth_rate * 2       # Can also be obtained from api, then take minimum of two, but it shouldn't be none!
  return eps, growth_rate, PE_ratio


rdd = sc.parallelize(tickers).cache()
stock_info = rdd.map(lambda x: (x, get_info(x))).filter(lambda x: x == None) # Format -- (Ticker, (info))



Step 2: Looked 10 years into the future with expected growth rate and calculated future stock price.

In [None]:

YEARS = 10 # can be changed to look further into future

def get_future_growth(eps, growth_rate):
  for i in range(YEARS):
    eps += eps*(growth_rate/100)
  return eps

future_growth = stock_info.map(lambda x: (x[0], x[1], get_future_growth(x[1][0], x[1][1]))).map(lambda x: (x[0], x[1], x[2]*x[1][2])) # Format -- (Ticker, (info), future_price)


Step 3: Calculated discounted stock price today, by deducting expected returns and applying margin of safety.


In [None]:
MIN_RATE_RETURN = 15        # Return expected
MARGIN_OF_SAFETY = 50       # Increase for conservative investment, lower for riskier

def get_current_valuation(value):
  for i in range(YEARS):
    value = value/( 1 + (MIN_RATE_RETURN/100) )
  return value


discounted_price_today = future_growth.map(lambda x: (x[0], x[1], get_current_valuation(x[2]))) # Format -- (Ticker, (info), intrinsic_price)
apply_margin_of_safety = discounted_price_today.map(lambda x: (x[0], x[2]*(MARGIN_OF_SAFETY/100))) # Format -- (Ticker,  intrinsic_margin_price)

Step 4: Get current valuation percentage and assign valuation score. Then, categorize into following baskets -- 

- (+3) Extremely Under-valued: 75% or more

- (+2) Moderately Under-valued: 25% to 75%

- (+1) Somewhat Under-valued: 0 to 25%

- (-1) Somewhat Over-valued: 0 to -25%

- (-2) Moderately Over-valued: -25% to -75%

- (-3) Extremely Over-valued: -75% or more


In [None]:

value_stocks = apply_margin_of_safety.map(lambda x: (x[0], x[1], si.get_live_price(x[0]))) # Format -- (Ticker,  intrinsic_price, live_price)
valuation = value_stocks.map(lambda x: (x[0], (100 * (x[1] - x[2])) / x[2])) # Format -- (Ticker, Valuation %) -- [Postive % == Undervalued ; Negative % == Overvalued]

def categorize_stocks(info):
  ticker, x = info
  if x > 0 and x <= 25:
    return (1, ticker)
  elif x > 25 and x <= 75:
    return (2, ticker)
  elif x > 75:
    return (3, ticker)
  elif x < 0 and x >= -25:
    return (-1, ticker)
  elif x < -25 and x >= -75:
    return (-2, ticker)
  elif x < -75:
    return (-3, ticker)
  else:
    return (0, x) # for 'None' cases


scores = valuation.map(categorize_stocks) # Format -- (valution_score, Ticker)
# groups = scores.groupByKey() #Not working..


In [None]:
print(scores.take(70))

# # print(si.tickers_sp500()) #BKR

# df1 = si.get_stats('bkr')
# eps = float(df1[df1['Attribute'] == 'Diluted EPS (ttm)'].iloc[0][1])

# print(type(eps))

