## 0. Review 
### 1. 파이프 만들기
- pipeline.pkl에 파이프라인 저장

In [1]:
import os
os.getcwd()

'c:\\Users\\owner\\Documents\\week3'

In [None]:
import pandas as pd
from sklearn.compose import ColumnTransformer, make_column_selector as selector
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, MinMaxScaler, OneHotEncoder
import pickle

# Scaling ->MinMax,  Imputing-> mean, pkl 파일 생성해주세요.

# CSV 읽기
csv_path = "data.csv"      # <- 파일명 수정
target_col = "Pass.Fail"      # <- 타깃 컬럼명 수정

df = pd.read_csv(csv_path)

X = df.drop(columns=[target_col]) #df.drop(target_col, axis=1)
y = df[target_col]

# 전처리(결측치 + 스케일링 + 원핫)
numeric_transformer = Pipeline(steps=[
    ("imputer", SimpleImputer(strategy="mean")),
    ("scaler", StandardScaler())
])

categorical_transformer = Pipeline(steps=[
    ("imputer", SimpleImputer(strategy="most_frequent")),
    ("onehot", OneHotEncoder(handle_unknown="ignore"))
])

preprocess = ColumnTransformer(
    transformers=[
        ("num", numeric_transformer, selector(dtype_include="number")),
        ("cat", categorical_transformer, selector(dtype_exclude="number"))
    ],
    remainder="drop"
)

pipe = Pipeline(steps=[
    ("preprocess", preprocess)
])

with open("pipeline.pkl", "wb") as f:
    pickle.dump( { "pipeline": pipe},  f )
    
# 적용
X_piped = pipe.fit_transform(X)
X_processed = pd.DataFrame(X_piped, columns=X.columns)
X_processed[target_col] = y.values

### 2. API 만들기
- API URL을 호출하면 JSON으로 정해진 행들을 return

In [None]:
#.py 파일 
#pkl 읽으셔서 데이터프레임에 pipe로 전처리를 적용하신 후, 데이터프레임으로 만드셔서
#api로 서비스해보세요.
#with open("파일이름.pkl", "rb") as f:
# saved = pickle.load(f)
# saved["pipeline"]  
from fastapi import FastAPI
import pandas as pd
import numpy as np
app = FastAPI()
df = pd.read_csv("data.csv")
@app.get("/data")
def date_gen():
    x = df.sample( 5 )   
    return  x.to_dict(orient="records")      

In [5]:
import requests
import pandas as pd
r = requests.get("http://localhost:8000/data?row=2")
pd.DataFrame( r.json() )

Unnamed: 0,X20,X86,X87,X88,X113,X115,X116,X117,X119,X120,...,X527,X570,X571,X572,X573,X574,X575,X576,X577,Pass.Fail
0,1.4144,2.4021,0.985,1811.5779,0.9446,784.3665,0.9904,59.0527,0.9659,6.4008,...,7.5111,533.4909,1.63,8.6801,0.1303,3.0031,0.0397,1.627,7.9951,0
1,1.389,2.4398,0.9915,1791.7535,0.9467,637.5776,0.9906,58.9508,0.9791,6.296,...,8.7528,531.8455,2.2544,8.64,0.2994,3.0979,0.0888,1.6245,13.283,0


## DuckDB (덕DB): 분석(OLAP) 전용 임베디드 DB임, 컬럼 지향 구조라 대용량 집계·통계 처리 빠름, Parquet·CSV 파일을 로딩 없이 SQL로 바로 조회 가능함
- 단점: 다중 사용자 동시 접속 부적합함, 트랜잭션·권한 관리 기능 약함, 서비스용 운영 DB로 쓰기엔 부적절함
- 적합: EDA, 피처엔지니어링, 실험 분석용, ML 전처리·중간 결과 저장용

## SQLite: 파일 기반 경량 임베디드 DB임, 설치·설정 거의 필요 없음, 단일 파일로 관리·배포 쉬움
- 단점: 동시 쓰기 성능 매우 약함, 대용량 분석 성능 한계 있음, 사용자·권한·스키마 관리 기능 부족함
- 적합: 로컬 저장소, 소규모 단일 사용자 환경, 임시 결과 저장용

## PostgreSQL: 서버형 관계형 DB임, 다중 사용자·트랜잭션 처리에 강함, 권한·스키마·백업 체계 완성도 높음, 서비스 백엔드용으로 적합함
- 단점: 설치·운영 부담 있음, 로컬 단일 분석에는 과함, 대용량 분석 성능은 DuckDB보다 느릴 수 있음
- 적합: 운영 서비스 DB, 다중 사용자 시스템, 장기 데이터 관리

## 1. sqlite 사용

In [1]:
import pandas as pd

df = pd.DataFrame({
    "id": [1, 2, 3],
    "score": [0.8, 0.6, 0.9],
    "label": [1, 0, 1]
})


In [None]:
#api 결과를 데이터프레임으로 만드신 후에 sqlite에 apitable이라는 테이블에다가 저장
import sqlite3
from sqlalchemy import create_engine
import pandas as pd
import requests
r = requests.get("http://localhost:8000/data?row=2")
df=pd.DataFrame( r.json())   
engine = create_engine("sqlite:///example.db")
df.to_sql( "apitable", engine,  if_exists="append", index=False)
df_check = pd.read_sql_query( "SELECT * FROM apitable", engine)
print(df_check.head())    

In [None]:
from sqlalchemy.types import Integer, Float, Text
df.to_sql(
    "model_result",
    engine,
    if_exists="replace",
    index=False,
    dtype={
        "id": Integer(),
        "score": Float(),
        "label": Integer(),
        "comment": Text()
    }
)

3

In [4]:
df_check = pd.read_sql_query(
    "SELECT * FROM model_result",
    engine
)
print(df_check.head())

   id  score  label
0   1    0.8      1
1   2    0.6      0
2   3    0.9      1


## 2. duckdb 사용
- sqlalchemy보다는 connection 스타일 사용 권장

In [None]:
!pip install duckdb

In [None]:
#파일 기반으로 덕디비에 테이블을 생성해주세요
#creditset2.csv를 읽으셔서
#default10yr가 1인 레코드만 select해보세요.

import duckdb
con = duckdb.connect("example.duckdb")

In [15]:
#인메모리 기반
con = duckdb.connect(":memory:")

In [None]:
from datetime import datetime
datetime.now().strftime("%Y_%m_%d_%H_%M_%S")   
#API로 부터 데이터를 세 번 받아서 각각 df_2026_02_10_10_27_24.parquet 같은 이름으로 워킹디렉토리 밑 test라는 폴더에 파일들을 만드세요
import os
os.getcwd()
if os.path.exists("test") ==0:
    os.mkdir("test")
for i in range(4):
    df = pd.DataFrame( requests.get("http://localhost:8000/data?row=20").json())
    df.to_parquet(
        "test\\df"+datetime.now().strftime("%H_%M_%S")+".parquet")

In [36]:
con.execute("""CREATE TABLE model_result4 AS
SELECT * FROM 'test/*.parquet'""")
df_out = con.execute(""" SELECT * FROM model_result4""").df()
print(df_out)


            0         1         2         3         4         5         6  \
0   -0.139556 -0.519279  0.863004  1.354490 -0.034447  0.195597 -0.885449   
1   -2.083365 -1.187373 -0.158658  1.326465  1.237458 -2.225531  0.260611   
2    2.169757  0.834115  0.562515 -0.781065  0.135140 -0.424901  0.323409   
3    1.197852  1.278555  0.790887  0.235043  0.506112  0.553008  0.213513   
4    0.034889  0.834115  0.911083 -1.307305 -0.903583 -0.554909  0.339108   
..        ...       ...       ...       ...       ...       ...       ...   
155 -0.529551  0.173903  0.570773  0.671227  0.074266  0.439508  0.435393   
156  0.623375  1.040826  0.580555  0.081343 -0.620055  0.396110  0.363822   
157 -0.744233 -1.687737  0.463172 -1.586939 -0.171822 -1.837707  0.459250   
158 -1.873305  1.565815 -2.177937  0.978443 -1.402264 -0.009731  0.399607   
159  0.249668 -0.568681 -1.718188  0.176785 -1.384687 -0.834613  0.292250   

            7         8         9  ...        39        40        41  \
0  

- Insert

- api로 데이터를 가져오셔서 덕디비에 넣어주세요.
- api로 데이터 가져오셔서 parquet로 만드신 후에, 덕디비에서 읽어주세요.
- 데이터프레임.to_parquet("이름.parquet")

In [59]:
con.execute("""
INSERT INTO model_result
SELECT * FROM df
""")

<_duckdb.DuckDBPyConnection at 0x1c52d58c370>

- 파퀘, csv 읽기

In [None]:
df = con.execute("""
SELECT *
FROM '이름.parquet'
WHERE label = 1
""").df()


In [None]:
df = con.execute("""
SELECT *
FROM read_csv_auto('data.csv')
""").df()


- 테이블 등록 및 확인

In [60]:
con.register("tmp_df", df)
con.execute("CREATE TABLE t AS SELECT * FROM tmp_df")

<_duckdb.DuckDBPyConnection at 0x1c52d58c370>

In [61]:
df2 = con.execute("SELECT * FROM t").df()

In [64]:
con.execute("SHOW TABLES").df()

Unnamed: 0,name
0,t
1,tmp_df


In [63]:
con.execute("DROP TABLE IF EXISTS model_result")

<_duckdb.DuckDBPyConnection at 0x1c52d58c370>

- parquet + duckdb
 - Parquet: 컬럼 지향 파일 포맷 (빠름, 압축 좋음)
 - DuckDB: Parquet를 로드 없이 SQL로 바로 조회 가능한 분석용 DB, read_parquet()

In [25]:
#parquet 파일 생성
import pandas as pd

df = pd.DataFrame({
    "user_id": [1, 2, 3, 4],
    "age": [23, 35, 29, 41],
    "score": [88.5, 72.0, 91.2, 65.3]
})

df.to_parquet("sample.parquet", engine="pyarrow")


In [26]:
#parquet에 대해 직접 쿼리
import duckdb

con = duckdb.connect()

result = con.execute("""
    SELECT 
        age,
        AVG(score) AS avg_score
    FROM read_parquet('sample.parquet')
    GROUP BY age
""").fetchdf()

print(result)


   age  avg_score
0   23       88.5
1   29       91.2
2   35       72.0
3   41       65.3


In [28]:
#폴더 내 여러 parquet 파일에 대해 쿼리
con.execute("""
    SELECT COUNT(*) 
    FROM read_parquet('*.parquet')
""").fetchone()


(4,)

In [29]:
#parquet 파일을 테이블로 생성 후 쿼리
con.execute("""
    CREATE TABLE users AS
    SELECT * FROM read_parquet('sample.parquet')
""")

con.execute("""
    SELECT age, COUNT(*) 
    FROM users
    GROUP BY age
""").fetchdf()


Unnamed: 0,age,count_star()
0,23,1
1,29,1
2,35,1
3,41,1


In [30]:
# Pandas → DuckDB
con.register("df_users", df)

con.execute("""
    CREATE TABLE users2 AS
    SELECT * FROM df_users
""")

# DuckDB → Parquet
con.execute("""
    COPY users2 TO 'users2.parquet' (FORMAT PARQUET)
""")


<_duckdb.DuckDBPyConnection at 0x238b9438af0>

## 3. Postgres SQL 접속을 위한 라이브러리 설치

In [None]:
!pip install psycopg2-binary sqlalchemy

In [90]:
DB_INFO = {
    "user": "postgres",
    "password": "12345",
    "host": "localhost",
    "port": 5432,
    "database": "postgres",
}

In [None]:
from sqlalchemy import create_engine

engine = create_engine(
    "postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}".format(**DB_INFO)
)


In [95]:
import pandas as pd

df = pd.DataFrame({
    "id": [1, 2, 3],
    "score": [0.8, 0.6, 0.9],
    "label": [1, 0, 1]
})


In [96]:
df.to_sql(
    name="test1",   # 테이블명
    con=engine,
    schema="public",       # 기본 스키마
    if_exists="replace",   # 'fail' | 'replace' | 'append'
    index=False
)


-1

In [None]:
from sqlalchemy import create_engine, inspect

engine = create_engine(
    "postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}".format(**DB_INFO)
)

inspector = inspect(engine)

tables = inspector.get_table_names(schema="public")
print(tables)

['test1']


In [100]:
query = "SELECT * FROM test1"

df = pd.read_sql_query(query, engine)

print(df.head())

   id  score  label
0   1    0.8      1
1   2    0.6      0
2   3    0.9      1


In [99]:
from sqlalchemy import text

query = text("""
    SELECT *
    FROM test1
""")

df = pd.read_sql_query(query, engine)

- Insert

In [103]:
from sqlalchemy import create_engine, text

engine = create_engine(
    "postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}".format(**DB_INFO)
)

sql = text("""
INSERT INTO test1 (id, score, label)
VALUES (:id, :score, :label)
""")

with engine.begin() as conn:
    conn.execute(sql, {
        "id": 1,
        "score": 0.85,
        "label": 1
    })


In [105]:
rows = [
    {"id": 1, "score": 0.8, "label": 1},
    {"id": 2, "score": 0.6, "label": 0},
    {"id": 3, "score": 0.9, "label": 1},
]

sql = text("""
INSERT INTO test1 (id, score, label)
VALUES (:id, :score, :label)
""")

with engine.begin() as conn:
    conn.execute(sql, rows)


In [None]:
with engine.connect() as conn:
    row_count = conn.execute(
        text("SELECT COUNT(*) FROM public.test1")
    ).scalar()

print(row_count)

In [106]:
with engine.begin() as conn:
    conn.execute(text("DROP TABLE IF EXISTS test1"))

# 4. Feature Engineering - Dimension Reduction
- 주성분 분석(Principal Component Analysis)

In [12]:
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA

# df: 원본 데이터프레임
df = pd.read_csv("data.csv")
X = df.select_dtypes(include="number").dropna()  # 숫자형만 + 결측 제거(간단 버전)

scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

pca = PCA(n_components=2, random_state=42)
X_pca = pca.fit_transform(X_scaled)

pca_df = pd.DataFrame(X_pca, columns=["PC1", "PC2"], index=X.index)

# 분산 설명력
explained = pca.explained_variance_ratio_
print("Explained variance ratio:", explained)
print("Cumulative:", explained.cumsum())


Explained variance ratio: [0.1130588  0.09530394]
Cumulative: [0.1130588  0.20836274]


In [13]:
pca95 = PCA(n_components=0.95, random_state=42)  # 누적 설명력 95% 되게끔 자동 선택
X_pca95 = pca95.fit_transform(X_scaled)

print("선택된 주성분 개수:", pca95.n_components_)
print("누적 설명력:", pca95.explained_variance_ratio_.sum())


선택된 주성분 개수: 20
누적 설명력: 0.9605695116891404


In [14]:
loadings = pd.DataFrame(
    pca.components_.T,              # (변수 수, PC 수)
    index=X.columns,
    columns=[f"PC{i+1}" for i in range(pca.n_components_)]
)

loadings_abs_top = loadings.abs().sort_values("PC1", ascending=False).head(10)
print(loadings_abs_top)


           PC1       PC2
X576  0.404086  0.032521
X577  0.402648  0.035784
X572  0.401545  0.032451
X574  0.400945  0.032008
X573  0.391738  0.032990
X575  0.388812  0.031893
X570  0.158004  0.016067
X359  0.045681  0.061476
X221  0.043800  0.059928
X493  0.042446  0.059310


In [15]:
from sklearn.pipeline import Pipeline

pipe = Pipeline([
    ("scaler", StandardScaler()),
    ("pca", PCA(n_components=2, random_state=42)),
])

X_pca = pipe.fit_transform(X)


In [16]:
X_pca

array([[-0.45063158,  0.50756164],
       [-0.69738425, -0.30283182],
       [-0.91839133,  0.42056117],
       ...,
       [-0.25819884,  0.18735053],
       [-0.29790915,  0.21797684],
       [ 0.36534702,  0.59866735]], shape=(1567, 2))

# 5. ETL 대시보드 연습
- streamlit_day3_ui.py를 참고해서 사이드바 구조로 ETL 대시보드를 만들기
- 기능1: 파일 업로드 후 데이터프레임 생성
- 기능2: 업로드 파일 EDA

In [None]:
#seaborn 활용

In [None]:
#.py 파일
import streamlit as st
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

st.set_page_config(page_title="Multi Page", layout="wide")

def page_home():
    st.title("Home")
    st.subheader("현황")    

#upload
def data_upload():   
    uploaded_file = st.file_uploader(label="Select a file", type=["csv"])
    if uploaded_file is not None:
        df = pd.read_csv(uploaded_file)
        st.write(df.head(10))
        st.session_state["df"] = df #다른 함수에서도 접근 가능하도록 세션 지정
        st.success("데이터 저장 완료")

#explore
def data_eda():  
    tmp_df = st.session_state["df"]
    g = sns.pairplot(data=tmp_df )
    st.pyplot(g.fig)  

def data_eda2():  
    tmp_df = st.session_state["df"]
    fig, ax = plt.subplots()
    sns.heatmap(data=tmp_df.corr() )
    st.pyplot(fig)  

pages = {
    "Home": page_home,
    "Data upload": data_upload,
    # 내용을 추가해보세요
}

st.sidebar.subheader("처리 선택")
choice = st.sidebar.selectbox("이동", list(pages.keys()))
pages[choice]()


# 6. ETL 대시보드 연습2
- 기능1: API로 부터 데이터 수집하여 api_data에 csv를 생성
- 기능2: api_data 폴더 내 csv를 읽어서 전처리 후 working directory에 하나의 csv 생성
- 파일 생성 시 현재 연월일시분초를 파일명에 반영
- from datetime import datetime
- datetime.now().strftime("%Y_%m_%d_%H_%M_%S")]

- 기능3: 수집한 csv들을 postgres db(설치 문제 있을 경우, sqlite)에 넣고, 처리된 csv는 별도 폴더(api_backup)로 이동
- 기능4: db 내 내용 조회