# 简单探索数据集

数据集：[Beijing PM2.5](https://archive.ics.uci.edu/dataset/381/beijing+pm2+5+data)

In [1]:
from pyspark.sql import SparkSession, functions as F

spark = (
    SparkSession.builder
    .master("local[*]")
    .appName("PRSA-Analysis")
    .getOrCreate()
)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


25/09/30 09:53:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# 导入 CSV 文件
df = (
    spark.read.option("header", "true")
    .option("inferSchema", "true")
    .option("nullValue", "NA")
    .csv("/user/root/PRSA_data_2010.1.1-2014.12.31.csv")
)

                                                                                

In [3]:
df.show(3)

+---+----+-----+---+----+-----+----+-----+------+----+----+---+---+
| No|year|month|day|hour|pm2.5|DEWP| TEMP|  PRES|cbwd| Iws| Is| Ir|
+---+----+-----+---+----+-----+----+-----+------+----+----+---+---+
|  1|2010|    1|  1|   0| null| -21|-11.0|1021.0|  NW|1.79|  0|  0|
|  2|2010|    1|  1|   1| null| -21|-12.0|1020.0|  NW|4.92|  0|  0|
|  3|2010|    1|  1|   2| null| -21|-11.0|1019.0|  NW|6.71|  0|  0|
+---+----+-----+---+----+-----+----+-----+------+----+----+---+---+
only showing top 3 rows



## 1. 表字段

In [4]:
df.printSchema()

root
 |-- No: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- pm2.5: integer (nullable = true)
 |-- DEWP: integer (nullable = true)
 |-- TEMP: double (nullable = true)
 |-- PRES: double (nullable = true)
 |-- cbwd: string (nullable = true)
 |-- Iws: double (nullable = true)
 |-- Is: integer (nullable = true)
 |-- Ir: integer (nullable = true)



## 2. 数据预览

In [5]:
df.show(5, truncate=False)

+---+----+-----+---+----+-----+----+-----+------+----+-----+---+---+
|No |year|month|day|hour|pm2.5|DEWP|TEMP |PRES  |cbwd|Iws  |Is |Ir |
+---+----+-----+---+----+-----+----+-----+------+----+-----+---+---+
|1  |2010|1    |1  |0   |null |-21 |-11.0|1021.0|NW  |1.79 |0  |0  |
|2  |2010|1    |1  |1   |null |-21 |-12.0|1020.0|NW  |4.92 |0  |0  |
|3  |2010|1    |1  |2   |null |-21 |-11.0|1019.0|NW  |6.71 |0  |0  |
|4  |2010|1    |1  |3   |null |-21 |-14.0|1019.0|NW  |9.84 |0  |0  |
|5  |2010|1    |1  |4   |null |-20 |-12.0|1018.0|NW  |12.97|0  |0  |
+---+----+-----+---+----+-----+----+-----+------+----+-----+---+---+
only showing top 5 rows



## 3. 缺失值统计

In [6]:
df = df.withColumnRenamed("pm2.5", "pm25").withColumn(
    "date",
    F.to_date(
        F.format_string(
            "%04d-%02d-%02d", F.col("year"), F.col("month"), F.col("day")
        )
    ),
).withColumn(
    "timestamp",
    F.to_timestamp(
        F.format_string(
            "%04d-%02d-%02d %02d:00:00",
            F.col("year"),
            F.col("month"),
            F.col("day"),
            F.col("hour"),
        )
    ),
)

In [7]:
missing_cols = ["pm25", "DEWP", "TEMP", "PRES", "Iws", "Is", "Ir", "cbwd"]
missing_df = df.select(
    *[
        F.count(F.when(F.col(c).isNull(), 1)).alias(f"missing_{c}")
        for c in missing_cols
    ]
)
missing_df.show(truncate=False)

+------------+------------+------------+------------+-----------+----------+----------+------------+
|missing_pm25|missing_DEWP|missing_TEMP|missing_PRES|missing_Iws|missing_Is|missing_Ir|missing_cbwd|
+------------+------------+------------+------------+-----------+----------+----------+------------+
|2067        |0           |0           |0           |0          |0         |0         |0           |
+------------+------------+------------+------------+-----------+----------+----------+------------+



## 4. PM2.5 基础统计指标

In [8]:
df.select(
    F.count(F.col("pm25")).alias("count"),
    F.mean("pm25").alias("mean"),
    F.stddev("pm25").alias("stddev"),
    F.min("pm25").alias("min"),
    F.max("pm25").alias("max"),
).show(truncate=False)

+-----+-----------------+-----------------+---+---+
|count|mean             |stddev           |min|max|
+-----+-----------------+-----------------+---+---+
|41757|98.61321455085375|92.05038718924065|0  |994|
+-----+-----------------+-----------------+---+---+



## 5. PM2.5 月度均值

In [9]:
monthly = (
    df.groupBy("year", "month")
    .agg(F.mean("pm25").alias("avg_pm25"), F.count("*").alias("n"))
    .orderBy("year", "month")
)
monthly.show(24, truncate=False)

+----+-----+------------------+---+
|year|month|avg_pm25          |n  |
+----+-----+------------------+---+
|2010|1    |90.44257274119448 |744|
|2010|2    |97.23397913561848 |672|
|2010|3    |94.10014104372355 |744|
|2010|4    |80.0292479108635  |720|
|2010|5    |86.89959294436906 |744|
|2010|6    |109.00353982300885|720|
|2010|7    |123.64784946236558|744|
|2010|8    |97.60207100591715 |744|
|2010|9    |122.51068376068376|720|
|2010|10   |118.98247978436657|744|
|2010|11   |138.12048192771084|720|
|2010|12   |97.33333333333333 |744|
|2011|1    |44.89136904761905 |744|
|2011|2    |150.32142857142858|672|
|2011|3    |57.9184           |744|
|2011|4    |91.58582089552239 |720|
|2011|5    |65.32162921348315 |744|
|2011|6    |108.46694796061885|720|
|2011|7    |107.57219973009447|744|
|2011|8    |103.42456140350878|744|
|2011|9    |95.27260083449235 |720|
|2011|10   |145.22564935064935|744|
|2011|11   |109.63216783216784|720|
|2011|12   |108.51951547779274|744|
+----+-----+----------------

## 6. PM2.5 月度均值（不计年份）

In [10]:
df.groupBy("month").agg(F.mean("pm25").alias("avg_pm25")).orderBy(
    F.desc("avg_pm25")
).show(12, truncate=False)

+-----+------------------+
|month|avg_pm25          |
+-----+------------------+
|2    |125.73613993477616|
|10   |120.40150880134115|
|1    |115.05906078939786|
|11   |105.76263924592973|
|12   |98.20022592487997 |
|3    |97.75759280089989 |
|6    |96.5136563876652  |
|7    |94.33232794077324 |
|9    |85.20942094209421 |
|4    |83.70925110132158 |
|5    |80.11305070656691 |
|8    |79.99670362601138 |
+-----+------------------+



## 7. PM2.5 季度均值

In [11]:
season_expr = (
    F.when(F.col("month").isin(12, 1, 2), F.lit("Winter"))
    .when(F.col("month").isin(3, 4, 5), F.lit("Spring"))
    .when(F.col("month").isin(6, 7, 8), F.lit("Summer"))
    .otherwise(F.lit("Fall"))
)
season_avg = (
    df.withColumn("season", season_expr)
    .groupBy("season")
    .agg(F.mean("pm25").alias("avg_pm25"), F.count("*").alias("n"))
    .orderBy(F.desc("avg_pm25"))
)
season_avg.show(truncate=False)

+------+------------------+-----+
|season|avg_pm25          |n    |
+------+------------------+-----+
|Winter|112.77852672123255|10824|
|Fall  |104.21540382214539|10920|
|Summer|90.44258350178073 |11040|
|Spring|87.20756859035005 |11040|
+------+------------------+-----+



## 8. PM2.5 小时均值

In [12]:
df.groupBy("hour").agg(F.mean("pm25").alias("avg_pm25")).orderBy("hour").show(
    24, truncate=False
)

+----+------------------+
|hour|avg_pm25          |
+----+------------------+
|0   |113.39020172910662|
|1   |113.69856733524355|
|2   |110.32317423806785|
|3   |108.0429799426934 |
|4   |104.0802752293578 |
|5   |100.0132183908046 |
|6   |96.88275862068966 |
|7   |96.02472685451409 |
|8   |95.90794016110472 |
|9   |94.67280046003451 |
|10  |93.4475201845444  |
|11  |91.67863346844238 |
|12  |89.44887348353552 |
|13  |87.99028016009149 |
|14  |86.28324697754749 |
|15  |85.53421506612996 |
|16  |85.92227979274611 |
|17  |87.57118254879448 |
|18  |91.50770988006853 |
|19  |97.398392652124   |
|20  |104.61436781609196|
|21  |109.2498564043653 |
|22  |111.02125215393453|
|23  |111.88978185993112|
+----+------------------+



## 9. PM2.5 风向维度

In [13]:
df.groupBy("cbwd").agg(
    F.mean("pm25").alias("avg_pm25"), F.count("*").alias("n")
).orderBy(F.desc("avg_pm25")).show(truncate=False)

+----+------------------+-----+
|cbwd|avg_pm25          |n    |
+----+------------------+-----+
|cv  |126.15194543828265|9387 |
|SE  |110.82158786797503|15290|
|NE  |90.17767031118586 |4997 |
|NW  |70.12763274992584 |14150|
+----+------------------+-----+



## 10. 分位数

In [14]:
q50, q90, q99 = df.approxQuantile("pm25", [0.5, 0.9, 0.99], 0.01)
print(f"pm25 median={q50}, p90={q90}, p99={q99}")


pm25 median=72.0, p90=212.0, p99=994.0


## 11. 重污染特征

In [15]:
high = df.filter(F.col("pm25") >= F.lit(q99))
high_stats = high.agg(
    F.mean("TEMP").alias("mean_TEMP"),
    F.mean("DEWP").alias("mean_DEWP"),
    F.mean("PRES").alias("mean_PRES"),
    F.mean("Iws").alias("mean_Iws"),
    F.count("*").alias("count"),
)
high_stats.show(truncate=False)

+---------+---------+---------+--------+-----+
|mean_TEMP|mean_DEWP|mean_PRES|mean_Iws|count|
+---------+---------+---------+--------+-----+
|-12.0    |-24.0    |1032.0   |4.92    |1    |
+---------+---------+---------+--------+-----+



## 12. 重污染 top20

In [16]:
df.orderBy(F.desc("pm25")).select(
    "year",
    "month",
    "day",
    "hour",
    "pm25",
    "TEMP",
    "DEWP",
    "PRES",
    "Iws",
    "cbwd",
).limit(20).show(truncate=False)

+----+-----+---+----+----+-----+----+------+-----+----+
|year|month|day|hour|pm25|TEMP |DEWP|PRES  |Iws  |cbwd|
+----+-----+---+----+----+-----+----+------+-----+----+
|2012|1    |23 |1   |994 |-12.0|-24 |1032.0|4.92 |NW  |
|2010|2    |14 |1   |980 |-7.0 |-14 |1029.0|0.89 |cv  |
|2012|1    |23 |2   |972 |-12.0|-24 |1032.0|8.05 |NW  |
|2013|1    |12 |20  |886 |-7.0 |-8  |1023.0|1.34 |cv  |
|2013|1    |12 |22  |858 |-9.0 |-10 |1024.0|0.89 |cv  |
|2013|1    |12 |21  |852 |-8.0 |-9  |1023.0|0.89 |NE  |
|2013|1    |12 |16  |845 |-2.0 |-7  |1021.0|8.95 |SE  |
|2013|1    |12 |19  |824 |-7.0 |-8  |1022.0|0.89 |cv  |
|2013|1    |12 |17  |810 |-4.0 |-7  |1021.0|9.84 |SE  |
|2013|1    |12 |23  |805 |-9.0 |-10 |1024.0|1.79 |NW  |
|2013|1    |12 |15  |802 |-1.0 |-7  |1021.0|7.16 |SE  |
|2010|3    |22 |10  |784 |11.0 |-8  |1013.0|11.18|NW  |
|2013|1    |12 |18  |776 |-6.0 |-8  |1021.0|11.63|SE  |
|2010|3    |22 |11  |761 |12.0 |-10 |1013.0|18.33|NW  |
|2013|1    |13 |0   |744 |-8.0 |-9  |1024.0|2.68

## 13. 相关性（Pearson）

In [17]:
c_temp = df.stat.corr("pm25", "TEMP")
c_dewp = df.stat.corr("pm25", "DEWP")
c_pres = df.stat.corr("pm25", "PRES")
c_iws = df.stat.corr("pm25", "Iws")
print(f"corr(pm25, TEMP) = {c_temp:.4f}")
print(f"corr(pm25, DEWP) = {c_dewp:.4f}")
print(f"corr(pm25, PRES) = {c_pres:.4f}")
print(f"corr(pm25, Iws)  = {c_iws:.4f}")

corr(pm25, TEMP) = -0.0898
corr(pm25, DEWP) = 0.1582
corr(pm25, PRES) = -0.0456
corr(pm25, Iws)  = -0.2342
