In [242]:
import pyspark
import pandas as pd
from pyspark.sql.functions import udf,col
from pyspark.sql.types import StringType, IntegerType, FloatType
import pyspark.sql.functions as F
from pyspark.sql import SparkSession

# Starting spark session


In [243]:
spark = SparkSession.builder \
    .appName("example") \
    .getOrCreate()

In [244]:
spark

# Reading files

In [245]:
df_spark = spark.read.csv("Dataset1.csv", header=True, inferSchema=True)


In [246]:
df_spark.printSchema()

root
 |-- N: integer (nullable = true)
 |-- P: string (nullable = true)
 |-- K: integer (nullable = true)
 |-- pH: double (nullable = true)
 |-- EC: double (nullable = true)
 |-- OC: string (nullable = true)
 |-- S: double (nullable = true)
 |-- Zn: double (nullable = true)
 |-- Fe: double (nullable = true)
 |-- Cu: double (nullable = true)
 |-- Mn: double (nullable = true)
 |-- B: double (nullable = true)
 |-- OM: double (nullable = true)
 |-- Fertility: integer (nullable = true)



In [247]:
df_spark.select(["N"]) 

DataFrame[N: int]

In [248]:
df_spark.describe().show()

+-------+------------------+------------------+------------------+-------------------+-------------------+------------------+-----------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+
|summary|                 N|                 P|                 K|                 pH|                 EC|                OC|                S|                Zn|                Fe|                Cu|               Mn|                 B|                OM|         Fertility|
+-------+------------------+------------------+------------------+-------------------+-------------------+------------------+-----------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+
|  count|               885|               885|               885|                885|                885|               885|              885|               885|          

Data types

In [249]:
df_spark.dtypes

[('N', 'int'),
 ('P', 'string'),
 ('K', 'int'),
 ('pH', 'double'),
 ('EC', 'double'),
 ('OC', 'string'),
 ('S', 'double'),
 ('Zn', 'double'),
 ('Fe', 'double'),
 ('Cu', 'double'),
 ('Mn', 'double'),
 ('B', 'double'),
 ('OM', 'double'),
 ('Fertility', 'int')]

In [250]:
df_spark = df_spark.withColumn("OC", F.col("OC").cast(FloatType()))
df_spark = df_spark.withColumn("P", F.col("P").cast(FloatType()))

Filling NA

In [251]:
from pyspark.ml.feature import Imputer

cols = df_spark.columns

imput = Imputer(
    inputCols=cols,
    outputCols=["{}_filled".format(c) for c in cols],
    strategy="mean")

In [252]:
df_spark = imput.fit(df_spark).transform(df_spark)
df_spark.show()

+---+----+---+----+----+----+----+----+----+----+----+----+------+---------+--------+---------+--------+---------+---------+----------+--------+---------+---------+---------+---------+--------+---------+----------------+
|  N|   P|  K|  pH|  EC|  OC|   S|  Zn|  Fe|  Cu|  Mn|   B|    OM|Fertility|N_filled| P_filled|K_filled|pH_filled|EC_filled| OC_filled|S_filled|Zn_filled|Fe_filled|Cu_filled|Mn_filled|B_filled|OM_filled|Fertility_filled|
+---+----+---+----+----+----+----+----+----+----+----+----+------+---------+--------+---------+--------+---------+---------+----------+--------+---------+---------+---------+---------+--------+---------+----------------+
|138| 8.6|560|7.46|0.62| 0.7| 5.9|0.24|0.31|0.77|8.71|0.11| 1.204|        0|     138|      8.6|     560|     7.46|     0.62|       0.7|     5.9|     0.24|     0.31|     0.77|     8.71|    0.11|    1.204|               0|
|213| 7.5|338|7.62|0.75|1.06|25.4| 0.3|0.86|1.54|2.89|2.29|1.8232|        0|     213|      7.5|     338|     7.62|  

Quartiles and statistics

In [253]:
df_spark.summary().show()

+-------+------------------+------------------+------------------+-------------------+-------------------+------------------+-----------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+-------------------+------------------+-----------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+
|summary|                 N|                 P|                 K|                 pH|                 EC|                OC|                S|                Zn|                Fe|                Cu|               Mn|                 B|                OM|         Fertility|          N_filled|          P_filled|          K_filled|          pH_filled|          EC_filled|         OC_filled|         S_filled|         Zn_filled|         Fe_filled|         Cu

Filters

In [254]:
df_filtered = df_spark.filter((df_spark['N'] >= 201) & (df_spark["Fertility"] == 1) & ~(df_spark["OC"] > 0.8))

In [255]:
df_filtered.show()

+---+---+---+----+----+----+----+----+----+----+-----+----+------+---------+--------+--------+--------+---------+---------+---------+--------+---------+---------+---------+---------+--------+---------+----------------+
|  N|  P|  K|  pH|  EC|  OC|   S|  Zn|  Fe|  Cu|   Mn|   B|    OM|Fertility|N_filled|P_filled|K_filled|pH_filled|EC_filled|OC_filled|S_filled|Zn_filled|Fe_filled|Cu_filled|Mn_filled|B_filled|OM_filled|Fertility_filled|
+---+---+---+----+----+----+----+----+----+----+-----+----+------+---------+--------+--------+--------+---------+---------+---------+--------+---------+---------+---------+---------+--------+---------+----------------+
|289|8.6|560|7.58|0.44|0.67| 7.3|0.63|0.66|0.94| 2.43|1.79|1.1524|        1|     289|     8.6|     560|     7.58|     0.44|     0.67|     7.3|     0.63|     0.66|     0.94|     2.43|    1.79|   1.1524|               1|
|201|7.7|676|7.39|0.77|0.72| 9.7|0.58|0.47|1.02| 3.77|2.56|1.2384|        1|     201|     7.7|     676|     7.39|     0.77| 

aggregation & group by

In [256]:
df_spark.groupBy("Fertility").sum().show()

+---------+------+------------------+------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+--------------+-------------+------------------+-------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+---------------------+
|Fertility|sum(N)|            sum(P)|sum(K)|           sum(pH)|           sum(EC)|           sum(OC)|            sum(S)|           sum(Zn)|           sum(Fe)|           sum(Cu)|           sum(Mn)|            sum(B)|           sum(OM)|sum(Fertility)|sum(N_filled)|     sum(P_filled)|sum(K_filled)|    sum(pH_filled)|    sum(EC_filled)|    sum(OC_filled)|     sum(S_filled)|    sum(Zn_filled)|    sum(Fe_filled)|    sum(Cu_filled)|    sum(Mn_filled)|     sum(B_filled)|    sum(OM_filled)|sum(F

In [257]:
df_spark.groupBy("Fertility").min("P").show()

+---------+------+
|Fertility|min(P)|
+---------+------+
|        1|   2.9|
|        2|   5.3|
|        0|   3.9|
+---------+------+



In [258]:
df_spark.agg( {'Fertility':"count"}).show()

+----------------+
|count(Fertility)|
+----------------+
|             885|
+----------------+



MLib

In [259]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=['N_filled', 'P_filled', 'K_filled', 'OC_filled'], outputCol='features')

In [260]:
output = assembler.transform(df_spark)

In [261]:
output.show()

+---+----+---+----+----+----+----+----+----+----+----+----+------+---------+--------+---------+--------+---------+---------+----------+--------+---------+---------+---------+---------+--------+---------+----------------+--------------------+
|  N|   P|  K|  pH|  EC|  OC|   S|  Zn|  Fe|  Cu|  Mn|   B|    OM|Fertility|N_filled| P_filled|K_filled|pH_filled|EC_filled| OC_filled|S_filled|Zn_filled|Fe_filled|Cu_filled|Mn_filled|B_filled|OM_filled|Fertility_filled|            features|
+---+----+---+----+----+----+----+----+----+----+----+----+------+---------+--------+---------+--------+---------+---------+----------+--------+---------+---------+---------+---------+--------+---------+----------------+--------------------+
|138| 8.6|560|7.46|0.62| 0.7| 5.9|0.24|0.31|0.77|8.71|0.11| 1.204|        0|     138|      8.6|     560|     7.46|     0.62|       0.7|     5.9|     0.24|     0.31|     0.77|     8.71|    0.11|    1.204|               0|[138.0,8.60000038...|
|213| 7.5|338|7.62|0.75|1.06|25.

In [262]:
modelData = output.select("Fertility_filled","features")

In [263]:
modelData.show()

+----------------+--------------------+
|Fertility_filled|            features|
+----------------+--------------------+
|               0|[138.0,8.60000038...|
|               0|[213.0,7.5,338.0,...|
|               0|[163.0,9.60000038...|
|               0|[157.0,6.80000019...|
|               1|[270.0,9.89999961...|
|               0|[220.0,8.60000038...|
|               0|[220.0,7.19999980...|
|               0|[207.0,7.0,401.0,...|
|               2|[333.0,14.8999996...|
|               1|[289.0,8.60000038...|
|               0|[138.0,8.10000038...|
|               1|[151.0,8.10000038...|
|               0|[144.0,7.19999980...|
|               2|[138.0,5.30000019...|
|               0|[144.0,8.30000019...|
|               1|[201.0,7.69999980...|
|               0|[182.0,14.5552663...|
|               0|[238.0,7.5,771.0,...|
|               1|[270.0,8.10000038...|
|               0|[213.0,6.09999990...|
+----------------+--------------------+
only showing top 20 rows



In [264]:
from pyspark.ml.classification import LogisticRegression

train_data,test_data = modelData.randomSplit([0.7,0.3], seed=1234)

regressor = LogisticRegression(featuresCol='features', labelCol='Fertility_filled')

regressor = regressor.fit(train_data)

pred_results = regressor.evaluate(test_data)

Evaluation of classification

In [265]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

evaluator = MulticlassClassificationEvaluator(labelCol="Fertility_filled", predictionCol="prediction", metricName="f1")

evaluator.evaluate(pred_results.predictions)


0.8356527813696564

Transform data using function

In [266]:
df_spark.columns

['N',
 'P',
 'K',
 'pH',
 'EC',
 'OC',
 'S',
 'Zn',
 'Fe',
 'Cu',
 'Mn',
 'B',
 'OM',
 'Fertility',
 'N_filled',
 'P_filled',
 'K_filled',
 'pH_filled',
 'EC_filled',
 'OC_filled',
 'S_filled',
 'Zn_filled',
 'Fe_filled',
 'Cu_filled',
 'Mn_filled',
 'B_filled',
 'OM_filled',
 'Fertility_filled']

In [267]:
def translate(fertility):
    if fertility == 0:
        return "not ferile"
    elif fertility == 1:
        return "ferile"
    elif fertility == 2:
        return "high fertility"
    else:
        return "Unknown"


translate_udf = udf(translate, StringType())

df_spark = df_spark.withColumn("Fertility_filled", df_spark["Fertility_filled"].cast(IntegerType()))

df_spark = df_spark.withColumn("TraductedFertility",translate_udf(df_spark['Fertility_filled']))