In [98]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import year, month, dayofmonth, quarter, col, hour, minute, when, split

In [99]:
# load data
spark = SparkSession.builder \
    .appName("Project") \
    .getOrCreate()

df = spark.read.csv("2014/01001099999.csv", header=True, inferSchema=True)



In [107]:
df = spark.read.csv("2014/01001099999.csv", header=True, inferSchema=True)
# feature engineering
df = df.withColumn("year", year("date")) \
       .withColumn("month", month("date")) \
       .withColumn("day", dayofmonth("date")) \
       .withColumn("quarter", quarter("date")) \
       .withColumn("hour", hour("date"))\
       .withColumn("minute", minute("date"))\

df = df.drop('DATE')
df = df.drop('NAME')

# 将 value == 9 的地方替换为 null
df = df.withColumn("SOURCE", when(col("SOURCE") == 9, None).otherwise(col("SOURCE")))

# 经纬度和介绍不符，缺失值没看
# 将 value == 9999 的地方替换为 null
df = df.withColumn("ELEVATION", when(col("ELEVATION") == 9999, None).otherwise(col("ELEVATION")))
df = df.withColumn("REPORT_TYPE", when(col("REPORT_TYPE") == '99999', None).otherwise(col("REPORT_TYPE")))
df = df.withColumn("CALL_SIGN", when(col("CALL_SIGN") == 99999, None).otherwise(col("CALL_SIGN")))


df = df.withColumn("wind_dir", split(col("WND"), ",").getItem(0)) \
       .withColumn("wind_dir_quality", split(col("WND"), ",").getItem(1)) \
       .withColumn("wind_type", split(col("WND"), ",").getItem(2)) \
       .withColumn("wind_speed", split(col("WND"), ",").getItem(3)) \
       .withColumn("wind_speed_quality", split(col("WND"), ",").getItem(4)) 

df = df.withColumn("wind_dir", col("wind_dir").cast("int")).withColumn("wind_speed", col("wind_speed").cast("double"))

df = df.withColumn("wind_dir", when(col("wind_dir") == 999, None).otherwise(col("wind_dir")))\
       .withColumn("wind_type", when(col("wind_type") == "9", None).otherwise(col("wind_type")))\
       .withColumn("wind_speed", when(col("wind_speed") == 9999, None).otherwise(col("wind_speed") / 10.0))\
       .withColumn("wind_dir",when((col("wind_type") == "V") & (col("wind_dir").isNull()), "Variable").otherwise(col("wind_dir")))\
       .withColumn("wind_type", when((col("wind_type").isNull()) & (col("wind_speed") == 0), None).otherwise(col("wind_type")))

Collecting jupyter_contrib_nbextensions
  Using cached jupyter_contrib_nbextensions-0.7.0-py2.py3-none-any.whl
Installing collected packages: jupyter-contrib-nbextensions
Successfully installed jupyter-contrib-nbextensions-0.7.0


In [108]:
df.select('wind_speed').show(5)

+----------+
|wind_speed|
+----------+
|       6.1|
|       5.1|
|       3.5|
|       1.9|
|       0.8|
+----------+
only showing top 5 rows



In [109]:
df.show()

+----------+------+----------+----------+---------+-----------+---------+---------------+--------------+-----------+------------+-------+-------+-------+-----------+-----------+----+----+--------+--------+------------------+------------------+------------------+--------------------+--------------------+----+-------------+-------------+---------------+----------------+----+------+---------------+----+---------------+----------+----------------+----+-----+---+-------+----+------+--------+----------------+---------+----------+------------------+
|   STATION|SOURCE|  LATITUDE| LONGITUDE|ELEVATION|REPORT_TYPE|CALL_SIGN|QUALITY_CONTROL|           WND|        CIG|         VIS|    TMP|    DEW|    SLP|        AA1|        AA2| AA3| AJ1|     AY1|     AY2|               GA1|               GA2|               GA3|                 GE1|                 GF1| IA1|          KA1|          KA2|            MA1|             MD1| MW1|   OC1|            OD1| SA1|            UA1|       REM|             EQD|year

In [93]:
df.printSchema()

root
 |-- STATION: integer (nullable = true)
 |-- SOURCE: integer (nullable = true)
 |-- LATITUDE: double (nullable = true)
 |-- LONGITUDE: double (nullable = true)
 |-- ELEVATION: double (nullable = true)
 |-- REPORT_TYPE: string (nullable = true)
 |-- CALL_SIGN: integer (nullable = true)
 |-- QUALITY_CONTROL: string (nullable = true)
 |-- WND: string (nullable = true)
 |-- CIG: string (nullable = true)
 |-- VIS: string (nullable = true)
 |-- TMP: string (nullable = true)
 |-- DEW: string (nullable = true)
 |-- SLP: string (nullable = true)
 |-- AA1: string (nullable = true)
 |-- AA2: string (nullable = true)
 |-- AA3: string (nullable = true)
 |-- AJ1: string (nullable = true)
 |-- AY1: string (nullable = true)
 |-- AY2: string (nullable = true)
 |-- GA1: string (nullable = true)
 |-- GA2: string (nullable = true)
 |-- GA3: string (nullable = true)
 |-- GE1: string (nullable = true)
 |-- GF1: string (nullable = true)
 |-- IA1: string (nullable = true)
 |-- KA1: string (nullable = tru

In [94]:
df.show()

+----------+------+----------+----------+---------+-----------+---------+---------------+--------------+-----------+------------+-------+-------+-------+-----------+-----------+----+----+--------+--------+------------------+------------------+------------------+--------------------+--------------------+----+-------------+-------------+---------------+----------------+----+------+---------------+----+---------------+----------+----------------+----+-----+---+-------+----+------+--------+----------------+---------+----------+------------------+
|   STATION|SOURCE|  LATITUDE| LONGITUDE|ELEVATION|REPORT_TYPE|CALL_SIGN|QUALITY_CONTROL|           WND|        CIG|         VIS|    TMP|    DEW|    SLP|        AA1|        AA2| AA3| AJ1|     AY1|     AY2|               GA1|               GA2|               GA3|                 GE1|                 GF1| IA1|          KA1|          KA2|            MA1|             MD1| MW1|   OC1|            OD1| SA1|            UA1|       REM|             EQD|year