### 1. 环境准备

In [4]:
# 加载自定义模块路径
import sys
import os
sys.path.append(os.path.abspath(".."))

In [5]:
# Jupyter Magic
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [6]:
# Import basic libraries
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("GoogleClusterAnalysis") \
    .getOrCreate()

print("Spark Session created.")

Spark Session created.


### 2. 加载原始数据

In [7]:
# Import custom modules
from src.data_loader import load_data

# Load data
data_path = "../data/borg_traces_data.csv"  # Adjust path if needed
df = load_data(spark, data_path)

print("Data loaded successfully!")

Data loaded successfully!


### 3. 初步探索

In [8]:
# 查看列名
print("\nColumns in the dataset:")
print(df.columns)

# 查看前几行数据
print("\nSample data:")
df.show(5)

# 查看字段类型
print("\nSchema:")
df.printSchema()


Columns in the dataset:
['_c0', 'time', 'instance_events_type', 'collection_id', 'scheduling_class', 'collection_type', 'priority', 'alloc_collection_id', 'instance_index', 'machine_id', 'resource_request', 'constraint', 'collections_events_type', 'user', 'collection_name', 'collection_logical_name', 'start_after_collection_ids', 'vertical_scaling', 'scheduler', 'start_time', 'end_time', 'average_usage', 'maximum_usage', 'random_sample_usage', 'assigned_memory', 'page_cache_memory', 'cycles_per_instruction', 'memory_accesses_per_instruction', 'sample_rate', 'cpu_usage_distribution', 'tail_cpu_usage_distribution', 'cluster', 'event', 'failed']

Sample data:
+--------------------+--------------------+--------------------+-------------+----------------+---------------+--------+-------------------+--------------+----------------+--------------------+----------+-----------------------+--------------------+--------------------+-----------------------+--------------------------+-------------

### 4. 数据清洗

In [9]:
# Import new preprocessing module
from src.feature_engineering import clean_and_engineer_features

# Perform full cleaning and feature engineering
df_clean = clean_and_engineer_features(df)

print("\nData cleaning and feature engineering completed!")

# 查看清洗后的字段
print("\nCleaned data columns:")
print(df_clean.columns)

# 查看部分清洗后数据
df_clean.select(
    "time", "start_time", "end_time", "collection_id", 
    "requested_cpus", "used_average_cpus", "resource_efficiency", 
    "queueing_delay", "run_duration", "failed"
).show(5)


Data cleaning and feature engineering completed!

Cleaned data columns:
['_c0', 'time', 'instance_events_type', 'collection_id', 'scheduling_class', 'collection_type', 'priority', 'alloc_collection_id', 'instance_index', 'machine_id', 'resource_request', 'constraint', 'collections_events_type', 'user', 'collection_name', 'collection_logical_name', 'start_after_collection_ids', 'vertical_scaling', 'scheduler', 'start_time', 'end_time', 'average_usage', 'maximum_usage', 'random_sample_usage', 'assigned_memory', 'page_cache_memory', 'cycles_per_instruction', 'memory_accesses_per_instruction', 'sample_rate', 'cpu_usage_distribution', 'tail_cpu_usage_distribution', 'cluster', 'event', 'failed', 'requested_cpus', 'used_average_cpus', 'used_maximum_cpus', 'resource_efficiency', 'queueing_delay', 'run_duration']
+-----------------+-----------+-----------+-------------+-------------------+-------------------+-------------------+-----------------+------------+------+
|             time| start_t

### 5. 保存清洗后的数据

In [10]:
df_clean.write.mode('overwrite').parquet("../data/cleaned_borg_data.parquet")

print("\nData preparation completed. Ready for further analysis!")


Data preparation completed. Ready for further analysis!
