In [1]:
item_nm = '방울토마토'

In [2]:
import pandas as pd
import glob
import os

# 기본 파일 경로
base_path = f"{item_nm}/유통공사_도매시장_{item_nm}.csv"

# retry 파일 목록 수집
retry_paths = glob.glob(f"{item_nm}/유통공사_retry_{item_nm}_*.csv")

# 기본 파일 읽기
df_list = [pd.read_csv(base_path, encoding="cp949", low_memory=False)]

# retry 파일들 읽기 (있을 경우에만)
for path in retry_paths:
    df_list.append(pd.read_csv(path, encoding="cp949", low_memory=False))

# 병합
df = pd.concat(df_list, ignore_index=True)

# 중복 제거 (전체 행 기준 또는 주요 열 기준)
df = df.drop_duplicates()

# plor_cd가 문자열이 아닐 가능성 대비
df['plor_cd'] = df['plor_cd'].fillna('').astype(str)

pattern = r'^[^0-9]+$'

condition = (
    df['totprc'].isna() | (df['totprc'] <= 0) |
    df['unit_tot_qty'].isna() | (df['unit_tot_qty'] <= 0) |
    df['plor_cd'].str.strip().isin(['0', '0.0']) |
    df['plor_cd'].str.match(pattern, na=False) |
    df['plor_nm'].isna() |
    (df['plor_nm'] == 0)
)

# 조건에 해당하는 행 추출
df_filtered = df[~condition]

# 결과 저장
df_filtered.to_csv(f"{item_nm}/유통공사_도매시장_{item_nm}_결측치제거.csv", index=False, encoding="cp949")

In [3]:
import pandas as pd

# 1. 파일 경로 설정
input_path = f"{item_nm}/유통공사_도매시장_{item_nm}_결측치제거.csv"
output_path = f"{item_nm}/유통공사_도매시장_{item_nm}_한글컬럼명.csv"

# 2. CSV 전체를 문자열로 불러오기 (최초 경고 방지)
df = pd.read_csv(input_path, encoding='cp949', dtype=str)

# 3. 한글 컬럼명 설정
new_columns = [
    '평균가격(원)', '법인코드', '법인이름', '상품 대분류 코드', '상품 대분류 이름',
    '상품 중분류 코드', '상품 중분류 이름', '상품 소분류 코드', '상품 소분류 이름',
    '등급코드', '등급이름', '최고가(원)', '최저가(원)', '포장코드', '포장이름',
    '산지코드', '산지이름', '크기코드', '크기이름', '총가격(원)', '날짜(YYYY-MM-DD)',
    '매매구분', '단위코드', '단위(kg)', '단위물량(kg)', '단위총물량(kg)', '도매시장코드', '도매시장이름'
]

if len(df.columns) != len(new_columns):
    raise ValueError("컬럼 수가 일치하지 않습니다.")

df.columns = new_columns

# 4. 날짜 컬럼 변환
df['날짜(YYYY-MM-DD)'] = pd.to_datetime(df['날짜(YYYY-MM-DD)'], errors='coerce')

# 5. 숫자 컬럼 변환
numeric_columns = ['평균가격(원)', '최고가(원)', '최저가(원)', '총가격(원)', '단위물량(kg)', '단위총물량(kg)']
for col in numeric_columns:
    df[col] = pd.to_numeric(df[col], errors='coerce')

# 6. 혼합 타입 경고 컬럼 처리: NaN을 빈 문자열로, 모두 문자열화
for col in ['포장이름', '산지이름', '크기이름']:
    df[col] = df[col].fillna('').astype(str)

# 7. CSV 저장 (완전 정리된 상태)
df.to_csv(output_path, index=False, encoding='cp949')

In [4]:
import pandas as pd

# 파일 경로
input_path = f"{item_nm}/유통공사_도매시장_{item_nm}_한글컬럼명.csv"
output_path = f"{item_nm}/유통공사_{item_nm}_요약데이터.csv"

# CSV 파일 불러오기
df = pd.read_csv(input_path, encoding='cp949')

selected_columns = ['날짜(YYYY-MM-DD)', '상품 중분류 이름', '등급이름', '총가격(원)', '단위총물량(kg)', '산지코드', '산지이름']
df_selected = df[selected_columns]

# 새 CSV로 저장
df_selected.to_csv(output_path, index=False, encoding='cp949')

  df = pd.read_csv(input_path, encoding='cp949')


In [5]:
import pandas as pd
import re

# CSV 파일 로드
df = pd.read_csv(f"{item_nm}/유통공사_{item_nm}_요약데이터.csv", encoding='cp949')

# 상품 중분류 이름 결측치 처리
df['상품 중분류 이름'] = df['상품 중분류 이름'].replace('', pd.NA).fillna(item_nm)

# 산지코드 정제 함수 (문자 → 0, 앞자리 0 제거, 6자리 맞춤)
def clean_origin_code_backpad(code):
    code_str = str(code).strip()
    replaced = re.sub(r'\D', '0', code_str)      # 문자 → 0
    stripped = replaced.lstrip('0')              # 앞쪽 0 제거
    if not stripped:  # 모두 0이거나 제거 결과 없음
        stripped = '0'
    return stripped[:6].ljust(6, '0')            # 앞 6자리 + 뒤 0 패딩

# 적용
df['산지코드'] = df['산지코드'].apply(clean_origin_code_backpad)

# 저장
output_path = f"{item_nm}/유통공사_{item_nm}_요약데이터_정제.csv"
df.to_csv(output_path, index=False, encoding='cp949')

print(f"정제된 데이터가 저장되었습니다: {output_path}")

  df = pd.read_csv(f"{item_nm}/유통공사_{item_nm}_요약데이터.csv", encoding='cp949')


정제된 데이터가 저장되었습니다: 방울토마토/유통공사_방울토마토_요약데이터_정제.csv


In [6]:
df = pd.read_csv(f"{item_nm}/유통공사_{item_nm}_요약데이터_정제.csv", encoding = 'cp949')

df.isna().sum()

날짜(YYYY-MM-DD)    0
상품 중분류 이름         0
등급이름              0
총가격(원)            0
단위총물량(kg)         0
산지코드              0
산지이름              0
dtype: int64

In [7]:
import pandas as pd

# 파일 경로 설정
file_path = f"{item_nm}/유통공사_{item_nm}_요약데이터_정제.csv"

# CSV 파일 읽기 (경고 방지를 위해 low_memory=False 사용)
df = pd.read_csv(file_path, encoding="cp949", low_memory=False)

# 산지코드가 문자열로 처리되도록 변환
df['산지코드'] = df['산지코드'].fillna('').astype(str)

# 필터링 조건: totprc 또는 unit_tot_qty가 NaN이거나 0인 경우
pattern = r'^[^0-9]+$'

condition = (
    (df['산지코드'] == '') |
    (df['산지코드'].str.strip() == "0") |
    (df['산지코드'].str.strip() == "0.0")
#     df['plor_cd'].str.match(pattern, na=False)) 
#     (df['plor_nm'].isna() | (df['plor_nm'] == 0))
)

# 조건에 해당하는 행 추출
df_filtered = df[condition]

# 결과 저장
df_filtered.to_csv(f"{item_nm}/결측치_{item_nm}.csv", index=False, encoding="cp949")

In [8]:
import pandas as pd

# 1. 파일 경로 설정
summary_path = f"{item_nm}/유통공사_{item_nm}_요약데이터_정제.csv"
region_code_path = "표준코드/산지코드_직팜.csv"

# 2. CSV 불러오기
summary_df = pd.read_csv(summary_path, encoding='cp949')
region_df = pd.read_csv(region_code_path, encoding='cp949')

# 3. 산지코드 범위 파싱 함수
def parse_code_range(code_str):
    if "~" in code_str:
        start, end = code_str.split("~")
    else:
        start = end = code_str
    return int(start), int(end)

# 4. 산지코드 범위 숫자 컬럼 생성
region_df[['start_code', 'end_code']] = region_df['산지코드'].apply(
    lambda x: pd.Series(parse_code_range(x))
)

# 5. 산지코드 → 직팜산지코드/이름 매핑 테이블 확장
expanded_rows = []
for _, row in region_df.iterrows():
    for code in range(row['start_code'], row['end_code'] + 1):
        expanded_rows.append({
            '산지코드': code,
            '직팜산지코드': row['직팜산지코드'],
            '직팜산지이름': row['직팜산지이름']
        })
expanded_map_df = pd.DataFrame(expanded_rows)

# 6. 요약 데이터 산지코드 정수형으로 변환
summary_df['산지코드'] = pd.to_numeric(summary_df['산지코드'], errors='coerce').astype('Int64')
summary_df = summary_df[summary_df['산지코드'].notna()].copy()
summary_df['산지코드'] = summary_df['산지코드'].astype(int)

# 7. 직팜 컬럼 제거 후 병합
summary_df = summary_df.drop(columns=['직팜산지코드', '직팜산지이름'], errors='ignore')
summary_df = pd.merge(summary_df, expanded_map_df, on='산지코드', how='left')

# 8. 산지이름이 누락된 경우 직팜산지이름으로 보완
summary_df['산지이름'] = summary_df['산지이름'].fillna(summary_df['직팜산지이름'])


# 9. 결과 저장
summary_df.to_csv(f"{item_nm}/{item_nm}요약데이터_직팜산지정리.csv", index=False, encoding='cp949')

In [9]:
df = pd.read_csv(f"{item_nm}/{item_nm}요약데이터_직팜산지정리.csv", encoding = 'cp949')

df.isna().sum()

날짜(YYYY-MM-DD)    0
상품 중분류 이름         0
등급이름              0
총가격(원)            0
단위총물량(kg)         0
산지코드              0
산지이름              0
직팜산지코드            0
직팜산지이름            0
dtype: int64

In [10]:
import pandas as pd

# CSV 파일 로드
df = pd.read_csv(f"{item_nm}/{item_nm}요약데이터_직팜산지정리.csv", encoding='cp949')

# 등급이름이 NaN 또는 공백이거나 '.'인 경우 '보통'으로 대체
df['등급이름'] = df['등급이름'].replace('.', '보통').fillna('보통')
df['등급이름'] = df['등급이름'].replace(r'^\s*$', '보통', regex=True)

# '특', '상', '보통' 외의 값은 모두 '하'로 변경
df['등급이름'] = df['등급이름'].apply(lambda x: x if x in ['특', '상', '보통'] else '하')

# 저장
output_path = f"{item_nm}/{item_nm}요약데이터_직팜정리.csv"
df.to_csv(output_path, index=False, encoding='cp949')

print(f"등급이름 수정 완료: {output_path}")

등급이름 수정 완료: 방울토마토/방울토마토요약데이터_직팜정리.csv


In [11]:
import pandas as pd

# 파일 경로와 인코딩으로 데이터 불러오기
df = pd.read_csv(f"{item_nm}/{item_nm}요약데이터_직팜정리.csv", encoding='cp949')

# 결측치가 하나라도 있는 행만 추출
df_na = df[df.isna().any(axis=1)]

# 결과 저장
output_path = f"{item_nm}/{item_nm}_결측치포함행.csv"
df_na.to_csv(output_path, index=False, encoding='cp949')

print(f"결측치 포함 행 {len(df_na)}개를 '{output_path}'로 저장했습니다.")

결측치 포함 행 0개를 '방울토마토/방울토마토_결측치포함행.csv'로 저장했습니다.


In [23]:
# # 누락 날짜 확인

# import pandas as pd

# # 1. 날짜 범위 생성
# full_dates = pd.date_range(start="2018-01-03", end="2025-05-31")

# # 2. CSV 파일에서 날짜 컬럼 읽기
# df = pd.read_csv(f"{item_nm}/{item_nm}요약데이터_직팜정리.csv", encoding='cp949')


# # 3. 날짜 컬럼 파싱 (예: 컬럼명이 '날짜'일 경우)
# df['날짜(YYYY-MM-DD)'] = pd.to_datetime(df['날짜(YYYY-MM-DD)'])

# # 4. 존재하는 날짜 목록
# existing_dates = pd.Series(df['날짜(YYYY-MM-DD)'].unique())

# # 5. 누락 날짜 계산
# missing_dates = full_dates[~full_dates.isin(existing_dates)]

# # 6. 결과 DataFrame (컬럼명 지정)
# missing_df = pd.DataFrame({'날짜(YYYY-MM-DD)': missing_dates.strftime("%Y-%m-%d")})

# # 7. CSV 저장 (cp949 인코딩)
# missing_df.to_csv(f"{item_nm}/누락날짜_{item_nm}.csv", index=False, encoding="cp949")

# print(f"[완료] 누락된 날짜 {len(missing_df)}건 → 누락날짜_{item_nm}.csv 저장됨")

[완료] 누락된 날짜 694건 → 누락날짜_배추.csv 저장됨


In [24]:
# # 실패 로그 기반 데이터 재수집
# import os
# import pandas as pd
# import requests
# import time
# from datetime import datetime
# from tqdm import tqdm
# import ssl
# import warnings
# import xml.etree.ElementTree as ET
# from requests.packages.urllib3.exceptions import InsecureRequestWarning
# from dotenv import load_dotenv

# # 환경 및 경고 설정
# load_dotenv()
# warnings.filterwarnings('ignore', category=InsecureRequestWarning)
# ssl._create_default_https_context = ssl._create_unverified_context

# # 상수
# API_KEY = os.getenv("DO_API_KEY")
# BASE_URL = 'http://apis.data.go.kr/B552845/katSale/trades'
# item_nm = '배추'
# ITEM_CODES = {f"{item_nm}": "1001"}
# max_retries = 2  # 재시도 최대 횟수 (1회 시도 + 0회 재시도)

# # 디렉토리 준비
# os.makedirs("logs", exist_ok=True)
# os.makedirs("data", exist_ok=True)
# os.makedirs("success", exist_ok=True)

# # 도매시장 코드 불러오기
# df_market = pd.read_csv("표준코드/도매시장_코드.csv", encoding="cp949", header=None)
# df_market[0] = df_market[0].astype(str)

# # 실패 로그 불러오기
# fail_df = pd.read_csv(f"{item_nm}/누락날짜_{item_nm}.csv", encoding="cp949")
# fail_dates = pd.to_datetime(fail_df['날짜(YYYY-MM-DD)']).dt.strftime("%Y-%m-%d")

# # 도매시장 리스트
# market_list = df_market[[0, 1]].values.tolist()  # (시장코드, 시장명)

# for item_name, code in ITEM_CODES.items():
#     LARGE = code[:2]
#     MID = code[2:]
#     data_list = []
#     cnt =0

#     print(f"\n📦 실패 항목 재시도 시작: {item_name}")
#     for date_str in tqdm(fail_dates, desc="재시도 날짜 진행"):
#         for mcode, market_name in market_list:
#             retry_count = 0
#             market_success = False

#             while retry_count < max_retries:
#                 page_no = 1
#                 cnt += 1
#                 try:
#                     while True:
#                         print(f"▶️ 요청 시도: {item_name} | 시장코드: {mcode} | 날짜: {date_str} | 페이지: {page_no} | 재시도: {retry_count + 1}")

#                         params = {
#                             'serviceKey': API_KEY,
#                             'pageNo': page_no,
#                             'numOfRows': 100,
#                             'cond[trd_clcln_ymd::EQ]': date_str,
#                             'cond[whsl_mrkt_cd::EQ]': mcode,
#                             'cond[gds_lclsf_cd::EQ]': LARGE,
#                             'cond[gds_mclsf_cd::EQ]': MID
#                         }

#                         response = requests.get(BASE_URL, params=params, verify=False, timeout=10)
#                         content_type = response.headers.get("Content-Type", "")
#                         time.sleep(1.0)
#                         response_preview = response.text[:500].strip()

#                         # 오류 체크
#                         if "LIMITED_" in response_preview:
#                             fail_reason = "❌ API 호출 제한 (LIMITED_ 응답)"
#                         elif "SERVICE ERROR" in response_preview:
#                             fail_reason = "❌ 서비스 오류 (SERVICE ERROR 응답)"
#                         elif "ERROR" in response_preview.upper():
#                             fail_reason = "❌ 기타 오류 포함 (ERROR 키워드 포함)"
#                         elif "TOO MANY REQUESTS" in response_preview.upper():
#                             fail_reason = "❌ 요청 과다로 인한 제한 (Too Many Requests)"
#                         else:
#                             fail_reason = None

#                         if fail_reason:
#                             print(f"⛔ {fail_reason} - 재시도 대기 중 (2분)")
#                             log_prefix = f"logs/retry_failed_{item_name}_{mcode}_{date_str}"
#                             with open(f"{log_prefix}.html", "w", encoding="utf-8") as f:
#                                 f.write(response.text)
#                             with open(f"{log_prefix}_info.txt", "w", encoding="utf-8") as f:
#                                 f.write(f"[오류] {fail_reason}\n{response_preview}")
#                             retry_count += 1
#                             if retry_count >= max_retries:
#                                 print(f"❗ 최대 재시도 {max_retries}회 초과 - 중단")
#                                 break
#                             time.sleep(60)
#                             continue

#                         # 응답 파싱
#                         if "application/json" in content_type:
#                             json_data = response.json()
#                             body = json_data.get("response", {}).get("body", {})
#                             items = body.get("items", {}).get("item", [])
#                             total_count = int(body.get("totalCount", 0))

#                         elif "application/xml" in content_type or response.text.strip().startswith("<"):
#                             root = ET.fromstring(response.text)
#                             total_count_el = root.find(".//totalCount")
#                             total_count = int(total_count_el.text) if total_count_el is not None else 0
#                             item_els = root.findall(".//item")
#                             items = [{el.tag: el.text for el in item} for item in item_els]
#                         else:
#                             raise ValueError(f"알 수 없는 응답 형식: {content_type}")

#                         if not items:
#                             print("⚠️ 거래 데이터 없음")
#                             market_success = True
#                             break

#                         data_list.extend(items)

#                         if cnt % 10000 == 0:
#                             print(f"🧪 중간 저장 시도: 현재 data_list 길이 = {len(data_list)}")
#                             mid_save_path = f"{item_nm}/유통공사_retry_{item_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}_mid.csv"
#                             pd.DataFrame(data_list).to_csv(mid_save_path, encoding='cp949', index=False)
#                             print(f"💾 중간 저장 완료: {mid_save_path}")

#                         market_success = True
#                         if page_no * 100 >= total_count:
#                             print(f"✅ 마지막 페이지 도달 (totalCount: {total_count})")
#                             break
#                         if page_no > 10:
#                             print("🚨 페이지 10 초과 - 무한 루프 방지를 위해 중단")
#                             break

#                         page_no += 1
#                         time.sleep(1.0)

#                     if market_success:
#                         break
#                     else:
#                         retry_count += 1
#                         time.sleep(2 * retry_count)

#                 except Exception as e:
#                     retry_count += 1
#                     print(f"❗예외 발생: {e} (재시도 {retry_count}/{max_retries})")
#                     fail_log_prefix = f"logs/retry_failed_{item_name}_{mcode}_{date_str}_try{retry_count}"
#                     if 'response' in locals():
#                         with open(f"{fail_log_prefix}.txt", "w", encoding="utf-8") as f:
#                             f.write(response.text)
#                     with open(f"{fail_log_prefix}_info.txt", "w", encoding="utf-8") as f:
#                         f.write(f"[예외] {str(e)}\n")
#                     if retry_count >= max_retries:
#                         break
#                     time.sleep(2 * retry_count)

#             if not market_success:
#                 fail_log_path = f"logs/retry_failed_{item_name}_{mcode}_{date_str}.txt"
#                 with open(fail_log_path, "w", encoding="utf-8") as f:
#                     f.write(f"❌ {datetime.now()} - {item_name} {mcode} {date_str} 데이터 수집 실패\n")

#     # 저장
#     if data_list:
#         df = pd.DataFrame(data_list)
#         filename = f"{item_nm}/유통공사_retry_{item_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
#         df.to_csv(filename, encoding='cp949', index=False)
#         print(f"✅ 저장 완료: {filename}")
#     else:
#         print(f"⚠️ {item_name}: 재시도에서도 데이터 없음")



📦 실패 항목 재시도 시작: 배추


재시도 날짜 진행:   0%|                                                                        | 0/694 [00:00<?, ?it/s]

▶️ 요청 시도: 배추 | 시장코드: 도매시장코드 | 날짜: 2018-01-07 | 페이지: 1 | 재시도: 1
⚠️ 거래 데이터 없음
▶️ 요청 시도: 배추 | 시장코드: 110008 | 날짜: 2018-01-07 | 페이지: 1 | 재시도: 1
⚠️ 거래 데이터 없음
▶️ 요청 시도: 배추 | 시장코드: 210001 | 날짜: 2018-01-07 | 페이지: 1 | 재시도: 1


재시도 날짜 진행:   0%|                                                                        | 0/694 [00:03<?, ?it/s]

⚠️ 거래 데이터 없음
▶️ 요청 시도: 배추 | 시장코드: 210005 | 날짜: 2018-01-07 | 페이지: 1 | 재시도: 1





KeyboardInterrupt: 