In [1]:
import OpenDartReader
from dotenv import load_dotenv
import os 
import pandas as pd

load_dotenv()
api_key =  os.environ.get('OpenDartReader_key')
dart = OpenDartReader(api_key)

AWS_KEY = os.environ.get('AWS_KEY')
AWS_SECRET = os.environ.get('AWS_SECRET')
tableau_key = os.environ.get('tableau_red')

In [2]:
pd.set_option('display.max_columns', 500)
pd.set_option('display.max_rows', 20)

In [3]:
def readFromPrestoDB(sql_command):
    ''' Hive DB에서 데이터 로딩 '''
    try:
        from pyhive import presto
        import pandas as pd

        presto_conn = presto.connect(
            host = "presto-internal.dp.zigbang.net",
            port = 80,
            username = "biglabred", catalog = "hive", schema = ""
        )

        df = pd.read_sql(sql_command, presto_conn)

        presto_conn.close()
        return df
    
    except:
        print('Failed to read data from hive DB')

In [4]:
def get_sql(AWS_KEY, AWS_SECRET, sql_command):
    import cryptocode
    import pymysql
    import s3fs
    # 암호화된 태블로 DB 접속키 로딩
    # 암호화된 태블로 DB 접속키 로딩
    fs = s3fs.S3FileSystem(anon=False, key=AWS_KEY, secret=AWS_SECRET)
    with fs.open("s3://zigbang-mlops/models/red/red_encrypted.txt", mode="r") as f:
        key = f.readline()

    conn = pymysql.connect(
        #host="biglab.c3svvjp5iqfn.ap-northeast-1.rds.amazonaws.com", port=3306,
        host="rds-red-w.zigbang.io", port=3306,
        user="zigbang_tableau", passwd=tableau_key, #cryptocode.decrypt(key, AWS_SECRET),
        db="tableau", charset="utf8"
    )
    cursor = conn.cursor() 
    cursor.execute(sql_command)
    result = pd.DataFrame(cursor.fetchall())

    num_fileds = len(cursor.description)
    field_names = [i[0] for i in cursor.description]
    result.columns = field_names

    conn.close()
        
    return result

In [5]:
def load_xml_to_dataframe(response, keyword):
    import bs4 as bs
    import urllib.request
    soup = bs.BeautifulSoup(response_xml,'xml')
    
    rows = soup.find_all(keyword)
    columns = rows[0].find_all()
    
    rowList = []
    nameList = []
    columnList = []

    rowsLen = len(rows)
    columnsLen = len(columns)

    for i in range(0, rowsLen):
        columns = rows[i].find_all()

        for j in range(0, columnsLen):
            if i == 0:
                nameList.append(columns[j].name) #header
            eachColumn = columns[j].text #value
            columnList.append(eachColumn)
        rowList.append(columnList)
        columnList = []    # 다음 row의 값을 넣기 위해 비워준다

    result = pd.DataFrame(rowList, columns=nameList)
    return result

In [6]:
def ExportDataFrameGspread(googlespread_link, sheetname, dataframe, AWS_KEY, AWS_SECRET):    
    import gspread
    from oauth2client.service_account import ServiceAccountCredentials
    import s3fs
    import numpy as np

    scope = ['https://spreadsheets.google.com/feeds',
     'https://www.googleapis.com/auth/drive']
    fs = s3fs.S3FileSystem(anon=False, key=AWS_KEY, secret=AWS_SECRET)
    key_file_name = "gspreadkey.json"
    fs.download(f"s3://zigbang-mlops/models/red/{key_file_name}", key_file_name)

    credentials = ServiceAccountCredentials.from_json_keyfile_name(key_file_name, scope)
    gc = gspread.authorize(credentials)
    spreadsheet_url= googlespread_link
    doc = gc.open_by_url(spreadsheet_url)

    worksheet = doc.worksheet(sheetname)
    worksheet.clear()  #clear selected worksheet before export data frame
    
    dataframe.replace([np.inf, -np.inf], np.nan, inplace=True)
    dataframe.fillna('', inplace=True)
    worksheet.update([dataframe.columns.values.tolist()] + dataframe.values.tolist())
           
    if os.path.exists("gspreadkey.json"):
        os.remove("gspreadkey.json")

In [544]:
def push_into_tableau_DB(data, AWS_KEY, AWS_SECRET, table_name, sql_create, replace=False):
    import cryptocode
    import pymysql
    import s3fs
    
    # 암호화된 태블로 DB 접속키 로딩
    fs = s3fs.S3FileSystem(anon=False, key=AWS_KEY, secret=AWS_SECRET)
    with fs.open("s3://zigbang-mlops/models/red/red_encrypted.txt", mode="r") as f:
        key = f.readline()
    
    # 태블로 DB에 접속하는 코드
    tableau_conn = pymysql.connect(
        host="rds-red-w.zigbang.io", port=3306,
        user="zigbang_tableau", passwd=cryptocode.decrypt(key, AWS_SECRET),
        db="tableau", charset="utf8"
    )
    cur = tableau_conn.cursor()

    # table_name이 존재하는지 체크하는 코드
    exist = cur.execute(f"SHOW TABLES LIKE '{table_name}'")
    
    if not exist:
        # table_name이 없다면 data 기반으로 생성하는 코드
        cur.execute(sql_create)
        
    # NULL 데이터 처리
    data = data.where(pd.notnull(data),"")
    
    # run_date 추가
    #data["run_date"] = datetime.now(timezone('Asia/Seoul')).strftime("%Y-%m-%d %H:%M:%S")
    
    # Replace할 경우 기존 데이터 삭제
    if replace:
        cur.execute(f"DELETE FROM {table_name}")
    
    # 데이터를 INSERT 하는 코드
    columns = ", ".join(data.columns)
    values = '('+', '.join(['%s']*len(data.columns))+')'
    
    print(columns, values)
    statement = "INSERT INTO " + table_name + " (" + columns + ") VALUES " + values
    insert = [tuple(x) for x in data.values]
    cur.executemany(statement, insert)
    
    # Commit 및 접속 종료
    cur.execute("COMMIT")
    tableau_conn.close()

In [8]:
def readFromGspread(googlespread_link, sheetname, AWS_KEY, AWS_SECRET):
    ''' 구글 스프레드시트에서 데이터 로딩 '''
    import gspread
    from oauth2client.service_account import ServiceAccountCredentials
    import s3fs
    
    scope = ['https://spreadsheets.google.com/feeds',
         'https://www.googleapis.com/auth/drive']
    fs = s3fs.S3FileSystem(anon=False, key=AWS_KEY, secret=AWS_SECRET)
    key_file_name = "gspreadkey.json"
    fs.download(f"s3://zigbang-mlops/models/red/{key_file_name}", key_file_name)
    #with fs.open("s3://zigbang-mlops/models/red/{key_file_name}}",key_file_name, mode="r") as f:
    #    key = f.readline()
    
    credentials = ServiceAccountCredentials.from_json_keyfile_name(key_file_name, scope)
    gc = gspread.authorize(credentials)
    spreadsheet_url= googlespread_link
    doc = gc.open_by_url(spreadsheet_url)
    
    df = pd.DataFrame.from_records(doc.worksheet(sheetname).get_all_values())
    
    df.columns = df.iloc[0]
    df = df[1:]
    
    import os
    os.remove(key_file_name)
              
    return df

## 데이터 불러오기

#### 기업개요 및 주소 정보

In [346]:
df_company = readFromPrestoDB('''
select * from 
(
select "고유번호", "정식명칭", "영문명칭", "종목명", "종목코드","법인구분", "법인등록번호",
"사업자등록번호", "주소", "업종코드", "업종명", "설립일",
"결산월", "created_at", "updated_at" from hive.ods.biglab_disclosure_company_overview
where "법인등록번호" !=''  and  "종목코드" != ''
) X
left join 
(
select * from
hive.ods.biglab_disclosure_company_address 
) Y
on X."주소" = Y."주소" 
'''
)

#법인등록번호가 null값인 것은 해외소재주소지로 제외 
#종목코드가 있는 상장사로 필터

#### 기업 분류 기준

In [347]:
url_industry_categorization = 'https://docs.google.com/spreadsheets/d/1N8iOHx1M8PLqlGiA2tgOKl-AFoCNw3Ew9bwHMMhBIIU/edit#gid=201266420'
df_industry_categorization = readFromGspread(url_industry_categorization, '표준산업대분류', AWS_KEY, AWS_SECRET)
df_industry_categorization = df_industry_categorization[['산업대분류', 'range_start', 'range_end']]

#df_industry_categorization = readFromGspread(url_industry_categorization, '표준산업분류', AWS_KEY, AWS_SECRET) 
#df_industry_categorization  = df_industry_categorization.CODE.astype(str).str.zfill(2)

#### 직원현황 공시정보

In [624]:
df_employee.정규직수.str

<pandas.core.strings.accessor.StringMethods at 0x7f812fbe9a90>

In [635]:
df_employee = readFromPrestoDB('''
select * from  hive.ods.biglab_disclosure_business_report_employee_summary    
where "보고서코드" ='11011'                         
''')
cols = ['정규직수', '합계', '연간급여총액', '1인평균급여액']
for col in cols:
    df_employee[col] = np.where(df_employee[col]=='-', 0, df_employee[col])
    df_employee[col] = pd.to_numeric(df_employee[col], errors='coerce')
df_employee.dropna()   #데이터 중 숫자가 아닌데 숫자로 들어간 데이터는 제외한다. 예시: 연간급여총액=="####################"으로 되어있는경우 위의 to_numeric coerce로 nan값을 주게 되고 이를 레코드에서 드랍 시킨다.
  
    

정규직수
합계
연간급여총액
1인평균급여액


Unnamed: 0,id,고유번호,사업연도,보고서코드,접수번호,법인구분,법인명,사업부문,성별,개정전직원수정규직,개정전직원수계약직,개정전직원수기타,정규직수,정규직단시간근로자수,계약직수,계약직단시간근로자수,합계,평균근속연수,연간급여총액,1인평균급여액,비고,created_at,updated_at


#### 사업보고서 공시 자료 api 호출 중. (추후 태블로 DB에서 가져올 예정)

df_financial_stat = readFromPrestoDB('''
select * from hive.ods.biglab_disclosure_financial_statement
where "재무제표종류"='재무상태표, 유동/비유동법-연결재무제표'  -- '재무상태표, 유동/비유동법-별도재무제표'
and "보고서종류"='사업보고서'
and "항목코드"='ifrs-full_Assets' or "항목코드"= 'ifrs_Assets' -- 자산총계
''')

In [409]:
%%time

api_key =  os.environ.get('OpenDartReader_key')
dart = OpenDartReader(api_key)
df = pd.DataFrame()
for yr in range(2018, 2020):
    for stock_code in df_company.종목코드:
        try:
            row = dart.finstate(corp=stock_code, bsns_year=yr , reprt_code='11011')
            if  row.shape!=(0,0):  #013 :조회된 데이타가 없습니다 일 경우에는 쉐입이 (0,0) 이 됨..
                df = df.append(row)
            else:
                print("해당보고서가 없습니다: ",stock_code, "  year:", yr)
                continue 
        except:
            row = []
            continue 


{'status': '013', 'message': '조회된 데이타가 없습니다.'}

해당보고서가 없습니다:  055000   year: 2018
{'status': '013', 'message': '조회된 데이타가 없습니다.'}

해당보고서가 없습니다:  032600   year: 2018
{'status': '013', 'message': '조회된 데이타가 없습니다.'}

해당보고서가 없습니다:  062730   year: 2018
{'status': '013', 'message': '조회된 데이타가 없습니다.'}

해당보고서가 없습니다:  034660   year: 2018
{'status': '013', 'message': '조회된 데이타가 없습니다.'}

해당보고서가 없습니다:  035010   year: 2018
{'status': '013', 'message': '조회된 데이타가 없습니다.'}

해당보고서가 없습니다:  054010   year: 2018
{'status': '013', 'message': '조회된 데이타가 없습니다.'}

해당보고서가 없습니다:  093820   year: 2018
{'status': '013', 'message': '조회된 데이타가 없습니다.'}

해당보고서가 없습니다:  031800   year: 2018
{'status': '013', 'message': '조회된 데이타가 없습니다.'}

해당보고서가 없습니다:  003020   year: 2018
{'status': '013', 'message': '조회된 데이타가 없습니다.'}

해당보고서가 없습니다:  045820   year: 2018
{'status': '013', 'message': '조회된 데이타가 없습니다.'}

해당보고서가 없습니다:  058550   year: 2018
{'status': '013', 'message': '조회된 데이타가 없습니다.'}

해당보고서가 없습니다:  047600   year: 2018
{'status': '013'

df_org = df.copy()

df_org = df.copy()

df_backup= df.copy()

In [None]:
df_company2 = pd.DataFrame()
for stock_code in df_company.종목코드.unique():
    row = pd.DataFrame(dart.company(stock_code), index=[0])
    if row['status'].values=='000':   #status 100 일경우 안나오는 기업. 000이면 정상적으로 호출이 되는것.
        df_company2 = df_company2.append(row)
    else:
        print(stock_code)
        continue


#### 업종코드 api호출된 기업개황에 결합

In [None]:
df_company2['induty_code_cat'] = df_company2['induty_code'].astype(str).str[:2].astype(int)

df_industry_categorization[['range_start', 'range_end']] = df_industry_categorization[['range_start', 'range_end']].apply(pd.to_numeric, errors='coerce')
s = pd.IntervalIndex.from_arrays(df_industry_categorization.range_start,
                                 df_industry_categorization.range_end, 'both')
df_mapping_industry = df_company2.assign(industry = df_industry_categorization.set_index(s).loc[df_company2.induty_code_cat].산업대분류.values )[['corp_code', 'industry']]

In [421]:
df_company = df_company.merge(df_mapping_industry, left_on='고유번호', right_on='corp_code', how='left')

#### 공시자료에서 연결재무재표 중 영업이익, 자산총계, 매출만 사용

import sweetviz as sv
analyze_report = sv.analyze(df_org)
analyze_report.show_html('dart_report.html', open_browser=False)

In [None]:
import numpy as np
df['thstrm_amount'] = np.where(df['thstrm_amount']=='-', 0, df['thstrm_amount']  )    #'-' 가 금액으로 들어간경우 숫자 0으로 대체

In [577]:
df = df[((df.fs_div=='CFS') & (df.account_nm.str.contains('영업이익')) ) |
   ((df.fs_div=='CFS') & (df.account_nm.str.contains('자산총계')) ) |
   ((df.fs_div=='CFS') & (df.account_nm.str.contains('매출')) )
   ]

In [578]:
df = df[df.currency=='KRW']

## 데이터 결합

In [579]:
df = df.pivot_table(index=['rcept_no',	'reprt_code',	'bsns_year',	'corp_code', 'stock_code'], 
               columns='account_nm'	, values='thstrm_amount', aggfunc=sum).fillna(0).reset_index()

In [639]:
df_employee = df_employee.groupby(["고유번호", "사업연도" ,"보고서코드", "법인명" ])[['정규직수', '합계', '연간급여총액']].sum().reset_index()

In [641]:
df_merged = df.merge(df_employee, how='inner', left_on=['bsns_year', 'corp_code'], right_on=['사업연도', '고유번호']) 
df_merged = df_merged.merge(df_company[['종목코드', 'industry', '주소', '시도', '시군구', '읍면동', '번지', '지역코드', '위도', '경도' ]], 
                how='left',   left_on ='stock_code', right_on='종목코드')
df_merged.rename(columns={"합계":"종업원수"}, inplace= True)




In [642]:
df_merged['연간평균급여'] = (df_merged.연간급여총액/df_merged.종업원수).fillna(0)

In [643]:
df_merged.rename(columns={"industry_x":"industry"}, inplace= True)

df_merged.head(2)

In [644]:

df_merged = df_merged[['rcept_no', 'bsns_year', '매출액', '영업이익', '자산총계', '고유번호','stock_code', '법인명', 'industry' , '정규직수', '종업원수', '연간평균급여',
                       '시도', '시군구', '읍면동', '번지', '지역코드', '위도', '경도' ]]
df_merged.rename(columns={'industry':'업종명'}, inplace=True, errors="ignore")


In [None]:

df_merged.연간평균급여 = df_merged.연간평균급여.replace([np.inf, -np.inf], 0)

cols =['매출액', '영업이익', '자산총계', '연간평균급여']
for col in cols:
    print(col)
    df_merged[col] = df_merged[col].str.replace(',', '').apply(pd.to_numeric, errors='coerce').fillna(0)


In [665]:
df_merged[df_merged.stock_code=='003620']#['매출액']

Unnamed: 0,rcept_no,bsns_year,매출액,영업이익,자산총계,고유번호,stock_code,법인명,업종명,정규직수,종업원수,연간평균급여,시도,시군구,읍면동,번지,지역코드,위도,경도
796,20190401003283,2018,3494637644343,-65275806226,0,138242,3620,쌍용자동차,,4972,5003,88686790.0,경기,평택시,칠괴동,580,4122010400,37.028952,127.095083
2011,20200330002307,2019,3704793546043,-64175997221,0,138242,3620,쌍용자동차,,4972,5003,85735560.0,경기,평택시,칠괴동,580,4122010400,37.028952,127.095083
4294,20210323000883,2020,3623882266268,-281905360065,0,138242,3620,쌍용자동차,,4840,4869,65383240.0,경기,평택시,칠괴동,580,4122010400,37.028952,127.095083


In [646]:
table_name = 'dart_disclosure_reported'
sql_create = f"""
    CREATE TABLE {table_name} (
    rcept_no VARCHAR(100),
    bsns_year VARCHAR(4),
    stock_code VARCHAR(6) ,
    고유번호 VARCHAR(10) ,
    법인명 VARCHAR(100) ,
    업종명 VARCHAR(100) ,
    정규직수 int, 
    종업원수 int, 
    연간평균급여 bigint,
    매출액 bigint,
    영업이익 bigint,
    자산총계 bigint,
    시도	 VARCHAR(10),
    시군구	VARCHAR(10),
    읍면동	VARCHAR(20),
    번지	VARCHAR(20),
    지역코드	VARCHAR(10),
    위도	double,
    경도    double
    ) CHARSET=utf8;
"""

In [662]:
push_into_tableau_DB(df_merged,  AWS_KEY, AWS_SECRET, table_name , sql_create, replace=True)

rcept_no, bsns_year, 매출액, 영업이익, 자산총계, 고유번호, stock_code, 법인명, 업종명, 정규직수, 종업원수, 연간평균급여, 시도, 시군구, 읍면동, 번지, 지역코드, 위도, 경도 (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)


In [661]:
df_merged

Unnamed: 0,rcept_no,bsns_year,매출액,영업이익,자산총계,고유번호,stock_code,법인명,업종명,정규직수,종업원수,연간평균급여,시도,시군구,읍면동,번지,지역코드,위도,경도
0,20180628000160,2018,26486287918,-2421224869,53706903697,00378363,060310,3S,,67,67,4.897531e+07,서울,금천구,가산동,345-24,1154510100,37.472852,126.883273
1,20180628000221,2018,13689574860,378892285,83733936170,00241209,033200,모아텍,,105,105,4.424355e+07,인천,남동구,구월동,292,2820010100,37.448752,126.714593
2,20180629000180,2018,10617309716,-1125362235,36717349367,00155586,024850,피에스엠씨,,274,275,0.000000e+00,경기,화성시,향남읍 구문천리,928,4159025925,37.084418,126.908098
3,20180629000420,2018,107804655124,10147395343,235954858759,00106614,092440,기신정기,,407,413,4.737300e+07,인천,남동구,논현동,439,2820011000,37.408791,126.703613
4,20180629000437,2018,492941408564,17149669339,492848698913,00118008,018500,동원금속,,683,684,6.815351e+07,경북,경산시,진량읍 양기리,330-1,4729025327,35.890239,128.826578
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
4841,20220831001680,2020,30443293632,2121292624,51118782194,00255433,042510,라온시큐어,,189,198,4.815009e+07,서울,강남구,역삼동,646-15,1168010100,37.500713,127.035193
4842,20220908000088,2018,372059611808,33942609831,350605916672,00456218,080160,모두투어,,1089,1237,4.431043e+07,서울,중구,을지로1가,188-3,1114010400,37.565634,126.979433
4843,20220908000100,2019,364973001111,16610715303,334585105230,00456218,080160,모두투어,,1047,1158,4.441364e+07,서울,중구,을지로1가,188-3,1114010400,37.565634,126.979433
4844,20220923000276,2020,53365357987,-21892260198,172007119266,00530556,122640,예스티,,239,239,4.491632e+07,경기,평택시,진위면 마산리,0,4122031022,37.086498,127.104186


df_merged[df_merged.종업원수<=0]