## 팀프로젝트4

### 라이브러리

In [None]:
import pandas as pd
import os
from sqlalchemy import create_engine, inspect
from google.cloud import storage, bigquery

### MySQL에서 DB에서 table을 가져와서 확인

In [None]:
## mysql에서 데이터를 가져와서 확인만 하는 용도

# MySQL 연결 설정
db_user = ""
db_password = "!"
db_host = ""  # 또는 MySQL 호스트 주소
db_port = ""       # 기본 포트
databases = ["A", "B"]


# 결과 저장용 딕셔너리
all_dataframes = {}

# 데이터베이스별로 처리
for db_name in databases:
    print(f"Processing database: {db_name}")
    # SQLAlchemy 엔진 생성
    engine = create_engine(f"mysql+pymysql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}")
    
    try:
        # 테이블 이름 가져오기
        inspector = inspect(engine)
        table_names = inspector.get_table_names()
        
        # 각 테이블 데이터 가져오기
        db_dataframes = {}
        for table in table_names:
            query = f"SELECT * FROM {table}"
            with engine.connect() as connection:
                db_dataframes[table] = pd.read_sql(query, con=connection)
                print(f"Loaded table '{table}' from database '{db_name}'.")
        
        # 결과 저장
        all_dataframes[db_name] = db_dataframes
        
    except Exception as e:
        print(f"Error processing database {db_name}: {e}")
    finally:
        # 엔진 닫기
        engine.dispose()

# 결과 확인
for db_name, tables in all_dataframes.items():
    print(f"Database: {db_name}")
    for table_name, df in tables.items():
        print(f"Table: {table_name} (Rows: {len(df)})")
        print(df.head(), "\n")


### MySQL DB에서 데이터 추출 후 로컬에 저장

In [None]:
### mysql에서 로컬로 데이터

# MySQL 연결 설정
db_user = ""
db_password = "!"
db_host = ""  # 또는 MySQL 호스트 주소
db_port = ""       # 기본 포트
databases = ["A", "B"]

# 저장 경로 설정
output_dir = "./origin_data"
os.makedirs(output_dir, exist_ok=True)  # 저장 디렉토리 생성

# 데이터베이스별로 처리
for db_name in databases:
    print(f"Processing database: {db_name}")
    # SQLAlchemy 엔진 생성
    engine = create_engine(f"mysql+pymysql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}")
    
    try:
        # 테이블 이름 가져오기
        inspector = inspect(engine)
        table_names = inspector.get_table_names()
        
        # 각 테이블 데이터 가져오기
        for table in table_names:
            query = f"SELECT * FROM {table}"
            with engine.connect() as connection:
                df = pd.read_sql(query, con=connection)
                
                # Parquet 파일로 저장
                parquet_file = os.path.join(output_dir, f"{db_name}_{table}.parquet")
                df.to_parquet(parquet_file, index=False)
                print(f"Saved table '{table}' from database '{db_name}' to '{parquet_file}'.")

    except Exception as e:
        print(f"Error processing database {db_name}: {e}")
    finally:
        # 엔진 닫기
        engine.dispose()

print("\nData extraction and Parquet file saving completed.")

# 저장된 Parquet 파일 읽기
print("\nLoading Parquet files:")
parquet_files = [f for f in os.listdir(output_dir) if f.endswith(".parquet")]
for parquet_file in parquet_files:
    full_path = os.path.join(output_dir, parquet_file)
    df = pd.read_parquet(full_path)
    print(f"Loaded {parquet_file} (Rows: {len(df)})")
    print(df.head(), "\n")


### data 파일 용량 확인

In [1]:
# 파일 용량 확인
def get_csv_file_sizes(directory):
    file_sizes = {}
    for filename in os.listdir(directory):
        if filename.endswith('.parquet'):
            file_path = os.path.join(directory, filename)
            file_size_bytes = os.path.getsize(file_path)
            file_size_mb = file_size_bytes / (1024 * 1024)
            file_sizes[filename] = file_size_mb
    return file_sizes

In [None]:
import os

# directory = '데이터가 저장된 위치'
directory = './origin_data'
csv_file_sizes = get_csv_file_sizes(directory)
sorted_dict = dict(sorted(csv_file_sizes.items(), key=lambda item: item[1], reverse=True))
for filename, size in sorted_dict.items():
    print(f'{filename:<50}: {size:.2f} MB')

### 데이터 미리보기
- 지정된 dataset이 있는 폴더내의 parquet파일을 전부 불러서 확인.
- pyarrow engine이 인식속도가 가장 빠르다고 함.

In [None]:
import pandas as pd

chunk_size = 100000

# Parquet 파일을 청크 단위로 읽어오는 예제
for filename in sorted_dict.keys():
    each_path = directory + '/' + filename
    df = pd.read_parquet(each_path, engine='pyarrow')
    # 청크 단위로 처리할 필요 없이, Parquet 파일은 기본적으로 전체 데이터를 한 번에 읽어옵니다.
    print(each_path.split('/')[-1], df.shape)
    display(df.head(5))
    display(df.dtypes)
    print('\n\n')

### local(data 폴더에서) -> GCS(Bucket) 업로드

In [None]:

def upload_parquet_to_gcs(data_type, bucket_name, json_key_file, local_folder):
    """
    GCS에 parquet 파일 업로드
    :param bucket_name: GCS 버킷 이름
    :param source_folder: 업로드할 로컬 폴더 경로
    :param json_key_file: GCS 서비스 계정 JSON 키 파일 경로
    :param destination_folder: GCS 내 저장 폴더 경로 (선택)
    """
    # GCS 클라이언트 생성
    client = storage.Client.from_service_account_json(json_key_file)
    bucket = client.bucket(bucket_name)

    for file_name in os.listdir(local_folder):
        if file_name.endswith(".parquet"):
            # 데이터베이스 이름과 테이블 이름 추출
            base_name, _ = os.path.splitext(file_name)  # 확장자 제거
            db_name, table_name = base_name.split("_", 1)  # 첫 '_' 기준 분리

            # GCS 폴더 경로 지정
            destination_blob_name = f"{data_type}/{db_name}/{table_name}.parquet"

            # GCS 업로드
            blob = bucket.blob(destination_blob_name)
            blob.upload_from_filename(os.path.join(local_folder, file_name))

            print(f"Uploaded {file_name} to gs://{bucket_name}/{destination_blob_name}")

# 실행 예제
data_type = "A"
bucket_name = "Bucket"  # GCS 버킷 이름
source_folder = "./"      # 데이터가 저장된 폴더위치
json_key_file = "./config/gcp_key.json"   # JSON 인증 파일 경로
upload_parquet_to_gcs(data_type, bucket_name, json_key_file, source_folder)

### GCS에서 데이터 불러오기
- 불러온 데이터 .temp라는 임시폴더에 데이터 저장(로컬형태로 함!)

In [None]:

# 환경 변수에 JSON 키 파일 설정
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "./config/gcp_key.json"

# GCS 클라이언트 및 BigQuery 클라이언트 초기화
storage_client = storage.Client()
bigquery_client = bigquery.Client()

def download_parquet_from_gcs(bucket_name, prefix):
    """
    GCS에서 Parquet 파일 다운로드 및 병합.
    :param bucket_name: GCS 버킷 이름
    :param prefix: 다운로드할 경로 (GCS 버킷 내부 폴더)
    :return: 병합된 Pandas 데이터프레임
    """
    bucket = storage_client.bucket(bucket_name)
    blobs = bucket.list_blobs(prefix=prefix)  # 지정된 경로의 파일 검색
    dfs = []  # 데이터프레임 저장 리스트

    for blob in blobs:
        if blob.name.endswith(".parquet"):
            print(f"Downloading: {blob.name}")
            local_file_path = f"./temp/{blob.name.split('/')[-1]}"  # 로컬 임시 저장 경로
            os.makedirs(os.path.dirname(local_file_path), exist_ok=True)
            blob.download_to_filename(local_file_path)  # 파일 다운로드
            df = pd.read_parquet(local_file_path)  # Parquet 파일 읽기
            dfs.append(df)

    if not dfs:
        print(f"No Parquet files found at prefix: {prefix}")
        return pd.DataFrame()  # 빈 데이터프레임 반환

    return pd.concat(dfs, ignore_index=True)  # 데이터프레임 병합

# GCS에서 Parquet 데이터 다운로드
# 실행 예제
if __name__ == "__main__":
    bucket_name = "finalproject_sprint"
    prefix = "hackle"  # GCS 내부의 특정 경로(버킷에서 파일이 저장된 폴더 이름.)
    dataset_name = "your_dataset_name"
    table_name = "your_table_name"

    # GCS에서 Parquet 데이터 다운로드
    merged_df = download_parquet_from_gcs(bucket_name, prefix)

### 빅쿼리에 데이터 업로드

In [None]:

def upload_to_bigquery(dataframe, dataset_name, table_name):
    """
    Pandas 데이터프레임을 BigQuery 테이블에 업로드.
    :param dataframe: 업로드할 데이터프레임
    :param dataset_name: BigQuery 데이터셋 이름
    :param table_name: BigQuery 테이블 이름
    """
    table_id = f"{bigquery_client.project}.{dataset_name}.{table_name}"
    job_config = bigquery.LoadJobConfig(write_disposition="WRITE_TRUNCATE")  # 기존 데이터 덮어쓰기
    job = bigquery_client.load_table_from_dataframe(dataframe, table_id, job_config=job_config)
    job.result()  # 작업 완료 대기

    print(f"Uploaded {len(dataframe)} rows to {table_id}.")


In [None]:
## 빅쿼리 데이터 업로드 실행 코드
if __name__ == "__main__":
    bucket_name = "your_gcs_bucket_name"
    prefix = "dict"  # GCS 내부의 특정 경로
    dataset_name = "your_dataset_name"
    table_name = "your_table_name"

    # GCS에서 Parquet 데이터 다운로드
    merged_df = download_parquet_from_gcs(bucket_name, prefix)

    if not merged_df.empty:
        # BigQuery로 데이터 업로드
        upload_to_bigquery(merged_df, dataset_name, table_name)
    else:
        print("No data to upload.")

### GCS로 부터 parquet파일 받아오기

In [None]:
import os
import pandas as pd
from google.cloud import storage, bigquery

# 환경 변수에 JSON 키 파일 설정
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "./config/gcp_key.json"

# GCS 클라이언트 및 BigQuery 클라이언트 초기화
storage_client = storage.Client()
bigquery_client = bigquery.Client()

def download_parquet_from_gcs(bucket_name, prefix):
    """
    GCS에서 Parquet 파일 다운로드 및 병합.
    :param bucket_name: GCS 버킷 이름
    :param prefix: 다운로드할 경로 (GCS 버킷 내부 폴더)
    :return: 파일별 데이터프레임 딕셔너리 (key: 파일 이름, value: 데이터프레임)
    """
    bucket = storage_client.bucket(bucket_name)
    blobs = bucket.list_blobs(prefix=prefix)  # 지정된 경로의 파일 검색
    dataframes = {}  # 파일별 데이터프레임 저장
    file_names = []

    for blob in blobs:
        if blob.name.endswith(".parquet"):
            file_name = blob.name.split("/")[-1].replace(".parquet", "")  # 파일 이름 추출
            print(f"Downloading: {blob.name}")
            # GCS에서 바로 메모리로 읽기
            with blob.open("rb") as file:
                df = pd.read_parquet(file, engine="pyarrow")
                dataframes[file_name] = df
                file_names.append(file_name)

    if not dataframes:
        print(f"No Parquet files found at prefix: {prefix}")
    return dataframes, file_names  # 파일 이름별 데이터프레임 딕셔너리 반환


### GCS에서 가져온 데이터 Bigquery로 이관

In [6]:

def upload_to_bigquery(dataframe, dataset_name, table_name):
    """
    Pandas 데이터프레임을 BigQuery 테이블에 업로드.
    :param dataframe: 업로드할 데이터프레임
    :param dataset_name: BigQuery 데이터셋 이름
    :param table_name: BigQuery 테이블 이름
    """
    table_id = f"{bigquery_client.project}.{dataset_name}.{table_name}"
    job_config = bigquery.LoadJobConfig(write_disposition="WRITE_TRUNCATE")  # 기존 데이터 덮어쓰기
    job = bigquery_client.load_table_from_dataframe(dataframe, table_id, job_config=job_config)
    job.result()  # 작업 완료 대기
    print(f"Uploaded {len(dataframe)} rows to {table_id}.")

In [None]:
## 실행, prefix, dataset_name은 동일하게 들어갈 것 같습니다.
if __name__ == "__main__":
    db_name = "H2"
    bucket_name = "finalproject_sprint" 
    prefix = db_name  # GCS 내부의 특정 경로(버킷에서 파일이 저장된 폴더 이름.)
    dataset_name = db_name  # 데이터셋 이름 지정

    # GCS에서 Parquet 데이터 다운로드
    dataframes = download_parquet_from_gcs(bucket_name, prefix)

    # 각 파일을 BigQuery로 업로드
    for file_name, df in dataframes.items():
        table_name = file_name  # 파일 이름을 테이블 이름으로 사용
        print(f"Uploading file '{file_name}' to BigQuery table '{table_name}'...")
        upload_to_bigquery(df, dataset_name, table_name)

In [None]:
## 그냥 데이터 다운로드만 하고 확인 할 때 사용.
## 실행, prefix, dataset_name은 동일하게 들어갈 것 같습니다.
if __name__ == "__main__":
    db_name = "H2"
    bucket_name = "finalproject_sprint"
    prefix = db_name  # GCS 내부의 특정 경로(버킷에서 파일이 저장된 폴더 이름.)
    dataset_name = db_name  # 데이터셋 이름 지정

    # GCS에서 Parquet 데이터 다운로드
    dataframes, file_names = download_parquet_from_gcs(bucket_name, prefix)
    print(f"DB name : {db_name}, table_name : {file_names}")

    # GCS에서 받은 parquet의 file이름으로 데이터 저장
    for file_name in file_names:
        globals()[file_name] = dataframes[f'{file_name}']

In [None]:
hackle_events = dataframes['hackle_events']

## 조금 중요한데 만약에 forloop를 이용해서 df의 len만큼 돌린다면 매우 큰 과부화가 걸릴 수 있음.
## 벡터화 연산을 이용하면 빠르게 변환이 가능. 혹은 np.arange로 동일하게 처리하는 방법을 사용!
hackle_events['id'] = range(1, len(hackle_events) + 1)

In [None]:
display(hackle_events.head())
print('hackle_events table shape is : ', hackle_events.shape)

In [10]:
hackle_events.to_parquet('parquet_data/hackle_hackle_events.parquet')

In [None]:
hackle_events['session_id'].value_counts()

In [None]:
device_properties

In [None]:
hackle_events

In [None]:
user_properties['user_id']

In [None]:
hackle_properties = dataframes['hackle_properties']
display(hackle_properties.head())
print("hackle_properties의 테이블 크기 : ", hackle_properties.shape)

In [None]:
display(set(hackle_properties['session_id']).intersection(set(hackle_properties['device_id'])))

len(set(hackle_properties['session_id']).intersection(set(hackle_properties['device_id'])))

In [None]:
hackle_properties['session_id'].value_counts()