API 설명: https://opendart.fss.or.kr/guide/detail.do?apiGrpCd=DS001&apiId=2019018

In [None]:
# @title 1) 종목 기본정보 (공시코드, 종목명, 종목코드) { display-mode: "form" }

import requests
import pandas as pd
import io
import zipfile

# 여기에 당신의 Open DART API 인증키를 입력하세요.
crtfc_key = '여기에 당신의 Open DART API 인증키를 입력하세요.' # @param {type:"string"}

def get_dart_info(api_key):
    url = 'https://opendart.fss.or.kr/api/corpCode.xml'
    params = {'crtfc_key': api_key}
    try:
        results = requests.get(url, params=params)
        results.raise_for_status()  # HTTP 요청 에러가 있을 경우 예외를 발생시킵니다.

        zip_stream = io.BytesIO(results.content)
        with zipfile.ZipFile(zip_stream, 'r') as zip_ref:
            with zip_ref.open('CORPCODE.xml') as xml_file:
                xml_data = xml_file.read()

        df_dart = pd.read_xml(io.BytesIO(xml_data), xpath='//list')
        return df_dart
    except requests.RequestException as e:
        print(f"HTTP 요청 에러: {e}")
    except zipfile.BadZipFile:
        print("ZIP 파일 형식 에러")
    except Exception as e:
        print(f"기타 에러: {e}")

def process_dart_info(df_dart):
    df_temp = df_dart.copy()
    df_temp = df_temp.dropna(subset=['stock_code'])
    df_temp['modify_date'] = pd.to_datetime(df_temp['modify_date'])
    df_temp = df_temp.sort_values(by=['corp_name', 'modify_date'], ascending=[True, False])
    df_temp = df_temp.drop_duplicates(subset='corp_name', keep='first')

    df_temp = df_temp[['corp_code', 'corp_name', 'stock_code']]
    df_temp = df_temp.rename(columns={'corp_code': '공시코드', 'corp_name': '종목명', 'stock_code': '종목코드'})

    return df_temp

df_dart_info = get_dart_info(crtfc_key)
if df_dart_info is not None:
    df_dart_info_processed = process_dart_info(df_dart_info)
    df_dart_info_processed.to_csv("screening_01.csv", encoding="utf-8-sig", index=False)


In [None]:
# @title 2) 재무재표 분석 (시가총액, 연간 재무, 분기 재무, PER, PSR)
# @markdown <img src = "https://drive.google.com/uc?id=1uSbO7Qjaiiq0d_OsPvTOdyV8iFfgz9mi" height = 400 width = 600>
import asyncio
import aiohttp
import pandas as pd

async def fetch_page(session, url):
    async with session.get(url) as response:
        return await response.text()

async def financial_statements(session, code):
    URL = f"https://finance.naver.com/item/main.nhn?code={code}"
    html = await fetch_page(session, URL)

    # 연간 및 분기 재무 데이터 처리
    df = pd.read_html(html)[3]
    df.set_index(df.columns[0], inplace=True)
    df.index.rename('주요재무정보', inplace=True)
    df.columns = df.columns.droplevel(2)
    annual_financial = df.xs('최근 연간 실적', axis=1)
    quarter_financial = df.xs('최근 분기 실적', axis=1)

    # 시가총액 데이터 처리
    df = pd.read_html(html)[5]
    market_capitalization = df.loc[df[0] == '시가총액', 1].values[0]
    market_capitalization = int(market_capitalization.replace(' ','').replace(',', '').replace('억원', '').replace('조', ''))

    return annual_financial, quarter_financial, market_capitalization

def check_performance(df, target_col, target_rows):
    df = df.loc[target_rows]
    # @markdown 입력값 이상 (ex: 매출액, 영업이익, 당기순이익 0원 이상일 때에만 ⭕)
    매출액 = 0 # @param {type:"number"}
    영업이익 = 0 # @param {type:"number"}
    당기순이익 = 0 # @param {type:"number"}

    # @markdown 입력값 이하 (ex: 부채비율이 200 이하일 때에만 ⭕)
    부채비율 = 200 # @param {type:"number"}

    if df[target_col].isnull().values.any():
        return '❌'
    try:
        if float(df[target_col]['매출액']) <= 매출액: return '❌'
        if float(df[target_col]['영업이익']) <= 영업이익: return '❌'
        if float(df[target_col]['당기순이익']) <= 당기순이익: return '❌'
        if float(df[target_col]['부채비율']) >= 부채비율: return '❌'
    except ValueError:
        return "❌"
    return '⭕'

async def process_stock_code(session, df_dart_info, code, semaphore):
    async with semaphore:
        TARGET_ROWS = ['매출액', '당기순이익', '영업이익', '부채비율']

        try:
            df_an, df_qu, market_capitalization = await financial_statements(session, code)
        except Exception as e:
            #print(f"Error processing {code}: {e}")
            return

        df_dart_info.loc[df_dart_info['종목코드'] == code, f'시가총액'] = market_capitalization

        df_qu_results = []
        for idx, col in enumerate(df_qu.columns[:-1]):
            df_qu_result = check_performance(df_qu, col, TARGET_ROWS)
            df_dart_info.loc[df_dart_info['종목코드'] == code, f'q_{idx}'] = df_qu_result
            df_qu_results.append(df_qu_result)

        df_an_results = []
        for idx, col in enumerate(df_an.columns[:-1]):
            df_an_result = check_performance(df_an, col, TARGET_ROWS)
            df_dart_info.loc[df_dart_info['종목코드'] == code, f'y_{idx}'] = df_an_result
            df_an_results.append(df_an_result)


        print('{:<7} {:<14} \t시가총액(억원): {:<10} \t연간 실적: {:<3} {:<3} {:<3} \t분기 실적: {:<3} {:<3} {:<3} {:<3} {:<3}'.format(
                code,
                df_dart_info[df_dart_info['종목코드'] == code]['종목명'].values[0],
                market_capitalization,
                df_an_results[0], df_an_results[1], df_an_results[2],
                df_qu_results[0], df_qu_results[1], df_qu_results[2], df_qu_results[3], df_qu_results[4]
        ))

        if df_an_result == '⭕':
            market_capitalization = df_dart_info[df_dart_info['종목코드'] == code]['시가총액'].values[0].astype(float)
            net_income = float(df_an[col]['당기순이익'])
            revenue = float(df_an[col]['매출액'])

            df_dart_info.loc[df_dart_info['종목코드'] == code, 'PER'] = round(market_capitalization / net_income, 2)
            df_dart_info.loc[df_dart_info['종목코드'] == code, 'PSR'] = round(market_capitalization / revenue, 2)

async def main(df):
    semaphore = asyncio.Semaphore(1500)
    async with aiohttp.ClientSession() as session:
        tasks = [process_stock_code(session, df, code, semaphore) for code in df['종목코드']]
        await asyncio.gather(*tasks)

    return df

with open("screening_01.csv") as file :
    df_dart_info = pd.read_csv(file)

df_dart_info = df_dart_info.dropna(subset=['종목코드'])
df_dart_info['종목코드'] = df_dart_info['종목코드'].astype(str).str.split('.').str[0].str.zfill(6)

df_financial_statements = await main(df_dart_info)
df_financial_statements = df_financial_statements.dropna(subset=['시가총액'])
df_financial_statements.to_csv("screening_02.csv", encoding="utf-8-sig", index=False)

001500  현대차증권          	시가총액(억원): 3032       	연간 실적: ❌   ❌   ❌   	분기 실적: ❌   ❌   ❌   ❌   ❌  
057050  현대홈쇼핑          	시가총액(억원): 5568       	연간 실적: ⭕   ⭕   ⭕   	분기 실적: ⭕   ❌   ⭕   ⭕   ⭕  
008770  호텔신라           	시가총액(억원): 23745      	연간 실적: ❌   ❌   ❌   	분기 실적: ❌   ❌   ❌   ❌   ❌  
127980  화인써키트          	시가총액(억원): 1036       	연간 실적: ⭕   ⭕   ⭕   	분기 실적: ⭕   ❌   ⭕   ⭕   ⭕  
126640  화신정공           	시가총액(억원): 591        	연간 실적: ⭕   ⭕   ⭕   	분기 실적: ⭕   ⭕   ⭕   ⭕   ⭕  
353190  휴럼             	시가총액(억원): 438        	연간 실적: ❌   ❌   ❌   	분기 실적: ⭕   ❌   ⭕   ⭕   ⭕  
000850  화천기공           	시가총액(억원): 728        	연간 실적: ⭕   ⭕   ⭕   	분기 실적: ⭕   ⭕   ⭕   ⭕   ❌  
039610  화성밸브           	시가총액(억원): 578        	연간 실적: ⭕   ⭕   ⭕   	분기 실적: ⭕   ⭕   ⭕   ⭕   ⭕  
084110  휴온스글로벌         	시가총액(억원): 3136       	연간 실적: ⭕   ⭕   ❌   	분기 실적: ⭕   ❌   ⭕   ⭕   ⭕  
037440  희림             	시가총액(억원): 987        	연간 실적: ⭕   ⭕   ⭕   	분기 실적: ⭕   ⭕   ⭕   ⭕   ⭕  
028080  휴맥스홀딩스         	시가총액(억원): 552        	연간 실적: ❌   ❌   ❌   	분기 실

In [None]:
# @title 3) 공시 분석

import asyncio
import aiohttp
import pandas as pd
from datetime import datetime
from dateutil.relativedelta import relativedelta
import numpy as np
import requests

with open("screening_02.csv") as file :
    df_financial_statements = pd.read_csv(file)

# 공시코드 열의 값들을 문자열로 변환하고 8자리로 맞춤
df_financial_statements['공시코드'] = df_financial_statements['공시코드'].astype(str).str.zfill(8)

# 공시기간 설정
today = datetime.now()
end_de = today.strftime("%Y%m%d")
bgn_de = (today - relativedelta(years=5)).strftime("%Y%m%d")

crtfc_key = '여기에 당신의 Open DART API 인증키를 입력하세요.' # @param {type:"string"}

def check_disclosure(df):
    위험공시 = ['유상증자', '감자', '단기차입금', '전환사채', '전환청구권', '검토의견부적정', '파생상품거래손실', '횡령', '영업정지'] # @param {type:"string"}

    for report in df['report_nm']:
        contain = any(item in report for item in 위험공시)

        if contain:
            disclosure_date = df[df['report_nm'] == report]['rcept_dt'].values[0]
            return f'{disclosure_date}: {report}'

    return "⭕"

async def fetch_page(session, url, params, max_retries=5):
    for attempt in range(max_retries):
        try:
            async with session.get(url, params=params) as response:
                return await response.json()
        except aiohttp.ClientError:
            if attempt < max_retries - 1:
                await asyncio.sleep(3 ** attempt)  # 지수적 백오프
            else:
                raise

async def process_page(session, url, params):
    json = await fetch_page(session, url, params)
    df_temp = pd.DataFrame(json.get('list')).dropna(how='all')

    return df_temp

async def get_total_page(session, url, params):
    json = await fetch_page(session, url, params)
    total_page = json.get('total_page')

    return total_page

async def fetch_disclosure(code, semaphore):
    async with semaphore, aiohttp.ClientSession() as session:
        df_disclosure = pd.DataFrame()
        url = 'https://opendart.fss.or.kr/api/list.json'
        params = {
            'crtfc_key': crtfc_key,
            'corp_code': code,
            'bgn_de': bgn_de,
            'end_de': end_de,
            'page_no': 0,
            'page_count': 100,
            'last_reprt_at': 'Y',
        }

        try:
            total_page = await get_total_page(session,url,params)
            tasks = [process_page(session, url, {**params, 'page_no': page}) for page in range(1, total_page+1)]
            pages = await asyncio.gather(*tasks)
        except Exception as e:
            print(f"Error processing {code}: {e}")
            return code, "Error"

        for df_temp in pages:
            if df_temp is not None:
                df_disclosure = pd.concat([df_disclosure, df_temp], axis=0)

        disclosure_value = check_disclosure(df_disclosure)

        print('{:<7} {:<14} \t {:<100}'.format(
            code,
            df_financial_statements[df_financial_statements['공시코드'] == code]['종목명'].values[0],
            disclosure_value
        ))
        return code, disclosure_value

async def main():
    semaphore = asyncio.Semaphore(5)
    tasks = [fetch_disclosure(code, semaphore) for code in df_financial_statements['공시코드']]
    results = await asyncio.gather(*tasks)

    for code, disclosure_value in results:
        df_financial_statements.loc[df_financial_statements['공시코드'] == code, '공시'] = disclosure_value

    return df_financial_statements

df_disclosure = await main()
df_disclosure.to_csv("screening_03.csv", encoding="utf-8-sig", index=False)

01203808 AP시스템          	 20190611: 단기차입금증가결정                                                                                 
00365387 AJ네트웍스         	 20230203: 유상증자결정(종속회사의주요경영사항)                                                                       
00378363 3S             	 20221115: [기재정정]주요사항보고서(유상증자결정)                                                                     
00296078 APS            	 20211223: 유상증자결정(종속회사의주요경영사항)                                                                       
00125080 AK홀딩스          	 20230925: 주요사항보고서(유상증자결정)(자회사의 주요경영사항)                                                              
00874803 AP위성           	 ⭕                                                                                                   
00656021 BF랩스           	 20240202: 전환청구권행사                                                                                   
00219097 BGF            	 20230814: 파생상품거래손실발생(자회사의 주요경영사항)                                                    

In [None]:
# @title 4) 최근 3개월 이내 거래량이 발생했는지 여부

from urllib import parse
from ast import literal_eval
from sklearn.ensemble import IsolationForest

with open("screening_03.csv") as file :
    df_disclosure = pd.read_csv(file)

df_disclosure['종목코드'] = df_disclosure['종목코드'].astype(str).str.zfill(6)

# 거래량 기간 설정
# @markdown 거래량 범위 설정 (※최소 3개월 이상으로 설정)
기간 = 1 # @param {type:"integer"}
단위 = '\uB144' # @param ["년", "월", "일"]

end_time = datetime.now().strftime("%Y%m%d")
time_delta = {
    '년': relativedelta(years=기간),
    '월': relativedelta(months=기간),
    '일': relativedelta(days=기간)
}
start_time = (datetime.now() - time_delta[단위]).strftime("%Y%m%d")

def filter_recent_data(df):
    within_3_months_example = df[
        df['날짜'].apply(lambda x: (today - datetime.strptime(str(x), '%Y%m%d')).days <= 90)
    ]
    return within_3_months_example

def calculate_top_volume(volume, df):
    percent = [volume/100]
    df_statistics = df['거래량'].describe(percentiles = percent)
    upper_threshold = df_statistics[f'{volume}%']
    df = df[df['거래량'] > upper_threshold]

    return df

def check_outlier(volume, df):
    df_outlier = calculate_top_volume(volume, df)
    if df_outlier.empty: return "⭕"

    df_within_3_months = filter_recent_data(df_outlier)
    if df_within_3_months.empty: return "⭕"

    return df_within_3_months['날짜'].values[-1]

def lowwer(df):
    target_data = df['저가'].values.reshape(-1, 1)
    return min(target_data)

def upper(df):
    target_data = df['고가'].values.reshape(-1, 1)
    return max(target_data)

async def fetch_page(session, url):
    async with session.get(url) as response:
        return await response.text()

async def process_page(session,code):
    param = {
    	'symbol':code,
	    'requestType':1,
	    'startTime':start_time,
	    'endTime':end_time,
	    'timeframe':'day'
    }
    param_url = parse.urlencode(param)
    url="https://api.finance.naver.com/siseJson.naver?%s"%(param_url)

    response = await fetch_page(session, url)
    response_list = literal_eval(response.strip())
    df_temp = pd.DataFrame(response_list[1:], columns=response_list[0])

    return df_temp

async def fetch_volume(session, df, code):
    try:
        # @markdown 거래량 범위 내 거래량 상위 n%가 3개월 이내 존재여부
        거래량_1 = 95 # @param {type:"slider", min:75, max:100, step:1}
        거래량_2 = 98 # @param {type:"slider", min:75, max:100, step:1}

        df_volume = await process_page(session, code)
        volume_1 = check_outlier(거래량_1, df_volume)
        volume_2 = check_outlier(거래량_2, df_volume)
        lowwer_value = lowwer(df_volume)[0]
        upper_value = upper(df_volume)[0]

        print('{:<7} {:<14} \t{:<6} \t{:<6}'.format(
            code,
            df[df['종목코드'] == code]['종목명'].values[0],
            volume_1, volume_2
        ))

        df.loc[df['종목코드'] == code, f'거래량 상위{거래량_1}%'] = volume_1
        df.loc[df['종목코드'] == code, f'거래량 상위{거래량_2}%'] = volume_2
        df.loc[df['종목코드'] == code, f'52주 최저가'] = lowwer_value
        df.loc[df['종목코드'] == code, f'52주 최고가'] = upper_value

    except Exception as e:
        print(f"Error processing {code}: {e}")
        return

async def main():
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_volume(session, df_disclosure, code) for code in df_disclosure['종목코드']]
        await asyncio.gather(*tasks)

    return df_disclosure

df_volume = await main()
df_volume.to_csv("screening_04.csv", encoding="utf-8-sig", index=False)

456440  DB금융스팩11호      	⭕      	⭕     
138930  BNK금융지주        	20240219 	20240219
069730  DSR제강          	⭕      	⭕     
000120  CJ대한통운         	20240202 	20231201
035760  CJ ENM         	20240215 	20240215
245620  EDGC           	20240131 	20240131
241520  DSC인베스트먼트      	20240220 	20240214
139130  DGB금융지주        	20240208 	20240202
054620  APS            	20240115 	⭕     
028300  HLB            	20240219 	20240129
011200  HMM            	20240112 	20231222
097950  CJ제일제당         	⭕      	⭕     
013720  CBI            	20240105 	20240105
900290  GRT            	⭕      	⭕     
180400  DXVX           	20240119 	20240116
017940  E1             	20240102 	20240102
060310  3S             	20240122 	20240112
099520  ITX-AI         	⭕      	⭕     
095570  AJ네트웍스         	20231219 	⭕     
078930  GS             	20240207 	20240202
403870  HPSP           	20240214 	20240214
115450  HLB테라퓨틱스       	20240220 	20240130
440290  HB인베스트먼트       	20240125 	20240125
006360  GS건설           	⭕      	⭕  