In [1]:
# 欠落したデータの対処

# 欠落に対する最も簡単な方法は、そうしてもデータに問題が生じない場合に、欠落を含むレコード全体を削除すること。
# 行を削除した後に残ったデータセットが非常に小さくなっていたり、データサイズが50%以下になってしまったりした場合、
# どういた列が最も欠落しているから調べ、それらの列を丸ごと削除するという選択肢もある。

# 欠落値があるデータを扱うもう一つの方法は、None となっているところに何らかの値を補うこと。
# これはデータの種類によっていくつかの選択肢がある。

# データが論理値の場合、第三の分類として Missing を加えることでカテゴリ値にする。
# データがもともとカテゴリ値の場合、Missing を加えることで単にカテゴリ値を増やす。
# 順序や数値のデータの場合、平均値や中央値、あるいはその他の事前に定義された値
#（たとえばデータの分布の様子に応じて25パーセンタイルや75パーセンタイルの値など）を補う

In [2]:
# 行について分析した場合
# ID3 の行が持っている有益な情報は height だけ
# ID6 の行では欠けている値は age だけ

# 列について分析した場合
# income 列は、公開するにはきわめて個人的な内容であることから、ほとんどの値が欠落している
# weight と gender 列で欠落している値はそれぞれ一つ
# age 列には2つの値が欠落している
df = spark.createDataFrame([
    (1, 143.5, 5.6, 28, 'M', 100000),
    (2, 167.2, 5.4, 45, 'M', None),
    (3, None, 5.2, None, None, None),
    (4, 144.5, 5.9, 33, 'M', None),
    (5, 133.2, 5.7, 54, 'F', None),
    (6, 124.1, 5.2, None, 'F', None),
    (7, 129.2, 5.3, 42, 'M', 76000),
], ['id', 'weight', 'height', 'age', 'gender', 'income'])

In [3]:
# 行ごとの欠落値の数を知る
df.rdd.map(
    lambda row: (row['id'], sum(column == None for column in row))
).collect()
# Out を見ると ID3 の行では4つの値が欠落していることが確認できる
# どの値が欠けているのか調べて列ごとの欠落値の数を数え、
# そうした値をまとめて削除するのか、何らかの値を補うのか判断する

[(1, 0), (2, 1), (3, 4), (4, 1), (5, 1), (6, 2), (7, 0)]

In [4]:
# 列ごとの欠落値のパーセンテージを調べる
import pyspark.sql.functions as fn

expressions = [
    (1 - (fn.count(column) / fn.count('*'))).alias(column + '_missing')
    for column in df.columns
]
df.agg(*expressions).show()

+----------+------------------+--------------+------------------+------------------+------------------+
|id_missing|    weight_missing|height_missing|       age_missing|    gender_missing|    income_missing|
+----------+------------------+--------------+------------------+------------------+------------------+
|       0.0|0.1428571428571429|           0.0|0.2857142857142857|0.1428571428571429|0.7142857142857143|
+----------+------------------+--------------+------------------+------------------+------------------+



In [5]:
# income 列はほとんどの値が欠落しているので削除とする
df = df.select([
    column for column in df.columns if column != 'income'
])

In [7]:
# 閾値を超える欠落数の行を削除する .dropna()
# ID3 は欠落数3 なので削除されない
df.dropna(thresh=3).show()

+---+------+------+----+------+
| id|weight|height| age|gender|
+---+------+------+----+------+
|  1| 143.5|   5.6|  28|     M|
|  2| 167.2|   5.4|  45|     M|
|  4| 144.5|   5.9|  33|     M|
|  5| 133.2|   5.7|  54|     F|
|  6| 124.1|   5.2|null|     F|
|  7| 129.2|   5.3|  42|     M|
+---+------+------+----+------+



In [17]:
# 欠落値を補う
# 数値はデータ全体の平均値で、文字列は missing で補う
# .toPandas() は RDD に対して .collect() と同様の動作をする
# ワーカーからすべての情報を収集しドライバに持ってくる。
# そのため 1000 * 1000 のようなデータになると速度問題が生じてくる。

means = df.agg(*[
    fn.mean(column).alias(column)
    for column in df.columns if column != 'gender'
]).toPandas().to_dict('records')[0]
means['gender'] = 'missing'
df.fillna(means).show()

+---+------------------+------+---+-------+
| id|            weight|height|age| gender|
+---+------------------+------+---+-------+
|  1|             143.5|   5.6| 28|      M|
|  2|             167.2|   5.4| 45|      M|
|  3|140.28333333333333|   5.2| 40|missing|
|  4|             144.5|   5.9| 33|      M|
|  5|             133.2|   5.7| 54|      F|
|  6|             124.1|   5.2| 40|      F|
|  7|             129.2|   5.3| 42|      M|
+---+------------------+------+---+-------+

