In [1]:
import schedule
import time
from datetime import datetime, timedelta
import mysql.connector
import requests
import os
from dotenv import load_dotenv

# .env 파일 로드
load_dotenv()

# 환경 변수에서 설정값 가져오기
DB_HOST = os.getenv("DB_HOST")
DB_PORT = int(os.getenv("DB_PORT", 3306))
DB_USER = os.getenv("DB_USER")
DB_PASSWORD = os.getenv("DB_PASSWORD")
DB_NAME = os.getenv("DB_NAME")
SERVICE_KEY = os.getenv("SERVICE_KEY")

# MySQL 연결 객체
def create_connection():
    """MySQL 연결을 생성하고 반환."""
    try:
        return mysql.connector.connect(
            host=DB_HOST,
            port=DB_PORT,
            user=DB_USER,
            password=DB_PASSWORD,
            database=DB_NAME,
            pool_name="mypool",
            pool_size=5
        )
    except mysql.connector.Error as e:
        log_error(f"Error connecting to MySQL: {e}")
        return None

conn = None

def ensure_connection():
    global conn
    if conn is None or not conn.is_connected():
        print("Database connection is not active. Attempting to reconnect...")
        conn = create_connection()
        if conn and conn.is_connected():
            print("Reconnected to the database.")
        else:
            print("Failed to reconnect to the database.")

# 에러 로깅
def log_error(message):
    with open("error_log.txt", "a") as log_file:
        log_file.write(f"{datetime.now()} - {message}\n")

# API에서 데이터 가져오기
def fetch_api_data():
    url = "http://marineweather.nmpnt.go.kr:8001/openWeatherNow.do"
    params = {
        "serviceKey": SERVICE_KEY,
        "resultType": "json",
        "mmaf": "101",
        "mmsi": "994401597",
        "dataType": "2"
    }

    try:
        response = requests.get(url, params=params)
        response.raise_for_status()
        return response.json()
    except requests.RequestException as e:
        log_error(f"Failed to fetch API data: {e}")
        return None

# 데이터 삽입 또는 업데이트
def upsert_data(data):
    try:
        ensure_connection()

        insert_query = """
        INSERT INTO oceandatatest (
            datetime, mmaf_code, mmaf_name, mmsi_code, mmsi_name, wind_direct, wind_speed,
            surface_curr_drc, surface_curr_speed, air_temperature, humidity, air_pressure,
            water_temperature, salinity, latitude, longitude
        )
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
        ON DUPLICATE KEY UPDATE
            wind_direct = VALUES(wind_direct),
            wind_speed = VALUES(wind_speed),
            surface_curr_drc = VALUES(surface_curr_drc),
            surface_curr_speed = VALUES(surface_curr_speed),
            air_temperature = VALUES(air_temperature),
            humidity = VALUES(humidity),
            air_pressure = VALUES(air_pressure),
            water_temperature = VALUES(water_temperature),
            salinity = VALUES(salinity);
        """

        with conn.cursor() as cursor:
            if "result" in data and "recordset" in data["result"]:
                for entry in data["result"]["recordset"]:

                    cursor.execute(insert_query, (
                        datetime.strptime(entry["DATETIME"], "%Y%m%d%H%M%S"),
                        entry["MMAF_CODE"],
                        entry["MMAF_NM"],
                        entry["MMSI_CODE"],
                        entry["MMSI_NM"],
                        float(entry["WIND_DIRECT"]) if entry["WIND_DIRECT"] != "미제공" else None,
                        float(entry["WIND_SPEED"]) if entry["WIND_SPEED"] != "미제공" else None,
                        float(entry["SURFACE_CURR_DRC"]) if entry["SURFACE_CURR_DRC"] != "미제공" else None,
                        float(entry["SURFACE_CURR_SPEED"]) if entry["SURFACE_CURR_SPEED"] != "미제공" else None,
                        float(entry["AIR_TEMPERATURE"]) if entry["AIR_TEMPERATURE"] != "미제공" else None,
                        float(entry["HUMIDITY"]) if entry["HUMIDITY"] != "미제공" else None,
                        float(entry["AIR_PRESSURE"]) if entry["AIR_PRESSURE"] != "미제공" else None,
                        float(entry["WATER_TEMPER"]) if entry["WATER_TEMPER"] != "미제공" else None,
                        float(entry["SALINITY"]) if entry["SALINITY"] != "미제공" else None,
                        round(float(entry["LATITUDE"]), 5),
                        round(float(entry["LONGITUDE"]), 5)
                    ))
                conn.commit()
        print("Data successfully inserted or updated.")
    except mysql.connector.Error as e:
        log_error(f"MySQL error: {e}")
    except Exception as e:
        log_error(f"Unexpected error: {e}")

# 작업 스케줄링
def register_task():
    if not any(job.job_func == scheduled_task for job in schedule.jobs):
        schedule.every(10).minutes.do(scheduled_task)
        print("Task registered successfully.")
    else:
        print("Task already registered.")

def scheduled_task():
    print(f"Running scheduled task at {datetime.now()}")
    api_data = fetch_api_data()
    if api_data:
        upsert_data(api_data)

# 실행
if __name__ == "__main__":
    ensure_connection()

    print("Fetching initial data...")
    api_data = fetch_api_data()
    if api_data:
        upsert_data(api_data)

    register_task()

    print("Scheduler is running... Press Ctrl+C to stop.")
    try:
        while True:
            schedule.run_pending()
            time.sleep(1)
    except KeyboardInterrupt:
        print("Scheduler stopped. Closing database connection...")
        if conn and conn.is_connected():
            conn.close()
            print("Database connection closed.")


Database connection is not active. Attempting to reconnect...
Reconnected to the database.
Fetching initial data...
Data successfully inserted or updated.
Task registered successfully.
Scheduler is running... Press Ctrl+C to stop.
Running scheduled task at 2025-01-20 13:33:44.273015
Data successfully inserted or updated.
Running scheduled task at 2025-01-20 13:43:44.698851
Data successfully inserted or updated.
Running scheduled task at 2025-01-20 13:53:45.132946
Data successfully inserted or updated.
Running scheduled task at 2025-01-20 14:03:45.554066
Data successfully inserted or updated.
Running scheduled task at 2025-01-20 14:13:46.115126
Data successfully inserted or updated.
Running scheduled task at 2025-01-20 14:23:46.536909
Data successfully inserted or updated.
Running scheduled task at 2025-01-20 14:33:47.137230
Data successfully inserted or updated.
Running scheduled task at 2025-01-20 14:43:47.564834
Data successfully inserted or updated.
Running scheduled task at 2025-01

In [2]:
import schedule

In [3]:
print(schedule.jobs)


[]
