## 데이터 전처리
### 1. 데이터 불러오기

In [None]:
import pandas as pd
import os

load_path = './dataset/df_processed.parquet'
output_dir = './dataset/divide'

os.makedirs(output_dir, exist_ok=True) # 출력 폴더 생성 (존재하지 않으면 생성)

print(f"Loading data from {load_path}...")

# 데이터 불러오기 & 필터링
df_processed = pd.read_parquet(load_path, engine='fastparquet', filters=[('Trigger', '!=', 'others')], columns=['day', 'minute', 'HashApp', 'HashFunction', 'invocations'])

print("Data loaded successfully.")

print(f"Data shape: {df_processed.shape}")
print(df_processed.head())

### 2. 4개의 파일로 분리하기

In [None]:
# 4개의 파일로 분리하기
num_parts = 4
part_size = len(df_processed) // num_parts
for i in range(num_parts):
    start_idx = i * part_size
    end_idx = (i + 1) * part_size if i < num_parts - 1 else len(df_processed)
    df_part = df_processed.iloc[start_idx:end_idx]
    part_path = f"{output_dir}/df_part_{i+1}.parquet"
    
    # Parquet 형식으로 저장
    df_part.to_parquet(part_path, engine='pyarrow', index=False)
    print(f"Saved part {i+1} to {part_path}, shape: {df_part.shape}")

Saved part 1 to ./Dataset/Divide/df_part_1.parquet, shape: (222676200, 5)
Saved part 2 to ./Dataset/Divide/df_part_2.parquet, shape: (222676200, 5)
Saved part 3 to ./Dataset/Divide/df_part_3.parquet, shape: (222676200, 5)
Saved part 4 to ./Dataset/Divide/df_part_4.parquet, shape: (222676200, 5)


### 3. 데이터 재구성 및 저장

In [None]:
import pandas as pd
import os
from tqdm import tqdm

# 병렬 처리로 데이터 재구성 및 저장

# 입력 Parquet 파일들이 있는 디렉토리
input_dir = './dataset/divide'
print(f"Reading Parquet files from {input_dir}...") 

# 4개의 Parquet 파일 경로 목록
num_parts = 4
file_paths = [os.path.join(input_dir, f'df_part_{i+1}.parquet') for i in range(num_parts)]

# 모든 파일에서 고유한 함수를 찾아 FunctionID 매핑 생성
print("3-1: Creating FunctionID map from all files...")
function_map = {}
next_function_id = 0

for path in file_paths:
    df_ids = pd.read_parquet(path, engine='fastparquet', columns=['HashApp', 'HashFunction'])

    for app, func in df_ids.drop_duplicates().itertuples(index=False): # 중복 제거 후 반복
        if (app, func) not in function_map:
            function_map[(app, func)] = next_function_id
            next_function_id += 1

print(f"Total unique functions found: {len(function_map)}")

Reading Parquet files from ./dataset/divide...
3-1: Creating FunctionID map from all files...
Total unique functions found: 72359


In [None]:
from joblib import Parallel, delayed

output_dir = './dataset/temp_results'
os.makedirs(output_dir, exist_ok=True)  # 출력 폴더 생성 (존재하지 않으면 생성)

def process_function_and_save(app_hash, func_hash, function_id, file_paths, output_dir):
    function_dfs = []
    for path in file_paths:
        try:
            df = pd.read_parquet(
                path,
                engine='fastparquet',
                filters=[('HashApp', '=', app_hash), ('HashFunction', '=', func_hash)],
                columns=['day', 'minute', 'invocations']
            )
            if not df.empty:
                df['FunctionID'] = function_id
                function_dfs.append(df)
        except Exception as e:
            print(f"Warning: Could not read {path} for function {(app_hash, func_hash)}. Error: {e}")
    
    if function_dfs:
        # 이 함수에 대한 모든 데이터를 하나로 합침
        reconstructed_df = pd.concat(function_dfs, ignore_index=True)

        # 결과 DataFrame을 디스크에 저장
        output_path = os.path.join(output_dir, f'func_{function_id}.parquet')
        reconstructed_df.to_parquet(output_path)

        # DataFrame 객체 대신 파일 경로(문자열)를 반환
        return output_path
    return None

# 함수 단위로 데이터 읽고 재구성 후 디스크에 저장
print("3-2: Reconstructing data for each function and saving to disk...")
unique_functions = list(function_map.keys())
result_paths = Parallel(n_jobs=-1, backend="loky")(
    delayed(process_function_and_save)(app_hash, func_hash, function_map[(app_hash, func_hash)], file_paths, output_dir)
    for app_hash, func_hash in tqdm(unique_functions, desc="Processing Functions")
)

## 데이터 분할 코드

In [None]:
import pandas as pd
import os
import glob
from tqdm import tqdm

# 원본 데이터가 있는 디렉토리
SOURCE_DIR = './dataset/temp_results'

# 분할된 데이터를 저장할 상위 디렉토리
OUTPUT_DIR = './split_dataset'

# 데이터 분할 기준 (day 컬럼 값)
# Train: 1-8일, Validation: 9-11일, Test: 12-14일
TRAIN_END_DAY = 8
VALIDATION_END_DAY = 11

# 출력 디렉토리 생성
train_path = os.path.join(OUTPUT_DIR, 'train')
val_path = os.path.join(OUTPUT_DIR, 'val')
test_path = os.path.join(OUTPUT_DIR, 'test')

os.makedirs(train_path, exist_ok=True)
os.makedirs(val_path, exist_ok=True)
os.makedirs(test_path, exist_ok=True)

print(f"Output directories created/ensured at: {OUTPUT_DIR}")

Output directories created/ensured at: ./split_dataset


### 데이터 분할 및 저장

In [None]:
# 원본 디렉토리에서 모든 func_...parquet 파일 목록 가져오기
all_files = glob.glob(os.path.join(SOURCE_DIR, 'func_*.parquet'))

print(f"Found {len(all_files)} function files. Starting the split process...")

# tqdm을 사용하여 진행 상황 표시
for file_path in tqdm(all_files, desc="Splitting data"):
    try:
        # 함수 파일 하나를 메모리로 로드
        df = pd.read_parquet(file_path)
        
        # 원본 파일 이름 유지
        filename = os.path.basename(file_path)
        
        # 'day' 컬럼을 기준으로 데이터프레임을 세 조각으로 분리
        train_df = df[df['day'] <= TRAIN_END_DAY]
        val_df = df[(df['day'] > TRAIN_END_DAY) & (df['day'] <= VALIDATION_END_DAY)]
        test_df = df[df['day'] > VALIDATION_END_DAY]
        
        # 각 조각을 해당하는 디렉토리에 저장
        # 데이터가 있는 경우에만 파일을 저장합니다.
        if not train_df.empty:
            train_df.to_parquet(os.path.join(train_path, filename), index=False)
            
        if not val_df.empty:
            val_df.to_parquet(os.path.join(val_path, filename), index=False)

        if not test_df.empty:
            test_df.to_parquet(os.path.join(test_path, filename), index=False)

    except Exception as e:
        print(f"\nError processing {file_path}. Error: {e}")

print("\n✅ Data splitting complete!")

### 데이터 샘플링 및 Z-정규화

In [1]:
# 설정
import pandas as pd
import numpy as np
import os
import glob
from tqdm import tqdm
from tslearn.preprocessing import TimeSeriesScalerMeanVariance
from joblib import Parallel, delayed

# TRAIN_DATA_DIR = './dataset/split_dataset/train'
# OUTPUT_DATASET_PATH = './dataset/preprocessed_training_dataset.npy'

TEST_DATA_DIR = './dataset/split_dataset/test'
OUTPUT_DATASET_PATH = './dataset/preprocessed_test_dataset.npy'

N_SAMPLES_PER_FUNCTION = 3
ALL_DAYS = list(range(12, 15)) # Train: 1-8, Validation: 9-11, Test: 12-14

In [2]:
# 단일 파일을 처리하는 함수
def process_single_file(file_path):
    """하나의 Parquet 파일을 읽어 4일치를 샘플링하고, wide format의 NumPy 배열로 반환"""
    try:
        df_func = pd.read_parquet(file_path)
        print(f"Processing {file_path}: initial shape {df_func.shape}")

        # 1) 중복된 (day, minute) 항목 합산
        # 동일한 (day, minute) 조합이 여러 번 나타날 수 있으므로, 이를 합산
        df_func_agg = df_func.groupby(['day', 'minute'], as_index=False).agg({'invocations': 'sum'})

        # 2) 파일에 실제로 존재하는 날짜 확인
        available_days = df_func['day'].unique()
        print(f"Available days in file: {available_days}")
        
        # 3) 파일에 최소 4일치 이상의 데이터가 있는지 확인
        if len(available_days) >= N_SAMPLES_PER_FUNCTION:
            # 4) 존재하는 날짜 '중에서' 4일을 무작위 샘플링
            sampled_days = np.random.choice(ALL_DAYS, size=N_SAMPLES_PER_FUNCTION, replace=False)
            # 이제 중복이 제거된 df_func_agg에서 샘플링된 날짜만 필터링
            df_sampled = df_func_agg[df_func_agg['day'].isin(sampled_days)]
            print(f"Days actually found after filtering: {df_sampled['day'].unique()}")

            # 5) 피봇 및 결측치 처리
            # NaN 값을 0으로 채워서 불완전한 데이터를 완전하게 만듭니다.
            # 즉, 호출 기록이 없는 minute는 0으로 간주합니다.
            df_wide = df_sampled.pivot(index='day', columns='minute', values='invocations').fillna(0)

            del df_func_agg, df_sampled, df_func  # 메모리 관리

        # 6) 조건 확인 및 반환
        if len(df_wide) == N_SAMPLES_PER_FUNCTION:
            return df_wide.to_numpy()
        
    except Exception as e:
        # 오류 발생 시 상세 내용을 출력하면 디버깅에 도움이 됨
        print(f"Error in process_single_file for {file_path}: {e}")
        return None
    
    return None

In [3]:
# 병렬 처리로 샘플링 실행
print(f"Step 1: Sampling {N_SAMPLES_PER_FUNCTION} days from each function in parallel...")
all_files = glob.glob(os.path.join(TEST_DATA_DIR, 'func_*.parquet'))

# n_jobs=-1: 사용 가능한 모든 CPU 코어를 사용
# delayed: 함수와 인자를 묶어 나중에 실행하도록 예약
results = Parallel(n_jobs=-1, verbose=1)(
    delayed(process_single_file)(path) for path in all_files
)

Step 1: Sampling 3 days from each function in parallel...


[Parallel(n_jobs=-1)]: Using backend LokyBackend with 20 concurrent workers.
[Parallel(n_jobs=-1)]: Done  10 tasks      | elapsed:    1.4s
[Parallel(n_jobs=-1)]: Done 180 tasks      | elapsed:    2.3s
[Parallel(n_jobs=-1)]: Done 1140 tasks      | elapsed:    6.4s
[Parallel(n_jobs=-1)]: Done 2540 tasks      | elapsed:   12.2s
[Parallel(n_jobs=-1)]: Done 4340 tasks      | elapsed:   19.9s
[Parallel(n_jobs=-1)]: Done 6540 tasks      | elapsed:   29.4s
[Parallel(n_jobs=-1)]: Done 9140 tasks      | elapsed:   40.7s
[Parallel(n_jobs=-1)]: Done 12140 tasks      | elapsed:   54.2s
[Parallel(n_jobs=-1)]: Done 15540 tasks      | elapsed:  1.1min
[Parallel(n_jobs=-1)]: Done 19340 tasks      | elapsed:  1.4min
[Parallel(n_jobs=-1)]: Done 23540 tasks      | elapsed:  1.8min
[Parallel(n_jobs=-1)]: Done 28140 tasks      | elapsed:  2.1min
[Parallel(n_jobs=-1)]: Done 33140 tasks      | elapsed:  2.5min
[Parallel(n_jobs=-1)]: Done 38540 tasks      | elapsed:  2.9min
[Parallel(n_jobs=-1)]: Done 44340 ta

### 최종 데이터셋 생성 및 전처리

In [4]:
print("\nStep 2: Concatenating all samples into a single dataset using np.memmap...")

# 결과 리스트에서 None 값을 제거하고, 유효한 결과만 필터링
valid_results = [res for res in results if res is not None]
print(f"Valid results count: {len(valid_results)}")

# 빈 리스트 처리
if not valid_results:
    print("No valid patterns found. Exiting...")
    exit()

# 최종 데이터셋의 형태(shape) 계산
num_samples = sum(res.shape[0] for res in valid_results)
num_timesteps = valid_results[0].shape[1]  # 각 샘플의 길이 (1440)
final_shape = (num_samples, num_timesteps)

# 1) 디스크에 최종 배열을 위한 공간 미리 할당 (메모리 사용 거의 없음)
memmap_path = './dataset/final_dataset_raw.mmap'
final_dataset_raw = np.memmap(memmap_path, dtype='float32', mode='w+', shape=final_shape)
print(f"✅ Memory-mapped file created on disk. Shape: {final_shape}") # (282684, 1440)

# 2) 작은 조각들을 순차적으로 디스크의 배열에 채워 넣기
print("Filling the memory-mapped file with sampled data...")
start_idx = 0
for res in tqdm(valid_results, desc="Filling data"):
    num_rows_in_chunk = res.shape[0]
    # RAM에는 작은 조각(res)만 있고, 디스크의 큰 배열(final_dataset_raw)에 복사
    final_dataset_raw[start_idx : start_idx + num_rows_in_chunk] = res
    start_idx += num_rows_in_chunk

# 디스크에 변경 사항 최종 저장
final_dataset_raw.flush()
del valid_results  # 메모리 해제


Step 2: Concatenating all samples into a single dataset using np.memmap...
Valid results count: 70671
✅ Memory-mapped file created on disk. Shape: (212013, 1440)
Filling the memory-mapped file with sampled data...


Filling data: 100%|██████████| 70671/70671 [00:00<00:00, 80602.56it/s]


### Z-정규화 및 최종 저장

In [5]:
print("\nStep 3: Applying Z-Normalization and saving the final dataset...")

# 1) 메모리 맵 파일 다시 로드 (읽기 전용)
# 이전 단계에서 생성된 메모리 맵 파일 경로 및 정보
memmap_path = './dataset/final_dataset_raw.mmap'
dtype = 'float32'

# 2) Z-정규화
# mode='r'은 읽기 전용으로 파일을 엽니다.
raw_data_mmap = np.memmap(memmap_path, dtype=dtype, mode='r', shape=final_shape)

scaler = TimeSeriesScalerMeanVariance(mu=0., std=1.)
print("Applying Z-Scaler...")
final_dataset_normalized = scaler.fit_transform(raw_data_mmap)

# 3) tslearn 모델 입력을 위한 3D 형태로 변환
# 이 연산은 데이터 복사본을 생성하므로 충분한 메모리가 필요할 수 있습니다.
print("Reshaping data for tslearn models...")
final_dataset_ready = final_dataset_normalized.reshape(
    final_dataset_normalized.shape[0], 
    final_dataset_normalized.shape[1], 
    1
)

# 4) 최종 결과물(.npy) 저장
print(f"Saving the final preprocessed dataset to {OUTPUT_DATASET_PATH}...")
np.save(OUTPUT_DATASET_PATH, final_dataset_ready)

# 5) 메모리 매핑 파일 핸들 해제 및 임시 파일 삭제
del raw_data_mmap, final_dataset_normalized, final_dataset_ready # 메모리에서 변수 해제

print("\n🎉 All preprocessing is complete!")
print(f"Final dataset is ready for training at: {OUTPUT_DATASET_PATH}")


Step 3: Applying Z-Normalization and saving the final dataset...
Applying Z-Scaler...
Reshaping data for tslearn models...
Saving the final preprocessed dataset to ./dataset/preprocessed_test_dataset.npy...

🎉 All preprocessing is complete!
Final dataset is ready for training at: ./dataset/preprocessed_test_dataset.npy
