# 04. 재무제표 API 이용해보기

### dart-fss를 사용하기 위한 세팅 후, 기업 코드 불러오기

In [None]:
!pip install dart-fss -q

In [None]:
# pandas와 dart_fss모듈을 import합니다.
import pandas as pd
import dart_fss

from google.colab import drive
import os

from datetime import datetime

### dart-fss를 사용하기 위한 세팅 후, 기업 코드 불러오기

In [None]:
API_KEY = '69db60f6b16e7a7e91ae38ced61b62c7f914f789'
FS_DAY = '재무제표기준일'
REPORT_DAY = '레포트기준일'

In [None]:
dart_fss.set_api_key(api_key=API_KEY)

corp_list = dart_fss.corp.get_corp_list()
# 회사 이름으로 기업 코드 찾기
corp_code = corp_list.find_by_corp_name('삼성전자', exactly=True)[0].corp_code

### 전자공시 시스템 DART에서 분기별 보고서 다운로드

In [None]:
# 현재 작업경로 확인
print(os.getcwd())

# drive모듈을 이용해서 드라이브 마운트하기
drive.mount('/content/drive')

# 다운받은 보고서 파일을 저장할 경로 지정
path = "/content/drive/MyDrive/temp2"

### DART의 재무제표 추출 메서드로 재무제표 다운로드 후 저장하고 로드하기

In [None]:
# 분기 키워드를 리스트로 저장, 이후 순회하며 사용
periods = ['annual', 'half', 'quarter']

# 주기별 보고서를 다운받아 로컬에 저장 후 dataframe으로 반환
def dir_exists(dir_path):
    if not os.path.exists(dir_path):
        os.makedirs(dir_path, exist_ok=True)

def save_report_if_not_exists(corp_code, period, start_date, dir_path):
    file_name = f"{corp_code}_{period}_{start_date[:4]}.xlsx"
    file_path = os.path.join(dir_path, file_name)

    if not os.path.exists(file_path):
        report = dart_fss.fs.extract(corp_code, start_date, separate=False, report_tp=[period])
        report.save(file_name, dir_path)

    return file_path

def download_reports(corp_code, start_date, dir_path):
    reports = {}
    dir_exists(dir_path)

    for period in periods:
        file_path = save_report_if_not_exists(corp_code, period, start_date, dir_path)
        reports[period] = pd.read_excel(file_path, sheet_name=None)

    return reports

In [None]:
start_yr = 2021
start_date = str(start_yr) + '0101'
fs_reports = download_reports(corp_code, start_date, path)

### 엑셀에 포함되어 있던 여러 시트 중 필요한 시트만 추려내기

In [None]:
# 보고서 데이터프레임에서 필요한 데이터가 있는 시트만 추려내는 함수
def extract_data_sheets(reports):
  # 매개변수 reports는 각 분기의 레포트를 받음(fs_df['annual'])
  return{
      # 필요한 시트를 딕셔너리로 반환 {보고서이름: 보고서내용}
      sheet_name[-2:]: sheet_data
      for sheet_name, sheet_data in reports.items()
      if sheet_name.startswith('Data') and sheet_name[-3] == '_'
  }

In [None]:
# 필요한 시트를 분기 키워드에 맞게 딕셔너리로 반환
def get_fs_dict(fs_reports):
  return {
      # 분기별 시트를 딕셔너리로 반환 {분기: 보고서딕셔너리}
      period: extract_data_sheets(reports)
      for period, reports in fs_reports.items()
  }

In [None]:
# 분기별 재무제표 데이터를 딕셔너리로 정리
fs_df = get_fs_dict(fs_reports)

# 호출 방법 (bs: 재무상태표, is: 손익계산서, cf: 현금흐름표)
fs_df['annual']['bs'].head()

### 데이터프레임의 인덱스와 컬럼 정리하기

In [None]:
# 재무제표 타입마다 다른 라벨의 수를 구하는 함수.
def get_last_label_position(col_list):
  end_idx, bef = 0, 0
  for i in range(1, len(col_list)):
    front_letters = col_list[i][:2]
    if ('Un' == front_letters) and bef == 0:
      end_idx += 1
      bef = 1
    elif ('Un' == front_letters) and bef == 1:
      end_idx += 1
    elif ('Un' != front_letters) and bef == 0:
      end_idx += 1
      continue
    else:
      break
  return end_idx

In [None]:
def get_last_label_position(col_list):
  last_idx = 0
  for i in range(1, len(col_list)):
    if col_list[i][:2] == 'Un':
      last_idx += 1
    elif last_idx > 0:
      # 이미 'Un'을 만난 후 다른 값을 만나면 중단
      break
    else:
      last_idx += 1
  return last_idx

In [None]:
# 불필요한 라벨 삭제, 행열 전치
def arrange_labels(df):
  last_label_idx = get_last_label_position(df.columns)

  # 칼럼 이름 바꾸기
  df.rename(columns={df.columns[idx]: df.iloc[0, idx] for idx in range(last_label_idx + 1)}, inplace=True)

  # 라벨의 라벨 역할을 하는 0, 1번째 행 삭제
  df.drop([0,1], axis=0, inplace=True)

  # 사용하지 않는 라벨 칼럼 삭제
  df.drop(df.columns[0:2], axis=1, inplace=True)
  df.drop(df.columns[1:last_label_idx-1], axis=1, inplace=True)

  # 인덱스 설정 및 전치
  df.set_index(df.columns[0], inplace=True)
  df = df.transpose()

  return df

In [None]:
fs_types = fs_df['annual'].keys()

for period in periods:
  for fs_type in fs_types:
    fs_df[period][fs_type] = arrange_labels(fs_df[period][fs_type])

### 재무제표 종류별로 필요한 데이터만 필터링

In [None]:
# 재무제표별로 필요한 데이터를 리스트로 보관
type_labels = {'bs': ['자산총계', '부채총계', '자본총계', '유동자산', '비유동자산', '유동부채', '비유동부채'],
               'is_annual': ['영업수익', '매출총이익', '영업이익', '당기순이익(손실)'],
               'is_half': ['수익(매출액)', '매출총이익', '영업이익', '당기순이익(손실)'],
               'is_quarter': ['매출액', '매출총이익', '영업이익'],
               'cf': ['영업활동현금흐름', '투자활동현금흐름', '재무활동현금흐름'],
               'cf_': ['영업활동 현금흐름', '투자활동 현금흐름', '재무활동 현금흐름']}

In [None]:
for period in periods:
  for fs_type in fs_types:
    if (fs_type=='cf') and not (type_labels['cf'][0] in fs_df[period][fs_type].columns):
      # 지표에 공백이 들어간 현금흐름표의 경우 알맞은 지표로 필터링
      fs_df[period][fs_type] = fs_df[period][fs_type].loc[:, type_labels['cf_']]
    elif (fs_type=='is'):
      type_key = 'is_' + period
      fs_df[period][fs_type] = fs_df[period][fs_type].loc[:, type_labels[type_key]]
    else:
      fs_df[period][fs_type] = fs_df[period][fs_type].loc[:, type_labels[fs_type]]

In [None]:
# 손익계산서에서 주기마다 다른 '영업수익'의 이름 통일
fs_df['half']['is'].rename(columns={'수익(매출액)':'영업수익'}, inplace=True)
fs_df['quarter']['is'].rename(columns={'매출액':'영업수익'}, inplace=True)

### 시계열 피처 만들기

In [None]:
# 재무제표 데이터의 기준일을 index에서 시계열 피처로
for period in periods:
  for fs_type in fs_types:
    fs_df[period][fs_type].reset_index(inplace=True)
    fs_df[period][fs_type].rename(columns={'index':REPORT_DAY}, inplace=True)

### 분기별로, 재무제표별로 나눠진 데이터셋을 하나로 병합하기 위한 기초 데이터프레임 만들기

In [None]:
# 지정된 기간동안의 재무제표 데이터에 대한 기준날짜를 시계열 피처로 하는 빈 데이터프레임 생성
def create_quarter_df(start_yr):
  start_yr_of_data = start_yr - 1
  current_year = datetime.now().year
  current_month = datetime.now().month
  terms_of_yr = ['0331', '0630', '0930', '1231']
  terms_list = []

  terms_list.append(pd.to_datetime(str(start_yr_of_data)+'1231'))
  for yr in range(start_yr_of_data + 1, current_year + 1):
    for term in terms_of_yr:
      if (yr < current_year or (yr == current_year and int(term[:2]) < current_month - 3)):
        terms_list.append(pd.to_datetime(str(yr) + term))
      elif (yr == current_year):
        break

  df = pd.DataFrame({FS_DAY: terms_list})
  return df

In [None]:
df = create_quarter_df(start_yr)

### 각 재무제표마다 지정된 년도 이후의 데이터만 남기기

In [None]:
def get_year(x):
  return x.year

for period in periods:
  is_valid = pd.to_datetime(fs_df[period]['bs'][REPORT_DAY]).apply(get_year) >= start_yr
  if (period == 'annual'):
    is_valid = pd.to_datetime(fs_df[period]['bs'][REPORT_DAY]).apply(get_year) >= (start_yr - 1)
  fs_df[period]['bs'] = fs_df[period]['bs'][is_valid]
  fs_df[period]['bs'].insert(0, FS_DAY, pd.to_datetime(fs_df[period]['bs'][REPORT_DAY]))

### 기초 데이터프레임에 BS 데이터 병합하기

In [None]:
# 1, 3, 4분기 bs데이터 from quarter 병합
df = pd.merge(df, fs_df['quarter']['bs'], on=FS_DAY, how='outer')

# 4분기 bs데이터 from annual 병합 후 중복 삭제
df = pd.merge(df, fs_df['annual']['bs'], how='outer').sort_values(FS_DAY)
df = df.drop_duplicates(subset=FS_DAY, keep='last')

# 인덱스 리셋
df = df.reset_index(drop=True)

# 2분기 bs데이터 from half 병합 => 2분기 날짜가 중복생성됨
df = pd.merge(df, fs_df['half']['bs'].sort_values(FS_DAY).reset_index(drop=True), how='outer').sort_values(FS_DAY)

# 중복 날짜 중 새로 추가된 데이터를 남김
df = df.drop_duplicates(subset=FS_DAY, keep='last')

# 인덱스 리셋
df = df.reset_index(drop=True)

# 레포트 기준일 삭제
df = df.drop(REPORT_DAY, axis=1)

### IS 데이터 병합하기

In [None]:
def get_start_month(x):
  return x[5]

def get_end_date(x):
  return x[9:]

def get_partial_fs(fs_df, period, report_type):
  # 손익계산서 기준기간이 1월 1일부터인 항목만 필터링
  is_from_jan = (fs_df[period][report_type][REPORT_DAY].apply(get_start_month)=='1')
  partial_is = fs_df[period][report_type][is_from_jan]

  # 기준기간의 종료일(=레포트기준일)만 찾아서 재무제표기준일 피처로
  partial_is.insert(0, FS_DAY, pd.to_datetime(partial_is[REPORT_DAY].apply(get_end_date)))

  # 재무제표기준일이 분석 시작년도 이후인 데이터만 필터링
  year_to_get = start_yr
  if (period == 'annual'):
    year_to_get = start_yr -1
  partial_is = partial_is[partial_is[FS_DAY].apply(get_year) >= year_to_get]

  return partial_is

In [None]:
is_13 = get_partial_fs(fs_df, 'quarter', 'is')
is_2 = get_partial_fs(fs_df, 'half', 'is')
is_4 = get_partial_fs(fs_df, 'annual', 'is')

# 각 분기 IS 데이터 병합, 정렬, 재인덱스
is_combined = pd.concat([is_13, is_2, is_4]).sort_values(FS_DAY).reset_index(drop=True)
is_combined = is_combined.drop(REPORT_DAY, axis=1)
is_combined

df = pd.merge(df, is_combined, how='outer')

### CF 데이터 병합하기

In [None]:
# 반기 보고서만 피처이름이 다른 것 보정
fs_df['half']['cf'].rename(columns={'영업활동 현금흐름':'영업활동현금흐름',
                                    '투자활동 현금흐름':'투자활동현금흐름',
                                    '재무활동 현금흐름':'재무활동현금흐름'}, inplace=True)

In [None]:
def get_partial_cf(period, fs_df):
  # 손익계산서 기준기간이 1월 1일부터인 항목만 필터링
  cf_from_jan = (fs_df[period]['cf'][REPORT_DAY].apply(get_start_month)=='1')
  partial_cf = fs_df[period]['cf'][cf_from_jan]

  # 기준기간의 종료일(=레포트기준일)만 찾아서 재무제표기준일 피처로
  partial_cf.insert(0, FS_DAY, pd.to_datetime(partial_cf[REPORT_DAY].apply(get_end_date)))

  year_to_get = start_yr
  if (period == 'annual'):
    year_to_get = start_yr -1
  partial_cf = partial_cf[partial_cf[FS_DAY].apply(get_year) >= year_to_get]

  return partial_cf

In [None]:
cf_13 = get_partial_fs(fs_df, 'quarter', 'cf').sort_values(FS_DAY)
cf_2 = get_partial_fs(fs_df, 'half', 'cf').sort_values(FS_DAY)
cf_4 = get_partial_fs(fs_df, 'annual', 'cf').sort_values(FS_DAY)

# 각 분기 CF 데이터 병합, 정렬, 재인덱스
cf_combined = pd.concat([cf_13, cf_2, cf_4]).sort_values(FS_DAY).reset_index(drop=True)
cf_combined = cf_combined.drop(REPORT_DAY, axis=1)
cf_combined

df = pd.merge(df, cf_combined, how='outer')

### 결측치 처리하기: 당기순이익

In [None]:
fs_reports = download_reports(corp_code, start_date, path)

quarter_is = fs_reports['quarter']['Data_is'].copy()

# 컬럼 이름 바꾸기
last_label = 6
for idx in range(0, last_label):
  old_nm = quarter_is.columns[idx]
  new_nm = quarter_is.iloc[0, idx]
  quarter_is.rename(columns={old_nm:new_nm}, inplace=True)

# 0, 1번째 행 삭제
quarter_is = quarter_is.iloc[2:]

# 라벨 칼럼 삭제
quarter_is = quarter_is.iloc[:, 2:]
quarter_is = quarter_is.drop(quarter_is.columns[1:last_label-1], axis=1)

# 라벨을 인덱스로해서 행열전환
quarter_is.set_index('label_ko', inplace=True)
quarter_is = quarter_is.transpose()

quarter_is = quarter_is.loc[:, ['법인세비용차감전순이익(손실)', '법인세비용(수익)']]
quarter_is.reset_index(inplace=True)

 # 손익계산서 기준기간이 1월 1일부터인 항목만 필터링
is_from_jan = (quarter_is['index'].apply(get_start_month)=='1')
quarter_is_partial = quarter_is[is_from_jan]

# 기준기간의 종료일(=레포트기준일)만 찾아서 재무제표기준일 피처로
quarter_is_partial.insert(0, FS_DAY, pd.to_datetime(quarter_is_partial['index'].apply(get_end_date)))
quarter_is_partial.drop(['index'], axis=1, inplace=True)

# 재무제표기준일이 분석 시작년도 이후인 데이터만 필터링
quarter_is_partial = quarter_is_partial[quarter_is_partial[FS_DAY].apply(get_year) >= start_yr]

quarter_is_partial['당기순이익(손실)'] = quarter_is_partial['법인세비용차감전순이익(손실)'] - quarter_is_partial['법인세비용(수익)']
quarter_is_partial.drop(['법인세비용차감전순이익(손실)', '법인세비용(수익)'], axis=1, inplace=True)

# 당기순이익을 채울 데이터프레임을 재무제표기준일 순으로 정렬한 후
quarter_is_partial.sort_values(FS_DAY, inplace=True)

# df의 '당기순이익(손실)'열에 결측치가 있는 행의 index를 가져와서 re-index한다.
quarter_is_partial = quarter_is_partial.set_index(df[df['당기순이익(손실)'].isna()].index, drop=True)

# 기존 데이터프레임의 결측치 채우기
# 결측치를 채운다.
df['당기순이익(손실)'] = df['당기순이익(손실)'].fillna(quarter_is_partial['당기순이익(손실)'])

### 결측치 처리하기: 비어있는 특정일자

In [None]:
# 결측치가 있는 행들의 인덱스를 리스트로 반환하고, 결측치가 있는 칼럼을 출력하는 함수
def get_na_idx(df):
  # na값 찾기
  na_values = df.isna()

  # na값이 있는 칼럼 찾아서 출력: 어떤 항목을 계산해야하는지 파악하기 위함
  cols_with_na = na_values.any(axis=0)
  cols_with_na_names = df.columns[cols_with_na].tolist()
  print("결측치가 있는 칼럼:", cols_with_na_names)

  # na값이 있는 행의 index 찾아서 리스트로 반환
  rows_with_na = na_values.any(axis=1)
  rows_with_na_indices = df[rows_with_na].index.tolist()
  return rows_with_na_indices

In [None]:
# 총계항목(whole)에 대한 특정부분(part) 비율의 평균을 구해 총계항목의 값을 채우는 함수
def fill_whole_w_mean(df, na_idx, part, rest, whole):
  mean_ratio = (df[part] / df[whole]).mean()
  df[whole][na_idx] = round(df[part][na_idx] / mean_ratio)
  df[rest][na_idx] = df[whole][na_idx] - df[part][na_idx]

# 총계항목(whole)에 대한 특정부분(part) 비율의 평균을 구해 특정부분의 값을 채우는 함수
def fill_part_w_mean(df, na_idx, part, rest, whole):
  mean_ratio = (df[part] / df[whole]).mean()
  df[part][na_idx] = round(df[whole][na_idx] / mean_ratio)
  df[rest][na_idx] = df[whole][na_idx] - df[part][na_idx]

# 결측치가 있는 행의 인덱스를 parameter로 받아 결측치를 채우는 함수
def fill_bs_na(df, na_idx):
  # 자본총계에 대한 자산총계 비율의 평균값으로 결측치 채우기
  fill_whole_w_mean(df, na_idx, '자본총계', '부채총계', '자산총계')

  # 유동자산에 대한 자산총계 비율의 평균값으로 결측치 채우기
  fill_part_w_mean(df, na_idx, '유동자산', '비유동자산', '자산총계')

  # 유동부채에 대한 부채총계 비율의 평균값으로 결측치 채우기
  fill_part_w_mean(df, na_idx, '유동부채', '비유동부채', '부채총계')

In [None]:
# 결측치가 있는 행들에 대해 작성해둔 함수 적용
na_idxs = get_na_idx(df)
for idx in na_idxs:
  fill_bs_na(df, idx)

In [None]:
df.head()

# 05. 주가 데이터 라이브러리 이용해보기

In [None]:
!pip install finance-datareader -q

In [None]:
import FinanceDataReader as fdr

### DART에서 기업의 주식종목코드 가져온 후 주가데이터 가져오기

In [None]:
stock_code = corp_list.find_by_corp_name('삼성전자', exactly=True)[0].stock_code
samsung_stock_history = fdr.DataReader(stock_code, start=start_date)
samsung_stock_history.reset_index(inplace=True)
samsung_stock_history.head()

# 06. 불러온 재무제표와 주가데이터를 하나의 데이터셋으로 만들어 구글 드라이브에 저장하기

### 주가 데이터와 재무제표 데이터 merge 하기

In [None]:
# prompt: stock의 Index를 참고하여 재무제표기준일 컬럼을 추가해줘.
# 재무제표기준일 컬럼은 분기 기준으로 값이 부여되어야해.
# 2020-12-31, 2021-03-31, 2021-06-30, 2021-09-30, 2021-12-31, 2022-03-31..2024-03-31 처럼 말이야
# 만약 stock의 Index가 2021-02-28 이라면 해당 날짜의 바로 이전 분기인 2020-12-31 값을 부여해줘
# merge한 후의 rows 수와 merge 전의 samsung_stock_history의 rows 수는 같아야해

# 재무제표 기준일 컬럼 생성
samsung_stock_history[FS_DAY] = samsung_stock_history.Date.apply(lambda x: pd.to_datetime(str(x.year - 1) + '-12-31') if x.month <= 3 else
                                                                               pd.to_datetime(str(x.year) + '-03-31') if x.month <= 6 else
                                                                               pd.to_datetime(str(x.year) + '-06-30') if x.month <= 9 else
                                                                               pd.to_datetime(str(x.year) + '-09-30'))

# 주가 데이터와 재무제표 데이터 merge
df_merged = pd.merge(samsung_stock_history, df, on=FS_DAY, how='left')

# merge 전후 행 수 확인
print("Merge 전 samsung_stock_history 행 수:", len(samsung_stock_history))
print("Merge 후 df_merged 행 수:", len(df_merged))


### 재무제표 부여가 불가능한 주식 데이터 row 삭제

In [None]:
print("Drop 전 df_merged 행 수:", len(df_merged))
df_merged = df_merged.dropna()
print("Drop 후 df_merged 행 수:", len(df_merged))

### merge한 데이터 저장 및 불러와보기

In [None]:
stock_path = os.path.join(path, f"{stock_code}_{start_yr}.csv")
df_merged.to_csv(stock_path, index=False)

In [None]:
pd.read_csv(stock_path)

In [None]:
88800 * 0.071170 + 82900