In [110]:
# 테이블 별 한글명 및 영문명 매칭

# 1. 상품 정보 테이블

goods_info_table = {
    '제목' : 'title',
    '상품가격' : 'price',
    '판매처URL' : 'connect_url',
    '상품타입' : 'type',
    '상품상태' : 'status',
    '브랜드' : 'brand',
}


category_table = {
    '상위카테고리명' : 'top_category',
    '하위카테고리명' : 'bottom_category' 
}


option_table = {
    '색깔' : 'color',
    '사이즈' : 'size'
}

image_tabel = {
    '상품이미지URL' : 'product_url'
}

In [111]:
try:
    import pymysql
except:
    print('you need to install pymysql\n$ : python -m pip install pymysql')
    exit()
import traceback

from typing import Iterable
from typing import Union

class MariaDB:
    """
    MariaDB
    """

    def __init__(self, db_config:dict, cursor_type="tuple") -> None:
        """
        생성자 메서드
        인스턴스 생성 시 db_config를 전달받아 DB에 연결합니다.
        
        **db_config**
            host=database host (localhost)
            port=port (3306)
            user=username (root)
            password=password (1q2w3e)
            database=database name (testdb)
            charset=charcter encoding (utf8mb4)
        """

        db_config['port'] = int(db_config.get('port', '3306'))
        self.DB = pymysql.connect(**db_config)

        if cursor_type == 'dict':
            self.cursor_type = pymysql.cursors.DictCursor
        else:
            self.cursor_type = None
            
        return
    
    def __del__(self):
        """
        인스턴스 소멸 시 DB 연결 해제
        """
        self.DB.close()

    def select(self, column_qry:str, table:str, limit=None, offset=None, order_by=None, where_condition=[]) -> tuple:
        """
        Select

        ex) 
        column_qry = "*" : 모든열
        column_qry = "id, name, email" : 열이름
        table = "esg_project" : 테이블이름

        """

        sql_qr = "SELECT {0} FROM {1}".format(column_qry, table)
        if order_by:
            sql_qr += ' ORDER BY {}'.format(order_by)
        if limit:
            sql_qr += ' LIMIT {}'.format(limit)
        if offset:
            sql_qr += ' OFFSET {}'.format(offset)
        if where_condition:
            for i, (col, eq, val) in enumerate(where_condition):
                is_equal = '=' if eq else '!='
                is_multiple = ' AND' if i > 1 else ' WHERE'
                sql_qr += f'{is_multiple} {col}{is_equal}{val}'


        with self.DB.cursor() as cur:
            cur.execute(sql_qr)
            return cur.fetchall()
        

    def insert(self, table:str, columns: str, value: tuple) -> Union[int, bool]:
        """
        Insert Data
        
        example)
        table = "Students"
        columns = "name, email, phone"
        values = ('이름', '이메일', '번호')
        """

        sql_qr = f"INSERT INTO {table}({columns}) " \
                  "VALUES (" +','.join(["%s"]*len(value)) +")"
        # args = values
        
        try:
            with self.DB.cursor() as cur:
                cur.execute(sql_qr, value)
                self.DB.commit()
            return cur.lastrowid
        except:
            traceback.print_exc()
            self.DB.rollback()
            return False
    
    # values는 list 형식으로 넣었음, args로 함
    def insert_many(self, table: str, columns: str, values: list) -> bool:
        """
        Insert Many Datas
        
        example)
        table = "테이블이름"
        columns = "name, email, phone" 열이름들
        values = [
            ('hong gildong', 'hgd123@gmail.com', '01012345678'),
            ...
        ] 각 밸류 값들
        """
        num_columns = len(columns.split(','))
        sql = f"INSERT INTO {table} ({columns}) VALUES (" + ','.join(["%s"] * num_columns) + ");"
        try:
            with self.DB.cursor() as cur:
                cur.executemany(sql, values)
                self.DB.commit()
            return True
        except Exception as e:
            print(f"An error occurred: {e}")
            traceback.print_exc()
            self.DB.rollback()
            return False


    def update(self, table:str, set_columns:list[str], set_values:list[str], where_column:str, where_value) -> bool:
        """
        Update
        
        example)
        table = "Students"
        set_column = ["name"]
        set_value = ["jason"]
        where_column = "id"
        where_value = "1"
        """

        set_statement = ', '.join('{}="{}"'.format(sc, sv) for sc, sv in zip(set_columns, set_values))
        sql = "UPDATE {0} " \
            "SET {1} " \
            "WHERE {2}={3};".format(table, set_statement, where_column, where_value)
        try:
            with self.DB.cursor() as cur:
                cur.execute(sql)
                self.DB.commit()
            return True
        except:
            traceback.print_exc()
            self.DB.rollback()
            return False
        

    def truncate(self, table:str, forcing=True) -> bool:
        """
        truncate table
        """
        try:
            with self.DB.cursor() as cursor:
                cursor.execute(f'SET FOREIGN_KEY_CHECKS = {int(forcing)}; TRUNCATE TABLE {table};')
                self.DB.commit()
                return True
        except:
            return False

In [1]:
import pandas as pd

In [112]:
# 무신사
musinsa_df = pd.read_csv('./db_data_to_insert_csv/musinsa_data.csv',na_values='null')
musinsa_df['size'] = 'NaN'
musinsa_df.loc[musinsa_df['status'] == 's', 'status'] = '새 상품'

# 에이블리
ably_df = pd.read_csv('./db_data_to_insert_csv/ably_data.csv', na_values='null')
ably_df.loc[ably_df['status'] == 's', 'status'] = '새 상품'

# 세컨드웨어
secondwear_df = pd.read_csv('./db_data_to_insert_csv/secondwear_data.csv',na_values='null')
secondwear_df['connect_url'] = [f'https://www.hellomarket.com/item/{connect_url.split("/")[-1]}?viewLocation=search_result' for connect_url in secondwear_df['connect_url']]

# 통합과정

# 칼럼 이름 정렬
columns = sorted(musinsa_df.columns)

# 각 데이터프레임의 칼럼 이름 정렬
musinsa_df = musinsa_df[columns]
ably_df = ably_df[columns]
secondwear_df = secondwear_df[columns]

# 데이터프레임 합치기
all_df = pd.concat([musinsa_df, ably_df, secondwear_df], ignore_index=True)
all_df['price'] = all_df['price'].astype(int)
all_df['type'] = all_df['type'].astype(int)

# 열 순서 정렬 및 결과 확인
all_df = all_df[['top_category','bottom_category','title','price','connect_url','product_url','type','status','brand','color','size']]


In [113]:
df_shuffled = all_df.sample(frac=1).reset_index(drop=True)
df_sample = df_shuffled[:100]

In [None]:
db_config = {
    'host' : 'localhost',
    'port' : '3306',
    'user' : 'closet',
    'password' : 'Y7&hK!2zV9$pL3#x',
    'database' : 'db_closet',
    'charset' : 'utf8mb4',
}


maria_db = MariaDB(db_config)

In [None]:
# 이미지 테이블 -> product_url 
table = 'product_image'
columns = 'product_url'
values = list(df_sample['product_url'])
success = maria_db.insert_many(table, columns, values)
print("Insert successful:", success)


# table = 'product_category'
# columns = 'top_category, bottom_category'
# values = [category_composition for category_composition in set(zip(all_df['top_category'], all_df['bottom_category']))]
# success = maria_db.insert_many(table, columns, values)
# print("Insert successful:", success)




table = 'product_option'
columns = 'color, size'
values = [option_composition for option_composition in set(zip(df_sample['color'], df_sample['size']))]
success = maria_db.insert_many(table, columns, values)
print("Insert successful:", success)
  



table = 'product'
columns = 'title, price, connect_url, type, status, brand'
values = [product_composition for product_composition in zip(df_sample['title'], df_sample['price'],df_sample['connect_url'], df_sample['type'],df_sample['status'], df_sample['brand'])]
success = maria_db.insert_many(table, columns, values)
print("Insert successful:", success)