In [1]:
import os
import requests
from bs4 import BeautifulSoup
from dotenv import load_dotenv
import math
import urllib.parse

# ✅ .env에서 디코딩된 API 키 로드 후 인코딩
load_dotenv()
API_KEY = urllib.parse.quote(os.getenv("API_KEY"), safe='')

def fetch_total_count():
    """전체 충전소 개수를 가져오는 함수"""
    url = (
        f"http://apis.data.go.kr/B552584/EvCharger/getChargerInfo"
        f"?serviceKey={API_KEY}&pageNo=1&numOfRows=1&dataType=XML"
    )
    response = requests.get(url)
    if response.status_code == 200:
        soup = BeautifulSoup(response.text, 'lxml-xml')
        total = soup.find('totalCount')
        print(f"📦 totalCount = {total.text if total else 'N/A'}")
        return int(total.text) if total else 0
    else:
        print(f"[ERROR] 총 개수 요청 실패 - 상태코드 {response.status_code}")
        return 0

def fetch_charger_info(page=1, num_of_rows=9999):
    """한 페이지 분량의 충전소 정보를 가져오는 함수"""
    url = (
        f"http://apis.data.go.kr/B552584/EvCharger/getChargerInfo"
        f"?serviceKey={API_KEY}&pageNo={page}&numOfRows={num_of_rows}&dataType=XML"
    )
    try:
        response = requests.get(url, timeout=30)
        if response.status_code == 200:
            soup = BeautifulSoup(response.text, 'lxml-xml')
            return soup.find_all('item')
        else:
            print(f"[ERROR] 요청 실패 - 상태코드 {response.status_code}")
            return []
    except Exception as e:
        print(f"[예외 발생] {e}")
        return []

def fetch_all_chargers():
    """전체 충전소 데이터를 수집하는 함수"""
    total_count = fetch_total_count()
    per_page = 9999
    total_pages = math.ceil(total_count / per_page)

    print(f"📊 총 {total_count}개의 충전소 → {total_pages}페이지 수집 예정")

    all_data = []
    for page in range(1, total_pages + 1):
        print(f"📥 {page}/{total_pages} 페이지 수집 중...")
        items = fetch_charger_info(page=page, num_of_rows=per_page)
        all_data.extend(items)

    print(f"✅ 전체 충전소 데이터 수집 완료! 총 {len(all_data)}건")
    return all_data

# ✅ 실행 시
if __name__ == "__main__":
    all_chargers = fetch_all_chargers() # 코드 실행하시면 됩니다.


📦 totalCount = 425565
📊 총 425565개의 충전소 → 43페이지 수집 예정
📥 1/43 페이지 수집 중...
📥 2/43 페이지 수집 중...
📥 3/43 페이지 수집 중...
📥 4/43 페이지 수집 중...
📥 5/43 페이지 수집 중...
📥 6/43 페이지 수집 중...
📥 7/43 페이지 수집 중...
📥 8/43 페이지 수집 중...
📥 9/43 페이지 수집 중...
📥 10/43 페이지 수집 중...
📥 11/43 페이지 수집 중...
📥 12/43 페이지 수집 중...
📥 13/43 페이지 수집 중...
📥 14/43 페이지 수집 중...
📥 15/43 페이지 수집 중...
📥 16/43 페이지 수집 중...
📥 17/43 페이지 수집 중...
📥 18/43 페이지 수집 중...
📥 19/43 페이지 수집 중...
📥 20/43 페이지 수집 중...
📥 21/43 페이지 수집 중...
📥 22/43 페이지 수집 중...
📥 23/43 페이지 수집 중...
📥 24/43 페이지 수집 중...
📥 25/43 페이지 수집 중...
📥 26/43 페이지 수집 중...
📥 27/43 페이지 수집 중...
📥 28/43 페이지 수집 중...
📥 29/43 페이지 수집 중...
📥 30/43 페이지 수집 중...
📥 31/43 페이지 수집 중...
📥 32/43 페이지 수집 중...
📥 33/43 페이지 수집 중...
📥 34/43 페이지 수집 중...
📥 35/43 페이지 수집 중...
📥 36/43 페이지 수집 중...
📥 37/43 페이지 수집 중...
📥 38/43 페이지 수집 중...
📥 39/43 페이지 수집 중...
📥 40/43 페이지 수집 중...
📥 41/43 페이지 수집 중...
📥 42/43 페이지 수집 중...
📥 43/43 페이지 수집 중...
✅ 전체 충전소 데이터 수집 완료! 총 425570건


In [2]:
def parse_charger_items(items): # 수집 완료되면 이 코드 실행하시면 됩니다.
    """BeautifulSoup item 리스트 → 딕셔너리 리스트로 정제"""
    data_list = []

    for item in items:
        data = {
            "statId": item.find("statId").text if item.find("statId") else None,
            "chgerId": item.find("chgerId").text if item.find("chgerId") else None,
            "statNm": item.find("statNm").text if item.find("statNm") else None,
            "addr": item.find("addr").text if item.find("addr") else None,
            "lat": float(item.find("lat").text) if item.find("lat") and item.find("lat").text.strip() else None,
            "lng": float(item.find("lng").text) if item.find("lng") and item.find("lng").text.strip() else None,
            "output": float(item.find("output").text) if item.find("output") and item.find("output").text.strip() else None,
            "year": int(item.find("year").text) if item.find("year") and item.find("year").text.strip() else None,
            "zcode": item.find("zcode").text if item.find("zcode") else None,
            "zscode": item.find("zscode").text if item.find("zscode") else None,
            "chgerType": item.find("chgerType").text if item.find("chgerType") else None,
            "stat": item.find("stat").text if item.find("stat") else None,
            "busiNm": item.find("busiNm").text if item.find("busiNm") else None,
            "statUpdDt": item.find("statUpdDt").text if item.find("statUpdDt") else None,
            "useTime": item.find("useTime").text if item.find("useTime") else None,
            "parkingFree": item.find("parkingFree").text if item.find("parkingFree") else None,
            "limitYn": item.find("limitYn").text if item.find("limitYn") else None,
            "limitDetail": item.find("limitDetail").text if item.find("limitDetail") else None,
            "note": item.find("note").text if item.find("note") else None,
            "delYn": item.find("delYn").text if item.find("delYn") else None,
        }
        data_list.append(data)

    return data_list


In [3]:
# 위 코드 실행 후 해당 코드 실행하시면 됩니다.
import mysql.connector
from mysql.connector import Error

config = {
    'host': 'localhost',
    'user': 'skn14',
    'password': 'skn14',
    'database': 'projectdb'
}

def insert_normalized_data(station_charger_list, config):
    """딕셔너리 리스트 데이터를 stations, chargers 테이블에 삽입"""
    try:
        conn = mysql.connector.connect(**config)
        cursor = conn.cursor()

        # ✅ 충전소 테이블 INSERT 문 (확장 필드 포함)
        station_sql = """
        INSERT INTO stations (
            statId, statNm, addr, lat, lng, busiNm, year,
            zcode, zscode, useTime, parkingFree, limitYn,
            limitDetail, note, delYn, statUpdDt
        ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
        ON DUPLICATE KEY UPDATE
            statNm=VALUES(statNm),
            addr=VALUES(addr),
            lat=VALUES(lat),
            lng=VALUES(lng),
            busiNm=VALUES(busiNm),
            year=VALUES(year),
            zcode=VALUES(zcode),
            zscode=VALUES(zscode),
            useTime=VALUES(useTime),
            parkingFree=VALUES(parkingFree),
            limitYn=VALUES(limitYn),
            limitDetail=VALUES(limitDetail),
            note=VALUES(note),
            delYn=VALUES(delYn),
            statUpdDt=VALUES(statUpdDt)
        """

        # ✅ 충전기 테이블 INSERT 문 그대로 유지
        charger_sql = """
        INSERT INTO chargers (
            statId, chgerId, chgerType, stat, output
        ) VALUES (%s, %s, %s, %s, %s)
        ON DUPLICATE KEY UPDATE
            chgerType=VALUES(chgerType),
            stat=VALUES(stat),
            output=VALUES(output)
        """

        for d in station_charger_list:
            cursor.execute(station_sql, (
                d.get('statId'), d.get('statNm'), d.get('addr'), d.get('lat'), d.get('lng'),
                d.get('busiNm'), d.get('year'), d.get('zcode'), d.get('zscode'),
                d.get('useTime'), d.get('parkingFree'), d.get('limitYn'),
                d.get('limitDetail'), d.get('note'), d.get('delYn'), d.get('statUpdDt')
            ))

            cursor.execute(charger_sql, (
                d.get('statId'), d.get('chgerId'), d.get('chgerType'), d.get('stat'), d.get('output')
            ))

        conn.commit()
        print(f"✅ {len(station_charger_list)}건 데이터 삽입 완료!")

    except Error as e:
        print(f"[DB 오류] {e}")
    finally:
        if conn.is_connected():
            cursor.close()
            conn.close()


# ✅ 실행

# 🔁 Step 1: 정제
cleaned_data = parse_charger_items(all_chargers)

# 🔁 Step 2: DB 삽입
insert_normalized_data(cleaned_data, config)

✅ 425570건 데이터 삽입 완료!


In [None]:
import csv
import mysql.connector
from datetime import datetime

# ✅ DB 접속 설정 (공통으로 사용)
config = {
    'host': 'localhost',
    'user': 'skn14',
    'password': 'skn14',
    'database': 'projectdb'
}

def insert_ev_latest_per_year(csv_path, config):
    """
    연도별 최신일 기준 전기차 등록 수를 추출하여 ev_registered_yearly 테이블에 저장
    """
    # {(zname, year): (latest_date, vehicle_count)} 형태의 딕셔너리
    latest_data = {}

    with open(csv_path, newline='', encoding='utf-8') as f:
        reader = csv.DictReader(f)
        for row in reader:
            date = datetime.strptime(row['date'], "%Y-%m-%d")
            year = date.year

            for region in row:
                if region == 'date':
                    continue

                zname = region
                count = int(row[region]) if row[region] else 0

                key = (zname, year)
                if key not in latest_data or date > latest_data[key][0]:
                    latest_data[key] = (date, count)

    try:
        conn = mysql.connector.connect(**config)
        cursor = conn.cursor()

        create_table_sql = """
        CREATE TABLE IF NOT EXISTS ev_registered_yearly (
            id INT AUTO_INCREMENT PRIMARY KEY,
            zname VARCHAR(20),
            year INT,
            vehicle_count INT,
            UNIQUE (zname, year)
        )
        """
        cursor.execute(create_table_sql)

        insert_sql = """
        INSERT INTO ev_registered_yearly (zname, year, vehicle_count)
        VALUES (%s, %s, %s)
        ON DUPLICATE KEY UPDATE vehicle_count = VALUES(vehicle_count)
        """

        for (zname, year), (latest_date, count) in latest_data.items():
            cursor.execute(insert_sql, (zname, year, count))

        conn.commit()
        print("✅ 연도별 전기차 등록 수 데이터 삽입 완료!")

    except mysql.connector.Error as err:
        print(f"[DB 오류] {err}")
    finally:
        if conn.is_connected():
            cursor.close()
            conn.close()

# ✅ 실행
if __name__ == "__main__":
    insert_ev_latest_per_year("ev_vehicle_counts.csv", config)