# 09. 통합 CAN SLIM 스크리닝 시스템

## 목표
- 모든 CAN SLIM 지표를 통합한 자동 스크리닝 시스템 구축
- 종합 점수 산출 및 순위 결정
- 최종 투자 후보 종목 선정

## CAN SLIM 전체 기준
- **C**: 분기 EPS 25% 이상 성장
- **A**: 연간 EPS 25% 이상 성장, ROE 17% 이상
- **N**: 52주 신고가 85% 이상
- **S**: 적절한 수급 상황
- **L**: 상대강도 80 이상
- **I**: 기관 매수 증가
- **M**: 시장 상승 추세

In [None]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from pykrx import stock
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
import time
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')

# 날짜 설정
end_date = datetime.now()
start_date = end_date - timedelta(days=365*2)

start_str = start_date.strftime('%Y%m%d')
end_str = end_date.strftime('%Y%m%d')

print(f"스크리닝 기간: {start_str} ~ {end_str}")

## 1. 통합 데이터 수집 클래스

In [None]:
class CANSLIMAnalyzer:
    """
    CAN SLIM 전체 지표 분석 클래스
    """
    
    def __init__(self, ticker, start_date, end_date):
        self.ticker = ticker
        self.start_date = start_date
        self.end_date = end_date
        self.name = stock.get_market_ticker_name(ticker)
        self.data = {}
        self.scores = {}
        
    def collect_data(self):
        """모든 필요 데이터 수집"""
        try:
            # 주가 데이터
            self.data['price'] = stock.get_market_ohlcv_by_date(
                self.start_date, self.end_date, self.ticker
            )
            
            # 재무 데이터
            self.data['fundamental'] = stock.get_market_fundamental_by_date(
                self.start_date, self.end_date, self.ticker
            )
            
            # 투자자별 매매
            self.data['investor'] = stock.get_market_trading_value_by_date(
                self.start_date, self.end_date, self.ticker, detail=True
            )
            
            return True
        except Exception as e:
            print(f"Error collecting data for {self.ticker}: {e}")
            return False
    
    def calculate_c_score(self):
        """C - Current Earnings 점수"""
        df = self.data['fundamental']
        if df.empty or len(df) < 252:
            return 0
        
        # 분기 EPS 성장률 (YoY)
        current_eps = df['EPS'].iloc[-1]
        year_ago_eps = df['EPS'].iloc[-252] if len(df) > 252 else df['EPS'].iloc[0]
        
        eps_growth = ((current_eps / year_ago_eps) - 1) * 100 if year_ago_eps != 0 else 0
        
        score = 0
        if eps_growth >= 50:
            score = 100
        elif eps_growth >= 25:
            score = 75
        elif eps_growth >= 15:
            score = 50
        elif eps_growth >= 0:
            score = 25
        
        self.scores['C'] = score
        self.data['eps_growth'] = eps_growth
        return score
    
    def calculate_a_score(self):
        """A - Annual Earnings 점수"""
        df = self.data['fundamental']
        if df.empty:
            return 0
        
        # ROE 계산
        roe = df['EPS'].iloc[-1] / df['BPS'].iloc[-1] if df['BPS'].iloc[-1] != 0 else 0
        
        # 연간 EPS CAGR (가능한 데이터로)
        if len(df) >= 252:
            years = len(df) / 252
            start_eps = df['EPS'].iloc[0]
            end_eps = df['EPS'].iloc[-1]
            if start_eps > 0:
                cagr = (pow(end_eps / start_eps, 1/years) - 1) * 100
            else:
                cagr = 0
        else:
            cagr = 0
        
        score = 0
        # ROE 점수 (50점)
        if roe >= 0.20:
            score += 50
        elif roe >= 0.17:
            score += 35
        elif roe >= 0.10:
            score += 20
        
        # CAGR 점수 (50점)
        if cagr >= 25:
            score += 50
        elif cagr >= 15:
            score += 35
        elif cagr >= 10:
            score += 20
        
        self.scores['A'] = score
        self.data['roe'] = roe
        self.data['eps_cagr'] = cagr
        return score
    
    def calculate_n_score(self):
        """N - New Highs 점수"""
        df = self.data['price']
        if df.empty or len(df) < 252:
            return 0
        
        # 52주 최고가 대비
        high_52w = df['고가'].rolling(window=252, min_periods=1).max().iloc[-1]
        current_price = df['종가'].iloc[-1]
        price_ratio = current_price / high_52w if high_52w != 0 else 0
        
        # 거래량 증가
        recent_volume = df['거래량'].tail(5).mean()
        avg_volume = df['거래량'].tail(60).mean()
        volume_ratio = recent_volume / avg_volume if avg_volume != 0 else 1
        
        score = 0
        # 가격 위치 점수 (70점)
        if price_ratio >= 0.95:
            score += 70
        elif price_ratio >= 0.85:
            score += 50
        elif price_ratio >= 0.75:
            score += 30
        
        # 거래량 점수 (30점)
        if volume_ratio >= 1.5:
            score += 30
        elif volume_ratio >= 1.2:
            score += 20
        elif volume_ratio >= 1.0:
            score += 10
        
        self.scores['N'] = score
        self.data['price_52w_ratio'] = price_ratio
        self.data['volume_ratio'] = volume_ratio
        return score
    
    def calculate_s_score(self):
        """S - Supply and Demand 점수"""
        # 시가총액과 유동성
        df = self.data['price']
        if df.empty:
            return 0
        
        # 거래 회전율
        avg_volume = df['거래량'].tail(20).mean()
        
        # 간단한 점수 산정 (거래량 기반)
        score = 50  # 기본 점수
        if avg_volume > 1000000:
            score = 70
        elif avg_volume > 500000:
            score = 60
        
        self.scores['S'] = score
        return score
    
    def calculate_l_score(self):
        """L - Leader or Laggard 점수"""
        df = self.data['price']
        if df.empty:
            return 0
        
        # 상대강도 (간단 버전: 3개월 수익률)
        if len(df) >= 60:
            returns_3m = (df['종가'].iloc[-1] / df['종가'].iloc[-60] - 1) * 100
        else:
            returns_3m = 0
        
        score = 0
        if returns_3m >= 30:
            score = 100
        elif returns_3m >= 20:
            score = 75
        elif returns_3m >= 10:
            score = 50
        elif returns_3m >= 0:
            score = 25
        
        self.scores['L'] = score
        self.data['returns_3m'] = returns_3m
        return score
    
    def calculate_i_score(self):
        """I - Institutional Sponsorship 점수"""
        df = self.data['investor']
        if df.empty:
            return 0
        
        # 최근 20일 기관 순매수
        inst_buying = df['기관'].tail(20).sum() if '기관' in df.columns else 0
        foreign_buying = df.get('외국인계', df.get('외국인', pd.Series([0]))).tail(20).sum()
        
        score = 50  # 기본 점수
        if inst_buying > 0 and foreign_buying > 0:
            score = 100
        elif inst_buying > 0 or foreign_buying > 0:
            score = 75
        elif inst_buying == 0 and foreign_buying == 0:
            score = 50
        else:
            score = 25
        
        self.scores['I'] = score
        self.data['inst_buying_20d'] = inst_buying
        self.data['foreign_buying_20d'] = foreign_buying
        return score
    
    def calculate_m_score(self):
        """M - Market Direction 점수"""
        # 시장 지수 확인 (KOSPI)
        try:
            kospi = stock.get_index_ohlcv_by_date(self.start_date, self.end_date, '1001')
            if len(kospi) >= 20:
                ma20 = kospi['종가'].rolling(window=20).mean().iloc[-1]
                current = kospi['종가'].iloc[-1]
                
                if current > ma20 * 1.02:  # 20일 이평선 2% 이상
                    score = 100
                elif current > ma20:
                    score = 75
                elif current > ma20 * 0.98:
                    score = 50
                else:
                    score = 25
            else:
                score = 50
        except:
            score = 50  # 기본 점수
        
        self.scores['M'] = score
        return score
    
    def calculate_total_score(self):
        """전체 CAN SLIM 점수 계산"""
        weights = {
            'C': 0.20,
            'A': 0.15,
            'N': 0.15,
            'S': 0.10,
            'L': 0.15,
            'I': 0.15,
            'M': 0.10
        }
        
        total = sum(self.scores.get(key, 0) * weight 
                   for key, weight in weights.items())
        
        self.scores['TOTAL'] = total
        return total
    
    def analyze(self):
        """전체 분석 실행"""
        if not self.collect_data():
            return None
        
        self.calculate_c_score()
        self.calculate_a_score()
        self.calculate_n_score()
        self.calculate_s_score()
        self.calculate_l_score()
        self.calculate_i_score()
        self.calculate_m_score()
        self.calculate_total_score()
        
        return self.get_summary()
    
    def get_summary(self):
        """분석 결과 요약"""
        return {
            'ticker': self.ticker,
            'name': self.name,
            'C_Score': self.scores.get('C', 0),
            'A_Score': self.scores.get('A', 0),
            'N_Score': self.scores.get('N', 0),
            'S_Score': self.scores.get('S', 0),
            'L_Score': self.scores.get('L', 0),
            'I_Score': self.scores.get('I', 0),
            'M_Score': self.scores.get('M', 0),
            'Total_Score': self.scores.get('TOTAL', 0),
            'EPS_Growth': self.data.get('eps_growth', 0),
            'ROE': self.data.get('roe', 0),
            '52W_Ratio': self.data.get('price_52w_ratio', 0),
            'Returns_3M': self.data.get('returns_3m', 0)
        }

# 테스트
analyzer = CANSLIMAnalyzer('005930', start_str, end_str)
result = analyzer.analyze()
if result:
    print("삼성전자 CAN SLIM 분석:")
    for key, value in result.items():
        if 'Score' in key:
            print(f"{key}: {value:.1f}")
        elif isinstance(value, float):
            print(f"{key}: {value:.2f}")
        else:
            print(f"{key}: {value}")

## 2. 대량 종목 스크리닝

In [None]:
def screen_stocks(tickers, start_date, end_date):
    """
    여러 종목 CAN SLIM 스크리닝
    """
    results = []
    
    for ticker in tqdm(tickers, desc="종목 스크리닝"):
        try:
            analyzer = CANSLIMAnalyzer(ticker, start_date, end_date)
            result = analyzer.analyze()
            
            if result:
                results.append(result)
            
            time.sleep(0.5)  # API 제한 방지
            
        except Exception as e:
            print(f"\nError with {ticker}: {e}")
            continue
    
    return pd.DataFrame(results)

# 주요 대형주 스크리닝
major_stocks = [
    '005930',  # 삼성전자
    '000660',  # SK하이닉스
    '035420',  # 네이버
    '035720',  # 카카오
    '051910',  # LG화학
    '006400',  # 삼성SDI
    '005490',  # POSCO
    '005380',  # 현대차
    '000270',  # 기아
    '068270',  # 셀트리온
]

screening_results = screen_stocks(major_stocks, start_str, end_str)
print("\n스크리닝 완료!")
print(f"분석 종목 수: {len(screening_results)}")

## 3. 결과 분석 및 시각화

In [None]:
# 종합 점수 기준 정렬
screening_results = screening_results.sort_values('Total_Score', ascending=False)

print("\n📊 CAN SLIM 종합 순위")
print("="*80)
display_cols = ['name', 'Total_Score', 'C_Score', 'A_Score', 'N_Score', 
                'L_Score', 'I_Score', 'EPS_Growth', 'ROE', '52W_Ratio']
print(screening_results[display_cols].to_string(index=False))

# 상위 종목
top_stocks = screening_results[screening_results['Total_Score'] >= 60]
print(f"\n✅ 투자 적합 종목 (점수 60 이상): {len(top_stocks)}개")
for _, row in top_stocks.iterrows():
    print(f"  - {row['name']} ({row['ticker']}): {row['Total_Score']:.1f}점")

In [None]:
# 레이더 차트로 시각화
def create_radar_chart(df, top_n=5):
    """
    상위 종목의 CAN SLIM 지표 레이더 차트
    """
    categories = ['C', 'A', 'N', 'S', 'L', 'I', 'M']
    
    fig = go.Figure()
    
    for i in range(min(top_n, len(df))):
        row = df.iloc[i]
        values = [row[f'{cat}_Score'] for cat in categories]
        
        fig.add_trace(go.Scatterpolar(
            r=values,
            theta=categories,
            fill='toself',
            name=f"{row['name']} ({row['Total_Score']:.1f})"
        ))
    
    fig.update_layout(
        polar=dict(
            radialaxis=dict(
                visible=True,
                range=[0, 100]
            )
        ),
        showlegend=True,
        title="CAN SLIM 지표 비교 (상위 5개 종목)",
        height=500
    )
    
    return fig

# 레이더 차트 생성
radar_fig = create_radar_chart(screening_results)
radar_fig.show()

In [None]:
# 히트맵으로 전체 점수 표시
def create_heatmap(df):
    """
    전체 종목의 CAN SLIM 점수 히트맵
    """
    score_cols = ['C_Score', 'A_Score', 'N_Score', 'S_Score', 
                  'L_Score', 'I_Score', 'M_Score']
    
    heatmap_data = df[score_cols].T
    heatmap_data.columns = df['name'].values
    
    fig = px.imshow(
        heatmap_data,
        labels=dict(x="종목", y="지표", color="점수"),
        x=heatmap_data.columns,
        y=['C', 'A', 'N', 'S', 'L', 'I', 'M'],
        color_continuous_scale='RdYlGn',
        aspect="auto",
        title="CAN SLIM 지표별 점수 히트맵"
    )
    
    fig.update_layout(height=400)
    return fig

heatmap_fig = create_heatmap(screening_results)
heatmap_fig.show()

## 4. 투자 포트폴리오 구성

In [None]:
def create_portfolio(screening_df, min_score=60, max_stocks=10):
    """
    CAN SLIM 기준으로 포트폴리오 구성
    """
    # 기준 충족 종목 필터링
    qualified = screening_df[screening_df['Total_Score'] >= min_score].copy()
    
    # 상위 종목 선택
    portfolio = qualified.head(max_stocks)
    
    # 가중치 계산 (점수 기반)
    total_score = portfolio['Total_Score'].sum()
    portfolio['Weight'] = portfolio['Total_Score'] / total_score
    
    # 리스크 분산을 위한 최대 가중치 제한
    max_weight = 0.25
    portfolio['Adjusted_Weight'] = portfolio['Weight'].clip(upper=max_weight)
    portfolio['Adjusted_Weight'] = portfolio['Adjusted_Weight'] / portfolio['Adjusted_Weight'].sum()
    
    return portfolio

# 포트폴리오 생성
portfolio = create_portfolio(screening_results)

print("\n💼 추천 포트폴리오")
print("="*60)
for _, row in portfolio.iterrows():
    print(f"{row['name']:15} ({row['ticker']}): {row['Adjusted_Weight']*100:5.1f}% | 점수: {row['Total_Score']:.1f}")

print(f"\n포트폴리오 종목 수: {len(portfolio)}")
print(f"평균 CAN SLIM 점수: {portfolio['Total_Score'].mean():.1f}")

## 5. 백테스팅 프레임워크

In [None]:
def backtest_strategy(portfolio_df, start_date, end_date):
    """
    간단한 백테스팅
    """
    returns = []
    
    for _, row in portfolio_df.iterrows():
        try:
            # 주가 데이터
            price_data = stock.get_market_ohlcv_by_date(start_date, end_date, row['ticker'])
            
            if not price_data.empty:
                # 단순 Buy & Hold 수익률
                start_price = price_data['종가'].iloc[0]
                end_price = price_data['종가'].iloc[-1]
                stock_return = (end_price / start_price - 1) * 100
                
                returns.append({
                    'ticker': row['ticker'],
                    'name': row['name'],
                    'weight': row['Adjusted_Weight'],
                    'return': stock_return,
                    'weighted_return': stock_return * row['Adjusted_Weight']
                })
        except:
            continue
    
    returns_df = pd.DataFrame(returns)
    portfolio_return = returns_df['weighted_return'].sum()
    
    # KOSPI 벤치마크
    kospi = stock.get_index_ohlcv_by_date(start_date, end_date, '1001')
    kospi_return = (kospi['종가'].iloc[-1] / kospi['종가'].iloc[0] - 1) * 100
    
    print("\n📈 백테스팅 결과 (최근 6개월)")
    print("="*50)
    print(returns_df[['name', 'weight', 'return', 'weighted_return']].to_string(index=False))
    print(f"\n포트폴리오 수익률: {portfolio_return:.2f}%")
    print(f"KOSPI 수익률: {kospi_return:.2f}%")
    print(f"초과 수익률: {portfolio_return - kospi_return:.2f}%")
    
    return returns_df

# 백테스팅 실행 (최근 6개월)
backtest_start = (datetime.now() - timedelta(days=180)).strftime('%Y%m%d')
backtest_end = end_str

if len(portfolio) > 0:
    backtest_results = backtest_strategy(portfolio, backtest_start, backtest_end)

## 6. 실시간 모니터링 대시보드

In [None]:
def create_monitoring_dashboard(portfolio_df):
    """
    포트폴리오 모니터링 대시보드 생성
    """
    fig = make_subplots(
        rows=2, cols=2,
        subplot_titles=('포트폴리오 구성', 'CAN SLIM 점수 분포', 
                       '주요 지표', '리스크 지표'),
        specs=[[{'type': 'pie'}, {'type': 'bar'}],
               [{'type': 'scatter'}, {'type': 'bar'}]]
    )
    
    # 1. 포트폴리오 구성
    fig.add_trace(
        go.Pie(labels=portfolio_df['name'], values=portfolio_df['Adjusted_Weight'],
               hole=0.3),
        row=1, col=1
    )
    
    # 2. CAN SLIM 점수
    fig.add_trace(
        go.Bar(x=portfolio_df['name'], y=portfolio_df['Total_Score'],
               marker_color='lightgreen'),
        row=1, col=2
    )
    
    # 3. 주요 지표
    fig.add_trace(
        go.Scatter(x=portfolio_df['EPS_Growth'], y=portfolio_df['ROE'],
                   mode='markers+text', text=portfolio_df['name'],
                   textposition='top center',
                   marker=dict(size=portfolio_df['Total_Score']/2, 
                              color=portfolio_df['Total_Score'],
                              colorscale='Viridis')),
        row=2, col=1
    )
    
    # 4. 리스크 지표
    risk_metrics = portfolio_df[['name', '52W_Ratio']].copy()
    fig.add_trace(
        go.Bar(x=risk_metrics['name'], y=risk_metrics['52W_Ratio'],
               marker_color='lightcoral'),
        row=2, col=2
    )
    
    fig.update_layout(height=800, showlegend=False,
                     title_text="CAN SLIM 포트폴리오 모니터링 대시보드")
    
    fig.update_xaxes(title_text="EPS 성장률(%)", row=2, col=1)
    fig.update_yaxes(title_text="ROE", row=2, col=1)
    fig.update_yaxes(title_text="52주 최고가 대비", row=2, col=2)
    
    return fig

if len(portfolio) > 0:
    dashboard = create_monitoring_dashboard(portfolio)
    dashboard.show()

## 7. 결과 저장

In [None]:
# 결과 저장
import os
os.makedirs('results', exist_ok=True)

# 전체 스크리닝 결과
screening_results.to_csv('results/canslim_screening_results.csv', index=False, encoding='utf-8-sig')
print("스크리닝 결과 저장: results/canslim_screening_results.csv")

# 포트폴리오
if len(portfolio) > 0:
    portfolio.to_csv('results/canslim_portfolio.csv', index=False, encoding='utf-8-sig')
    print("포트폴리오 저장: results/canslim_portfolio.csv")

# 요약 리포트
with open('results/canslim_report.txt', 'w', encoding='utf-8') as f:
    f.write("CAN SLIM 스크리닝 리포트\n")
    f.write("="*50 + "\n")
    f.write(f"분석 일자: {datetime.now().strftime('%Y-%m-%d')}\n")
    f.write(f"분석 종목 수: {len(screening_results)}\n")
    f.write(f"투자 적합 종목: {len(screening_results[screening_results['Total_Score'] >= 60])}\n")
    f.write(f"\n상위 5개 종목:\n")
    for i, row in screening_results.head(5).iterrows():
        f.write(f"  {i+1}. {row['name']} ({row['ticker']}): {row['Total_Score']:.1f}점\n")

print("리포트 저장: results/canslim_report.txt")

## 마무리

### 구현 완료 사항
1. ✅ 모든 CAN SLIM 지표 통합 분석
2. ✅ 자동화된 스크리닝 시스템
3. ✅ 종합 점수 산출 및 순위화
4. ✅ 포트폴리오 구성 및 가중치 배분
5. ✅ 백테스팅 프레임워크
6. ✅ 시각화 대시보드

### 실전 활용 가이드
1. **매주 실행**: 주말에 전체 스크리닝 실행
2. **점수 기준**: 60점 이상 종목 주목, 70점 이상 우선 투자
3. **분산 투자**: 최소 5개, 최대 15개 종목
4. **리밸런싱**: 월 1회 포트폴리오 재조정
5. **손절 기준**: 7-8% 손실 시 기계적 손절

### 추가 개선 사항
- 실시간 데이터 연동
- 자동 매매 시스템 연결
- 리스크 관리 모듈 강화
- 섹터별 분석 추가
- 머신러닝 예측 모델 통합

**투자에 성공하시길 바랍니다! 📈**