In [1]:

## kosis 현재 인구 데이터 가공

import pandas as pd
import _config
import _codes
import _util

def popExcelToCsv(file_nm, year):
    sido_cd_map = {info['sido_nm']:info['sido_cd'] for info in _codes.sido_codes}
    sido_sgg_cd_map = {}

    df = pd.read_csv(_config.pop_read_path + f'\\{file_nm}.csv', encoding=_config.euc_kr)
    keyword = '년'

    sido_cd = None
    #sido_cd row마다 부여
    for i in range(len(df)):
        df.loc[i, '년도'] = year
        sgg_nm = df.loc[i, '행정구역(시군구)별'].strip()

        # 시도 코드 매핑
        sido_cd = sido_cd_map.get(sgg_nm, sido_cd)
        df.loc[i, 'sido_cd'] = f'`{str(sido_cd)}'  # 시도 코드 할당

        #해당년도 해당 변수 불러오기
        sgg_cd = _util.getSggCd(sido_cd, sgg_nm, year, 1)
        df.loc[i, 'sgg_cd'] = f'`{str(sgg_cd)}'  # 시군구 코드 할당
            
    # year 부여
    df['year'] = f'`{year}'

    _cols = {}
    for col in df.columns:
        if keyword in col and f'{keyword}도' not in col:
            _cols[col] = 'cnt'
            continue
        if col.count('시군구') > 0: 
            _cols[col] = 'sgg_nm'
            continue
        if col.count('성별') > 0: 
            _cols[col] = 'gender'
            continue
        if '6' not in col: _cols[col] = col

    df.rename(columns=_cols, inplace=True)
    
    # 로우를 컬럼으로 변경
    df = df.pivot_table(
        index=['sido_cd', 'sgg_cd', 'sgg_nm', 'gender', 'year'],
        columns='연령별',
        values='cnt',
        aggfunc='sum',
        fill_value=0
    )
    df.reset_index(inplace=True)

    # 열 이름 목록 생성
    _list = []
    for col in df.columns:
        if '85세 이상' in col or '80세 이상' in col: continue
        if '계' in col: continue
        _list.append(col)
    
    df = df[_list]
    
    import math
    # 나이를 기준으로 정렬
    _cols = {col: int(_util.extract_number(col)) for col in df.columns if not math.isinf(_util.extract_number(col))}
    _notCols = {col: col for col in df.columns if math.isinf(_util.extract_number(col))}
    _cols = sorted(_cols.items(), key=lambda x: x[1])
    cols = {}
    for col, num in _cols:
        if num < 100: cols[col] = f'{num}y'
        elif num == 100: cols[col] = f'{num}y_over'
    
    df.rename(columns=cols, inplace=True)
    _notCols.update(cols)
    df = df[list(_notCols.values())]

    try: df.to_csv(_config.pop_write_path + f'\\{file_nm}.csv', encoding=_config.euc_kr, index=False)
    except: 
        print(f'해당 파일쓰기 실행 중 에러 발생 {file_nm}')
        return False
    
    print(f'{file_nm}.csv 파일 생성완료')

    return True

suc_cnt = 0
fail_cnt = 0
years = list(range(2000, 2024))
for year in years:
    year_str = str(year)[2:4]
    if popExcelToCsv(year_str + '년도 인구', year): suc_cnt += 1
    else: fail_cnt += 1

print('성공', suc_cnt, '실패', fail_cnt)


KeyboardInterrupt: 

In [None]:
# kosis 추세 인구 데이터 가공

import _config
import _util
import _codes
import pandas as pd

df = pd.read_csv(_config.pop_read_path + '\\' + '성_및_연령별_추계인구_1세별__5세별____전국_20240524100134' + '.csv', encoding=_config.euc_kr)

number_cols = [col for col in df.columns if str(col).isnumeric()]
not_number_cols = [col for col in df.columns if not str(col).isnumeric()]

df = pd.melt(
    df
    , id_vars=not_number_cols
    , value_vars=number_cols
    , var_name='year'
    , value_name='cnt'
)

_list = []
for col in df.columns:
    if col != '가정별': _list.append(col)

df = df[_list]
df = pd.DataFrame(df)

_cols = {}
for col in df.columns:
    if col.count('성별'): _cols[col] = 'gender'
df.rename(columns=_cols, inplace=True)

df['sido_cd'] = '`00'
df['sgg_cd'] = '`None'
df['sgg_nm'] = '전국'
df['year'] = '`' + df['year'].astype(str)  # 이스케이프 문제 해결

df = df.pivot_table(
    index=['sido_cd', 'sgg_cd', 'sgg_nm', 'gender', 'year']
    , columns='연령별'
    , values='cnt'
    , aggfunc='sum'
    , fill_value=0
    # , dropna=False
)
df.reset_index(inplace=True)

#age sort
import math
_cols = {col: _util.extract_number(col) for col in df.columns if not math.isinf(_util.extract_number(col))}
_notCols = {col: col for col in df.columns if math.isinf(_util.extract_number(col))}
_cols = sorted(_cols.items(), key=lambda x: x[1]) #sort
cols = {}
for col, num in _cols:
    if num < 100: cols[col] = f'{num}y'
    elif num == 100: cols[col] = f'{num}y_over'
df.rename(columns=cols, inplace=True)
_notCols.update(cols)
df = df[list(_notCols.values())]

df.to_csv(_config.pop_write_path + '\\' + '24-72년도 인구' + '.csv', encoding=_config.euc_kr, index=False)


In [6]:
# 특정년도 시군구 연령별 증감률 구하기

import _util
import _codes
import _config
import pandas as pd
import math
import re
from openpyxl import load_workbook
from openpyxl.styles import PatternFill
from openpyxl.styles import Font

patterns = {
    'sgg_cd': re.compile(r'sgg_cd'),
    'sgg_nm': re.compile(r'sgg_nm'),
    'sido_cd': re.compile(r'^sido_cd$'),
    'gender': re.compile(r'^gender$'),
    'year': re.compile(r'^year$'),
    'geom': re.compile(r'geom')
}

def makePopCR(year):
    pop_cols = _util.getTableColumns('pop')
    if pop_cols is None: return
    pop_cols_v1 = [f'"{col[0]}"' for col in pop_cols]
    pop_cols_v2 = [f'"{col[0]}"' for col in pop_cols if col[0] and not any(p.search(col[0]) for p in patterns.values())]
    pop_cols_v3 = [f'{col[0]}' for col in pop_cols if col[0] and not any(p.search(col[0]) for p in patterns.values())]  # 인구 나이 컬럼만

    gis_year = _util.makeGisYear(year)
    g_cols = _util.getTableColumns(f'gujaesi_sgg_{gis_year}_5179')
    if g_cols is None: return
    g_cols = [col[0] for col in g_cols if not patterns['geom'].search(col[0])]
    ng_cols = _util.getTableColumns(f'nogujaesi_sgg_{gis_year}_5179')
    if ng_cols is None: return
    ng_cols = [col[0] for col in ng_cols if not patterns['geom'].search(col[0])]

    def getPopData(year):
        gis_year = _util.makeGisYear(year)
        sql = f'''
            with g as (
                select {', '.join(g_cols)}
                from gujaesi_sgg_{gis_year}_5179
            )
            , ng as (
                select {', '.join(ng_cols)}
                from nogujaesi_sgg_{gis_year}_5179
            )
            , p as (
                select {', '.join(pop_cols_v1)}
                from pop
                where "year" = '{year}' and gender = '계'
                order by sgg_cd, gender
            )
            select *
            from (
                select coalesce(p.sido_cd, substring(g.sgg_cd, 1, 2)) as sido_cd, g.sgg_cd, g.sgg_nm, p.gender, p."year", {', '.join([f'coalesce(sum({col}), 0) as {col}' for col in pop_cols_v2])}
                from g
                left join p on substring(g.sgg_cd, 1, 4) = substring(p.sgg_cd, 1, 4)
                group by g.sgg_cd, g.sgg_nm, p.sido_cd, p."year", p.gender
                order by g.sgg_cd, p."year"
            ) union all (
                select p.sido_cd, ng.sgg_cd, ng.sgg_nm, p.gender, p."year", {', '.join(pop_cols_v2)}
                from ng
                left join p on ng.sgg_cd = p.sgg_cd
                order by ng.sgg_cd, p."year"
            )
        '''

        result = _util.execute_sql(sql)
        if result is None: return
        _list = []
        for row in result: _list.append({pop_cols[idx][0]: row[idx] for idx in range(len(pop_cols))})
        _dic = {row['sgg_cd']: row for row in _list}
        return _dic

    if year == 2023: bf_df = getPopData(year - 3)
    else: bf_df = getPopData(year - 5)
    ct_df = getPopData(year)

    _list = []
    for key in ct_df.keys():
        ct_row = ct_df.get(key, None)
        if key == '29010' and year < 2020: continue # 세종시
        if ct_row is None:
            print('ct_row', key, ct_row)
            break
        
        if year == 2023: key = _util.beforeSggCdMapping(key, year - 3)
        else: key = _util.beforeSggCdMapping(key, year - 5)
        
        bf_row = bf_df.get(key, None)
        if bf_row is None:
            print(year, 'ct_row', key, ct_row)
            print(year, 'bf_row', key, bf_row)
            break

        _dict = {
            'sido_cd': ct_row['sido_cd'],
            'sgg_cd': ct_row['sgg_cd'],
            'sgg_nm': ct_row['sgg_nm'],
            'gender': ct_row['gender'],
            'year': ct_row['year']
        }

        _dict.update({
            col: f'{round(_util.objDivide(ct_row[col], bf_row[col]) * 100, 1) - 100}%' for col in pop_cols_v3
        })

        _list.append(_dict)

    df = pd.DataFrame(_list)
    df = df.sort_values(by='sgg_cd')

    # 헤더 설정 부분
    header = [
        {
            'sido_cd': '시도코드'
            , 'sgg_cd': '시군구코드'
            , 'sgg_nm': '시군구명'
            , 'gender': '성별'
            , 'year': '년도'
        }
        , {col[0]: col[0] for col in pop_cols}
    ]
    header[0].update(
        {col: f'{col.replace("y", "세").replace("y_over", "세 이상")} 증감률' for col in pop_cols_v3}
    )

    # 헤더를 DataFrame으로 변환
    header_df = pd.DataFrame(header)
    df = pd.concat([header_df, df], ignore_index=False)

    # 엑셀 파일로 저장
    excel_path = _config.pop_write_path + '\\' + f'pop_{year}_CR' + '.xlsx'
    df.to_excel(excel_path, index=False, header=False)

    # 엑셀 파일 열기
    workbook = load_workbook(excel_path)
    sheet = workbook.active

    # # 특정 값을 가진 셀에 색상 부여
    # for row in sheet.iter_rows(min_row=2, max_row=sheet.max_row, min_col=6, max_col=sheet.max_column):
    #     for cell in row:
    #         if isinstance(cell.value, (int, float)):  # 값이 숫자인 경우만 처리
    #             if cell.value < 0:
    #                 cell.fill = PatternFill(start_color='FF0000', end_color='FF0000', fill_type='solid')  # 빨간색
    #             elif cell.value > 0:
    #                 cell.fill = PatternFill(start_color='0000FF', end_color='0000FF', fill_type='solid')  # 파란색

    # 특정 값을 가진 셀에 색상 부여
    for row in sheet.iter_rows(min_row=2, max_row=sheet.max_row, min_col=6, max_col=sheet.max_column):
        for cell in row:
            val = round(_util.objToFloat(cell.value.replace('%', '')), 1)
            if isinstance(val, (int, float)):  # 값이 숫자인 경우만 처리
                if math.isinf(val): continue
                if val < -60:
                    cell.value = f'{round(val, 1)}%'
                    cell.font = Font(color='00008B')  # 진한파란색  
                elif val < -30:
                    cell.font = Font(color='0000CD')  # 중간파란색  
                    cell.value = f'{round(val, 1)}%'
                elif val < 0: 
                    cell.font = Font(color='87CEFA')  # 연한파란색  
                    cell.value = f'{round(val, 1)}%'
                    
                elif val > 60: 
                    cell.font = Font(color='8B0000')  # 진한빨간색
                    cell.value = f'+{round(val, 1)}%'
                elif val > 30: 
                    cell.font = Font(color='FF4500')  # 중간빨간색
                    cell.value = f'+{round(val, 1)}%'
                elif val > 0: 
                    cell.font = Font(color='FFA07A')  # 연한빨간색
                    cell.value = f'+{round(val, 1)}%'

    # 수정된 엑셀 파일 저장
    workbook.save(excel_path)

for year in _config.layer_years:
    if year == 2000: continue
    print(f'{year} 파일 생성 시작')
    makePopCR(year)
    print(f'{year} 파일 생성 종료')


2005 파일 생성 시작
2005 파일 생성 종료
2010 파일 생성 시작
2010 파일 생성 종료
2015 파일 생성 시작
2015 파일 생성 종료
2020 파일 생성 시작
2020 파일 생성 종료
2023 파일 생성 시작
2023 파일 생성 종료


In [11]:
# 24 ~ 72년도 전국 인구 데이터의 연령별 인구 증감률을 구하기
import _util
import _codes
import _config
import pandas as pd
import math
import re

# 23년도 인구 데이터 -> 전국 데이터만 뽑아오기 위한 전처리

patterns = {
    '00': re.compile(r'00')
    , 'all': re.compile(r'^(계|전체)$')
    , 'None': re.compile(r'None')
}

df_23 = pd.read_csv(_config.pop_write_path + '\\' + '23년도 인구' + '.csv', encoding=_config.euc_kr)
df_all = pd.read_csv(_config.pop_write_path + '\\' + '24-72년도 인구' + '.csv', encoding=_config.euc_kr)

df_list = [
    df_23
    , df_all
]
df = pd.concat(df_list, ignore_index=False)
_list = [row for idx, row in df.iterrows() if patterns['00'].search(row['sido_cd']) and patterns['all'].search(row['gender'])]
# _cols = {col: col for col in _cols if col }
df = pd.DataFrame(_list)
_list = None

# 23년 ~ 72년 인구 자료
df.to_csv(_config.pop_write_path + '\\' + 'pop_fcr' + '.csv', encoding=_config.euc_kr, index=False)

_list = [row for idx, row in df_23.iterrows() if patterns['None'].search(row['sgg_cd'])]
df_23

# 2024년 부터의 인구 증감률 데이터 만들기
# 연령 컬럼 뽑아내기 위한 과정
_cols = [col for col in df.columns if not math.isinf(_util.extract_number(col))]
_notCols = [col for col in df.columns if math.isinf(_util.extract_number(col))]

# 전국 데이터를 토대로 증감률 구하기

# rst = []
# rst.append(df_23)

crs = {}
for idx in range(len(df)):
    if idx == 0: continue
    if idx > 1: break
    bf_row = df.iloc[idx - 1]
    ct_row = df.iloc[idx]

    # 연령별로 나이 증감률 나누기
    crs = {col: round(ct_row[col] / bf_row[col], 3) for col in _cols}

# 새로운 데이터프레임 생성
# start와 end

def make_pop_projection(df, year):
    df = pd.DataFrame(df)
    new_df = df.copy()
    for idx, row in new_df.iterrows():
        for col in _cols: new_df.at[idx, col] = round(_util.objMultiple(new_df.at[idx, col], crs[col]))

    # 헤더 설정 부분
    header = [
        {
            'sido_cd': '시도코드'
            , 'sgg_cd': '시군구코드'
            , 'sgg_nm': '시군구명'
            , 'gender': '성별'
            , 'year': '년도'
        }
        , {col: col for col in df_23.columns}
    ]
    header[0].update(
        {col: f'{col.replace("y", "세").replace("y_over", "세 이상")}' for col in _cols}
    )

    # 헤더를 DataFrame으로 변환
    header_df = pd.DataFrame(header)
    df_24 = pd.concat([header_df, new_df], ignore_index=False)
    df_24.to_csv(_config.pop_write_path + '\\' + f'{year[0:2]}년도 인구' + '.csv', encoding=_config.euc_kr, index=False, header=False)


    # _list = []
    # for idx in range(len(df_23)):
    #     row = df.iloc[idx]
    #     print(row)
# print(crs)
# for cr in crs:


In [None]:
# 현재 인구 데이터 가공 파일 + 추계 인구 데이터 가공 파일 => 병합 파일 생성 

import _config
import _util
import _codes
import pandas as pd
import re

patterns = {
    'test2': re.compile(r'test2')
    , 'pop_': re.compile(r'pop_')
}

files = _util.get_files(_config.pop_write_path)
files = [file for file in files if not any(p.search(file) for p in patterns.values())]
print(files)
_list = []
for f in files:
    df = pd.read_csv(_config.pop_write_path + '\\' + f, encoding=_config.euc_kr)
    _list.append(df) 

df = pd.concat(_list, ignore_index=True)
df.to_csv(_config.pop_write_path + '\\' + 'test2' + '.csv', encoding=_config.euc_kr, index=False)

In [None]:
# 병합 파일 => db에 insert하기

import _config
import _util
import _codes
import pandas as pd

print('파일 불러오는 중')
df = pd.read_csv(_config.pop_write_path + '\\' + 'test2' + '.csv', encoding=_config.euc_kr)

_cols = []
for col in df.columns:
    if col.count('sido_cd') > 0: val = f'\"{col}\" varchar(2)'
    elif col.count('sgg_cd') > 0: val = f'\"{col}\" varchar(5)'
    elif col.count('sgg_nm') > 0: val = f'\"{col}\" varchar(10)'
    elif col.count('gender') > 0: val = f'\"{col}\" varchar(3)'
    elif col.count('year') > 0: val = f'\"{col}\" varchar(4)'
    else: val = f'\"{col}\" bigint'

    _cols.append(val)

sql = f'''
    do $$
    begin
        if exists (select 1 from pg_tables where tablename ='pop') then
        drop table pop cascade;
        end if;
        if not exists (select 1 from pg_tables where tablename ='pop') then
        create table pop(
            {', '.join(_cols)}
        );
        end if;
    end $$;
'''
print('데이터 불러오는 중')
_util.execute_sql(sql)

columns = df.columns.tolist()
for idx in range(len(columns)): columns[idx] = f'"{columns[idx]}"'

# 데이터프레임의 모든 행을 리스트로 변환
data = df.values.tolist()
# 데이터 배치로 나누기
import math
print('데이터 전처리 및 데이터 삽입 시작')
for chunk in _util.chunker(data, 1000):
    values = []
    for row in chunk:
        for idx in range(len(columns)):
            if str(row[idx]).count('`') > 0:
                val = str(row[idx]).replace('`', '')
                row[idx] = f'\'{val}\''
            elif isinstance(row[idx], (int, float)): row[idx] = f'{_util.objToInt(row[idx])}'
            else:
                row[idx] = f'\'{str(row[idx])}\''
            if row[idx] == 'None':
                row[idx] = '\'\''

        values.append(f"({', '.join(row)})")
    
    sql = f'''
        do $$
        begin
            if exists (select 1 from pg_tables where tablename = 'pop') then
                insert into public.pop(
                    {','.join(columns)}
                ) values {', '.join(values)};
            end if;
        end $$;
    '''
    
    _util.execute_sql(sql)

print("데이터 삽입 완료")


In [None]:
# 특정년도 시군구 매핑 데이터 파일로 만들기
# 참고) 구제시는 구제시로 통합하여 만듬

import _util
import pandas as pd
import _config

year = 2023
file_nm = f'pop_{year}'

cols = _util.getTableColumns('pop')
col_nms = [row[0] for row in cols]
_cols = list(col_nms)
cols = [
    f's.\"{col}\"' if col == 'sgg_cd' else f's.\"{col}\"' if col == 'sgg_nm' else f'p.\"{col}\"' for col in _cols
]

table_nm = f'v_simple_pop_{year}'
sql = f'''
    do $$
    begin
        if exists (select 1 from pg_matviews where matviewname = '{table_nm}') then
            drop materialized view {table_nm};
        end if;
        if exists (select 1 from pg_tables where tablename = 'pop') then
        create materialized view {table_nm} as
        with p_{year} as (
            select *
            from pop
            where "year" = '{year}'
        ) 
        select row_number() over() as fid
        , {', '.join(cols)}, s.geom
        from simple_sgg_{year} s
        left join p_{year} p on p.sgg_cd = s.sgg_cd
        ;
        end if;
    end $$;
'''

_util.execute_sql(sql)

patterns = {
    'y': re.compile(r'y$')
    , 'y_over': re.compile(r'.+y.+')
}

cols = _util.getTableColumns('pop') #table columns 가져오기

if cols is not None:
    cols = [row[0] for row in cols]
    header = [
        {
            'sido_cd': '시도코드'
            , 'sgg_cd': '시군구코드'
            , 'sgg_nm': '시군구명'
            , 'gender': '성별'
            , 'year': '년도'
        }
        , {col: col for col in cols}
    ]
    header[0].update(
        {f"{col}": f"{col.replace('y', '').replace('y_over', '')}년" if col != '100' else "100년 이상" for col in cols if any(p.search(col) for p in patterns.values()) or col == '100'}
    )
    header = pd.DataFrame(header)

    _cols = list(cols)
    cols = [f's.\"{col}\"' for col in _cols]
    sql = f'''
        select {', '.join(cols)}
        from v_simple_pop_{year} s
    '''

    result = _util.execute_sql(sql)

    df = pd.DataFrame(result)

    # df_cols = [col for col in df.columns if col != 'Unnamed: 0']
    cols = {}
    for idx in range(len(_cols)): cols[idx] = _cols[idx]
    df = pd.DataFrame(list(result))
    df.rename(columns=cols, inplace=True)

    df = pd.concat([header, df], ignore_index=True)
    df.to_csv(_config.pop_write_path + '\\' + file_nm + '.csv', encoding=_config.euc_kr, index=False, header=False)

    # sql = f'''
    #     do $$
    #     begin
    #         if exists (select 1 from pg_matviews where matviewname = '{table_nm}') then
    #         drop materialized view {table_nm} cascade;
    #         end if;
    #     end $$;
    # '''

    # _util.execute_sql(sql)

