In [2]:
import os
import csv
import time
import socket
import requests
import pandas as pd
import xml.etree.ElementTree as ET
from datetime import datetime

# 1) CSV에서 코드↔이름 매핑 생성
def load_station_map(csv_path: str) -> (dict[str, str], set[str]):
    df = pd.read_csv(csv_path, dtype=str)
    code2name = dict(zip(df['code'], df['name']))
    name2code = dict(zip(df['name'], df['code']))
    codes = set(df['code'])
    return code2name, name2code, codes

# 2) JSON/HTTP 요청 + 페이징
def fetch_asos_data(
    service_key: str,
    start: str,
    end: str,
    station_id: str,
    max_retries: int = 3,
    per_page: int = 999
) -> pd.DataFrame:
    url = 'http://apis.data.go.kr/1360000/AsosHourlyInfoService/getWthrDataList'
    all_records: list[dict] = []
    page_no = 1
    while True:
        params = {
            'serviceKey': service_key,
            'pageNo': str(page_no),
            'numOfRows': str(per_page),
            'dataType': 'JSON',
            'dataCd': 'ASOS',
            'dateCd': 'HR',
            'startDt': start,
            'startHh': '00',
            'endDt': end,
            'endHh': '23',
            'stnIds': station_id
        }
        for attempt in range(1, max_retries + 1):
            try:
                resp = requests.get(url, params=params, timeout=10)
                resp.raise_for_status()
                data = resp.json()
                break
            except (requests.RequestException, socket.error, ValueError) as e:
                if attempt < max_retries:
                    time.sleep(1)
                    continue
                else:
                    print(f"❌ 요청 실패: station_id={station_id}, page={page_no}, error={e}")
                    return pd.DataFrame()
        items = data.get('response', {}).get('body', {}).get('items', {}).get('item', [])
        if not items:
            break
        all_records.extend(items)
        if len(items) < per_page:
            break
        page_no += 1
    df = pd.DataFrame(all_records)
    if not df.empty:
        # 컬럼명 매핑
        df = df.rename(columns={
            'tm': 'time',
            'stnId': 'station_id',
            'stnNm': 'station_name',
            'ta': 'temperature',
            'ws': 'wind_speed',
            'wd': 'wind_direction',
            'hm': 'humidity',
            'pv': 'precipitation',
            'td': 'dew_point',
            'pa': 'pressure',
            'ps': 'sea_pressure',
            'dsnw': 'snow_depth',
            'ts': 'ground_temp'
        })
        df['time'] = pd.to_datetime(df['time'])
    return df

# 3) 사용자 지정 복수 지역 데이터 수집
def select_data(
    region_keys: list[str],
    start: str,
    end: str,
    csv_path: str = '/home/solutionhkn/kma-data/Kma-data-crawling-Webpage/data/asos.csv',
    exclude: set[str] = None
) -> pd.DataFrame:
    code2name, name2code, codes = load_station_map(csv_path)
    service_key = "iCNxo2r0TdZnnV63/ItO+QrOUqJakXCxx/m20BsCp53DGZzJMDd1/7jOGLYQE+Sn+1EQeSeIhUsTIyQ5dYgy4Q=="
    if not service_key:
        raise RuntimeError('환경변수 SERVICE_KEY가 설정되어 있지 않습니다.')

    result_df = pd.DataFrame()
    exclude = exclude or set()
    for key in region_keys:
        if key in exclude:
            print(f"⚠️ 제외된 지역: {key}")
            continue
        if key in name2code:
            station_id = name2code[key]
        elif key in codes:
            station_id = key
        else:
            print(f"⚠️ 잘못된 지역: {key}")
            continue

        print(f"▶️ 수집 시작: station_id={station_id} ({key}), {start}~{end}")
        df = fetch_asos_data(service_key, start, end, station_id)
        if df.empty:
            print(f"❌ 데이터 없음: {station_id} ({key})")
        else:
            df['region_key'] = key
            result_df = pd.concat([result_df, df], ignore_index=True)
            print(f"✅ 수집 완료: {station_id} ({key}), rows={len(df)}")
        time.sleep(1)
    return result_df

# 4) 스크립트 실행 예시
if __name__ == '__main__':
    regions_input = input('지역명/코드를 쉼표로 구분해 입력: ')
    region_keys = [r.strip() for r in regions_input.split(',') if r.strip()]
    start_date = input('시작일 (YYYYMMDD): ')
    end_date = input('종료일 (YYYYMMDD): ')
    exclude_codes = {'102','115','169'}

    df_all = select_data(region_keys, start_date, end_date, exclude=exclude_codes)
    if not df_all.empty:
        out_file = f"collected_{start_date}_{end_date}_{'_'.join(region_keys)}.csv"
        df_all.to_csv(out_file, index=False, encoding='utf-8-sig')
        print(f"▶️ 저장 완료: {out_file}")
    else:
        print('수집된 데이터가 없습니다.')


▶️ 수집 시작: station_id=90 (90), 20250101~20250101
✅ 수집 완료: 90 (90), rows=24
▶️ 저장 완료: collected_20250101_20250101_90.csv


In [None]:
#!/usr/bin/env python3
# disaster_weather_text.py

import requests
import pandas as pd
from io import StringIO

# — 호출 가능한 변수와 컬럼명 매핑
VARIABLES = {
    "TA": ["tm","stn","ta","ta_hmi","ta_avg","ta_qcm","ta_max","ta_max_mi","ta_min","ta_min_mi"],
    "WD": ["tm","stn","wd","ws","wd_hmi","ws_hmi","ws1","wd1","ws1_mi","qc1"],  # 예시, 실제 컬럼 개수/순서 확인 필요
    "RN": ["tm","stn","rn","rn_hmi","rn_avg","rn_qcm","rn_max","rn_max_mi","rn_min","rn_min_mi"],
    "HM": ["tm","stn","hm","hm_hmi","hm_avg","hm_qcm","hm_max","hm_max_mi","hm_min","hm_min_mi"],
    "PS": ["tm","stn","ps","ps_hmi","ps_avg","ps_qcm","ps_max","ps_max_mi","ps_min","ps_min_mi"],
    None: None  # 정시자료는 동적 처리
}

BASE_URL = "https://apihub.kma.go.kr/api/typ01/url/awsh.php"

def fetch_disaster_data(auth_key: str, tm: str, var: str = None) -> pd.DataFrame:
    """
    방재 기상 API 호출 후, 고정폭 텍스트 응답을 whitespace-delimited CSV처럼 읽어서 DataFrame으로 반환.
     - auth_key: 발급받은 인증키
     - tm: 'YYYYMMDDhhmm' 형식 (예: '201508121500')
     - var: 'TA','WD','RN','HM','PS' 또는 None(정시자료)
    """
    # 1) GET 요청 (help 파라미터 제거)
    params = {"tm": tm, "authKey": auth_key}
    if var:
        params["var"] = var
    resp = requests.get(BASE_URL, params=params)
    resp.raise_for_status()
    text = resp.text

    # 2) comment("#") 라인 제거 → 실제 데이터 라인만 남김
    data_lines = [line for line in text.splitlines() if not line.startswith("#")]
    if not data_lines:
        raise RuntimeError(f"{var or '정시자료'} 데이터가 없습니다.")
    data_str = "\n".join(data_lines)

    # 3) pandas 로딩
    #    - delim_whitespace=True: 공백을 구분자로 사용
    #    - header=None: 첫 줄도 데이터로 인식
    df = pd.read_csv(StringIO(data_str), 
                     delim_whitespace=True, 
                     header=None)

    # 4) 컬럼명 지정
    col_names = VARIABLES.get(var)
    if col_names:
        if len(col_names) != df.shape[1]:
            raise RuntimeError(f"컬럼 수 불일치: 기대={len(col_names)}, 실제={df.shape[1]}")
        df.columns = col_names
    else:
        # 정시자료: 첫 두 컬럼은 'tm','stn', 나머지는 자동 이름 부여
        names = ["tm","stn"] + [f"col{i}" for i in range(2, df.shape[1])]
        df.columns = names

    # 5) 타입 변환
    # tm → datetime
    df["tm"] = pd.to_datetime(df["tm"].astype(str), format="%Y%m%d%H%M")
    # 나머지 숫자 컬럼 float
    for c in df.columns:
        if c != "tm":
            df[c] = pd.to_numeric(df[c], errors="ignore")

    return df

def main():
    AUTH_KEY = "Ek1C8O-0RwSNQvDvtAcEpw"
    TM       = "201508121500"

    for var in ["TA","WD","RN","HM","PS", None]:
        label = var if var else "ROUTINE"
        print(f"\n=== {label} ===")
        try:
            df = fetch_disaster_data(AUTH_KEY, TM, var)
            print(df.head().to_string(index=False))
        except Exception as e:
            print(f"오류: {e}")

if __name__ == "__main__":
    main()


FileNotFoundError: [Errno 2] No such file or directory: '/home/solutionhkn/kma-data/Kma-data-crawling-Webpage/data/임시.html'

In [None]:
import re
import csv
from bs4 import BeautifulSoup

def parse_stations(html):
    """
    HTML 문자열에서 <div title="..."> 요소를 찾아
    "이름(코드)" 형태로 파싱하여 리스트로 반환.
    반환 형식: [{'name': str, 'code': str}, ...]
    """
    soup = BeautifulSoup(html, 'html.parser')
    pattern = re.compile(r"^(.+)\((\d+)\)$")
    stations = []

    for div in soup.find_all('div', title=True):
        # title 속성에서 '|' 앞부분만 취함 (예: "가거도(303)")
        head = div['title'].split('|', 1)[0]
        m = pattern.match(head)
        if m:
            name, code = m.group(1), m.group(2)
        else:
            # fallback: 마지막 '(' 기준으로 분리
            if '(' in head and head.endswith(')'):
                name, code_part = head.rsplit('(', 1)
                code = code_part.rstrip(')')
            else:
                # 파싱 불가 시 스킵
                continue
        stations.append({'name': name, 'code': code})
    return stations


def save_to_csv(stations, path='stations.csv'):
    """
    파싱된 stations 리스트를 CSV 파일로 저장
    """
    if not stations:
        print('저장할 데이터가 없습니다.')
        return
    fieldnames = ['name', 'code']
    with open(path, 'w', newline='', encoding='utf-8') as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(stations)
    print(f'저장 완료: {path} ({len(stations)}개)')


if __name__ == '__main__':
    # HTML 파일 경로를 직접 지정
    html_path = '/home/solutionhkn/kma-data/Kma-data-crawling-Webpage/data/임시.html'
    # 결과 CSV 파일 경로
    output_csv = 'stations.csv'

    # HTML 읽기
    with open(html_path, encoding='utf-8') as f:
        html_content = f.read()

    # 파싱
    station_list = parse_stations(html_content)

    # CSV 저장
    save_to_csv(station_list, output_csv)

    # 콘솔에 일부 출력
    print('추출된 관측소 예시:')
    for station in station_list[:10]:
        print(f"{station['name']} → {station['code']}")


저장 완료: stations.csv (572개)
추출된 관측소 예시:
가거도 → 303
가곡 → 647
가덕 → 641
가덕도 → 921
가산 → 824
가야산 → 311
가파도 → 855
가평북면 → 531
가평조종 → 505
간동 → 681


In [1]:
#!/usr/bin/env python3
# fetch_single_disaster.py

import re
import time
import requests
import pandas as pd
from io import StringIO

# —————————————————————
# 1) 설정
# —————————————————————
AUTH_KEY = 'Ek1C8O-0RwSNQvDvtAcEpw'  # 발급받은 인증키
BASE_URL = 'https://apihub.kma.go.kr/api/typ01/url/awsh.php'
RETRIES = 10         # 재시도 횟수
WAIT_SEC = 1         # 재시도 간 대기 시간(초)

# —————————————————————
# 2) 방재기상 API 호출 및 컬럼 매핑 함수
# —————————————————————
def fetch_disaster_data(auth_key: str, tm: str, var: str, stn: str, disp: int = 0) -> pd.DataFrame:
    """
    방재기상 API 호출 후 고정폭 텍스트 응답을 파싱해
    pandas.DataFrame으로 반환하며, 컬럼명을 한글 설명으로 매핑합니다.
    """
    params = {'tm': tm, 'authKey': auth_key, 'disp': disp}
    if var:
        params['var'] = var
    if stn:
        params['stn'] = stn
    resp = requests.get(BASE_URL, params=params, timeout=10)
    resp.raise_for_status()
    text = resp.text

    # 주석('#') 제거 → 데이터 라인만
    lines = [ln for ln in text.splitlines() if not ln.startswith('#')]
    if not lines:
        return pd.DataFrame()
    data_str = "\n".join(lines)

    # 고정폭 텍스트 → DataFrame
    df = pd.read_csv(StringIO(data_str), sep='\s+', header=None)
    # 기본 컬럼명 설정
    base_cols = ['tm', 'stn', var]
    other_cols = [f'col{i}' for i in range(3, df.shape[1])]
    df.columns = base_cols + other_cols
    # tm 컬럼 datetime 변환
    df['tm'] = pd.to_datetime(df['tm'].astype(str), format='%Y%m%d%H%M')

    # 변수별 세부 컬럼명 매핑
    if var == 'TA':
        df.columns = [
            'tm','stn',
            '정시 기온 (°C)', '기온 시각차 (분)', '60분 평균 기온 (°C)',
            '기온 자료수', '60분 최고 기온 (°C)', '최고 기온 시각차 (분)',
            '60분 최저 기온 (°C)', '최저 기온 시각차 (분)'
        ][:df.shape[1]]
    elif var == 'PS':
        df.columns = [
            'tm','stn',
            '정시 해면기압 (hPa)', '해면기압 시각차 (분)', '60분 해면기압 평균 (hPa)',
            '해면기압 자료수', '60분 최고 해면기압 (hPa)', '최고 해면기압 시각차 (분)',
            '60분 최저 해면기압 (hPa)', '최저 해면기압 시각차 (분)'
        ][:df.shape[1]]
    elif var == 'HM':
        df.columns = [
            'tm','stn',
            '정시 습도 (%)', '습도 시각차 (분)', '60분 평균 습도 (%)',
            '습도 자료수', '60분 최고 습도 (%)', '최고 습도 시각차 (분)',
            '60분 최저 습도 (%)', '최저 습도 시각차 (분)'
        ][:df.shape[1]]
    elif var == 'RN':
        df.columns = [
            'tm','stn',
            '60분 강수 유무 합계', '60분 강수 유무 자료수',
            '일강수량 (mm)', '일강수량 시각차 (분)',
            '1시간 강수량 (mm)', '1시간 누적 시작 시각차 (분)',
            '60분 이동 누적 강수 최대 (mm)', '최대 누적 강수 시각차 (분)',
            '60분 강수 자료수', '15분 이동 누적 강수 최대 (mm)',
            '15분 누적 강수 시각차 (분)', '15분 강수 자료수'
        ][:df.shape[1]]
    elif var == 'WD':
        df.columns = [
            'tm','stn',
            '정시 10분 평균 풍향 (degree)', '정시 10분 평균 풍속 (m/s)', '바람 시각차 (분)',
            '60분 최대 풍향 (degree)', '60분 최대 풍속 (m/s)', '최대 풍속 시각차 (분)',
            '10분 풍속 자료수', '60분 평균 풍속 (m/s)', '1분 최대 풍향 (degree)',
            '1분 최대 풍속 (m/s)', '1분 최대 풍속 시각차 (분)', '1분 풍속 자료수',
            '순간 최대 풍향 (degree)', '순간 최대 풍속 (m/s)', '순간 최대 풍속 시각차 (분)', '순간 풍속 자료수'
        ][:df.shape[1]]

    return df

# —————————————————————
# 3) main: 단일 지점 데이터 조회
# —————————————————————
def main():
    # 사용자 입력
    tm = input('조회 기준 시각을 입력하세요 (YYYYMMDDhhmm): ').strip()
    if not re.match(r'^\d{12}$', tm):
        print('잘못된 형식입니다.')
        return
    stn = input('지점 코드 입력 (예: 303): ').strip()
    if not stn.isdigit():
        print('지점 코드는 숫자여야 합니다.')
        return
    var = input('변수코드 입력 (PS, HM, TA, RN, WD): ').strip().upper()
    if var not in ('PS','HM','TA','RN','WD'):
        print('유효한 변수코드가 아닙니다.')
        return
    disp = input('도움말 포함? (0/1): ').strip()
    disp = 1 if disp == '1' else 0

    # API 호출 및 재시도
    df = pd.DataFrame()
    for i in range(1, RETRIES+1):
        try:
            df = fetch_disaster_data(AUTH_KEY, tm, var, stn, disp)
            break
        except Exception as e:
            print(f"[{stn},{var}] 시도 {i}/{RETRIES} 실패: {e}")
            if i < RETRIES:
                time.sleep(WAIT_SEC)
    if df.empty:
        print('데이터가 없습니다.')
        return

    # 결과 출력 및 저장
    print(df.to_string(index=False))
    if input('CSV로 저장하시겠습니까? (y/n): ').strip().lower() == 'y':
        fname = f'{stn}_{var}_{tm}.csv'
        df.to_csv(fname, index=False, encoding='utf-8-sig')
        print(f'저장완료: {fname}')

if __name__ == '__main__':
    main()


[303,TA] 시도 1/10 실패: HTTPSConnectionPool(host='apihub.kma.go.kr', port=443): Read timed out. (read timeout=10)
                 tm  stn  정시 기온 (°C)  기온 시각차 (분)  60분 평균 기온 (°C)  기온 자료수  60분 최고 기온 (°C)  최고 기온 시각차 (분)  60분 최저 기온 (°C)  최저 기온 시각차 (분)
2025-01-01 01:00:00  303         5.5           0             5.6      60             5.9              0             5.5              0


In [None]:
#!/usr/bin/env python3
# fetch_multiple_disaster.py

import re
import time
import requests
import pandas as pd
from io import StringIO
from datetime import datetime, timedelta

# —————————————————————
# 1) 설정
# —————————————————————
AUTH_KEY = 'Ek1C8O-0RwSNQvDvtAcEpw'  # 발급받은 인증키
BASE_URL = 'https://apihub.kma.go.kr/api/typ01/url/awsh.php'
RETRIES = 10         # 재시도 횟수
WAIT_SEC = 1         # 재시도 간 대기 시간(초)

# —————————————————————
# 2) 방재기상 API 호출 및 컬럼 매핑 함수
#    (기존 로직 변경 없음)
# —————————————————————
def fetch_disaster_data(auth_key: str, tm: str, var: str, stn: str, disp: int = 0) -> pd.DataFrame:
    params = {'tm': tm, 'authKey': auth_key, 'disp': disp}
    if var:
        params['var'] = var
    if stn:
        params['stn'] = stn
    resp = requests.get(BASE_URL, params=params, timeout=10)
    resp.raise_for_status()
    text = resp.text

    # 주석('#') 제거 → 데이터 라인만
    lines = [ln for ln in text.splitlines() if not ln.startswith('#')]
    if not lines:
        return pd.DataFrame()
    data_str = "\n".join(lines)

    # 고정폭 텍스트 → DataFrame
    df = pd.read_csv(StringIO(data_str), sep='\s+', header=None)
    base_cols = ['tm', 'stn', var]
    other_cols = [f'col{i}' for i in range(3, df.shape[1])]
    df.columns = base_cols + other_cols
    df['tm'] = pd.to_datetime(df['tm'].astype(str), format='%Y%m%d%H%M')

    # 기존 변수별 컬럼 매핑 로직 (생략)
    # ...
    return df

# —————————————————————
# 3) main: 다중 tm 범위, 다중 변수, 다중 지역 조회
# —————————————————————
def main():
    # 1) 다중 범위 입력
    raw_ranges = input(
        '조회 기준 시각 범위들을 입력하세요 (YYYYMMDDhhmm-YYYYMMDDhhmm 으로 양식을 작성하세요): '
    ).strip()
    if not raw_ranges:
        print('입력이 없습니다.')
        return
    ranges = []
    for part in raw_ranges.split(','):
        try:
            start_str, end_str = part.split('-')
            if not (re.match(r'^\d{12}$', start_str) and re.match(r'^\d{12}$', end_str)):
                raise ValueError
            ranges.append((start_str, end_str))
        except ValueError:
            print(f'잘못된 범위 형식: "{part}"')
            return

    # 2) 다중 지역 입력
    raw_stns = input('지점 코드 입력 (예: 303,105,내이름허용): 쉼표로 구분: ').strip()
    stn_list = [s.strip() for s in raw_stns.split(',') if s.strip()]
    if not stn_list:
        print('적어도 하나의 지점 코드를 입력하세요.')
        return

    # 3) 다중 변수 입력
    raw_vars = input('변수코드 입력 (PS, HM, TA, RN, WD), 쉼표로 구분: ').strip().upper()
    var_list = [v.strip() for v in raw_vars.split(',') if v.strip()]
    valid_vars = {'PS','HM','TA','RN','WD'}
    if not var_list or not all(v in valid_vars for v in var_list):
        print('유효한 변수코드만 입력하세요 (PS, HM, TA, RN, WD)')
        return

    # 4) disp 옵션
    disp_input = input('도움말 포함? (0/1): ').strip()
    disp = 1 if disp_input == '1' else 0

    # 5) 조회 및 재시도 반복
    results = []
    for start_tm, end_tm in ranges:
        cur = datetime.strptime(start_tm, '%Y%m%d%H%M')
        end = datetime.strptime(end_tm, '%Y%m%d%H%M')
        while cur <= end:
            tm = cur.strftime('%Y%m%d%H%M')
            for stn in stn_list:
                for var in var_list:
                    print(f'[{stn},{var}] 조회: {tm}')
                    df = pd.DataFrame()
                    for i in range(1, RETRIES+1):
                        try:
                            df = fetch_disaster_data(AUTH_KEY, tm, var, stn, disp)
                            break
                        except Exception as e:
                            print(f'  시도 {i}/{RETRIES} 실패: {e}')
                            if i < RETRIES:
                                time.sleep(WAIT_SEC)
                    if not df.empty:
                        df['variable'] = var
                        df['station'] = stn
                        results.append(df)
                    else:
                        print(f'  → {tm} {stn} {var} 데이터 없음 또는 실패')
            cur += timedelta(hours=1)

    if not results:
        print('모든 범위, 지역, 변수에서 데이터가 없습니다.')
        return

    # 6) 결과 합치기
    full_df = pd.concat(results, ignore_index=True)
    print(full_df.to_string(index=False))

    # 7) CSV 저장
    if input('CSV로 저장하시겠습니까? (y/n): ').strip().lower() == 'y':
        ranges_str = '_'.join(f"{s}-{e}" for s, e in ranges)
        vars_str = '_'.join(var_list)
        stns_str = '_'.join(stn_list)
        fname = f'{stns_str}_{vars_str}_{ranges_str}.csv'
        full_df.to_csv(fname, index=False, encoding='utf-8-sig')
        print(f'저장완료: {fname}')

if __name__ == '__main__':
    main()

[303,TA] 조회: 202508010000
[303,RN] 조회: 202508010000
[647,TA] 조회: 202508010000
[647,RN] 조회: 202508010000
[303,TA] 조회: 202508010100
[303,RN] 조회: 202508010100
[647,TA] 조회: 202508010100
[647,RN] 조회: 202508010100
[303,TA] 조회: 202508010200
[303,RN] 조회: 202508010200
[647,TA] 조회: 202508010200
[647,RN] 조회: 202508010200
[303,TA] 조회: 202508010300
[303,RN] 조회: 202508010300
[647,TA] 조회: 202508010300
[647,RN] 조회: 202508010300
[303,TA] 조회: 202508010400
[303,RN] 조회: 202508010400
[647,TA] 조회: 202508010400
[647,RN] 조회: 202508010400
[303,TA] 조회: 202508010500
[303,RN] 조회: 202508010500
[647,TA] 조회: 202508010500
[647,RN] 조회: 202508010500
[303,TA] 조회: 202508010600
[303,RN] 조회: 202508010600
[647,TA] 조회: 202508010600
[647,RN] 조회: 202508010600
[303,TA] 조회: 202508010700
[303,RN] 조회: 202508010700
[647,TA] 조회: 202508010700
[647,RN] 조회: 202508010700
[303,TA] 조회: 202508010800
[303,RN] 조회: 202508010800
[647,TA] 조회: 202508010800
[647,RN] 조회: 202508010800
[303,TA] 조회: 202508010900
[303,RN] 조회: 202508010900
[647,TA] 조회: