## 설명
* '만개의레시피'에서 제공한 무료 레시피 데이터 전처리

## 전처리
* 2024년 데이터에만 있는 RCP_IMG_URL(레시피 이미지) 컬럼 삭제
* 불필요한 FIRST_REG_DT(최초등록일시), RGTR_ID(등록자ID), RGTR_NM(등록자명) 컬럼 삭제
* 내용이 다소 많은 CKG_IPDC(요리소개), RCP_TTL(레시피제목) 컬럼 삭제

## revision
* 1 : 3개 파일 병합. 불필요 컬럼 삭제 - RCP_IMG_URL(레시피 이미지), FIRST_REG_DT(최초등록일시), RGTR_ID(등록자ID), RGTR_NM(등록자명)
* 2 : 불필요 컬럼 삭제 - CKG_IPDC(요리소개), RCP_TTL(레시피제목)
* 3 : 요리재료내용 (CKG_MTRL_CN) 에서 재료수 (MATERIAL_CNT), 양념수 (SEASONING_CNT) 추출

In [1]:
import pandas as pd
import chardet
import time
from datetime import datetime
import math

함수 : CSV 파일을 읽어서 지정된 불필요 컬럼(RCP_IMG_URL, FIRST_REG_DT, RGTR_ID, RGTR_NM, CKG_IPDC, RCP_TTL) 삭제

In [2]:
def proc_csv_remove_needless_column(input_filename: str, output_filename: str) -> None:
    """
    CSV 파일을 읽어서 지정된 컬럼들을 삭제한 후 새로운 파일로 저장하는 함수

    Parameters:
    input_filename (str): 읽을 CSV 파일의 경로
    output_filename (str): 저장할 CSV 파일의 경로

    Returns:
    None
    """
    try:
        # CSV 파일의 인코딩 정보 감지
        with open(input_filename, 'rb') as rawdata:
            result = chardet.detect(rawdata.read(100000))  # 처음 100KB 읽어서 인코딩 확인
        csv_enc = result['encoding']

        if csv_enc is None:
            raise ValueError("CSV 파일의 인코딩을 감지할 수 없습니다.")

        # CSV 파일 읽기
        df = pd.read_csv(input_filename, encoding=csv_enc)

        # 지정된 컬럼 삭제
        columns_to_drop = ['RCP_IMG_URL', 'FIRST_REG_DT', 'RGTR_ID', 'RGTR_NM', 'CKG_IPDC', 'RCP_TTL']
        df = df.drop(columns=columns_to_drop, errors='ignore')

        # 결과를 새로운 CSV 파일로 저장
        df.to_csv(output_filename, index=False, encoding='utf-8-sig')
        print(f"파일이 성공적으로 처리되어 {output_filename}로 저장되었습니다.")

    except FileNotFoundError:
        print(f"에러: {input_filename} 파일을 찾을 수 없습니다.")
    except Exception as e:
        print(f"에러 발생: {str(e)}")

함수 : CSV 파일 병합

In [3]:
def merge_csv_files(csv_files, output_filename):
    """
    여러 CSV 파일을 하나로 병합하는 함수

    Args:
        csv_files (list): 병합할 CSV 파일 경로 목록
        output_filename (str): 병합 결과를 저장할 파일 경로
    """
    try:
        # 빈 DataFrame 생성
        merged_df = pd.DataFrame()

        # 각 CSV 파일을 읽어서 병합
        for file in csv_files:
            try:
                df = pd.read_csv(file, encoding='utf-8-sig')
                merged_df = pd.concat([merged_df, df], ignore_index=True)
            except FileNotFoundError:
                print(f"경고: 파일 '{file}'을 찾을 수 없습니다. 병합에서 제외합니다.")
            except pd.errors.ParserError:
                print(f"경고: 파일 '{file}'을 파싱하는데 문제가 발생했습니다. 병합에서 제외합니다.")

        # 병합된 DataFrame을 새로운 CSV 파일로 저장
        merged_df.to_csv(output_filename, index=False, encoding='utf-8-sig')
        print(f"파일이 성공적으로 병합되어 '{output_filename}'로 저장되었습니다.")

    except Exception as e:
        print(f"에러 발생: {str(e)}")

메인 실행 : 업로드된 3개 파일에 대해 불필요한 컬럼 삭제 후 병합

In [4]:
def main():
    """
    메인 실행 함수
    """
    try:
        start_time = datetime.now()
        print(f"수집 시작: {start_time}")

        # 2022년도 레시피 처리
        input_file = "TB_RECIPE_SEARCH-220701-utf8.csv"
        output_file = "TB_RECIPE_2022.csv"

        proc_csv_remove_needless_column(input_file, output_file)

        # 2023년도 레시피 처리
        input_file = "TB_RECIPE_SEARCH-20231130-utf8.csv"
        output_file = "TB_RECIPE_2023.csv"

        proc_csv_remove_needless_column(input_file, output_file)

        # 2024년도 레시피 처리
        input_file = "TB_RECIPE_SEARCH_241226-utf8.csv"
        output_file = "TB_RECIPE_2024.csv"

        proc_csv_remove_needless_column(input_file, output_file)

        merge_csv_files(["TB_RECIPE_2022.csv", "TB_RECIPE_2023.csv", "TB_RECIPE_2024.csv"], "TB_RECIPE.csv")

        ##### 통계 출력
        end_time = datetime.now()
        duration = end_time - start_time

        print(f"\n처리 완료!")
        print(f"시작 시간: {start_time}")
        print(f"종료 시간: {end_time}")
        print(f"소요 시간: {duration}")
        print(f"저장 파일명: {output_file}")

    except Exception as e:
        print(f"프로그램 실행 중 오류 발생: {str(e)}")

if __name__ == "__main__":
    main()

수집 시작: 2025-01-08 12:13:17.435704
파일이 성공적으로 처리되어 TB_RECIPE_2022.csv로 저장되었습니다.
파일이 성공적으로 처리되어 TB_RECIPE_2023.csv로 저장되었습니다.
파일이 성공적으로 처리되어 TB_RECIPE_2024.csv로 저장되었습니다.
파일이 성공적으로 병합되어 'TB_RECIPE.csv'로 저장되었습니다.

처리 완료!
시작 시간: 2025-01-08 12:13:17.435704
종료 시간: 2025-01-08 12:13:32.810587
소요 시간: 0:00:15.374883
저장 파일명: TB_RECIPE_2024.csv


함수 : 여러 재료와 양념이 기재된 문자열에서 재료 수와 양념 수 카운팅

In [5]:
# prompt: 문자열을 입력받아 '[재료]' 단어부터 '[양념]' 단어 사이에 있는 문자열에서 '|'로 구분된 단어수를 'material_cnt'라는 딕셔너리 항목으로
# '[양념]' 단어 뒤 문자열에서 '|' 로 구분된 단어수를 'seasoning_cnt' 라는 딕셔너리 항목으로 반환하는 함수 작성

def count_material_and_seasoning(str_value):
    """
    입력 문자열에서 재료와 양념의 개수를 계산하는 함수

    Args:
        str_value: 분석할 문자열

    Returns:
        재료와 양념의 개수를 담은 딕셔너리
    """
    try:
        if len(str(str_value)) == 0:
            return {'material_cnt': 0, 'seasoning_cnt': 0}

        start_idx = str_value.find('[재료]')  # '[재료]' 문자열의 끝 위치를 시작으로

        if start_idx == -1:
            start_idx = 0
            end_idx = str_value.find('[')    # 첫 '[' 문자열의 시작 위치
        else:
            start_idx += 4
            end_idx = str_value.find('[', start_idx)     # 4번째 뒤 '[' 문자열의 시작 위치

        if end_idx == -1:
            end_idx = len(str_value)

        # print('len(text): ', len(str_value))
        # print('start_idx: ', start_idx)
        # print('end_idx: ', end_idx)

        material_part = str_value[start_idx:end_idx].strip()
        seasoning_part = str_value[end_idx:len(str_value)].strip()

        # print(f'material_part: \'{material_part}\'')
        # print(f'seasoning_part: \'{seasoning_part}\'')

        if '|' in material_part:
            material_cnt = len(material_part.split('|'))
        elif '|' not in material_part:
            if len(material_part) > 0:
                material_cnt = 1
            else:
                material_cnt = 0
        else:
            material_cnt = 0

        if len(seasoning_part) == 0:
            seasoning_cnt = 0
        elif seasoning_part.count('[') > 1:
            seasoning_cnt = seasoning_part.count('[') + seasoning_part.count('|')
        else:
            seasoning_cnt = len(seasoning_part.split('|'))

#        if '|' in seasoning_part:
#            seasoning_cnt = len(seasoning_part.split('|'))
#        elif '|' not in seasoning_part:
#            if len(seasoning_part) > 0:
#                if seasoning_part.count('[') > 1:
#                    # 각 양념 파트가 1개씩 존재하는 경우 '[패티양념] 후추 [소스] 소금'
#                    seasoning_cnt = seasoning_part.count('[')
#                else:
#                    seasoning_cnt = 1
#            else:
#                seasoning_cnt = 0
#        else:
#            seasoning_cnt = 0

        return {'material_cnt': material_cnt, 'seasoning_cnt': seasoning_cnt}

    except ValueError:
        return {'material_cnt': 0, 'seasoning_cnt': 0}

### 문자열 파싱 테스트

In [6]:
# [재료] 가래떡 1개| 호두 15알 정도 [양념] 깨 3T
# [재료] 가래떡 1개| 호두 15알 정도 [양념] 깨 3T | 간장
# [재료] 모닝빵 3개| 새우 10마리 [패티양념] 다진양파 1/2개| 빵가루| 밀가루| 계란 [소스] 소금| 후추
counts = count_material_and_seasoning('[재료] 모닝빵 3개| 새우 10마리 [패티양념] 다진양파 1/2개| 빵가루| 밀가루| 계란 [소스] 소금| 후추')
print('material_cnt:', counts['material_cnt'])
print('seasoning_cnt', counts['seasoning_cnt'])

material_cnt: 2
seasoning_cnt 6


추가 실행 : 재료 수와 양념 수 센 뒤 컬럼 추가

In [8]:
# prompt: TB_RECIPE.csv 파일의 CKG_MTRL_CN 컬럼 문자열을 위 count_material_and_seasoning 함수에 사용한 결과 딕셔너리 중
# seasoning_cnt 는 SEASONING_CNT 컬럼에 저장
# material_cnt 는 MATERIAL_CNT 컬럼에 저장
# 하여 새로운 TB_RECIPE_CAT.csv 파일로 저장

def process_and_save_recipe_data(input_file, output_file):
    """
    TB_RECIPE.csv 파일을 읽어 CKG_MTRL_CN 컬럼을 처리하고 새로운 CSV 파일을 생성합니다.
    """
    try:
        df = pd.read_csv(input_file, encoding='utf-8-sig')

        # 새로운 컬럼 추가
        df['MATERIAL_CNT'] = 0
        df['SEASONING_CNT'] = 0

        # CKG_MTRL_CN 컬럼의 각 값에 대해 함수 실행
        for index, row in df.iterrows():
            if not pd.isna(row['CKG_MTRL_CN']):  # NaN 값 확인 추가
              try:
                  counts = count_material_and_seasoning(row['CKG_MTRL_CN'])
                  df.loc[index, 'MATERIAL_CNT'] = counts['material_cnt']
                  df.loc[index, 'SEASONING_CNT'] = counts['seasoning_cnt']
              except Exception as e:
                  print(f"Error processing row {index}: {e}")
                  df.loc[index, 'MATERIAL_CNT'] = 0
                  df.loc[index, 'SEASONING_CNT'] = 0
            else:
              df.loc[index, 'MATERIAL_CNT'] = 0
              df.loc[index, 'SEASONING_CNT'] = 0

        df.to_csv(output_file, index=False, encoding='utf-8-sig')
        print(f"파일이 성공적으로 처리되어 {output_file}로 저장되었습니다.")
    except FileNotFoundError:
        print(f"에러: {input_file} 파일을 찾을 수 없습니다.")
    except Exception as e:
        print(f"에러 발생: {str(e)}")


def main():
    # ... (기존 main 함수 코드)

    # 새로운 함수 호출
    process_and_save_recipe_data("TB_RECIPE.csv", "TB_RECIPE_CAT.csv")

if __name__ == "__main__":
    main()

파일이 성공적으로 처리되어 TB_RECIPE_CAT.csv로 저장되었습니다.
