In [None]:
# 필요한 라이브러리 불러오기
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import koreanize_matplotlib
import seaborn as sns
import os
import time
from ast import literal_eval  # 문자열로 저장된 딕셔너리를 진짜 딕셔너리로 변환
import multiprocessing as mp # session_start / session_end 매칭 시 병렬처리
import glob


# # GCS 파일 경로에서 데이터 불러오기 위한 라이브러리
# from google.cloud import storage
# from google.oauth2 import service_account

# 데이터 불러오기 및 확인    
- device_df_filtered 7개로 쪼갠 df들

In [3]:
# 데이터 불러오기
group_1_df = pd.read_parquet('/home/codeit_project_vm/codeit_project/codeit-project-docker/data/group_1_df.parquet')
group_2_df = pd.read_parquet('/home/codeit_project_vm/codeit_project/codeit-project-docker/data/group_2_df.parquet')
group_3_df = pd.read_parquet('/home/codeit_project_vm/codeit_project/codeit-project-docker/data/group_3_df.parquet')
group_4_df_1 = pd.read_parquet('/home/codeit_project_vm/codeit_project/codeit-project-docker/data/group_4_df_1.parquet')
group_4_df_2 = pd.read_parquet('/home/codeit_project_vm/codeit_project/codeit-project-docker/data/group_4_df_2.parquet')
group_5_df = pd.read_parquet('/home/codeit_project_vm/codeit_project/codeit-project-docker/data/group_5_df.parquet')
group_6_df = pd.read_parquet('/home/codeit_project_vm/codeit_project/codeit-project-docker/data/group_6_df.parquet')
group_7_df = pd.read_parquet('/home/codeit_project_vm/codeit_project/codeit-project-docker/data/group_7_df.parquet')

# 각 그룹별 형태 확인 
print(f"group_1_df: {group_1_df.shape}")
print(f"group_2_df: {group_2_df.shape}")
print(f"group_3_df: {group_3_df.shape}")
print(f"group_4_df_1: {group_4_df_1.shape}")
print(f"group_4_df_2: {group_4_df_2.shape}")
print(f"group_5_df: {group_5_df.shape}")
print(f"group_6_df: {group_6_df.shape}")
print(f"group_7_df: {group_7_df.shape}")

print(f"총합 확인: {group_1_df.shape[0] + group_2_df.shape[0] + group_3_df.shape[0] + \
    group_4_df_1.shape[0] + group_4_df_2.shape[0]+ group_5_df.shape[0] + group_6_df.shape[0] + group_7_df.shape[0]}")


group_1_df: (1303382, 3)
group_2_df: (3234971, 3)
group_3_df: (8484426, 3)
group_4_df_1: (5122630, 3)
group_4_df_2: (5127168, 3)
group_5_df: (4797121, 3)
group_6_df: (3081471, 3)
group_7_df: (2155756, 3)
총합 확인: 33306925


In [3]:
# 그룹별 device_id 고유값 개수 확인
print(f"group_1_df: {group_1_df['device_id'].nunique()}")
print(f"group_2_df: {group_2_df['device_id'].nunique()}")
print(f"group_3_df: {group_3_df['device_id'].nunique()}")
print(f"group_4_df_1: {group_4_df_1['device_id'].nunique()}")
print(f"group_4_df_2: {group_4_df_2['device_id'].nunique()}")
print(f"group_5_df: {group_5_df['device_id'].nunique()}")
print(f"group_6_df: {group_6_df['device_id'].nunique()}")
print(f"group_7_df: {group_7_df['device_id'].nunique()}")

group_1_df: 390929
group_2_df: 179144
group_3_df: 190563
group_4_df_1: 60542
group_4_df_2: 60543
group_5_df: 37864
group_6_df: 18785
group_7_df: 9418


In [4]:

# 기본 정보 확인 함수 
def show_df_info(df, df_name):
    """
    DataFrame의 기본 정보와 통계 요약을 출력하는 함수
    """
    print(f"\n{df_name} 형태:")
    display(df.shape)
    print(f"\n{df_name} 첫 5행:")
    display(df.head())
    print(f"\n{df_name} 정보:")
    df.info()
    print(f"\n{df_name} 통계 요약:")
    display(df.describe())
    print(f"\n{df_name} 통계 요약:")
    display(df.describe(include = 'O'))
    print(f"\n{df_name} 결측치 확인:")
    display(df.isnull().sum())
    # print(f"\n{df_name} 중복값 확인:")
    # print(df.iloc[:,1:].duplicated().sum())

# 세션 매칭 함수 적용

In [5]:
## 재재 수정본

# ▶︎ 1. 정확한 세션 매칭 함수: 가장 가까운 과거 start에 end 매칭
def match_sessions_buffer_corrected(group_df):
    """
    device_id별로 $session_start와 $session_end를 1:1로 정확히 매칭
    - end 이벤트가 오면 그보다 이전에 발생한 start 중 "가장 가까운" start에 매칭
    - 매칭되지 않은 start나 end는 NaT로 남김
    """
    matched_sessions = []

    for device_id, group in group_df.groupby('device_id'):
        starts = []
        used_indices = set()
        group = group.sort_values(by='event_datetime').reset_index(drop=True)

        for idx, row in group.iterrows():
            if row['event_key'] == '$session_start':
                starts.append({
                    'index': idx,
                    'start_time': row['event_datetime'],
                    'end_time': pd.NaT,
                    'matched': False
                })

        for idx, row in group.iterrows():
            if row['event_key'] == '$session_end':
                # 가능한 start 중 end보다 같거나 작고, 아직 매칭되지 않은 가장 최근 start 찾기
                candidates = [s for s in starts if not s['matched'] and s['start_time'] <= row['event_datetime']]
                if candidates:
                    latest_start = max(candidates, key=lambda x: x['start_time'])
                    latest_start['end_time'] = row['event_datetime']
                    latest_start['matched'] = True
                else:
                    # 매칭 안 되는 end는 따로 기록
                    matched_sessions.append({
                        'device_id': device_id,
                        'start_time': pd.NaT,
                        'end_time': row['event_datetime'],
                        'session_duration_sec': np.nan
                    })

        # 최종 세션 리스트로 구성
        for s in starts:
            matched_sessions.append({
                'device_id': device_id,
                'start_time': s['start_time'],
                'end_time': s['end_time'],
                'session_duration_sec': (
                    (s['end_time'] - s['start_time']).total_seconds() if pd.notna(s['end_time']) else np.nan
                )
            })
    return pd.DataFrame(matched_sessions)


def split_by_device_id(df, n_splits):
    """
    device_id 단위로 groupby 후 균등하게 분할하여 list로 반환
    하나의 device에 속한 모든 이벤트가 같은 덩어리에 있도록 유지함
    """
    grouped = list(df.groupby('device_id'))
    split_size = int(np.ceil(len(grouped) / n_splits))
    return [pd.concat([g for _, g in grouped[i:i + split_size]])
            for i in range(0, len(grouped), split_size)]


def match_sessions_parallel(df, n_cores=None):
    """
    device_id 기준으로 분할된 DataFrame을 병렬 처리하여 세션 매칭
    """
    if n_cores is None:
        n_cores = mp.cpu_count()

    # 정렬: 병렬 처리 전에 전체를 정렬해야 안정성 확보
    df = df.sort_values(by=['device_id', 'event_datetime']).reset_index(drop=True)
    
    # device_id 기준으로 분할
    df_splits = split_by_device_id(df, n_cores)

    # 병렬 실행
    with mp.Pool(n_cores) as pool:
        results = pool.map(match_sessions_buffer_corrected, df_splits)

    return pd.concat(results, ignore_index=True)


In [None]:
# 작은거부터!! 그룹 7
matched_group_7_df = match_sessions_parallel(group_7_df, n_cores=3) # 3m 56.9s

In [8]:
matched_group_7_df.to_parquet(f"../data/matched_group_7_df.parquet", engine="pyarrow", compression="snappy")

In [None]:
# 그룹 6
matched_group_6_df = match_sessions_parallel(group_6_df, n_cores=3) # 5분 16초

In [10]:
matched_group_6_df.to_parquet(f"../data/matched_group_6_df.parquet", engine="pyarrow", compression="snappy")

In [None]:
# 그룹 5
matched_group_5_df = match_sessions_parallel(group_5_df, n_cores=3) # 10분 13초

In [12]:
matched_group_5_df.to_parquet(f"../data/matched_group_5_df.parquet", engine="pyarrow", compression="snappy")

In [None]:
# 그룹 4_1
matched_group_4_df_1 = match_sessions_parallel(group_4_df_1, n_cores=3) # 17분 12초

In [14]:
matched_group_4_df_1.to_parquet(f"../data/matched_group_4_df_1.parquet", engine="pyarrow", compression="snappy")

In [None]:
# 그룹 4_2
matched_group_4_df_2 = match_sessions_parallel(group_4_df_2, n_cores=3)
matched_group_4_df_2.to_parquet(f"../data/matched_group_4_df_2.parquet", engine="pyarrow", compression="snappy") # 17분 1초

## group_1 ~ 3 다시 쪼개기!!! 

In [None]:
############ 그룹 1,2,3은 다시 쪼개기! ############

In [6]:
def split_group_by_device_id(df, n_parts):
    """
    device_id 기준으로 DataFrame을 n_parts로 분할
    각 device_id에 해당하는 모든 행이 같은 조각에 들어가도록 보장
    """
    unique_ids = df['device_id'].unique()
    split_ids = np.array_split(unique_ids, n_parts)

    return [df[df['device_id'].isin(split_id)] for split_id in split_ids]


In [7]:
# group_1 쪼개기
group_1_df_1, group_1_df_2, group_1_df_3 = split_group_by_device_id(group_1_df, n_parts=3)

# group_2 쪼개기
group_2_df_1, group_2_df_2 = split_group_by_device_id(group_2_df, n_parts=2)

# group_3 쪼개기
group_3_df_1, group_3_df_2, group_3_df_3 = split_group_by_device_id(group_3_df, n_parts=3)

# 확인
for i, g in enumerate([group_1_df_1, group_1_df_2, group_1_df_3], 1):
    print(f"group_1_df_{i}: {g.shape}, device_id: {g['device_id'].nunique()}")

for i, g in enumerate([group_2_df_1, group_2_df_2], 1):
    print(f"group_2_df_{i}: {g.shape}, device_id: {g['device_id'].nunique()}")

for i, g in enumerate([group_3_df_1, group_3_df_2, group_3_df_3], 1):
    print(f"group_3_df_{i}: {g.shape}, device_id: {g['device_id'].nunique()}")


group_1_df_1: (433672, 3), device_id: 130310
group_1_df_2: (433121, 3), device_id: 130310
group_1_df_3: (436589, 3), device_id: 130309
group_2_df_1: (1617482, 3), device_id: 89572
group_2_df_2: (1617489, 3), device_id: 89572
group_3_df_1: (2827046, 3), device_id: 63521
group_3_df_2: (2828521, 3), device_id: 63521
group_3_df_3: (2828859, 3), device_id: 63521


**최종 device_df 분할**    
group_1_df_1: (433672, 3), device_id: 130310<br>
group_1_df_2: (433121, 3), device_id: 130310<br>
group_1_df_3: (436589, 3), device_id: 130309<br>
group_2_df_1: (1617482, 3), device_id: 89572<br>
group_2_df_2: (1617489, 3), device_id: 89572<br>
group_3_df_1: (2827046, 3), device_id: 63521<br>
group_3_df_2: (2828521, 3), device_id: 63521<br>
group_3_df_3: (2828859, 3), device_id: 63521<br>
group_4_df_1: (5122630, 3), device_id: 60542<br>
group_4_df_2: (5127168, 3), device_id: 60543<br>
group_5_df: (4797121, 3), device_id: 37864<br>
group_6_df: (3081471, 3), device_id: 18785<br>
group_7_df: (2155756, 3), device_id: 9418<br>
총합 확인: 33306925

In [None]:
# 그룹 3-1
matched_group_3_df_1 = match_sessions_parallel(group_3_df_1, n_cores=3)
matched_group_3_df_1.to_parquet(f"../data/matched_group_3_df_1.parquet", engine="pyarrow", compression="snappy") # 14분 35초

In [None]:
# 그룹 3-2
matched_group_3_df_2 = match_sessions_parallel(group_3_df_2, n_cores=3)
matched_group_3_df_2.to_parquet(f"../data/matched_group_3_df_2.parquet", engine="pyarrow", compression="snappy") # 14분 10초

In [None]:
# 그룹 3-3
matched_group_3_df_3 = match_sessions_parallel(group_3_df_3, n_cores=3)
matched_group_3_df_3.to_parquet(f"../data/matched_group_3_df_3.parquet", engine="pyarrow", compression="snappy") # 14분 24초

In [None]:
# 그룹 2-1
matched_group_2_df_1 = match_sessions_parallel(group_2_df_1, n_cores=3)
matched_group_2_df_1.to_parquet(f"../data/matched_group_2_df_1.parquet", engine="pyarrow", compression="snappy") # 23분 

In [None]:
# 그룹 2-2
matched_group_2_df_2 = match_sessions_parallel(group_2_df_2, n_cores=3)
matched_group_2_df_2.to_parquet(f"../data/matched_group_2_df_2.parquet", engine="pyarrow", compression="snappy") # 22분 36초

In [None]:
# 그룹 1-1
matched_group_1_df_1 = match_sessions_parallel(group_1_df_1, n_cores=3)
matched_group_1_df_1.to_parquet(f"../data/matched_group_1_df_1.parquet", engine="pyarrow", compression="snappy") # 48분 19초

In [None]:
# 그룹 1-2
matched_group_1_df_2 = match_sessions_parallel(group_1_df_2, n_cores=3)
matched_group_1_df_2.to_parquet(f"../data/matched_group_1_df_2.parquet", engine="pyarrow", compression="snappy") # 45분 54초

In [None]:
# 그룹 1-3
matched_group_1_df_3 = match_sessions_parallel(group_1_df_3, n_cores=3)
matched_group_1_df_3.to_parquet(f"../data/matched_group_1_df_3.parquet", engine="pyarrow", compression="snappy") # 46분 26초

In [19]:
matched_group_1_df_3.head()

Unnamed: 0,device_id,start_time,end_time,session_duration_sec
0,aa6d0849-b138-41c8-92a8-f1d13585c9ec,2023-05-24 04:23:47,NaT,
1,aa6d0849-b138-41c8-92a8-f1d13585c9ec,2023-05-24 04:29:11,NaT,
2,aa6d7143-b240-4377-b3ec-e1d1731e7fb9,2023-05-13 08:38:41,NaT,
3,aa6d9b87-0e49-45ed-8ec6-8c9a5ffade89,2023-07-10 00:25:38,NaT,
4,aa6daf24-0059-4eaa-907e-f677de29dccb,2023-05-12 06:13:02,2023-05-12 06:13:40,38.0


In [None]:
# # group_1 ~ group_3까지 분할된 세션 매칭 진행 및 중간 저장 /// 왜 한꺼번에 하려고만 하면 무한 실행이냐...

# group_dfs = [
#     group_1_df_1, group_1_df_2, group_1_df_3,
#     group_2_df_1, group_2_df_2,
#     group_3_df_1, group_3_df_2, group_3_df_3
# ]

# results = []
# save_dir = "../data"  # 저장 경로
# os.makedirs(save_dir, exist_ok=True)

# for i, group_df in enumerate(group_dfs, 1):
#     print(f"🚀 Processing group {i} with shape {group_df.shape}")

#     try:
#         result = match_sessions_parallel(group_df, n_cores=3)

#         # 결과 저장
#         save_path = os.path.join(save_dir, f"group_{i}_matched.parquet")
#         result.to_parquet(save_path, engine="pyarrow", compression="snappy")

#         print(f"✅ group_{i} 저장 완료: {result.shape}")
#         results.append(result)

#     except Exception as e:
#         print(f"❌ group_{i} 처리 중 에러 발생: {e}")


# matched_group_df 다시 하나로 병합

In [None]:
# matched df 모두 합치기

# 데이터 디렉토리 지정
data_dir = "../data"

# matched_group_*.parquet 파일 경로 가져오기
matched_files = glob.glob(os.path.join(data_dir, "matched_group_*.parquet"))

# 파일이 있는지 확인
if not matched_files:
    raise FileNotFoundError("⚠️ matched_group_*.parquet 파일을 찾을 수 없습니다.")

# 병합
dfs = []
for f in matched_files:
    try:
        df = pd.read_parquet(f)
        dfs.append(df)
    except Exception as e:
        print(f"❌ 오류 발생: {f} → {e}")

matched_device_df = pd.concat(dfs, ignore_index=True)

# 저장
output_path = os.path.join(data_dir, "matched_device_df.parquet")
matched_device_df.to_parquet(output_path, engine="pyarrow", compression="snappy")

print(f"✅ 병합 및 저장 완료: {matched_device_df.shape}, 저장 위치 → {output_path}")


✅ 병합 및 저장 완료: (19984291, 4), 저장 위치 → ../data/matched_device_df.parquet


In [21]:
# 병합된 matched_device_df 확인하기
merged_df_test = pd.read_parquet("/home/codeit_project_vm/codeit_project/codeit-project-docker/data/matched_device_df.parquet")

print(merged_df_test.shape)
merged_df_test.head()

(19984291, 4)


Unnamed: 0,device_id,start_time,end_time,session_duration_sec
0,00018a1a-4204-4efe-8a7f-b4b591d7bb2d,2023-05-11 21:47:43,2023-05-11 21:48:05,22.0
1,00018a1a-4204-4efe-8a7f-b4b591d7bb2d,2023-05-11 21:48:06,2023-05-11 21:49:50,104.0
2,00018a1a-4204-4efe-8a7f-b4b591d7bb2d,2023-05-11 22:52:45,2023-05-12 00:49:43,7018.0
3,00018a1a-4204-4efe-8a7f-b4b591d7bb2d,2023-05-12 03:23:34,2023-05-12 03:24:09,35.0
4,00018a1a-4204-4efe-8a7f-b4b591d7bb2d,2023-05-12 03:26:37,2023-05-12 04:44:56,4699.0


In [None]:
merged_df_test['device_id'].nunique() # device_id 고유값 유실 없이 병합 완료!! 

947788