## Mockdb 관련 스크립트

### Postgresql 연결 설정

In [None]:
import os
from dotenv import load_dotenv

# 데이터베이스 관련
from sqlalchemy import create_engine, text

load_dotenv('.env')

# PostgreSQL 설정 로드
PG_HOST = os.getenv('PG_HOST')
PG_PORT = os.getenv('PG_PORT')
PG_DATABASE = os.getenv('PG_DATABASE')
PG_USER = os.getenv('PG_USER')
PG_PASSWORD = os.getenv('PG_PASSWORD')

print(f"PostgreSQL 연결 정보:")
print(f"   Host: {PG_HOST}")
print(f"   Port: {PG_PORT}")

# SQLAlchemy 연결 문자열 생성
POSTGRES_URL = f"postgresql://{PG_USER}:{PG_PASSWORD}@{PG_HOST}:{PG_PORT}/{PG_DATABASE}"

print(f"\n✅ PostgreSQL 설정 로드 완료")

PostgreSQL 연결 정보:
   Host: dev-rubicon-postgresql.postgres.database.azure.com
   Port: 5432
   Database: postgres
   User: rubicon

✅ PostgreSQL 설정 로드 완료


### PostgreSQL 연결 및 테이블 생성

In [None]:
def postgresql_connection_and_create_tables():
    """PostgreSQL 연결 및 테이블 생성"""
    
    print("=" * 60)
    print("PostgreSQL 연결 및 테이블 생성")
    print("=" * 60)
    
    try:
        # SQLAlchemy 엔진 생성
        engine = create_engine(POSTGRES_URL)
        
        # 연결 테스트
        with engine.connect() as conn:
            # 기본 연결 테스트
            result = conn.execute(text("SELECT version();"))
            version = result.fetchone()[0]
            print(f"✅ PostgreSQL 연결 성공!")
            print(f"   버전: {version}")
            
            # 현재 데이터베이스 정보
            result = conn.execute(text("SELECT current_database(), current_user;"))
            db_info = result.fetchone()
            print(f"   현재 DB: {db_info[0]}")
            print(f"   사용자: {db_info[1]}")
            
            existing_tables = [row[0] for row in result.fetchall()]
            print(f"\n📊 기존 테이블 수: {len(existing_tables)}개")
            if existing_tables:
                for table in existing_tables:
                    print(f"   • {table}")
        
        with engine.connect() as conn:
            # 트랜잭션 시작
            trans = conn.begin()
            
            try:
                # 1. 고객 정보 테이블
                conn.execute(text("""
                    CREATE TABLE IF NOT EXISTS sr_merged_product (
                        disp_lv1 VARCHAR(1000),
                        disp_lv2 VARCHAR(1000),
                        disp_lv3 VARCHAR(1000),
                        product_category_lv1 VARCHAR(1000),
                        product_category_lv2 VARCHAR(1000),
                        product_category_lv3 VARCHAR(1000),
                        model_name VARCHAR(1000),
                        mdl_code VARCHAR(1000),
                        goods_id VARCHAR(1000),
                        goods_nm VARCHAR(1000),
                        color VARCHAR(1000),
                        release_date DATE,
                        aisc_yn VARCHAR(1),
                        sc_yn VARCHAR(1),
                        gc_yn VARCHAR(1),
                        div_pay_apl_yn VARCHAR(1),
                        show_yn VARCHAR(1),
                        pd_url TEXT,
                        dlgt_img_url TEXT,
                        site_cd VARCHAR(10),
                        usp_desc VARCHAR,
                        review_num INT4,
                        estm_score NUMERIC(5,2),
                        sale_prc1 NUMERIC(10),
                        sale_prc2 NUMERIC(10),
                        sale_prc3 NUMERIC(10),
                        review_content TEXT,
                        disp_clsf_nm_list VARCHAR,
                        spec JSONB,
                        on_sale TEXT,
                        web_cd_dc_amt NUMERIC(10),
                        stock_qty INT4,
                        ctg_rank_recommend INT4,
                        ctg_rank_quantity INT4,
                        ctg_rank_rating INT4,
                        cstrt_mdl_codes VARCHAR,
                        card_promotion JSONB,
                        sale_prc NUMERIC(10)
                    );
                """))
                print("   ✅ sr_merged_product 테이블 생성")
                
                
                # 인덱스 생성
                conn.execute(text("CREATE INDEX IF NOT EXISTS idx_sr_merged_product_mdl_code ON sr_merged_product(mdl_code);"))
                conn.execute(text("CREATE INDEX IF NOT EXISTS idx_sr_merged_product_site_cd ON sr_merged_product(site_cd);"))
                conn.execute(text("CREATE INDEX IF NOT EXISTS idx_sr_merged_product_mdl_site ON sr_merged_product(mdl_code, site_cd);"))
                conn.execute(text("CREATE INDEX IF NOT EXISTS idx_sr_merged_product_goods_id ON sr_merged_product(goods_id);"))
                conn.execute(text("CREATE INDEX IF NOT EXISTS idx_sr_merged_product_goods_nm ON sr_merged_product(goods_nm);"))
                print("   ✅ 인덱스 생성 완료")


                # 커멘트 생성
                conn.execute(text("COMMENT ON TABLE sr_merged_product IS '상품 통합 정보 테이블';"))
                conn.execute(text("COMMENT ON COLUMN sr_merged_product.disp_lv1 IS '전시 대분류';"))
                conn.execute(text("COMMENT ON COLUMN sr_merged_product.disp_lv2 IS '전시 중분류';"))
                conn.execute(text("COMMENT ON COLUMN sr_merged_product.disp_lv3 IS '전시 소분류';"))
                conn.execute(text("COMMENT ON COLUMN sr_merged_product.product_category_lv1 IS '카테고리 대분류';"))
                conn.execute(text("COMMENT ON COLUMN sr_merged_product.product_category_lv2 IS '카테고리 중분류';"))
                conn.execute(text("COMMENT ON COLUMN sr_merged_product.product_category_lv3 IS '카테고리 소분류';"))
                conn.execute(text("COMMENT ON COLUMN sr_merged_product.model_name IS '모델 명(모델 코드 상위 집합)';"))
                conn.execute(text("COMMENT ON COLUMN sr_merged_product.mdl_code IS '모델 코드';"))
                conn.execute(text("COMMENT ON COLUMN sr_merged_product.goods_id IS '상품 아이디';"))
                conn.execute(text("COMMENT ON COLUMN sr_merged_product.goods_nm IS '상품 명';"))
                conn.execute(text("COMMENT ON COLUMN sr_merged_product.color IS '색상';"))
                conn.execute(text("COMMENT ON COLUMN sr_merged_product.release_date IS '출시일';"))
                conn.execute(text("COMMENT ON COLUMN sr_merged_product.aisc_yn IS 'ai구독 대상 여부';"))
                conn.execute(text("COMMENT ON COLUMN sr_merged_product.sc_yn IS 'ai구독 스마트 대상 여부';"))
                conn.execute(text("COMMENT ON COLUMN sr_merged_product.gc_yn IS '갤럭시 클럽 대상 여부';"))
                conn.execute(text("COMMENT ON COLUMN sr_merged_product.div_pay_apl_yn IS '나눠서 결제 대상 여부';"))
                conn.execute(text("COMMENT ON COLUMN sr_merged_product.show_yn IS '노출 여부';"))
                conn.execute(text("COMMENT ON COLUMN sr_merged_product.pd_url IS 'pd URL';"))
                conn.execute(text("COMMENT ON COLUMN sr_merged_product.dlgt_img_url IS '대표 이미지 URL';"))
                conn.execute(text("COMMENT ON COLUMN sr_merged_product.site_cd IS '채널코드';"))
                conn.execute(text("COMMENT ON COLUMN sr_merged_product.usp_desc IS '모델 카드 3줄 장점';"))
                conn.execute(text("COMMENT ON COLUMN sr_merged_product.review_num IS '해당 모델의 리뷰 개수';"))
                conn.execute(text("COMMENT ON COLUMN sr_merged_product.estm_score IS '해당 모델의 리뷰 점수';"))
                conn.execute(text("COMMENT ON COLUMN sr_merged_product.sale_prc1 IS '해당 모델의 기준가';"))
                conn.execute(text("COMMENT ON COLUMN sr_merged_product.sale_prc2 IS '해당 모델의 회원가';"))
                conn.execute(text("COMMENT ON COLUMN sr_merged_product.sale_prc3 IS '해당 모델의 혜택가';"))
                conn.execute(text("COMMENT ON COLUMN sr_merged_product.review_content IS '각 제품에 대한 리뷰들 모음';"))
                conn.execute(text("COMMENT ON COLUMN sr_merged_product.disp_clsf_nm_list IS '전시 분류명 모음';"))
                conn.execute(text("COMMENT ON COLUMN sr_merged_product.spec IS 'json 형태의 spec 모음';"))
                conn.execute(text("COMMENT ON COLUMN sr_merged_product.on_sale IS '판매여부';"))
                conn.execute(text("COMMENT ON COLUMN sr_merged_product.web_cd_dc_amt IS '웹 쿠폰 할인 가격';"))
                conn.execute(text("COMMENT ON COLUMN sr_merged_product.stock_qty IS '상품 재고';"))
                conn.execute(text("COMMENT ON COLUMN sr_merged_product.ctg_rank_recommend IS '전시 소분류내에서 우선 추천순 순위';"))
                conn.execute(text("COMMENT ON COLUMN sr_merged_product.ctg_rank_quantity IS '전시 소분류내에서 우선 판매량순 순위';"))
                conn.execute(text("COMMENT ON COLUMN sr_merged_product.ctg_rank_rating IS '전시 소분류내에서 우선 별점순 순위';"))
                conn.execute(text("COMMENT ON COLUMN sr_merged_product.cstrt_mdl_codes IS 'set상품의 경우 구성 제품 mdl_code';"))
                conn.execute(text("COMMENT ON COLUMN sr_merged_product.card_promotion IS 'card 할인 정보';"))
                conn.execute(text("COMMENT ON COLUMN sr_merged_product.sale_prc IS '최종가격';"))
                print("   ✅ 커멘트 생성 완료")

                # 트랜잭션 커밋
                trans.commit()
                print("\n✅ 모든 테이블 생성 완료!")
                
            except Exception as e:
                trans.rollback()
                print(f"❌ 테이블 생성 실패: {e}")
                return None
        
        return engine
        
    except Exception as e:
        print(f"❌ PostgreSQL 연결 실패: {e}")
        return None

# PostgreSQL 연결 및 테이블 생성
pg_engine = postgresql_connection_and_create_tables()

PostgreSQL 연결 및 테이블 생성
✅ PostgreSQL 연결 성공!
   버전: PostgreSQL 17.5 on x86_64-pc-linux-gnu, compiled by gcc (GCC) 11.2.0, 64-bit
   현재 DB: postgres
   사용자: rubicon

📊 기존 테이블 수: 0개
   ✅ sr_merged_product 테이블 생성
   ✅ 인덱스 생성 완료
   ✅ 커멘트 생성 완료

✅ 모든 테이블 생성 완료!


### 로컬 파일의 데이터를 postgresql에 업로드

In [3]:
# 로컬 파일의 데이터를 postgresql에 업로드

import pandas as pd
import json
from typing import List, Dict, Any
import re

import psycopg2
from psycopg2.extras import execute_batch
from datetime import datetime

from sqlalchemy import Engine, text

PG_UPLOAD_FILE_PATH = os.getenv('PG_UPLOAD_FILE_PATH')

def parse_json_field(value):
    """JSON 필드 파싱 헬퍼 함수"""
    if pd.isna(value) or value == '' or value is None:
        return None
        
    # 이미 dict인 경우
    if isinstance(value, dict):
        return json.dumps(value, ensure_ascii=False)
        
    # 문자열인 경우
    if isinstance(value, str):
        value = value.strip()
        if not value:
            return None
            
        try:
            # 중괄호로 시작하는 JSON 문자열
            if value.startswith('{'):
                # 먼저 그대로 파싱 시도
                try:
                    parsed = json.loads(value)
                    return json.dumps(parsed, ensure_ascii=False)
                except:
                    # 특수한 형식 처리 (예: {key: value} 형태)
                    # JSON 형식으로 변환 시도
                    value_cleaned = re.sub(r'(\w+):', r'"\1":', value)
                    value_cleaned = value_cleaned.replace('null', '"null"')
                    parsed = json.loads(value_cleaned)
                    return json.dumps(parsed, ensure_ascii=False)
            else:
                return None
        except Exception as e:
            print(f"JSON 파싱 실패: {value[:100]}... - {e}")
            return None
    
    return None

def prepare_data_from_csv(df: pd.DataFrame) -> List[tuple]:
    """
    CSV 데이터프레임을 PostgreSQL 삽입용 데이터로 변환
    
    Args:
        df: 원본 데이터프레임
        
    Returns:
        삽입용 튜플 리스트
    """
    # CSV 헤더와 테이블 컬럼 매핑
    column_mapping = {
        'disp_lv1': 'disp_lv1',
        'disp_lv2': 'disp_lv2',
        'disp_lv3': 'disp_lv3',
        'product_category_lv1': 'product_category_lv1',
        'product_category_lv2': 'product_category_lv2',
        'product_category_lv3': 'product_category_lv3',
        'model_name': 'model_name',
        'mdl_code': 'mdl_code',
        'goods_id': 'goods_id',
        'goods_nm': 'goods_nm',
        'color': 'color',
        'release_date': 'release_date',  
        'aisc_yn': 'aisc_yn',
        'sc_yn': 'sc_yn',
        'gc_yn': 'gc_yn',
        'div_pay_apl_yn': 'div_pay_apl_yn',
        'show_yn': 'show_yn',
        'pd_url': 'pd_url',
        'dlgt_img_url': 'dlgt_img_url',
        'site_cd': 'site_cd',
        'usp_desc': 'usp_desc',
        'review_num': 'review_num',
        'estm_score': 'estm_score',
        'sale_prc1': 'sale_prc1',
        'sale_prc2': 'sale_prc2',
        'sale_prc3': 'sale_prc3',
        'review_content': 'review_content',
        'disp_clsf_nm_list': 'disp_clsf_nm_list',
        'spec': 'spec',
        'on_sale': 'on_sale',
        'web_cp_dc_amt': 'web_cd_dc_amt',  # CSV의 web_cp_dc_amt -> 테이블의 web_cd_dc_amt
        'stock_qty': 'stock_qty',
        'ctg_rank_recommend': 'ctg_rank_recommend',
        'ctg_rank_quantity': 'ctg_rank_quantity',
        'ctg_rank_rating': 'ctg_rank_rating',
        'cstrt_mdl_codes': 'cstrt_mdl_codes',
        'card_promotion': 'card_promotion',
        'sale_prc': 'sale_prc'
    }
    
    prepared_data = []
    
    for idx, row in df.iterrows():
        try:
            record = []
            
            # 테이블 컬럼 순서대로 처리
            table_columns = [
                'disp_lv1', 'disp_lv2', 'disp_lv3',
                'product_category_lv1', 'product_category_lv2', 'product_category_lv3',
                'model_name', 'mdl_code', 'goods_id', 'goods_nm', 'color',
                'release_date', 'aisc_yn', 'sc_yn', 'gc_yn', 'div_pay_apl_yn',
                'show_yn', 'pd_url', 'dlgt_img_url', 'site_cd', 'usp_desc',
                'review_num', 'estm_score', 'sale_prc1', 'sale_prc2', 'sale_prc3',
                'review_content', 'disp_clsf_nm_list', 'spec', 'on_sale',
                'web_cd_dc_amt', 'stock_qty', 'ctg_rank_recommend',
                'ctg_rank_quantity', 'ctg_rank_rating', 'cstrt_mdl_codes',
                'card_promotion', 'sale_prc'
            ]
            
            # 역매핑 생성 (테이블 컬럼 -> CSV 컬럼)
            reverse_mapping = {v: k for k, v in column_mapping.items()}
            
            for table_col in table_columns:
                # CSV 컬럼명 찾기
                csv_col = reverse_mapping.get(table_col, table_col)
                
                # 데이터 가져오기
                if csv_col in df.columns:
                    value = row[csv_col]
                else:
                    value = None
                
                # NULL 처리
                if pd.isna(value) or value == '':
                    record.append(None)
                # DATE 처리 (release_date)
                elif table_col == 'release_date':
                    if value:
                        try:
                            # 다양한 날짜 형식 처리
                            date_val = pd.to_datetime(value)
                            record.append(date_val.date())
                        except Exception as e:
                            print(f"날짜 변환 실패 (행 {idx}): {value} - {e}")
                            record.append(None)
                    else:
                        record.append(None)
                # JSONB 처리 (spec, card_promotion)
                elif table_col in ['spec', 'card_promotion']:
                    json_value = parse_json_field(value)
                    record.append(json_value)
                # usp_desc 처리 (JSON 배열을 문자열로)
                elif table_col == 'usp_desc':
                    if value and isinstance(value, str) and value.startswith('{'):
                        # JSON 배열을 일반 문자열로 변환
                        try:
                            # 중괄호 제거하고 쉼표로 구분
                            cleaned = value.strip('{}')
                            # 따옴표 제거
                            cleaned = cleaned.replace('""', '"').replace('"', '')
                            record.append(cleaned)
                        except:
                            record.append(str(value) if value else None)
                    else:
                        record.append(str(value) if value else None)
                # NUMERIC 처리
                elif table_col in ['estm_score', 'sale_prc1', 'sale_prc2', 'sale_prc3', 'web_cd_dc_amt', 'sale_prc']:
                    if value is not None and value != '':
                        try:
                            record.append(float(value))
                        except:
                            record.append(None)
                    else:
                        record.append(None)
                # INT 처리
                elif table_col in ['review_num', 'stock_qty', 'ctg_rank_recommend', 'ctg_rank_quantity', 'ctg_rank_rating']:
                    if value is not None and value != '':
                        try:
                            record.append(int(float(value)))
                        except:
                            record.append(None)
                    else:
                        record.append(None)
                # VARCHAR(1) 처리 (Y/N 값들)
                elif table_col in ['aisc_yn', 'sc_yn', 'gc_yn', 'div_pay_apl_yn', 'show_yn']:
                    if value and str(value).upper() in ['Y', 'N']:
                        record.append(str(value).upper())
                    else:
                        record.append(None)
                # 나머지 VARCHAR/TEXT 처리
                else:
                    if value is not None and value != '':
                        record.append(str(value))
                    else:
                        record.append(None)
            
            prepared_data.append(tuple(record))
            
        except Exception as e:
            print(f"행 {idx} 처리 중 오류: {e}")
            continue
            
    print(f"총 {len(prepared_data)}개 레코드 준비 완료")
    return prepared_data

def insert_data_with_psycopg2(data: List[tuple], batch_size: int = 1000):
    """
    psycopg2를 직접 사용하여 데이터 삽입
    
    Args:
        data: 삽입할 데이터 튜플 리스트
        batch_size: 배치 크기
    """
    # psycopg2 직접 연결
    conn = psycopg2.connect(
        host=PG_HOST,
        port=PG_PORT,
        database=PG_DATABASE,
        user=PG_USER,
        password=PG_PASSWORD
    )
    
    insert_query = """
    INSERT INTO sr_merged_product (
        disp_lv1, disp_lv2, disp_lv3,
        product_category_lv1, product_category_lv2, product_category_lv3,
        model_name, mdl_code, goods_id, goods_nm, color,
        release_date, aisc_yn, sc_yn, gc_yn, div_pay_apl_yn,
        show_yn, pd_url, dlgt_img_url, site_cd, usp_desc,
        review_num, estm_score, sale_prc1, sale_prc2, sale_prc3,
        review_content, disp_clsf_nm_list, spec, on_sale,
        web_cd_dc_amt, stock_qty, ctg_rank_recommend,
        ctg_rank_quantity, ctg_rank_rating, cstrt_mdl_codes,
        card_promotion, sale_prc
    ) VALUES (
        %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
        %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
        %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
        %s, %s, %s, %s, %s, %s, %s, %s
    ) ON CONFLICT DO NOTHING
    """
    
    try:
        cur = conn.cursor()
        
        total_records = len(data)
        inserted_count = 0
        failed_count = 0
        
        # 배치 단위로 삽입
        for i in range(0, total_records, batch_size):
            batch = data[i:i + batch_size]
            try:
                execute_batch(cur, insert_query, batch, page_size=batch_size)
                conn.commit()
                inserted_count += len(batch)
                print(f"삽입 진행중: {inserted_count}/{total_records} 레코드")
            except Exception as batch_error:
                print(f"배치 삽입 실패: {batch_error}")
                conn.rollback()
                
                # 배치 실패 시 개별 삽입 시도
                for record in batch:
                    try:
                        cur.execute(insert_query, record)
                        conn.commit()
                        inserted_count += 1
                    except Exception as record_error:
                        failed_count += 1
                        print(f"개별 레코드 삽입 실패: {record_error}")
                        conn.rollback()
        
        print(f"삽입 완료 - 성공: {inserted_count}개, 실패: {failed_count}개")
        
    except Exception as e:
        print(f"데이터 삽입 실패: {e}")
        raise
    finally:
        cur.close()
        conn.close()

try:
    print(f"CSV 파일 읽기 시작: {PG_UPLOAD_FILE_PATH}")
    
    encoding = 'utf-8'
    delimiter = ','

    # CSV 파일 읽기 (큰따옴표 처리 포함)
    df = pd.read_csv(
        PG_UPLOAD_FILE_PATH, 
        encoding=encoding, 
        delimiter=delimiter,
        quotechar='"',
        quoting=1  # QUOTE_MINIMAL
    )
    
    print(f"총 {len(df)}개 레코드 읽기 완료")
    print(f"컬럼 목록: {df.columns.tolist()}")
    
    # 데이터 준비
    prepared_data = prepare_data_from_csv(df)
    
    if prepared_data:
        # psycopg2를 사용한 데이터 삽입
        insert_data_with_psycopg2(prepared_data)
    else:
        print("삽입할 데이터가 없습니다")
    
except Exception as e:
    print(f"CSV 처리 실패: {e}")
    raise

CSV 파일 읽기 시작: /Users/toby/prog/kt/rubicon/data/sr_merged_product_202509231550.tsv
총 3496개 레코드 읽기 완료
컬럼 목록: ['disp_lv1', 'disp_lv2', 'disp_lv3', 'product_category_lv1', 'product_category_lv2', 'product_category_lv3', 'model_name', 'mdl_code', 'goods_id', 'goods_nm', 'color', 'release_date', 'aisc_yn', 'sc_yn', 'gc_yn', 'div_pay_apl_yn', 'show_yn', 'pd_url', 'dlgt_img_url', 'site_cd', 'created_at', 'usp_desc', 'review_num', 'estm_score', 'sale_prc1', 'sale_prc2', 'sale_prc3', 'review_content', 'disp_clsf_nm_list', 'spec', 'on_sale', 'web_cp_dc_amt', 'stock_qty', 'ctg_rank_recommend', 'ctg_rank_quantity', 'ctg_rank_rating', 'cstrt_mdl_codes', 'card_promotion', 'sale_prc']
총 3496개 레코드 준비 완료
삽입 진행중: 1000/3496 레코드
삽입 진행중: 2000/3496 레코드
삽입 진행중: 3000/3496 레코드
삽입 진행중: 3496/3496 레코드
삽입 완료 - 성공: 3496개, 실패: 0개


### Mongodb 연결


In [1]:
# Azure Cosmos DB for MongoDB 설정 (Microsoft 공식 방식)
import os
import sys
from dotenv import load_dotenv
import pymongo

load_dotenv('.env')

# Microsoft 공식 방식: COSMOS_CONNECTION_STRING 또는 MONGODB_CONNECTION_STRING 사용
CONNECTION_STRING = os.getenv('COSMOS_CONNECTION_STRING') or os.getenv('MONGODB_CONNECTION_STRING')

print("=" * 60)
print("Azure Cosmos DB for MongoDB 연결 설정 (MS 공식 방식)")
print("=" * 60)

# 연결 문자열 확인
if CONNECTION_STRING:
    print("✅ 연결 문자열 로드 완료")
    # 보안을 위해 일부만 출력
    if "mongodb://" in CONNECTION_STRING or "mongodb+srv://" in CONNECTION_STRING:
        parts = CONNECTION_STRING.split('@')
        if len(parts) > 1:
            host_info = parts[1].split('?')[0] if '?' in parts[1] else parts[1].split('/')[0]
            print(f"   Host: {host_info}")
else:
    print("❌ 연결 문자열을 찾을 수 없습니다.")
    print("   .env 파일에 COSMOS_CONNECTION_STRING 또는 MONGODB_CONNECTION_STRING을 설정해주세요.")
    print("\n💡 .env 파일 예시:")
    print("   COSMOS_CONNECTION_STRING=mongodb://username:password@host:port/database?ssl=true&replicaSet=globaldb&retryWrites=false")

Azure Cosmos DB for MongoDB 연결 설정 (MS 공식 방식)
✅ 연결 문자열 로드 완료
   Host: dev-rubicon-mongodb.mongocluster.cosmos.azure.com/


In [2]:
# MongoDB 연결 (Microsoft 공식 방식)
def mongodb_connection_ms_official():
    """Microsoft 공식 방식으로 Azure Cosmos DB for MongoDB 연결"""
    
    print("=" * 60)
    print("MongoDB 연결 테스트 (MS 공식 방식)")
    print("=" * 60)
    
    if not CONNECTION_STRING:
        print("❌ 연결 문자열이 없습니다.")
        return None
    
    try:
        # Microsoft 공식 방식: 단순한 MongoClient 생성
        client = pymongo.MongoClient(CONNECTION_STRING)
        print(CONNECTION_STRING)
        
        # 클라이언트 옵션 확인 (디버깅용)
        print("🔍 클라이언트 옵션:")
        for prop, value in vars(client.options).items():
            if value is not None:  # None이 아닌 값만 표시
                print(f"   {prop}: {value}")
        
        print("\n🔗 연결 검증 중...")
        
        # Microsoft 공식 방식: server_info()로 연결 검증
        try:
            server_info = client.server_info()
            print("✅ 연결 성공!")
            print(f"   서버 버전: {server_info.get('version', 'N/A')}")
            
            # 추가 연결 정보
            if 'buildInfo' in server_info:
                build_info = server_info.get('buildInfo', {})
                print(f"   빌드 정보: {build_info.get('gitVersion', 'N/A')}")
            
        except (pymongo.errors.OperationFailure, 
                pymongo.errors.ConnectionFailure, 
                pymongo.errors.ExecutionTimeout) as err:
            print(f"❌ 연결 실패: {err}")
            return None
        
        # 데이터베이스 목록 조회 (선택적)
        try:
            db_list = client.list_database_names()
            print(f"\n📊 접근 가능한 데이터베이스 ({len(db_list)}개):")
            for db in db_list[:10]:  # 최대 10개만 표시
                print(f"   • {db}")
        except Exception as db_error:
            print(f"   ℹ️  데이터베이스 목록 조회 제한: {db_error}")
        
        print("\n✅ Azure Cosmos DB for MongoDB 연결 완료!")
        return client
        
    except Exception as err:
        print(f"❌ 전체적인 연결 오류: {err}")
        
        # 상세한 오류 정보 제공
        error_type = type(err).__name__
        print(f"\n🔍 오류 유형: {error_type}")
        
        if "timeout" in str(err).lower():
            print("💡 타임아웃 문제 - 네트워크 연결 또는 방화벽 확인")
        elif "authentication" in str(err).lower():
            print("💡 인증 문제 - 사용자명/비밀번호 확인")
        elif "ssl" in str(err).lower():
            print("💡 SSL 문제 - 연결 문자열의 SSL 설정 확인")
        
        return None

# Microsoft 공식 방식으로 연결 시도
mongo_client = mongodb_connection_ms_official()

MongoDB 연결 테스트 (MS 공식 방식)
mongodb+srv://rubicon:fnqlzhs135!!@dev-rubicon-mongodb.mongocluster.cosmos.azure.com/?tls=true&authMechanism=SCRAM-SHA-256&retrywrites=false&maxIdleTimeMS=120000
🔍 클라이언트 옵션:
   _ClientOptions__options: {'tls': True, 'authMechanism': 'SCRAM-SHA-256', 'retrywrites': False, 'maxIdleTimeMS': 120.0, 'document_class': <class 'dict'>, 'tz_aware': False, 'connect': True}
   _ClientOptions__codec_options: CodecOptions(document_class=dict, tz_aware=False, uuid_representation=UuidRepresentation.UNSPECIFIED, unicode_decode_error_handler='strict', tzinfo=None, type_registry=TypeRegistry(type_codecs=[], fallback_encoder=None), datetime_conversion=DatetimeConversion.DATETIME)
   _ClientOptions__local_threshold_ms: 15
   _ClientOptions__server_selection_timeout: 30
   _ClientOptions__pool_options: <pymongo.pool_options.PoolOptions object at 0x10780fa00>
   _ClientOptions__read_preference: Primary()
   _ClientOptions__write_concern: WriteConcern()
   _ClientOptions__read_conce

  client = pymongo.MongoClient(CONNECTION_STRING)
  self._resolve_srv()


✅ 연결 성공!
   서버 버전: 8.0.0

📊 접근 가능한 데이터베이스 (1개):
   • rubicon

✅ Azure Cosmos DB for MongoDB 연결 완료!


In [3]:
# MongoDB 컬렉션(테이블) 생성 및 인덱스 설정 - PostgreSQL 구조 기반
def create_mongodb_collections():
    """MongoDB 컬렉션 생성 및 인덱스 설정 (PostgreSQL sr_merged_product 구조 기반)"""
    
    print("=" * 60)
    print("MongoDB 컬렉션 생성 (PostgreSQL 구조 기반)")
    print("=" * 60)
    
    if not mongo_client:
        print("❌ MongoDB 클라이언트가 없습니다. 연결을 먼저 수행하세요.")
        return None
    
    try:
        # rubicon 데이터베이스 선택 또는 생성
        db_name = "rubicon"
        db = mongo_client[db_name]
        print(f"📚 데이터베이스 '{db_name}' 선택/생성")
        
        # sr_merged_product 컬렉션 생성 (PostgreSQL 테이블과 동일한 이름)
        collection_name = "sr_merged_product"
        sr_merged_product = db[collection_name]
        
        # 기존 컬렉션 확인
        existing_collections = db.list_collection_names()
        if collection_name in existing_collections:
            print(f"   ℹ️  '{collection_name}' 컬렉션이 이미 존재합니다.")
            doc_count = sr_merged_product.count_documents({})
            print(f"      현재 문서 수: {doc_count}개")
            
            # 기존 데이터 삭제 여부 확인 (선택적)
            # sr_merged_product.drop()
            # print(f"   ✅ 기존 컬렉션 삭제 후 재생성")
        else:
            print(f"   ✅ '{collection_name}' 컬렉션 생성")
        
        # PostgreSQL과 동일한 인덱스 생성
        print("\n📍 인덱스 생성 (PostgreSQL 구조 기반):")
        
        # 단일 필드 인덱스
        sr_merged_product.create_index("mdl_code", name="idx_sr_merged_product_mdl_code")
        print("   ✅ mdl_code 인덱스 생성")
        
        sr_merged_product.create_index("site_cd", name="idx_sr_merged_product_site_cd")
        print("   ✅ site_cd 인덱스 생성")
        
        sr_merged_product.create_index("goods_id", name="idx_sr_merged_product_goods_id")
        print("   ✅ goods_id 인덱스 생성")
        
        sr_merged_product.create_index("goods_nm", name="idx_sr_merged_product_goods_nm")
        print("   ✅ goods_nm 인덱스 생성")
        
        # 복합 인덱스
        sr_merged_product.create_index(
            [("mdl_code", 1), ("site_cd", 1)],
            name="idx_sr_merged_product_mdl_site"
        )
        print("   ✅ mdl_code + site_cd 복합 인덱스 생성")
        
        # 추가 성능 최적화 인덱스
        sr_merged_product.create_index("release_date", name="idx_release_date")
        print("   ✅ release_date 인덱스 생성")
        
        sr_merged_product.create_index("sale_prc", name="idx_sale_prc")
        print("   ✅ sale_prc 인덱스 생성")
        
        # 텍스트 검색 인덱스 (MongoDB 특화)
        try:
            sr_merged_product.create_index(
                [("goods_nm", "text"), ("model_name", "text"), ("usp_desc", "text")],
                name="text_search_index",
                default_language="korean"  # 한국어 텍스트 검색 지원
            )
            print("   ✅ 텍스트 검색 인덱스 생성 (goods_nm, model_name, usp_desc)")
        except Exception as text_idx_error:
            print(f"   ⚠️  텍스트 인덱스 생성 실패 (이미 존재할 수 있음): {text_idx_error}")
        
        # 스키마 검증 규칙 정의 (MongoDB 3.6+)
        # PostgreSQL 테이블 구조를 MongoDB 검증 규칙으로 변환
        validation_rules = {
            "$jsonSchema": {
                "bsonType": "object",
                "title": "상품 통합 정보",
                "description": "PostgreSQL sr_merged_product 테이블과 동일한 구조",
                "properties": {
                    # 전시 카테고리
                    "disp_lv1": {"bsonType": "string", "description": "전시 대분류"},
                    "disp_lv2": {"bsonType": "string", "description": "전시 중분류"},
                    "disp_lv3": {"bsonType": "string", "description": "전시 소분류"},
                    
                    # 제품 카테고리
                    "product_category_lv1": {"bsonType": "string", "description": "카테고리 대분류"},
                    "product_category_lv2": {"bsonType": "string", "description": "카테고리 중분류"},
                    "product_category_lv3": {"bsonType": "string", "description": "카테고리 소분류"},
                    
                    # 제품 정보
                    "model_name": {"bsonType": "string", "description": "모델 명(모델 코드 상위 집합)"},
                    "mdl_code": {"bsonType": "string", "description": "모델 코드"},
                    "goods_id": {"bsonType": "string", "description": "상품 아이디"},
                    "goods_nm": {"bsonType": "string", "description": "상품 명"},
                    "color": {"bsonType": "string", "description": "색상"},
                    "release_date": {"bsonType": ["date", "string"], "description": "출시일"},
                    
                    # 플래그 필드 (VARCHAR(1))
                    "aisc_yn": {"bsonType": "string", "maxLength": 1, "description": "ai구독 대상 여부"},
                    "sc_yn": {"bsonType": "string", "maxLength": 1, "description": "ai구독 스마트 대상 여부"},
                    "gc_yn": {"bsonType": "string", "maxLength": 1, "description": "갤럭시 클럽 대상 여부"},
                    "div_pay_apl_yn": {"bsonType": "string", "maxLength": 1, "description": "나눠서 결제 대상 여부"},
                    "show_yn": {"bsonType": "string", "maxLength": 1, "description": "노출 여부"},
                    
                    # URL 및 텍스트
                    "pd_url": {"bsonType": "string", "description": "pd URL"},
                    "dlgt_img_url": {"bsonType": "string", "description": "대표 이미지 URL"},
                    "site_cd": {"bsonType": "string", "maxLength": 10, "description": "채널코드"},
                    "usp_desc": {"bsonType": "string", "description": "모델 카드 3줄 장점"},
                    
                    # 숫자 필드
                    "review_num": {"bsonType": ["int", "long"], "description": "해당 모델의 리뷰 개수"},
                    "estm_score": {"bsonType": ["double", "decimal"], "description": "해당 모델의 리뷰 점수"},
                    "sale_prc1": {"bsonType": ["double", "decimal"], "description": "해당 모델의 기준가"},
                    "sale_prc2": {"bsonType": ["double", "decimal"], "description": "해당 모델의 회원가"},
                    "sale_prc3": {"bsonType": ["double", "decimal"], "description": "해당 모델의 혜택가"},
                    "sale_prc": {"bsonType": ["double", "decimal"], "description": "최종가격"},
                    
                    # 텍스트 필드
                    "review_content": {"bsonType": "string", "description": "각 제품에 대한 리뷰들 모음"},
                    "disp_clsf_nm_list": {"bsonType": "string", "description": "전시 분류명 모음"},
                    "on_sale": {"bsonType": "string", "description": "판매여부"},
                    
                    # 추가 숫자 필드
                    "web_cd_dc_amt": {"bsonType": ["double", "decimal"], "description": "웹 쿠폰 할인 가격"},
                    "stock_qty": {"bsonType": ["int", "long"], "description": "상품 재고"},
                    "ctg_rank_recommend": {"bsonType": ["int", "long"], "description": "전시 소분류내에서 우선 추천순 순위"},
                    "ctg_rank_quantity": {"bsonType": ["int", "long"], "description": "전시 소분류내에서 우선 판매량순 순위"},
                    "ctg_rank_rating": {"bsonType": ["int", "long"], "description": "전시 소분류내에서 우선 별점순 순위"},
                    "cstrt_mdl_codes": {"bsonType": "string", "description": "set상품의 경우 구성 제품 mdl_code"},
                    
                    # JSON 필드 (PostgreSQL JSONB -> MongoDB Object)
                    "spec": {"bsonType": "object", "description": "json 형태의 spec 모음"},
                    "card_promotion": {"bsonType": "object", "description": "card 할인 정보"}
                }
            }
        }
        
        # 스키마 검증 설정 (선택적 - Cosmos DB에서 지원하는 경우)
        try:
            db.command({
                "collMod": collection_name,
                "validator": validation_rules,
                "validationLevel": "moderate"  # 기존 문서는 검증 안함, 신규/수정만 검증
            })
            print("\n✅ 스키마 검증 규칙 설정 완료")
        except Exception as validation_error:
            print(f"\n⚠️  스키마 검증 설정 실패 (Cosmos DB 버전 확인): {validation_error}")
        
        # 인덱스 정보 출력
        print("\n📊 생성된 인덱스 목록:")
        indexes = sr_merged_product.list_indexes()
        for idx in indexes:
            print(f"   • {idx['name']}: {idx['key']}")
        
        print("\n✅ MongoDB 컬렉션 및 인덱스 생성 완료!")
        print(f"   데이터베이스: {db_name}")
        print(f"   컬렉션: {collection_name}")
        
        # 생성된 컬렉션 정보 반환
        return {
            "database": db,
            "sr_merged_product": sr_merged_product
        }
        
    except Exception as e:
        print(f"❌ 컬렉션 생성 실패: {e}")
        import traceback
        traceback.print_exc()
        return None

# 컬렉션 생성 실행
collections = create_mongodb_collections()

MongoDB 컬렉션 생성 (PostgreSQL 구조 기반)
📚 데이터베이스 'rubicon' 선택/생성
   ✅ 'sr_merged_product' 컬렉션 생성

📍 인덱스 생성 (PostgreSQL 구조 기반):
   ✅ mdl_code 인덱스 생성
   ✅ site_cd 인덱스 생성
   ✅ goods_id 인덱스 생성
   ✅ goods_nm 인덱스 생성
   ✅ mdl_code + site_cd 복합 인덱스 생성
   ✅ release_date 인덱스 생성
   ✅ sale_prc 인덱스 생성
   ⚠️  텍스트 인덱스 생성 실패 (이미 존재할 수 있음): unsupported language: "korean" for text index version 3, full error: {'ok': 0.0, 'errmsg': 'unsupported language: "korean" for text index version 3', 'code': 67, 'codeName': 'CannotCreateIndex'}

⚠️  스키마 검증 설정 실패 (Cosmos DB 버전 확인): validator not supported yet, full error: {'ok': 0.0, 'errmsg': 'validator not supported yet', 'code': 115, 'codeName': 'CommandNotSupported'}

📊 생성된 인덱스 목록:
   • _id_: SON([('_id', 1)])
   • idx_sr_merged_product_mdl_code: SON([('mdl_code', 1)])
   • idx_sr_merged_product_site_cd: SON([('site_cd', 1)])
   • idx_sr_merged_product_goods_id: SON([('goods_id', 1)])
   • idx_sr_merged_product_goods_nm: SON([('goods_nm', 1)])
   • idx_sr_merged_produ

In [4]:
# PostgreSQL 데이터 파일을 읽어서 MongoDB용 데이터로 변환
import pandas as pd
from datetime import datetime
import json
import os
from dotenv import load_dotenv

load_dotenv('.env')
PG_UPLOAD_FILE_PATH = os.getenv('PG_UPLOAD_FILE_PATH')

def analyze_file_structure(file_path):
    """파일 구조 분석"""
    print("=" * 50)
    print("파일 구조 분석")
    print("=" * 50)
    
    try:
        # 파일 첫 몇 줄 읽기
        with open(file_path, 'r', encoding='utf-8') as f:
            lines = [f.readline().strip() for _ in range(5)]
        
        print(f"파일 첫 5줄:")
        for i, line in enumerate(lines):
            print(f"  [{i}] {line[:100]}{'...' if len(line) > 100 else ''}")
        
        # 구분자 자동 감지
        first_line = lines[0] if lines else ""
        tab_count = first_line.count('\t')
        comma_count = first_line.count(',')
        semicolon_count = first_line.count(';')
        
        print(f"\n구분자 분석:")
        print(f"  탭(\\t): {tab_count}개")
        print(f"  쉼표(,): {comma_count}개")
        print(f"  세미콜론(;): {semicolon_count}개")
        
        # 최적 구분자 결정
        if tab_count > comma_count and tab_count > semicolon_count:
            delimiter = '\t'
            delimiter_name = 'TAB'
        elif comma_count > semicolon_count:
            delimiter = ','
            delimiter_name = 'COMMA'
        else:
            delimiter = ';'
            delimiter_name = 'SEMICOLON'
        
        print(f"  권장 구분자: {delimiter_name}")
        
        return delimiter, lines
        
    except Exception as e:
        print(f"파일 분석 실패: {e}")
        return None, []

def parse_json_field_safe(value):
    """JSON 필드 안전 파싱"""
    if pd.isna(value) or value == '' or value is None:
        return {}
        
    # 이미 dict인 경우
    if isinstance(value, dict):
        return value
        
    # 문자열인 경우
    if isinstance(value, str):
        value = value.strip()
        if not value or value in ['{}', 'null', 'None']:
            return {}
            
        try:
            # JSON 문자열 파싱
            if value.startswith('{') and value.endswith('}'):
                parsed = json.loads(value)
                return parsed if isinstance(parsed, dict) else {}
        except json.JSONDecodeError:
            # JSON 파싱 실패시 빈 딕셔너리 반환
            return {}
    
    return {}

def load_and_transform_pg_data():
    """PostgreSQL CSV/TSV 파일을 MongoDB용 데이터로 변환"""
    
    print("=" * 60)
    print("PostgreSQL 데이터 파일 로드 및 변환")
    print("=" * 60)
    
    if not PG_UPLOAD_FILE_PATH:
        print("❌ PG_UPLOAD_FILE_PATH가 설정되지 않았습니다.")
        print("   .env 파일에 PG_UPLOAD_FILE_PATH를 확인하세요.")
        return None
    
    try:
        print(f"📁 파일 경로: {PG_UPLOAD_FILE_PATH}")
        
        # 파일 존재 확인
        if not os.path.exists(PG_UPLOAD_FILE_PATH):
            print(f"❌ 파일을 찾을 수 없습니다: {PG_UPLOAD_FILE_PATH}")
            return None
        
        # 파일 크기 확인
        file_size = os.path.getsize(PG_UPLOAD_FILE_PATH)
        print(f"📏 파일 크기: {file_size:,} bytes ({file_size/1024:.1f} KB)")
        
        # 파일 구조 분석
        delimiter, sample_lines = analyze_file_structure(PG_UPLOAD_FILE_PATH)
        
        if delimiter is None:
            print("❌ 파일 구조를 분석할 수 없습니다.")
            return None
        
        # 여러 구분자로 시도
        delimiters_to_try = [delimiter, '\t', ',', ';']
        df = None
        
        for delim in delimiters_to_try:
            try:
                print(f"\n🔄 구분자 '{delim}' 시도...")
                
                # 파일 읽기
                test_df = pd.read_csv(
                    PG_UPLOAD_FILE_PATH,
                    encoding='utf-8',
                    delimiter=delim,
                    quotechar='"',
                    quoting=1,  # QUOTE_MINIMAL
                    na_values=['', 'NULL', 'null', 'None', 'NaN'],
                    keep_default_na=True,
                    nrows=5  # 처음 5행만 테스트
                )
                
                # 성공적으로 파싱되었는지 확인
                if len(test_df.columns) > 1 and len(test_df) > 0:
                    print(f"   ✅ 성공: {len(test_df.columns)}개 컬럼, {len(test_df)}개 행")
                    print(f"   컬럼명: {list(test_df.columns)[:10]}")
                    
                    # 전체 파일 읽기
                    df = pd.read_csv(
                        PG_UPLOAD_FILE_PATH,
                        encoding='utf-8',
                        delimiter=delim,
                        quotechar='"',
                        quoting=1,
                        na_values=['', 'NULL', 'null', 'None', 'NaN'],
                        keep_default_na=True
                    )
                    print(f"   전체 파일 로드: {len(df)}개 레코드")
                    break
                else:
                    print(f"   ❌ 실패: {len(test_df.columns)}개 컬럼만 인식")
                    
            except Exception as e:
                print(f"   ❌ 구분자 '{delim}' 실패: {e}")
        
        if df is None or df.empty:
            print("❌ 모든 구분자로 파싱 실패")
            return None
        
        print(f"\n✅ 파일 파싱 성공!")
        print(f"   총 레코드: {len(df)}개")
        print(f"   총 컬럼: {len(df.columns)}개")
        print(f"   컬럼 목록: {list(df.columns)}")
        
        # 필수 컬럼 확인
        required_cols = ['mdl_code', 'goods_id', 'goods_nm']
        missing_cols = [col for col in required_cols if col not in df.columns]
        if missing_cols:
            print(f"⚠️  필수 컬럼 누락: {missing_cols}")
            print(f"   실제 컬럼: {list(df.columns)[:10]}")
        
        # 데이터 샘플 확인
        print(f"\n📋 원본 데이터 샘플 (첫 3행):")
        for idx in range(min(3, len(df))):
            print(f"  행 {idx}:")
            for col in list(df.columns)[:5]:  # 처음 5개 컬럼만
                value = df.iloc[idx][col]
                print(f"    {col}: {value}")
        
        # MongoDB 문서로 변환
        products_data = []
        conversion_errors = 0
        
        print(f"\n📦 데이터 변환 시작...")
        
        for idx, row in df.iterrows():
            try:
                # MongoDB 문서 생성
                product = {}
                
                # 모든 컬럼을 처리
                for col_name in df.columns:
                    value = row[col_name]
                    
                    # NaN이나 None 값 처리
                    if pd.isna(value):
                        continue
                    
                    # 빈 문자열 제외
                    if isinstance(value, str) and value.strip() == '':
                        continue
                    
                    # 컬럼별 특수 처리
                    if col_name == 'spec':
                        # JSON 필드 처리
                        parsed_spec = parse_json_field_safe(value)
                        if parsed_spec:
                            product['spec'] = parsed_spec
                    
                    elif col_name == 'card_promotion':
                        # JSON 필드 처리
                        parsed_promo = parse_json_field_safe(value)
                        if parsed_promo:
                            product['card_promotion'] = parsed_promo
                    
                    elif col_name == 'release_date':
                        # 날짜 필드 처리
                        try:
                            date_obj = pd.to_datetime(value)
                            product['release_date'] = date_obj.isoformat()
                        except:
                            product['release_date'] = str(value)
                    
                    elif col_name in ['review_num', 'stock_qty', 'ctg_rank_recommend', 
                                      'ctg_rank_quantity', 'ctg_rank_rating']:
                        # 정수 필드 처리
                        try:
                            product[col_name] = int(float(value))
                        except (ValueError, TypeError):
                            product[col_name] = 0
                    
                    elif col_name in ['sale_prc', 'sale_prc1', 'sale_prc2', 'sale_prc3', 
                                      'web_cd_dc_amt', 'web_cp_dc_amt', 'estm_score']:
                        # 실수 필드 처리
                        try:
                            # web_cp_dc_amt는 web_cd_dc_amt로 변환
                            field_name = 'web_cd_dc_amt' if col_name == 'web_cp_dc_amt' else col_name
                            product[field_name] = float(value)
                        except (ValueError, TypeError):
                            pass
                    
                    elif col_name in ['aisc_yn', 'sc_yn', 'gc_yn', 'div_pay_apl_yn', 'show_yn']:
                        # Y/N 필드 처리
                        str_value = str(value).strip().upper()
                        if str_value in ['Y', 'N']:
                            product[col_name] = str_value
                    
                    else:
                        # 기본 문자열 처리
                        product[col_name] = str(value).strip()
                
                # 추가 메타데이터
                product.update({
                    "imported_from": "PostgreSQL",
                    "imported_at": datetime.now().isoformat(),
                    "source_file": os.path.basename(PG_UPLOAD_FILE_PATH),
                    "record_index": idx
                })
                
                # 필수 필드 확인 (더 관대하게)
                has_required = any(key in product and product[key] for key in ['mdl_code', 'goods_id', 'goods_nm'])
                
                if has_required or len(product) > 4:  # 메타데이터 4개 + 실제 데이터
                    products_data.append(product)
                else:
                    print(f"   ⚠️  행 {idx}: 유효한 데이터 없음 (필드 수: {len(product)})")
                    conversion_errors += 1
                
                # 처음 몇 개 변환 결과 출력
                if idx < 3:
                    print(f"   행 {idx} 변환 결과: {len(product)}개 필드")
                    for key, val in list(product.items())[:5]:
                        print(f"      {key}: {val}")
                
            except Exception as e:
                conversion_errors += 1
                if conversion_errors <= 5:
                    print(f"   ⚠️  행 {idx} 변환 실패: {e}")
        
        print(f"\n✅ 변환 완료:")
        print(f"   성공: {len(products_data)}개 문서")
        print(f"   실패: {conversion_errors}개 문서")
        
        return {
            "products": products_data,
            "customers": [],
            "orders": []
        }
        
    except Exception as e:
        print(f"❌ 데이터 로드 실패: {e}")
        import traceback
        traceback.print_exc()
        return None

# PostgreSQL 데이터 로드 및 변환
pg_data = load_and_transform_pg_data()

# 결과 요약
if pg_data and pg_data['products']:
    print(f"\n🎉 최종 결과:")
    print(f"   변환된 제품 수: {len(pg_data['products'])}개")
    
    if pg_data['products']:
        sample = pg_data['products'][0]
        print(f"   첫 번째 제품 필드 수: {len(sample)}개")
        print(f"   주요 필드: {[k for k in sample.keys() if not k.startswith('imported')][:10]}")
else:
    print(f"\n❌ 데이터 변환 실패")
    pg_data = None

PostgreSQL 데이터 파일 로드 및 변환
📁 파일 경로: /Users/dhseo/Downloads/sr_merged_product_202509231550.tsv
📏 파일 크기: 15,190,640 bytes (14834.6 KB)
파일 구조 분석
파일 첫 5줄:
  [0] "disp_lv1","disp_lv2","disp_lv3","product_category_lv1","product_category_lv2","product_category_lv3...
  [1] 생활가전,세탁기,BESPOKE 그랑데 AI 원바디 Top-Fit,ELECTRIC WASHER,FWM,Bespoke Washer,WH90F2120GBHS,WH90F2120GBHS,G...
  [2] 모바일,갤럭시 스마트폰,갤럭시 S,HHP,NEW RADIO MOBILE (5G SMARTPHONE),Galaxy S25,SM-S931N,SM-S931NZREKOO,G20024558...
  [3] 생활가전,세탁기,BESPOKE 그랑데 AI 원바디 Top-Fit,ELECTRIC WASHER,FWM,Bespoke Washer,WH90F2520BBHS,WH90F2520BBHS,G...
  [4] 생활가전,세탁기,그랑데 세탁기 AI,ELECTRIC WASHER,FWM,Washer,WF25DG8650BE,WF25DG8650BE,G200245752,AI 세탁기 25kg,그레이지...

구분자 분석:
  탭(\t): 0개
  쉼표(,): 38개
  세미콜론(;): 0개
  권장 구분자: COMMA

🔄 구분자 ',' 시도...
   ✅ 성공: 39개 컬럼, 5개 행
   컬럼명: ['disp_lv1', 'disp_lv2', 'disp_lv3', 'product_category_lv1', 'product_category_lv2', 'product_category_lv3', 'model_name', 'mdl_code', 'goods_id', 'goods_nm']
   전체 파일 로드: 3496개 레코드

✅ 파일 파

In [5]:
pg_data

{'products': [{'disp_lv1': '생활가전',
   'disp_lv2': '세탁기',
   'disp_lv3': 'BESPOKE 그랑데 AI 원바디 Top-Fit',
   'product_category_lv1': 'ELECTRIC WASHER',
   'product_category_lv2': 'FWM',
   'product_category_lv3': 'Bespoke Washer',
   'model_name': 'WH90F2120GBHS',
   'mdl_code': 'WH90F2120GBHS',
   'goods_id': 'G200252235',
   'goods_nm': 'Bespoke AI 원바디 21/20kg (177.8mm LCD)',
   'release_date': '2025-07-01T00:00:00',
   'div_pay_apl_yn': 'N',
   'show_yn': 'Y',
   'pd_url': 'https://familynet.samsung.com/washing-machine/WH90F2120GBHS/WH90F2120GBHS/',
   'dlgt_img_url': 'https://images.samsung.com/kdp/goods/2025/05/21/854cbf3a-3a70-4eb6-ae24-607bc7a8fb0f.png',
   'site_cd': 'FN',
   'created_at': '2025-09-16 10:29:11.878 +0900',
   'usp_desc': '{"AI로 세탁부터 건조까지 알아서 AI 맞춤+","손끝에서 시작하고 음성으로 완성되는 AI 홈","강력한 직수로 열교환기 자동 세척 직수 파워 오토 클린"}',
   'review_num': 0,
   'estm_score': 0.0,
   'sale_prc1': 3798000.0,
   'sale_prc2': 3798000.0,
   'sale_prc3': 2932000.0,
   'disp_clsf_nm_list': '{건조기,세탁기,

In [6]:
# MongoDB에 PostgreSQL 데이터 삽입 (sr_merged_product)
def insert_pg_data_to_mongodb(collections, pg_data):
    """PostgreSQL 데이터를 MongoDB sr_merged_product 컬렉션에 삽입"""
    
    print("=" * 60)
    print("PostgreSQL 데이터를 MongoDB에 삽입")
    print("=" * 60)
    
    if collections is None:
        print("❌ 컬렉션 정보가 없습니다.")
        return False
    
    if pg_data is None:
        print("❌ 삽입할 데이터가 없습니다.")
        return False
    
    try:
        # sr_merged_product 컬렉션 가져오기
        sr_merged_product = collections.get("sr_merged_product")
        
        # MongoDB Collection 객체는 None과 비교해야 함
        if sr_merged_product is None:
            print("❌ sr_merged_product 컬렉션을 찾을 수 없습니다.")
            return False
        
        # products 데이터 확인 및 삽입
        if pg_data.get("products") and len(pg_data["products"]) > 0:
            # 기존 PostgreSQL 데이터 삭제 (선택적)
            try:
                delete_result = sr_merged_product.delete_many({"imported_from": "PostgreSQL"})
                if delete_result.deleted_count > 0:
                    print(f"   ℹ️  기존 PostgreSQL 데이터 {delete_result.deleted_count}개 삭제")
            except Exception as delete_error:
                print(f"   ⚠️  기존 데이터 삭제 실패 (계속 진행): {delete_error}")
            
            # 배치 크기 설정 (Cosmos DB는 작은 배치가 효율적)
            batch_size = 50  # Cosmos DB 최적화를 위해 작은 배치 사용
            total_products = len(pg_data["products"])
            inserted_count = 0
            failed_count = 0
            failed_samples = []
            
            print(f"\n📤 데이터 삽입 시작 ({total_products}개 문서)")
            
            # 배치 단위로 삽입
            for i in range(0, total_products, batch_size):
                batch = pg_data["products"][i:i + batch_size]
                try:
                    result = sr_merged_product.insert_many(batch, ordered=False)
                    inserted_count += len(result.inserted_ids)
                    
                    # 진행 상황 표시
                    progress = min(i + batch_size, total_products)
                    print(f"   진행중: {progress}/{total_products} ({progress*100//total_products}%)")
                    
                except Exception as batch_error:
                    error_msg = str(batch_error)
                    if "duplicate key" in error_msg.lower():
                        print(f"   ⚠️  배치에 중복 키 존재 - 개별 삽입 시도")
                    else:
                        print(f"   ⚠️  배치 삽입 일부 실패: {error_msg[:100]}")
                    
                    # 개별 문서 삽입 시도
                    for doc in batch:
                        try:
                            sr_merged_product.insert_one(doc)
                            inserted_count += 1
                        except Exception as doc_error:
                            failed_count += 1
                            if failed_count <= 3:  # 처음 3개 오류만 샘플로 저장
                                failed_samples.append({
                                    'mdl_code': doc.get('mdl_code', 'Unknown'),
                                    'error': str(doc_error)[:100]
                                })
            
            print(f"\n✅ 데이터 삽입 완료:")
            print(f"   성공: {inserted_count}개 문서")
            print(f"   실패: {failed_count}개 문서")
            
            if failed_samples:
                print(f"\n⚠️  실패 샘플:")
                for sample in failed_samples:
                    print(f"   • {sample['mdl_code']}: {sample['error']}")
            
            # 삽입 후 통계
            print("\n📊 컬렉션 통계:")
            try:
                total_docs = sr_merged_product.count_documents({})
                pg_docs = sr_merged_product.count_documents({"imported_from": "PostgreSQL"})
                print(f"   전체 문서: {total_docs}개")
                print(f"   PostgreSQL 데이터: {pg_docs}개")
                
                # spec 필드가 있는 문서 확인
                spec_docs = sr_merged_product.count_documents({
                    "spec": {"$exists": True, "$ne": {}}
                })
                print(f"   spec 필드 보유: {spec_docs}개")
                
                # card_promotion 필드가 있는 문서 확인
                card_promo_docs = sr_merged_product.count_documents({
                    "card_promotion": {"$exists": True, "$ne": {}}
                })
                print(f"   card_promotion 필드 보유: {card_promo_docs}개")
            except Exception as stat_error:
                print(f"   ⚠️  통계 조회 실패: {stat_error}")
            
            # 샘플 문서 확인
            try:
                sample = sr_merged_product.find_one({
                    "imported_from": "PostgreSQL",
                    "spec": {"$exists": True, "$ne": {}}
                })
                
                if sample:
                    print("\n📋 삽입된 데이터 샘플:")
                    print(f"   모델코드: {sample.get('mdl_code')}")
                    print(f"   상품명: {sample.get('goods_nm')}")
                    print(f"   사이트: {sample.get('site_cd')}")
                    
                    sale_prc = sample.get('sale_prc')
                    if sale_prc is not None:
                        print(f"   가격: {sale_prc:,}원")
                    
                    if isinstance(sample.get('spec'), dict):
                        spec_keys = list(sample.get('spec', {}).keys())
                        print(f"   spec 필드 키: {spec_keys[:5]}")
                    
                    if isinstance(sample.get('card_promotion'), dict):
                        promo_keys = list(sample.get('card_promotion', {}).keys())
                        print(f"   card_promotion 키: {promo_keys}")
            except Exception as sample_error:
                print(f"   ⚠️  샘플 조회 실패: {sample_error}")
            
            # 인덱스 통계 (선택적)
            try:
                index_stats = sr_merged_product.index_information()
                print(f"\n📍 활성 인덱스: {len(index_stats)}개")
            except:
                pass
            
            return True
        else:
            print("❌ Products 데이터가 비어있습니다.")
            return False
            
    except Exception as e:
        print(f"❌ 데이터 삽입 실패: {e}")
        import traceback
        traceback.print_exc()
        return False

# PostgreSQL 데이터 삽입 실행
if collections is not None and pg_data is not None:
    insert_result = insert_pg_data_to_mongodb(collections, pg_data)
else:
    print("컬렉션 또는 데이터가 없습니다.")
    print("다음을 확인하세요:")
    print("1. MongoDB 연결이 성공했는지")
    print("2. 컬렉션 생성이 완료되었는지")
    print("3. PostgreSQL 데이터가 로드되었는지")

PostgreSQL 데이터를 MongoDB에 삽입

📤 데이터 삽입 시작 (3496개 문서)
   진행중: 50/3496 (1%)
   진행중: 100/3496 (2%)
   진행중: 150/3496 (4%)
   진행중: 200/3496 (5%)
   진행중: 250/3496 (7%)
   진행중: 300/3496 (8%)
   진행중: 350/3496 (10%)
   진행중: 400/3496 (11%)
   진행중: 450/3496 (12%)
   진행중: 500/3496 (14%)
   진행중: 550/3496 (15%)
   진행중: 600/3496 (17%)
   진행중: 650/3496 (18%)
   진행중: 700/3496 (20%)
   진행중: 750/3496 (21%)
   진행중: 800/3496 (22%)
   진행중: 850/3496 (24%)
   진행중: 900/3496 (25%)
   진행중: 950/3496 (27%)
   진행중: 1000/3496 (28%)
   진행중: 1050/3496 (30%)
   진행중: 1100/3496 (31%)
   진행중: 1150/3496 (32%)
   진행중: 1200/3496 (34%)
   진행중: 1250/3496 (35%)
   진행중: 1300/3496 (37%)
   진행중: 1350/3496 (38%)
   진행중: 1400/3496 (40%)
   진행중: 1450/3496 (41%)
   진행중: 1500/3496 (42%)
   진행중: 1550/3496 (44%)
   진행중: 1600/3496 (45%)
   진행중: 1650/3496 (47%)
   진행중: 1700/3496 (48%)
   진행중: 1750/3496 (50%)
   진행중: 1800/3496 (51%)
   진행중: 1850/3496 (52%)
   진행중: 1900/3496 (54%)
   진행중: 1950/3496 (55%)
   진행중: 2000/3496 (57%)
   진행중: 2050/3

In [7]:
# MongoDB sr_merged_product 데이터 조회 및 검증
def query_mongodb_sr_merged_product(collections):
    """MongoDB sr_merged_product 컬렉션에서 데이터 조회"""
    
    print("=" * 60)
    print("MongoDB sr_merged_product 데이터 조회")
    print("=" * 60)
    
    if collections is None:
        print("❌ 컬렉션 정보가 없습니다.")
        return
    
    try:
        sr_merged_product = collections.get("sr_merged_product")
        
        # MongoDB Collection 객체는 None과 비교
        if sr_merged_product is None:
            print("❌ sr_merged_product 컬렉션을 찾을 수 없습니다.")
            return
        
        # 1. 기본 통계
        total_docs = sr_merged_product.count_documents({})
        pg_docs = sr_merged_product.count_documents({"imported_from": "PostgreSQL"})
        
        print("\n📊 1. 컬렉션 통계:")
        print(f"   전체 문서 수: {total_docs:,}")
        print(f"   PostgreSQL 데이터: {pg_docs:,}")
        
        if total_docs == 0:
            print("❌ 데이터가 없습니다. 데이터 삽입을 먼저 실행해주세요.")
            return
        
        # 2. 샘플 데이터 조회
        print("\n📋 2. 데이터 샘플 (처음 3개):")
        samples = list(sr_merged_product.find().limit(3))
        
        for idx, product in enumerate(samples, 1):
            print(f"\n   [{idx}] 제품 정보:")
            print(f"       상품명: {product.get('goods_nm', 'N/A')}")
            print(f"       모델코드: {product.get('mdl_code', 'N/A')}")
            print(f"       상품ID: {product.get('goods_id', 'N/A')}")
            print(f"       사이트: {product.get('site_cd', 'N/A')}")
            print(f"       카테고리: {product.get('disp_lv1', '')}/{product.get('disp_lv2', '')}/{product.get('disp_lv3', '')}")
            
            if product.get('sale_prc') is not None:
                print(f"       가격: {product.get('sale_prc'):,}원")
            
            if product.get('stock_qty') is not None:
                print(f"       재고: {product.get('stock_qty'):,}개")
            
            # spec 정보
            if product.get('spec') and isinstance(product.get('spec'), dict):
                spec_keys = list(product.get('spec').keys())[:3]
                print(f"       spec 정보: {spec_keys}")
        
        # 3. 가격 범위별 분포
        print("\n💰 3. 가격 범위별 제품 분포:")
        price_pipeline = [
            {"$match": {"sale_prc": {"$exists": True, "$ne": None}}},
            {"$bucket": {
                "groupBy": "$sale_prc",
                "boundaries": [0, 500000, 1000000, 2000000, 5000000, float('inf')],
                "default": "기타",
                "output": {
                    "count": {"$sum": 1},
                    "avg_price": {"$avg": "$sale_prc"}
                }
            }}
        ]
        
        try:
            price_results = list(sr_merged_product.aggregate(price_pipeline))
            price_labels = ["~50만원", "50~100만원", "100~200만원", "200~500만원", "500만원~"]
            
            for idx, result in enumerate(price_results):
                if result['_id'] != "기타":
                    label = price_labels[min(idx, len(price_labels)-1)]
                    print(f"   {label}: {result['count']:,}개 (평균: {result['avg_price']:,.0f}원)")
        except Exception as price_error:
            print(f"   ⚠️  가격 분포 분석 실패: {price_error}")
        
        # 4. 사이트별 통계
        print("\n📈 4. 사이트별 제품 통계:")
        site_pipeline = [
            {"$match": {"site_cd": {"$exists": True, "$ne": None}}},
            {"$group": {
                "_id": "$site_cd",
                "count": {"$sum": 1},
                "avg_price": {"$avg": "$sale_prc"},
                "max_price": {"$max": "$sale_prc"},
                "min_price": {"$min": "$sale_prc"},
                "avg_stock": {"$avg": "$stock_qty"}
            }},
            {"$sort": {"count": -1}},
            {"$limit": 10}
        ]
        
        try:
            site_results = list(sr_merged_product.aggregate(site_pipeline))
            for result in site_results:
                print(f"   사이트 '{result['_id']}':")
                print(f"      제품 수: {result['count']:,}개")
                if result.get('avg_price') is not None:
                    print(f"      평균 가격: {result['avg_price']:,.0f}원")
                if result.get('avg_stock') is not None:
                    print(f"      평균 재고: {result['avg_stock']:.0f}개")
        except Exception as site_error:
            print(f"   ⚠️  사이트별 통계 실패: {site_error}")
        
        # 5. 카테고리별 통계 (disp_lv1 기준)
        print("\n📦 5. 대분류별 제품 통계 (상위 10개):")
        category_pipeline = [
            {"$match": {"disp_lv1": {"$exists": True, "$ne": ""}}},
            {"$group": {
                "_id": "$disp_lv1",
                "count": {"$sum": 1},
                "avg_price": {"$avg": "$sale_prc"},
                "brands": {"$addToSet": "$disp_lv2"}
            }},
            {"$sort": {"count": -1}},
            {"$limit": 10}
        ]
        
        try:
            category_results = list(sr_merged_product.aggregate(category_pipeline))
            for result in category_results:
                print(f"   {result['_id']}: {result['count']:,}개")
                if result.get('avg_price') is not None:
                    print(f"      평균 가격: {result['avg_price']:,.0f}원")
                if result.get('brands'):
                    print(f"      하위 카테고리: {len(result['brands'])}개")
        except Exception as cat_error:
            print(f"   ⚠️  카테고리 통계 실패: {cat_error}")
        
        # 6. spec 필드 분석
        print("\n🔧 6. spec 필드 분석:")
        try:
            spec_count = sr_merged_product.count_documents({
                "spec": {"$exists": True, "$ne": {}}
            })
            print(f"   spec 필드 보유 제품: {spec_count:,}개 ({spec_count*100//total_docs if total_docs else 0}%)")
            
            # spec 키 통계
            if spec_count > 0:
                spec_sample = sr_merged_product.find_one({"spec": {"$exists": True, "$ne": {}}})
                if spec_sample and spec_sample.get('spec'):
                    print(f"   spec 샘플 키: {list(spec_sample['spec'].keys())[:10]}")
        except Exception as spec_error:
            print(f"   ⚠️  spec 분석 실패: {spec_error}")
        
        # 7. 날짜 필드 분석
        print("\n📅 7. 출시일 분석:")
        try:
            date_count = sr_merged_product.count_documents({
                "release_date": {"$exists": True, "$ne": None}
            })
            print(f"   출시일 정보 보유: {date_count:,}개 ({date_count*100//total_docs if total_docs else 0}%)")
        except Exception as date_error:
            print(f"   ⚠️  날짜 분석 실패: {date_error}")
        
        # 8. 재고 상태 분석
        print("\n📦 8. 재고 상태:")
        stock_pipeline = [
            {"$match": {"stock_qty": {"$exists": True}}},
            {"$group": {
                "_id": None,
                "total_stock": {"$sum": "$stock_qty"},
                "avg_stock": {"$avg": "$stock_qty"},
                "max_stock": {"$max": "$stock_qty"},
                "out_of_stock": {"$sum": {"$cond": [{"$eq": ["$stock_qty", 0]}, 1, 0]}},
                "low_stock": {"$sum": {"$cond": [{"$and": [{"$gt": ["$stock_qty", 0]}, {"$lte": ["$stock_qty", 10]}]}, 1, 0]}}
            }}
        ]
        
        try:
            stock_results = list(sr_merged_product.aggregate(stock_pipeline))
            if stock_results:
                result = stock_results[0]
                print(f"   총 재고: {result.get('total_stock', 0):,}개")
                print(f"   평균 재고: {result.get('avg_stock', 0):.1f}개")
                print(f"   최대 재고: {result.get('max_stock', 0):,}개")
                print(f"   품절 상품: {result.get('out_of_stock', 0):,}개")
                print(f"   재고 부족(10개 이하): {result.get('low_stock', 0):,}개")
        except Exception as stock_error:
            print(f"   ⚠️  재고 분석 실패: {stock_error}")
        
        # 9. 검색 예시
        print("\n🔍 9. 텍스트 검색 예시:")
        
        # 텍스트 검색 (인덱스가 있는 경우)
        try:
            search_results = sr_merged_product.find(
                {"$text": {"$search": "갤럭시"}},
                {"score": {"$meta": "textScore"}}
            ).sort([("score", {"$meta": "textScore"})]).limit(5)
            
            search_count = 0
            for product in search_results:
                search_count += 1
                print(f"   • {product.get('goods_nm')} (점수: {product.get('score', 0):.2f})")
            
            if search_count == 0:
                raise Exception("텍스트 인덱스 검색 결과 없음")
                
        except:
            # 정규식 검색으로 대체
            try:
                regex_results = sr_merged_product.find(
                    {"goods_nm": {"$regex": "갤럭시", "$options": "i"}}
                ).limit(5)
                
                for product in regex_results:
                    print(f"   • {product.get('goods_nm')} ({product.get('mdl_code')})")
            except Exception as search_error:
                print(f"   ⚠️  검색 실패: {search_error}")
        
        # 10. 복잡한 쿼리 예시 - 고가 제품 중 재고가 있는 제품
        print("\n💎 10. 고가 제품 (200만원 이상, 재고 있음):")
        try:
            premium_products = sr_merged_product.find({
                "sale_prc": {"$gte": 2000000},
                "stock_qty": {"$gt": 0}
            }).sort("sale_prc", -1).limit(5)
            
            premium_count = 0
            for product in premium_products:
                premium_count += 1
                print(f"   • {product.get('goods_nm')}")
                print(f"      가격: {product.get('sale_prc'):,}원")
                print(f"      재고: {product.get('stock_qty'):,}개")
                print(f"      사이트: {product.get('site_cd')}")
            
            if premium_count == 0:
                print("   해당 조건의 제품이 없습니다.")
        except Exception as premium_error:
            print(f"   ⚠️  고가 제품 조회 실패: {premium_error}")
        
    except Exception as e:
        print(f"❌ 조회 실패: {e}")
        import traceback
        traceback.print_exc()

# 데이터 조회 실행
if collections is not None:
    query_mongodb_sr_merged_product(collections)
else:
    print("컬렉션 정보가 없습니다. MongoDB 연결과 컬렉션 생성을 먼저 실행하세요.")

MongoDB sr_merged_product 데이터 조회

📊 1. 컬렉션 통계:
   전체 문서 수: 3,496
   PostgreSQL 데이터: 3,496

📋 2. 데이터 샘플 (처음 3개):

   [1] 제품 정보:
       상품명: Bespoke AI 원바디 21/20kg (177.8mm LCD)
       모델코드: WH90F2120GBHS
       상품ID: G200252235
       사이트: FN
       카테고리: 생활가전/세탁기/BESPOKE 그랑데 AI 원바디 Top-Fit
       가격: 2,932,000.0원
       재고: 6개
       spec 정보: ['스마트', '앱 연결', 'EMC/RF모듈']

   [2] 제품 정보:
       상품명: 갤럭시 S25 자급제 (스페셜 컬러)
       모델코드: SM-S931NZREKOO
       상품ID: G200245586
       사이트: FN
       카테고리: 모바일/갤럭시 스마트폰/갤럭시 S
       가격: 1,155,000.0원
       재고: 138개
       spec 정보: ['센서', '연결', '추가']

   [3] 제품 정보:
       상품명: 2025 NEW Bespoke AI 원바디 25/20kg (177.8mm LCD)
       모델코드: WH90F2520BBHS
       상품ID: G200252226
       사이트: FN
       카테고리: 생활가전/세탁기/BESPOKE 그랑데 AI 원바디 Top-Fit
       가격: 3,465,300.0원
       재고: 5개
       spec 정보: ['추가', '스마트', '앱 연결']

💰 3. 가격 범위별 제품 분포:
   ~50만원: 181개 (평균: 9,090,456원)
   50~100만원: 193개 (평균: 773,533원)
   100~200만원: 676개 (평균: 1,502,806원)
   200~500만원: 1,088개