In [None]:
import requests
from io import StringIO
import pandas as pd
import numpy as np
import sqlite3
sqlite3.register_adapter(np.int64, int)
import re
import math
from datetime import date
import matplotlib.pyplot as plt

# 展開所有dataframe columns
pd.set_option('display.max_rows', 2000)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)
pd.set_option ('display.max_colwidth',500)

# 不縮寫成科學記號
pd.set_option('display.float_format',lambda x : '%.2f' % x)

db = sqlite3.connect('./stock.db' , isolation_level=None)

In [None]:
# 下次換季時間： 3/31, 5/15, 8/14, 11/14
QUARTERS = ['2022q4','2023q1','2023q2','2023q3']
CURRENT_QUARTER = QUARTERS[-1][-2:]
CURRENT_QUARTERS = list(map(lambda q: q[-2:], QUARTERS))
CURRENT_YEAR = int(QUARTERS[-1][0:4])
LAST_YEAR_QUARTER = '2022q4'
CURRENT_YEARS = [2018, 2019, 2020, 2021, 2022]
print("QUARTERS:", QUARTERS)
print("CURRENT_QUARTER:", CURRENT_QUARTER)
print("CURRENT_QUARTERS:", CURRENT_QUARTERS)
print("CURRENT_YEAR:", CURRENT_YEAR)
print("LAST_YEAR_QUARTER:", LAST_YEAR_QUARTER)
print("CURRENT_YEARS:", CURRENT_YEARS)

In [None]:
# 查詢股本table
df_stock_id_name = pd.read_sql_query("select * from stockIdName",db)
df_stock_id_name.head()

In [None]:
out = df_stock_id_name[['id', 'name', 'market', 'industry', 'capital']]
out.head()

In [None]:
ids = out['id'].tolist()

# 用的到的table(加速運算時間,不用每個id都查db)

In [None]:
# 月營收
monthly = pd.read_sql_query(f"select * from monthlyRevenue order by date" , db)
# 日收盤
daily = pd.read_sql_query("select * from daily",db)
daily = daily.sort_values(by=['date'])
# 現金流量表
cashflow = pd.read_sql_query("select * from cashflow order by date",db)
# 財報
df_financial_statement = pd.read_sql_query("select * from financialStatement order by date",db)
df_financial_statement = df_financial_statement.fillna(0)
df_financial_statement.replace('--', 0, inplace=True)
# 負債表
df_debt = pd.read_sql_query(f"select * from balanceSheet order by date",db)

# YoY > 0筆數

月營收yoy>0筆數

In [None]:
def countYoYGreaterThan0(id):
    df = monthly[(monthly['id']==id) & (monthly['YoY'] > 0)]
    
    try:
        return len(df)
    except:
        return np.nan
    
count = [countYoYGreaterThan0(id) for id in ids]
out['yoyIncreaseCount'] = count

In [None]:
out.sort_values(by=['yoyIncreaseCount'], ascending=False).head(10)

# 月營收

In [None]:
# 近12月營收平均
def TTMRevenueCal(id):
    df = monthly[(monthly['id']==id)].tail(12)

    try:
        TTMRevenue = round((df['revenue'].sum()/100000),2)
        TTMYoY = round(df['sumYoY'].mean() ,2)
        return TTMRevenue, TTMYoY
    except:
        return np.nan

out['TTMRevenue'] = [TTMRevenueCal(id)[0] for id in ids]
out['TTMYoY'] = [TTMRevenueCal(id)[1] for id in ids]

# TTMRevenueCal(2330)

In [None]:
out[out['id'].isin([2330,3037])]

# 毛利率(Gross Profit Margin)
# 營業利益率(Operating profit Margin)
# 淨利率(Net profit margin)

In [None]:
df_financial_statement['grossProfitMargin'] = df_financial_statement['grossProfit'].div(df_financial_statement['revenue'])
df_financial_statement['operatingProfitMargin'] = df_financial_statement['operatingIncome'].div(df_financial_statement['revenue'])
df_financial_statement['netProfitMargin'] = df_financial_statement['income'].div(df_financial_statement['revenue'])

In [None]:
# 近四季毛利率、營業利益率、淨利率
def writeToOut(id, date, col):
    df = df_financial_statement[(df_financial_statement['date']==date) & (df_financial_statement['id']==id)]
    try: 
        return round(df[col].iloc[0], 2)
    except:
        return np.nan

cols = ['grossProfitMargin', 'operatingProfitMargin', 'netProfitMargin']

for col in cols:
    for idx, quarter in enumerate(QUARTERS):
        out[f'{col}{CURRENT_QUARTERS[idx]}'] = [writeToOut(id, quarter, col) for id in ids]

# writeToOut(2330, '2023q2', 'grossProfitMargin')

In [None]:
out[out['id'].isin([2330,2454,3037])]

In [None]:
# 近五年毛利率,營業利益率,淨利率 (too slow !!!)
def findYearFinancialStatement(id, year, col):
    try:
        return round(df_financial_statement[(df_financial_statement['id']==id) & (df_financial_statement['date'].str.contains(str(year)+'q4'))][col].iloc[0], 2)
    except:
        return np.nan

# for col in cols:
#     for year in CURRENT_YEARS:
#         out[f"{year}{col}"] = [findYearFinancialStatement(id, year, col) for id in ids]

# EPS

In [None]:
# 近四季eps
def nearFourQuarterEPS(id):
    try:
        fid = df_financial_statement['id'] == id
        return round(df_financial_statement[fid].sort_values(by=['date']).tail(4)['qeps'].sum(), 2)
    except:
        return np.nan

# nearFourQuarterEPS(4977)

out['eps'] = [nearFourQuarterEPS(id) for id in ids]

In [None]:
# 上年eps
def lastYearEPS(id):
    try:
        isId = df_financial_statement['id'] == id
        isLastYear = df_financial_statement['date'].str.contains(LAST_YEAR_QUARTER)
        return df_financial_statement[isId & isLastYear]['eps'].iloc[0]
    except:
        return np.nan 
    
out['last_eps'] = [lastYearEPS(id) for id in ids]

# 收盤價

In [None]:
def recentClose(id):
    try:
        return daily[daily['id']==id].tail(1).iloc[0]['close']
    except:
        return np.nan
    
out['收盤價'] = [recentClose(id) for id in ids]

# 現金流量

In [None]:
# 近四季自由現金
def calFreeCashFlow(id):
    try:
        df = cashflow[cashflow['id']==id].tail(4)
        return math.floor(df['qcashflow'].sum() / 1e5)
    except:
        return np.nan
    
# calFreeCashFlow(4977)
out['freeCashFlow'] = [calFreeCashFlow(id) for id in ids]

In [None]:
# 近5年自由現金流平均
def FiveYearsFreeCashFlowAvg(id):
    try:
        df = cashflow[(cashflow['id']==id) & (cashflow['date'].str.contains('q4'))].tail(5)
        freeCash = df['operating'] + df['investing']
        return math.floor(freeCash.mean() / 1e5)
    except:
        return np.nan
    
out['freeCashFlow5Y'] = [FiveYearsFreeCashFlowAvg(id) for id in ids]

In [None]:
out[out['id'].isin([5483,2454,3037,9103])]

# 負債

In [None]:
def debt(id):
    df = df_debt[df_debt['id']==id].tail(1)
    df['debtRate'] = df['liabilities'] / df['asset']
    try:
        return round(df['debtRate'].iloc[0] ,2)
    except:
        return np.nan

out['debt'] = [debt(id) for id in ids]

# 市值

In [None]:
out['marketCap'] = round(out['收盤價'] * out['capital'], 2)
df_marketCap = out.sort_values(by=['marketCap'], ascending=False).head(50)
df_marketCap[['id', 'name', 'industry', 'capital', 'marketCap', 'eps', '收盤價', ]]

# MA

In [None]:
def ma(id, day):
    df_close = daily[daily['id'] == id]
    try:
        return round(df_close.tail(day)['close'].mean(), 2)
    except:
        return np.nan
out['ma20'] = [ma(id, 20) for id in ids]
out['ma60'] = [ma(id, 60) for id in ids]
out['ma120'] = [ma(id, 120) for id in ids]
out['aboveMa20'] = out['收盤價'] > out['ma20']

In [None]:
out[out['id'].isin([4977,2330])]

# PE, EPS_INCREASE, PEG

In [None]:
# pe
series_pe = round(out['收盤價']/out['eps'] ,2)
out['pe'] = series_pe

# eps_increase
out['eps_increase'] = round(100*(out['eps']-out['last_eps'])/out['last_eps'], 2)

# peg
out['peg'] = round(out['pe']/out['eps_increase'], 2)

In [None]:
def cal_avg_pe(id):
    df = daily[daily['id']==id]
    try:
        return round(df['pe'].mean(), 2)
    except:
        return np.nan

out['avg_pe'] = [cal_avg_pe(id) for id in ids]

## 筆數

In [None]:
def count_daily(id):
    df = daily[daily['id']==id]
    return len(df)

out['count'] = [count_daily(id) for id in ids]

## DCF

In [None]:
def epsDcf(eps, cagr = 0.02):
    r = 0.08   # 折現率, 希望報酬, 大盤平均報酬, WACC
    g = 0.02  # 永續成長率
    COUNT_YEAR = 5
    EV = 0     # 企業價值(每年eps折現加總+最終價值tv)

    for year in range(1, COUNT_YEAR + 1):
        FCF = round(eps * pow(1 + cagr, year), 2)
        discountFCF = round(FCF / pow((1 + r), year), 2)
        EV += discountFCF

        if(year == COUNT_YEAR):
            TV = (discountFCF * (1 + g)) / (r - g)
            TV = round(TV / (pow((1 + r), COUNT_YEAR)) , 2)
            EV += TV
    
    return EV

out['epsDcf'] = epsDcf(out['eps'])

In [None]:
def cashflowDcf(cashflow, cagr = 0.02):
    r = 0.08   # 折現率, 希望報酬, 大盤平均報酬, WACC
    g = 0.02  # 永續成長率
    COUNT_YEAR = 5
    EV = 0     # 企業價值(每年eps折現加總+最終價值tv)

    for year in range(1, COUNT_YEAR + 1):
        FCF = round(cashflow * pow(1 + cagr, year), 2)
        discountFCF = round(FCF / pow((1 + r), year), 2)
        EV += discountFCF

        if(year == COUNT_YEAR):
            TV = (discountFCF * (1 + g)) / (r - g)
            TV = round(TV / (pow((1 + r), COUNT_YEAR)) , 2)
            EV += TV
    return EV

def calCashflowDcf(id, cagr = 0.02):
    df = out[out['id']==id]
    freeCashFlow = df['freeCashFlow'].iloc[0]
    
    df_ids = df_stock_id_name[df_stock_id_name['id']==id]
    capital = df_ids['capital'].iloc[0]
    
    return round(cashflowDcf(freeCashFlow, cagr)/capital, 2)

out['cashflowDcf'] = [calCashflowDcf(id) for id in ids]

In [None]:
out['epsDcfClose'] = out['收盤價'] < out['epsDcf']
out['cashflowDcfClose'] = out['收盤價'] < out['cashflowDcf']

# ROE

In [None]:
def roe(id, ttm = True):
    try:
        debt = df_debt[df_debt['id']==id].tail(1)
        shareholderEquity = debt['shareholderEquity'].iloc[0]
        financial_statement = df_financial_statement[df_financial_statement['id']==id].tail(4)
        income_mean = financial_statement['qincome'].mean()
        income_q = financial_statement['qincome'].tail(1).iloc[0]
        income =  income_mean if ttm else income_q
        
        return round(income*100/shareholderEquity , 2)
    except:
        return np.nan

out['TTMRoe'] = [roe(id) for id in ids]
out['roe'] = [roe(id, False) for id in ids]

In [None]:
# 計分
# 1. 在月線上
# 2. close < epsDcfClose
# 3. close < cashflowDcfClose
out['score'] = out['aboveMa20'].astype(int) + out['epsDcfClose'].astype(int) + out['cashflowDcfClose'].astype(int)

In [None]:
out[out['id'].isin([4977,2330,3037,2454,2379,6245])]

In [None]:
features = out[['id','name','market','industry','capital','marketCap','debt','TTMRevenue','TTMYoY','TTMRoe','roe','freeCashFlow','freeCashFlow5Y','last_eps','eps','eps_increase','yoyIncreaseCount','pe','avg_pe','peg','ma20','ma60','ma120','aboveMa20','收盤價','epsDcf','cashflowDcf','epsDcfClose','cashflowDcfClose','score']]
features[features['id'].isin([4977,2330,3037,2454,2379,2404,6670,6245])]

# 統計數字

In [None]:
today = '20231203'
print('總家數:', len(out))
print('市值大於50億家數:', len(out[out['marketCap'] > 50]))
print(f'市值加總: {round(out["marketCap"].sum()/1e4, 2)}兆')

In [None]:
features.to_csv(f"features_{today}.csv", index = False)

In [None]:
# 在季線上家數
upMa60 = len(out[out['收盤價'] > out['ma60']])
percent = round(len(out[out['收盤價'] > out['ma60']])*100/len(out), 2)
print(f'在季線上家數: {upMa60} ({percent}%)')
upMa120 = len(out[out['收盤價'] > out['ma120']])
percent120 = round(len(out[out['收盤價'] > out['ma120']])*100/len(out), 2)
print(f'在半年線上家數: {upMa120} ({percent120}%)')

In [None]:
out[out['id'].isin([4977])]

In [None]:
out.to_csv(f"features_{today}.csv", index = False)

# 選股1
* roe > 5
* 股價 < dfc(eps and cashflow)
* 排除航運
* 5年free cashflow > 0

In [None]:
filter = out
roe = (filter['roe'] > 5)
dcf = (filter['收盤價'] < filter['epsDcf']) & ((filter['收盤價'] < filter['cashflowDcf']))
industry = (filter['industry'] != '航運業')
freeCashFlow5Y = (filter['freeCashFlow5Y'] > 0)

filter2 = filter[roe & dcf & industry & freeCashFlow5Y]
print(len(filter2), "\n")
print("、".join([f"{row['id']}{row['name']}({row['收盤價']})" for index, row in filter2.iterrows()]))
filter2

# 選股2
* 本益比 < 15
* eps > 3
* marketCap(市值) > 50億
* yoyCount(yoy>0次數) > 0
* industry(產業) 排除生技醫療業、航運業
* grossmargin(毛利率) > 0.2
* profit(淨利率), operating(營業利益率) > 0
* cashflowSum(現金流) > 0
* 自由現金流為正 > 0
* count至少有一年資料 > 240

In [None]:
filter = out
industry = (filter['industry'] != '生技醫療業') & (filter['industry'] != '航運業') & (filter['industry'] != '文化創意業')
eps = (filter['eps'] > 3) 
pe = (filter['pe'] <= 20) & (filter['pe'] > 0)
marketCap = (filter['marketCap'] > 50) 
freeCashFlow = (filter['freeCashFlow'] > 0)
yoyCount = (filter['yoyIncreaseCount'] > 0)
freeCashFlow5Y = (filter['freeCashFlow5Y'] > 0)
count = (filter['count'] > 240)
roe = (filter['roe'] > 0)

filter1 = filter[eps & pe & count & freeCashFlow & freeCashFlow5Y & roe & industry] 

print(f'{len(filter1)} 筆')

print("、".join([f"{row['id']}{row['name']}({row['收盤價']})" for index, row in filter1.iterrows()]))

filter1

In [None]:
# 產業分布
# filter1.groupby(by=['industry']).agg({'id': len, 'name': ', '.join}).sort_values(by=['id'], ascending=False)

In [None]:
# 技術分析(多頭)
ma20 = filter1['ma20']
ma60 = filter1['ma60']
ma120 = filter1['ma120']
ma20_60 = ma20 >= ma60
ma60_120 = ma60 >= ma120
ma20_120 = ma20 >= ma120
filter_long = filter1[ma20_60 & ma60_120 & ma20_120]
print('多頭結構：', len(filter_long))
print("、".join([f"{row['id']}{row['name']}({row['收盤價']})" for index, row in filter_long.iterrows()]))
filter_long

In [None]:
dev = 2
diff20_60 = abs((ma20-ma60)*100/ma60) <= dev
diff60_120 = abs((ma60-ma120)*100/ma120) <= dev
diff20_120 = abs((ma20-ma120)*100/ma120) <= dev

filter_torn = filter1[diff20_60 & diff60_120 & diff20_120]
print('均線糾結:', len(filter_torn))
print("、".join([f"{row['id']}{row['name']}({row['收盤價']})" for index, row in filter_torn.iterrows()]))
torn_ids = filter_torn['id'].tolist()
filter_torn

## MA分佈機率

In [None]:
# ma乖離機率
max_days = 120
def dev_ma(id, days):
    df_close = daily[daily['id'] == id][['date','id','name','close']]
    series = df_close.close
    ma = series.rolling(days).mean().tolist()[max_days - 1:]
    return ma

def cal_probability(list, threshold, now):
    p = round(np.sum(list > threshold)/len(list) * 100, 2) if threshold >= 0 else round(np.sum(list < threshold)/len(list) * 100, 2) 
    isInThreshold = ((threshold + 0.05) > now/100 > threshold)
    return f'{p}% <---' if isInThreshold else f'{p}%'

# 本益比分佈機率

In [None]:
PE_MIN = 0
PE_MAX = 100

def peDistribution(id):
    pe = daily[daily['id']==id][['date','id','name','pe']]
    pe = pe[(pe['pe'] > PE_MIN) & (pe['pe'] < PE_MAX)]
    return pe['pe'].tolist()

id = 3037
pe = peDistribution(id)
y, x, _ = plt.hist(pe, 100)
df_id = out[out['id']==id]
name = df_id['name'].iloc[0]
now_pe = df_id['pe'].iloc[0]
most_pe = round(x[np.where(y == y.max())][0], 2)
eps = df_id['eps'].iloc[0]

print(f"[{len(pe)}]", name)
print('[PE] :', '目前:', now_pe, 'most:', most_pe)
print('[EPS]:','目前:', eps)
plt.axvline(now_pe, color='k', linestyle='dashed', linewidth=1)

plt.show()

In [None]:
df_target = pd.DataFrame(columns=['count', 'id', 'name', 'now_pe','most_pe', 'avg_pe', 'eps', '收盤價', '偏離平均程度'])

def countTarget(id):
    pe = peDistribution(id)
    y, x, _ = plt.hist(pe, 100)
    df_id = out[out['id']==id]
    name = df_id['name'].iloc[0]
    now_pe = df_id['pe'].iloc[0]
    most_pe = round(x[np.where(y == y.max())][0], 2)
    avg_pe = df_id['avg_pe'].iloc[0]
    eps = df_id['eps'].iloc[0]
    close = df_id['收盤價'].iloc[0]
    cal_pe = min(most_pe, avg_pe)

    new_row = {'count': len(pe), 
               'id':id, 
               'name':name, 
               'now_pe':now_pe, 
               'most_pe':most_pe, 
               'avg_pe': avg_pe,
               'eps':eps, 
               '收盤價':close,
               '偏離平均程度':round(100*(cal_pe/now_pe - 1), 2)
              }
    
    df_target.loc[len(df_target)] = new_row
    

[countTarget(id) for id in filter1['id'].tolist()]

In [None]:
id = df_target['id'].isin(filter1['id'].tolist())

df_target_filter = (df_target[(df_target['count'] > 0)]) # 過濾pe數量>0
df_target_filter = df_target_filter.sort_values(by=['偏離平均程度'],ascending=False)
df_target_filter = df_target_filter.reset_index(drop=True)

df_target_filter

In [None]:
# 偏離程度>20
bias = df_target_filter['偏離平均程度'] > 20
df_bias = df_target_filter[bias]
print("、".join([f"{row['id']}{row['name']}({row['收盤價']})" for index, row in df_bias.iterrows()]))
df_bias