In [3]:
import pyspark.sql.functions as F
from pyspark.sql import SparkSession

# 1. 启动 Spark
spark = SparkSession.builder.appName("ProjectFrame-HDFS").getOrCreate()

# # 2. 从 HDFS (高速工作台) 读取所有 CSV 数据
# print("== Cell 1: 正在从 HDFS 加载数据 ==")
# df = spark.read.csv(
#     "hdfs:///project_data/*.csv", 
#     header=True, 
#     inferSchema=True
# )

# 2. 从 HDFS (高速工作台) 只读取一个 CSV 文件（验证框架）
print("== Cell 1: 正在从 HDFS 加载单个文件 ==")
# !!! 修改这里，指向一个具体的文件名 !!!
df = spark.read.csv(
    "hdfs:///project_data/A0000453929.csv", # <--- 把这里改成你 HDFS 上的一个文件名
    header=True, 
    inferSchema=True
)

# 3. 缓存数据到内存 (HDFS -> 内存)，为后续快速分析做准备
df.cache()

print("== Cell 1: 数据加载完成并已缓存 ==")

== Cell 1: 正在从 HDFS 加载单个文件 ==




== Cell 1: 数据加载完成并已缓存 ==




In [4]:
print("== Cell 2: 探索原始数据 ==")

# 1. 查看数据类型
# 你会看到 TMP, DEW 都是 string (字符串)
df.printSchema()

# 2. 查看内容 (truncate=False 显示完整字符串)
df.select("TMP", "DEW").show(10, truncate=False)

== Cell 2: 探索原始数据 ==
root
 |-- STATION: string (nullable = true)
 |-- DATE: timestamp (nullable = true)
 |-- SOURCE: string (nullable = true)
 |-- LATITUDE: double (nullable = true)
 |-- LONGITUDE: double (nullable = true)
 |-- ELEVATION: double (nullable = true)
 |-- NAME: string (nullable = true)
 |-- REPORT_TYPE: string (nullable = true)
 |-- CALL_SIGN: string (nullable = true)
 |-- QUALITY_CONTROL: string (nullable = true)
 |-- WND: string (nullable = true)
 |-- CIG: string (nullable = true)
 |-- VIS: string (nullable = true)
 |-- TMP: string (nullable = true)
 |-- DEW: string (nullable = true)
 |-- SLP: string (nullable = true)
 |-- AA1: string (nullable = true)
 |-- AT1: string (nullable = true)
 |-- AT2: string (nullable = true)
 |-- AT3: string (nullable = true)
 |-- AT4: string (nullable = true)
 |-- AT5: string (nullable = true)
 |-- AT6: string (nullable = true)
 |-- AU1: string (nullable = true)
 |-- AU2: string (nullable = true)
 |-- AW1: string (nullable = true)
 |-- AW2:

[Stage 4:==>            (162 + 7) / 827][Stage 8:>                  (0 + 1) / 1]

+-------+-------+
|TMP    |DEW    |
+-------+-------+
|+0008,5|-0022,5|
|+0007,5|-0022,5|
|+0006,5|-0022,5|
|+0004,5|-0022,5|
|+0004,5|-0023,5|
|+0002,5|-0028,5|
|+0000,5|-0029,5|
|-0001,5|-0029,5|
|-0001,5|-0029,5|
|-0001,5|-0030,5|
+-------+-------+
only showing top 10 rows





In [5]:
print("== Cell 3: 正在清理数据 ==")

def clean_column(df, col_name):
    # 按逗号拆分 "Value,Flag"
    split_col = F.split(df[col_name], ',')
    
    # 提取 "Value" (+0100)，并转为浮点数
    value_col = split_col.getItem(0).cast('float')
    
    # 按文档过滤：如果值不是 9999，就除以 10；否则设为 null (空值)
    filtered_col = F.when(value_col != 9999, value_col / 10.0).otherwise(None)
    
    # 返回一个带新列 (col_name_clean) 的 DataFrame
    return df.withColumn(f"{col_name}_clean", filtered_col)

# 应用清理
cleaned_df = clean_column(df, 'TMP')
cleaned_df = clean_column(cleaned_df, 'DEW') # DEW (露点) 是我们的第一个特征

# 验证清理结果
print("== Cell 3: 清理结果对比 ==")
cleaned_df.select("TMP", "TMP_clean", "DEW", "DEW_clean").show(10, truncate=False)

== Cell 3: 正在清理数据 ==
== Cell 3: 清理结果对比 ==
+-------+---------+-------+---------+
|TMP    |TMP_clean|DEW    |DEW_clean|
+-------+---------+-------+---------+
|+0008,5|0.8      |-0022,5|-2.2     |
|+0007,5|0.7      |-0022,5|-2.2     |
|+0006,5|0.6      |-0022,5|-2.2     |
|+0004,5|0.4      |-0022,5|-2.2     |
|+0004,5|0.4      |-0023,5|-2.3     |
|+0002,5|0.2      |-0028,5|-2.8     |
|+0000,5|0.0      |-0029,5|-2.9     |
|-0001,5|-0.1     |-0029,5|-2.9     |
|-0001,5|-0.1     |-0029,5|-2.9     |
|-0001,5|-0.1     |-0030,5|-3.0     |
+-------+---------+-------+---------+
only showing top 10 rows





In [6]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler

print("== Cell 4: 准备用于 ML 的 DataFrame ==")

# 1. 选择我们要的列
# .alias("label") 是 MLlib 的标准要求
model_df = cleaned_df.select(
    F.col("TMP_clean").alias("label"),
    F.col("DEW_clean")  # 这是我们选择的第一个特征
).dropna() # 丢弃任何有 null 值的行

# 2. 查看准备好的数据
model_df.show(10)

== Cell 4: 准备用于 ML 的 DataFrame ==


[Stage 4:===>           (176 + 7) / 827][Stage 10:>                 (0 + 1) / 1]

+-----+---------+
|label|DEW_clean|
+-----+---------+
|  0.8|     -2.2|
|  0.7|     -2.2|
|  0.6|     -2.2|
|  0.4|     -2.2|
|  0.4|     -2.3|
|  0.2|     -2.8|
|  0.0|     -2.9|
| -0.1|     -2.9|
| -0.1|     -2.9|
| -0.1|     -3.0|
+-----+---------+
only showing top 10 rows





In [7]:
print("== Cell 5: 定义 Pipeline 阶段并拆分数据 ==")

# 步骤 1: 特征打包
# 把所有特征列（目前只有 DEW_clean）打包成一个向量
assembler = VectorAssembler(
    inputCols=["DEW_clean"], 
    outputCol="unscaled_features"
)

# 步骤 2: 标准化
# 按项目要求 ("standardizing the data") [cite: 902]
scaler = StandardScaler(
    inputCol="unscaled_features", 
    outputCol="features"
)

# 步骤 3: 拆分数据 (70% 训练, 30% 测试) 
(trainingData, testData) = model_df.randomSplit([0.7, 0.3], seed=42)

print(f"== Cell 5: 拆分完毕，训练集: {trainingData.count()} 行, 测试集: {testData.count()} 行 ==")

== Cell 5: 定义 Pipeline 阶段并拆分数据 ==


[Stage 4:===>           (188 + 8) / 827][Stage 16:>                 (0 + 0) / 1]

== Cell 5: 拆分完毕，训练集: 16956 行, 测试集: 7120 行 ==




In [8]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

print("== Cell 6: 训练模型 1 (LinearRegression) ==")

# 1. 定义模型
lr = LinearRegression(featuresCol="features", labelCol="label")

# 2. 组装最终 Pipeline
pipeline_lr = Pipeline(stages=[assembler, scaler, lr])

# 3. 训练
model_lr = pipeline_lr.fit(trainingData)

# 4. 预测
predictions_lr = model_lr.transform(testData)

# 5. 评估 (RMSE) 
evaluator = RegressionEvaluator(
    labelCol="label", 
    predictionCol="prediction", 
    metricName="rmse"
)
rmse_lr = evaluator.evaluate(predictions_lr)

print("="*30)
print(f"模型 1 (LinearRegression) 的 RMSE: {rmse_lr}")
print("="*30)

== Cell 6: 训练模型 1 (LinearRegression) ==


25/10/24 09:17:49 WARN Instrumentation: [8facddaa] regParam is zero, which might cause numerical instability and overfitting.

模型 1 (LinearRegression) 的 RMSE: 5.489006811081679




In [9]:
from pyspark.ml.regression import DecisionTreeRegressor

print("== Cell 7: 训练模型 2 (DecisionTreeRegressor) ==")

# 1. 定义新模型
dt = DecisionTreeRegressor(featuresCol="features", labelCol="label")

# 2. 组装新 Pipeline (注意我们换掉了 lr)
pipeline_dt = Pipeline(stages=[assembler, scaler, dt])

# 3. 训练
model_dt = pipeline_dt.fit(trainingData)

# 4. 预测
predictions_dt = model_dt.transform(testData)

# 5. 评估 (使用 Cell 6 定义的同一个 evaluator)
rmse_dt = evaluator.evaluate(predictions_dt)

print("="*30)
print(f"模型 2 (DecisionTreeRegressor) 的 RMSE: {rmse_dt}")
print("="*30)
print("== 框架搭建完毕！ ==")

== Cell 7: 训练模型 2 (DecisionTreeRegressor) ==




模型 2 (DecisionTreeRegressor) 的 RMSE: 5.549243089672489
== 框架搭建完毕！ ==


