In [1]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('Practise').getOrCreate()

In [2]:
df_pyspark=spark.read.csv('data\\teste_2.csv',header=True,inferSchema=True)
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [3]:
df_pyspark.show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|null|      null| 40000|
|     null|  34|        10| 38000|
|     null|  36|      null|  null|
+---------+----+----------+------+



In [6]:
#thresh define o limite minimo para delettar, subset quais as colunas que o drop vai considerar
df_pyspark = df_pyspark.na.drop(thresh=2, subset=['Name', 'Experience'])
df_pyspark.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [7]:
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=['age', 'Experience', 'Salary'], 
    outputCols=["{}_imputed".format(c) for c in ['age', 'Experience', 'Salary']]
    ).setStrategy("median")

imputer.fit(df_pyspark).transform(df_pyspark).show()

+---------+---+----------+------+-----------+------------------+--------------+
|     Name|age|Experience|Salary|age_imputed|Experience_imputed|Salary_imputed|
+---------+---+----------+------+-----------+------------------+--------------+
|    Krish| 31|        10| 30000|         31|                10|         30000|
|Sudhanshu| 30|         8| 25000|         30|                 8|         25000|
|    Sunny| 29|         4| 20000|         29|                 4|         20000|
|     Paul| 24|         3| 20000|         24|                 3|         20000|
|   Harsha| 21|         1| 15000|         21|                 1|         15000|
|  Shubham| 23|         2| 18000|         23|                 2|         18000|
+---------+---+----------+------+-----------+------------------+--------------+



### Modelo para fazer o fillNA pelo Pyspark é utilizando a Classe Inputer, o exemplo abaixo mostra em mais detalhes o passo a passo

In [8]:
#DataFrame com valores nulos em algumas das linhas nas colunas
df = spark.createDataFrame([
        (1.0, float("nan")),
        (2.0, float("nan")),
        (float("nan"), 3.0),
        (4.0, 4.0),
        (5.0, 5.0)
    ],
    ["a", "b"])

df.show()

+---+---+
|  a|  b|
+---+---+
|1.0|NaN|
|2.0|NaN|
|NaN|3.0|
|4.0|4.0|
|5.0|5.0|
+---+---+



In [17]:
'''Uitilizo a classe Inputer para especificar as colunas de entrada e as de saída,
 aonde as de entrada sao os colunas que terao os valores de referencia para que sejam atribuidos
 nas colunas de saida, o metodo getRelativeError mostra o erro estatístico do modelo.
'''
imputer.setInputCols(["a", "b"])
imputer.setOutputCols(["out_a", "out_b"])
imputer.getRelativeError()

0.001

In [19]:
#Pesquisar mais as diferencas entre somente o fit e o transform, mas basicamente ambos aplicam as configuracoes para fillNA no dataset
model = imputer.fit(df)
model.setInputCols(["a", "b"])

ImputerModel: uid=Imputer_6112fd9904a9, strategy=median, missingValue=NaN, numInputCols=2, numOutputCols=2

In [20]:
model.getStrategy()

'median'

In [21]:
model.surrogateDF.show()

+---+---+
|  a|  b|
+---+---+
|2.0|4.0|
+---+---+



In [22]:
model.transform(df).show()

+---+---+-----+-----+
|  a|  b|out_a|out_b|
+---+---+-----+-----+
|1.0|NaN|  1.0|  4.0|
|2.0|NaN|  2.0|  4.0|
|NaN|3.0|  2.0|  3.0|
|4.0|4.0|  4.0|  4.0|
|5.0|5.0|  5.0|  5.0|
+---+---+-----+-----+

