In [1]:
# Defining a custom error

class NotFittedError(Exception):
    pass

# Defining the desired class

class StandardScaler:
    mean = None
    def fit(self, iterable):
        self.mean = sum(iterable)/len(iterable)
    def transform(self, iterable):
        if self.mean != None:
            return([element - self.mean for element in iterable])
        else:
            raise NotFittedError("You first have to fit the estimator!")
    def fit_transform(self, iterable):
        self.mean = sum(iterable)/len(iterable)
        return([element - self.mean for element in iterable])

sc = StandardScaler()
sc.fit([1,2,3])
print(sc.transform([1,2,3])) # Testing that the resulting list has a 0 mean
print(sc.fit_transform([1,1,1])) # Testing that the output list has a 0 mean that that the mean was learned
print(sc.transform([2,2,2]))

[-1.0, 0.0, 1.0]
[0.0, 0.0, 0.0]
[1.0, 1.0, 1.0]


In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.ml.feature import VectorAssembler

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession\
        .builder\
        .appName("Cours de Spark")\
        .master("local[*]")\
        .getOrCreate()
sc = spark.sparkContext

rdd = sc.parallelize([(1,2), (3,4), (5,6)])
df = rdd.toDF(["A", "B"])
df.show()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
                                                                                

+---+---+
|  A|  B|
+---+---+
|  1|  2|
|  3|  4|
|  5|  6|
+---+---+



In [3]:
va = VectorAssembler(inputCols=["A", "B"], outputCol="C")
new_df = va.transform(df)
print(type(new_df), new_df.printSchema())
new_df.show()


root
 |-- A: long (nullable = true)
 |-- B: long (nullable = true)
 |-- C: vector (nullable = true)

<class 'pyspark.sql.dataframe.DataFrame'> None
+---+---+---------+
|  A|  B|        C|
+---+---+---------+
|  1|  2|[1.0,2.0]|
|  3|  4|[3.0,4.0]|
|  5|  6|[5.0,6.0]|
+---+---+---------+



In [16]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation

rdd = sc.parallelize([(1,2,3), (2,4,6), (4,8,10)])
df = rdd.toDF(["A", "B", "C"])
df.show()

va = VectorAssembler(inputCols=["A", "B", "C"], outputCol="D")
new_df = va.transform(df)
new_df.show()

Corr = Correlation.corr(new_df, "D", "pearson")
print(type(Corr))
Corr.show()
print('##\n\n',str(Corr.collect()[0][0]))


+---+---+---+
|  A|  B|  C|
+---+---+---+
|  1|  2|  3|
|  2|  4|  6|
|  4|  8| 10|
+---+---+---+

+---+---+---+--------------+
|  A|  B|  C|             D|
+---+---+---+--------------+
|  1|  2|  3| [1.0,2.0,3.0]|
|  2|  4|  6| [2.0,4.0,6.0]|
|  4|  8| 10|[4.0,8.0,10.0]|
+---+---+---+--------------+

<class 'pyspark.sql.dataframe.DataFrame'>
+--------------------+
|          pearson(D)|
+--------------------+
|1.0              ...|
+--------------------+

##

 DenseMatrix([[1.        , 1.        , 0.99419163],
             [1.        , 1.        , 0.99419163],
             [0.99419163, 0.99419163, 1.        ]])


In [17]:
from pyspark.ml.stat import Summarizer

summarizer = Summarizer.metrics("min", "mean", "max", "numNonZeros")

rdd = sc.parallelize([(0,0,3), (0,4,6), (4,8,10)])
df = rdd.toDF(["A", "B", "C"])
va = VectorAssembler(inputCols=["A", "B", "C"], outputCol="features")
new_df = va.transform(df)

new_df.select(summarizer.summary(new_df["features"])).show(truncate=False)


[Stage 51:>                                                         (0 + 2) / 2]

+------------------------------------------------------------------------------------------+
|aggregate_metrics(features, 1.0)                                                          |
+------------------------------------------------------------------------------------------+
|{[0.0,0.0,3.0], [1.3333333333333333,4.0,6.333333333333334], [4.0,8.0,10.0], [1.0,2.0,3.0]}|
+------------------------------------------------------------------------------------------+



                                                                                

In [18]:
df.show()

+---+---+---+
|  A|  B|  C|
+---+---+---+
|  0|  0|  3|
|  0|  4|  6|
|  4|  8| 10|
+---+---+---+



In [25]:
penguins_df = spark.read.option("header", True)\
                        .option("inferSchema", True)\
                        .option("escape", "\"")\
                        .csv("penguins.csv")

print(penguins_df.printSchema())
penguins_df.show()


root
 |-- species: string (nullable = true)
 |-- island: string (nullable = true)
 |-- bill_length_mm: double (nullable = true)
 |-- bill_depth_mm: double (nullable = true)
 |-- flipper_length_mm: integer (nullable = true)
 |-- body_mass_g: integer (nullable = true)
 |-- sex: string (nullable = true)

None
+-------+---------+--------------+-------------+-----------------+-----------+------+
|species|   island|bill_length_mm|bill_depth_mm|flipper_length_mm|body_mass_g|   sex|
+-------+---------+--------------+-------------+-----------------+-----------+------+
| Adelie|Torgersen|          39.1|         18.7|              181|       3750|  MALE|
| Adelie|Torgersen|          39.5|         17.4|              186|       3800|FEMALE|
| Adelie|Torgersen|          40.3|         18.0|              195|       3250|FEMALE|
| Adelie|Torgersen|          NULL|         NULL|             NULL|       NULL|  NULL|
| Adelie|Torgersen|          36.7|         19.3|              193|       3450|FEMALE|
| Ad

In [26]:
penguins_df.summary().show()


[Stage 68:>                                                         (0 + 1) / 1]

+-------+-------+---------+-----------------+------------------+------------------+-----------------+------+
|summary|species|   island|   bill_length_mm|     bill_depth_mm| flipper_length_mm|      body_mass_g|   sex|
+-------+-------+---------+-----------------+------------------+------------------+-----------------+------+
|  count|    344|      344|              342|               342|               342|              342|   333|
|   mean|   NULL|     NULL|43.92192982456142|17.151169590643278|200.91520467836258|4201.754385964912|  NULL|
| stddev|   NULL|     NULL|5.459583713926537|1.9747931568167807|14.061713679356952|801.9545356980949|  NULL|
|    min| Adelie|   Biscoe|             32.1|              13.1|               172|             2700|FEMALE|
|    25%|   NULL|     NULL|             39.2|              15.6|               190|             3550|  NULL|
|    50%|   NULL|     NULL|             44.4|              17.3|               197|             4050|  NULL|
|    75%|   NULL|  

                                                                                

In [27]:

from pyspark.ml.feature import Imputer

impute_mean = Imputer(strategy="mean",\
                      inputCols=["bill_length_mm", "bill_depth_mm", "flipper_length_mm"],\
                      outputCols=["bill_length_mm_f", "bill_depth_mm_f", "flipper_length_mm_f"])

impute_median = Imputer(strategy="median",\
                        inputCol="body_mass_g",\
                        outputCol="body_mass_g_f")

model_mean = impute_mean.fit(penguins_df)
model_median = impute_median.fit(penguins_df)

penguins_no_na = model_mean.transform(model_median.transform(penguins_df))
penguins_no_na.select(["bill_length_mm_f", "bill_depth_mm_f", "flipper_length_mm_f", "body_mass_g_f"]).show()

+-----------------+------------------+-------------------+-------------+
| bill_length_mm_f|   bill_depth_mm_f|flipper_length_mm_f|body_mass_g_f|
+-----------------+------------------+-------------------+-------------+
|             39.1|              18.7|                181|         3750|
|             39.5|              17.4|                186|         3800|
|             40.3|              18.0|                195|         3250|
|43.92192982456142|17.151169590643278|                200|         4050|
|             36.7|              19.3|                193|         3450|
|             39.3|              20.6|                190|         3650|
|             38.9|              17.8|                181|         3625|
|             39.2|              19.6|                195|         4675|
|             34.1|              18.1|                193|         3475|
|             42.0|              20.2|                190|         4250|
|             37.8|              17.1|             

In [28]:
from pyspark.ml.feature import StringIndexer

index = StringIndexer(inputCols=["species", "sex", "island"],\
                      outputCols=["label", "sex_f", "island_f"],\
                      handleInvalid="keep")

model_index = index.fit(penguins_no_na)

penguins_indexed = model_index.transform(penguins_no_na)
penguins_indexed.select(["label", "sex_f", "island_f"]).show()

+-----+-----+--------+
|label|sex_f|island_f|
+-----+-----+--------+
|  0.0|  0.0|     2.0|
|  0.0|  1.0|     2.0|
|  0.0|  1.0|     2.0|
|  0.0|  2.0|     2.0|
|  0.0|  1.0|     2.0|
|  0.0|  0.0|     2.0|
|  0.0|  1.0|     2.0|
|  0.0|  0.0|     2.0|
|  0.0|  2.0|     2.0|
|  0.0|  2.0|     2.0|
|  0.0|  2.0|     2.0|
|  0.0|  2.0|     2.0|
|  0.0|  1.0|     2.0|
|  0.0|  0.0|     2.0|
|  0.0|  0.0|     2.0|
|  0.0|  1.0|     2.0|
|  0.0|  1.0|     2.0|
|  0.0|  0.0|     2.0|
|  0.0|  1.0|     2.0|
|  0.0|  0.0|     2.0|
+-----+-----+--------+
only showing top 20 rows



In [29]:
impute_mode = Imputer(strategy="mode",\
                      missingValue=2.0,\
                      inputCol="sex_f",\
                      outputCol="sex_f")

model_mode = impute_mode.fit(penguins_indexed)

penguins_indexed_f = model_mode.transform(penguins_indexed)
penguins_indexed_f.select("sex", "sex_f").show()


+------+-----+
|   sex|sex_f|
+------+-----+
|  MALE|  0.0|
|FEMALE|  1.0|
|FEMALE|  1.0|
|  NULL|  0.0|
|FEMALE|  1.0|
|  MALE|  0.0|
|FEMALE|  1.0|
|  MALE|  0.0|
|  NULL|  0.0|
|  NULL|  0.0|
|  NULL|  0.0|
|  NULL|  0.0|
|FEMALE|  1.0|
|  MALE|  0.0|
|  MALE|  0.0|
|FEMALE|  1.0|
|FEMALE|  1.0|
|  MALE|  0.0|
|FEMALE|  1.0|
|  MALE|  0.0|
+------+-----+
only showing top 20 rows



In [30]:
from pyspark.ml.feature import OneHotEncoder

encoder = OneHotEncoder(inputCol="island_f",\
                        outputCol="island_encoded")

model_encoder = encoder.fit(penguins_indexed_f)

penguins_encoded = model_encoder.transform(penguins_indexed_f)
penguins_encoded.select("island_encoded").show()

+--------------+
|island_encoded|
+--------------+
| (3,[2],[1.0])|
| (3,[2],[1.0])|
| (3,[2],[1.0])|
| (3,[2],[1.0])|
| (3,[2],[1.0])|
| (3,[2],[1.0])|
| (3,[2],[1.0])|
| (3,[2],[1.0])|
| (3,[2],[1.0])|
| (3,[2],[1.0])|
| (3,[2],[1.0])|
| (3,[2],[1.0])|
| (3,[2],[1.0])|
| (3,[2],[1.0])|
| (3,[2],[1.0])|
| (3,[2],[1.0])|
| (3,[2],[1.0])|
| (3,[2],[1.0])|
| (3,[2],[1.0])|
| (3,[2],[1.0])|
+--------------+
only showing top 20 rows



In [31]:
from pyspark.ml.feature import VectorAssembler, StandardScaler

va = VectorAssembler(inputCols=["bill_length_mm_f", "bill_depth_mm_f", "flipper_length_mm_f", "body_mass_g_f"],\
                     outputCol="numerical_features")

scaler = StandardScaler(inputCol="numerical_features", outputCol="scaled_features")

penguins_scaled_ass = va.transform(penguins_encoded)
penguins_scaled_ass.select("numerical_features").show()

scalerModel = scaler.fit(penguins_scaled_ass)

penguins_scaled = scalerModel.transform(penguins_scaled_ass)
penguins_scaled.select("numerical_features", "scaled_features").show()


+--------------------+
|  numerical_features|
+--------------------+
|[39.1,18.7,181.0,...|
|[39.5,17.4,186.0,...|
|[40.3,18.0,195.0,...|
|[43.9219298245614...|
|[36.7,19.3,193.0,...|
|[39.3,20.6,190.0,...|
|[38.9,17.8,181.0,...|
|[39.2,19.6,195.0,...|
|[34.1,18.1,193.0,...|
|[42.0,20.2,190.0,...|
|[37.8,17.1,186.0,...|
|[37.8,17.3,180.0,...|
|[41.1,17.6,182.0,...|
|[38.6,21.2,191.0,...|
|[34.6,21.1,198.0,...|
|[36.6,17.8,185.0,...|
|[38.7,19.0,195.0,...|
|[42.5,20.7,197.0,...|
|[34.4,18.4,184.0,...|
|[46.0,21.5,194.0,...|
+--------------------+
only showing top 20 rows

+--------------------+--------------------+
|  numerical_features|     scaled_features|
+--------------------+--------------------+
|[39.1,18.7,181.0,...|[7.18268959235864...|
|[39.5,17.4,186.0,...|[7.25616979279197...|
|[40.3,18.0,195.0,...|[7.40313019365864...|
|[43.9219298245614...|[8.06848051731928...|
|[36.7,19.3,193.0,...|[6.74180838975862...|
|[39.3,20.6,190.0,...|[7.21942969257530...|
|[38.9,17.8,181.0,...|[7.1

In [33]:
penguins_scaled.select("numerical_features", "scaled_features").show(truncate=False)


+---------------------------------------------------+----------------------------------------------------------------------------+
|numerical_features                                 |scaled_features                                                             |
+---------------------------------------------------+----------------------------------------------------------------------------+
|[39.1,18.7,181.0,3750.0]                           |[7.18268959235864,9.497074906915868,12.909363540496079,4.689278807334335]   |
|[39.5,17.4,186.0,3800.0]                           |[7.256169792791977,8.836850448146317,13.26597579299597,4.751802524765459]   |
|[40.3,18.0,195.0,3250.0]                           |[7.403130193658649,9.14156942911688,13.907877847495774,4.06404163302309]    |
|[43.92192982456142,17.151169590643278,200.0,4050.0]|[8.06848051731928,8.710478200190202,14.264490099995667,5.064421111921082]   |
|[36.7,19.3,193.0,3450.0]                           |[6.741808389758622,9.801793887

In [34]:
final_va = VectorAssembler(inputCols=["island_encoded", "scaled_features", "sex_f"],\
                           outputCol="features")

penguins_final = final_va.transform(penguins_scaled).select("features", "label")
penguins_final.show()


+--------------------+-----+
|            features|label|
+--------------------+-----+
|[0.0,0.0,1.0,7.18...|  0.0|
|[0.0,0.0,1.0,7.25...|  0.0|
|[0.0,0.0,1.0,7.40...|  0.0|
|[0.0,0.0,1.0,8.06...|  0.0|
|[0.0,0.0,1.0,6.74...|  0.0|
|[0.0,0.0,1.0,7.21...|  0.0|
|[0.0,0.0,1.0,7.14...|  0.0|
|[0.0,0.0,1.0,7.20...|  0.0|
|[0.0,0.0,1.0,6.26...|  0.0|
|[0.0,0.0,1.0,7.71...|  0.0|
|[0.0,0.0,1.0,6.94...|  0.0|
|[0.0,0.0,1.0,6.94...|  0.0|
|[0.0,0.0,1.0,7.55...|  0.0|
|[0.0,0.0,1.0,7.09...|  0.0|
|[0.0,0.0,1.0,6.35...|  0.0|
|[0.0,0.0,1.0,6.72...|  0.0|
|[0.0,0.0,1.0,7.10...|  0.0|
|[0.0,0.0,1.0,7.80...|  0.0|
|[0.0,0.0,1.0,6.31...|  0.0|
|[0.0,0.0,1.0,8.45...|  0.0|
+--------------------+-----+
only showing top 20 rows



In [35]:
penguins_final.show(truncate=False)

+--------------------------------------------------------------------------------------------+-----+
|features                                                                                    |label|
+--------------------------------------------------------------------------------------------+-----+
|[0.0,0.0,1.0,7.18268959235864,9.497074906915868,12.909363540496079,4.689278807334335,0.0]   |0.0  |
|[0.0,0.0,1.0,7.256169792791977,8.836850448146317,13.26597579299597,4.751802524765459,1.0]   |0.0  |
|[0.0,0.0,1.0,7.403130193658649,9.14156942911688,13.907877847495774,4.06404163302309,1.0]    |0.0  |
|[0.0,0.0,1.0,8.06848051731928,8.710478200190202,14.264490099995667,5.064421111921082,0.0]   |0.0  |
|[0.0,0.0,1.0,6.741808389758622,9.801793887886433,13.765232946495818,4.314136502747588,1.0]  |0.0  |
|[0.0,0.0,1.0,7.219429692575308,10.462018346655984,13.551265594995883,4.5642313724720855,0.0]|0.0  |
|[0.0,0.0,1.0,7.1459494921419715,9.039996435460026,12.909363540496079,4.532969513756524,1.0

In [36]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

lrModel = lr.fit(penguins_final)


In [37]:
penguins_predict = lrModel.transform(penguins_final)
penguins_predict.show()


+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[0.0,0.0,1.0,7.18...|  0.0|[1.58875008949583...|[0.63772122064009...|       0.0|
|[0.0,0.0,1.0,7.25...|  0.0|[1.55648011267446...|[0.59867134949241...|       0.0|
|[0.0,0.0,1.0,7.40...|  0.0|[1.49194015903172...|[0.57846427961242...|       0.0|
|[0.0,0.0,1.0,8.06...|  0.0|[1.19974118031367...|[0.46883296562379...|       0.0|
|[0.0,0.0,1.0,6.74...|  0.0|[1.78236995042404...|[0.67034990632772...|       0.0|
|[0.0,0.0,1.0,7.21...|  0.0|[1.57261510108515...|[0.64553243203912...|       0.0|
|[0.0,0.0,1.0,7.14...|  0.0|[1.60488507790651...|[0.62875276475007...|       0.0|
|[0.0,0.0,1.0,7.20...|  0.0|[1.58068259529049...|[0.61526950047541...|       0.0|
|[0.0,0.0,1.0,6.26...|  0.0|[1.99212479976294...|[0.69727180569744...|       0.0|
|[0.0,0.0,1.0,7.

In [38]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator()
print(evaluator.evaluate(penguins_predict))
print(evaluator.evaluate(penguins_predict, {evaluator.metricName: "accuracy"}))


0.730188079806837
0.8052325581395349


In [39]:
train, test = penguins_final.randomSplit([0.7, 0.3], seed=12345)

lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

lrModel = lr.fit(train)

test_predict = lrModel.transform(test)

evaluator = MulticlassClassificationEvaluator()

print(evaluator.evaluate(test_predict))
print(evaluator.evaluate(test_predict, {evaluator.metricName: "accuracy"}))


0.6483238016339061
0.7472527472527473


In [40]:
evaluator.getMetricName()

'f1'

In [42]:
print(evaluator.explainParams())

beta: The beta value used in weightedFMeasure|fMeasureByLabel. Must be > 0. The default value is 1. (default: 1.0)
eps: log-loss is undefined for p=0 or p=1, so probabilities are clipped to max(eps, min(1 - eps, p)). Must be in range (0, 0.5). The default value is 1e-15. (default: 1e-15)
labelCol: label column name. (default: label)
metricLabel: The class whose metric will be computed in truePositiveRateByLabel|falsePositiveRateByLabel|precisionByLabel|recallByLabel|fMeasureByLabel. Must be >= 0. The default value is 0. (default: 0.0)
metricName: metric name in evaluation (f1|accuracy|weightedPrecision|weightedRecall|weightedTruePositiveRate| weightedFalsePositiveRate|weightedFMeasure|truePositiveRateByLabel| falsePositiveRateByLabel|precisionByLabel|recallByLabel|fMeasureByLabel| logLoss|hammingLoss) (default: f1)
predictionCol: prediction column name. (default: prediction)
probabilityCol: Column name for predicted class conditional probabilities. Note: Not all models output well-cali

In [43]:
from pyspark.ml.tuning import ParamGridBuilder

lr = LogisticRegression()
output = ParamGridBuilder().baseOn({lr.labelCol: 'label'})\
                           .baseOn([lr.predictionCol, 'prediction'])\
                           .addGrid(lr.regParam, [1.0, 2.0])\
                           .addGrid(lr.maxIter, [1, 5])\
                           .build()


from pyspark.ml.tuning import CrossValidator

cv = CrossValidator(estimator=lr,\
                    estimatorParamMaps=output,\
                    evaluator=evaluator,\
                    numFolds=3)


In [44]:
cvModel = cv.fit(train)

test_predict = cvModel.transform(test)

evaluator = MulticlassClassificationEvaluator()

print(evaluator.evaluate(test_predict))
print(evaluator.evaluate(test_predict, {evaluator.metricName: "accuracy"}))


0.6684941965664856
0.7472527472527473


In [45]:
cvModel

CrossValidatorModel_1d4a1fdd6a27

In [46]:
from pyspark import SparkContext
from pyspark.conf import SparkConf
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import Imputer, OneHotEncoder, StandardScaler, StringIndexer, VectorAssembler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql import SparkSession

conf = SparkConf()
conf.set("spark.log.level", "error")
conf.set("spark.ui.showConsoleProgress", "false") 

sc = SparkContext.getOrCreate(conf=conf) 

spark = SparkSession \
    .builder \
    .master("local") \
    .appName("Introduction au DataFrame") \
    .getOrCreate()

df = spark.read.option("header", True)\
               .option("inferSchema", True)\
               .option("escape", "\"")\
               .csv("penguins.csv")

train, test = df.randomSplit([0.9, 0.1], seed=12345)

impute_mean = Imputer(strategy="mean",\
                      inputCols=["bill_length_mm", "bill_depth_mm", "flipper_length_mm"],\
                      outputCols=["bill_length_mm_f", "bill_depth_mm_f", "flipper_length_mm_f"])

impute_median = Imputer(strategy="median",\
                        inputCol="body_mass_g",\
                        outputCol="body_mass_g_f")

index = StringIndexer(inputCols=["species", "sex", "island"],\
                      outputCols=["label", "sex_f", "island_f"],\
                      handleInvalid="keep")

impute_mode = Imputer(strategy="mode",\
                      missingValue=2.0,\
                      inputCol="sex_f",\
                      outputCol="sex_f")

encoder = OneHotEncoder(inputCol="island_f",\
                        outputCol="island_encoded")

va = VectorAssembler(inputCols=["bill_length_mm_f", "bill_depth_mm_f", "flipper_length_mm_f", "body_mass_g_f"],\
                     outputCol="numerical_features")

scaler = StandardScaler(inputCol="numerical_features", outputCol="scaled_features")

final_va = VectorAssembler(inputCols=["island_encoded", "scaled_features", "sex_f"],\
                           outputCol="features")

lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

pipeline = Pipeline(stages=[impute_mean, impute_median, index, impute_mode, encoder, va, scaler, final_va, lr])

pipelineModel = pipeline.fit(train)

test_predict = pipelineModel.transform(test)

evaluator = MulticlassClassificationEvaluator()

print(evaluator.evaluate(test_predict))
print(evaluator.evaluate(test_predict, {evaluator.metricName: "accuracy"}))

0.871545593662796
0.9130434782608695


In [47]:
from pyspark import SparkContext
from pyspark.conf import SparkConf
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import Imputer, OneHotEncoder, StandardScaler, StringIndexer, VectorAssembler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql import SparkSession

conf = SparkConf()
conf.set("spark.log.level", "error")
conf.set("spark.ui.showConsoleProgress", "false") 

sc = SparkContext.getOrCreate(conf=conf) 

spark = SparkSession \
    .builder \
    .master("local") \
    .appName("Introduction au DataFrame") \
    .getOrCreate()

df = spark.read.option("header", True)\
               .option("inferSchema", True)\
               .option("escape", "\"")\
               .csv("penguins.csv")

train, test = df.randomSplit([0.9, 0.1], seed=12345)

impute_mean = Imputer(strategy="mean",\
                      inputCols=["bill_length_mm", "bill_depth_mm", "flipper_length_mm"],\
                      outputCols=["bill_length_mm_f", "bill_depth_mm_f", "flipper_length_mm_f"])

impute_median = Imputer(strategy="median",\
                        inputCol="body_mass_g",\
                        outputCol="body_mass_g_f")

index = StringIndexer(inputCols=["species", "sex", "island"],\
                      outputCols=["label", "sex_f", "island_f"],\
                      handleInvalid="keep")

impute_mode = Imputer(strategy="mode",\
                      missingValue=2.0,\
                      inputCol="sex_f",\
                      outputCol="sex_f")

encoder = OneHotEncoder(inputCol="island_f",\
                        outputCol="island_encoded")

va = VectorAssembler(inputCols=["bill_length_mm_f", "bill_depth_mm_f", "flipper_length_mm_f", "body_mass_g_f"],\
                     outputCol="numerical_features")

scaler = StandardScaler(inputCol="numerical_features", outputCol="scaled_features")

final_va = VectorAssembler(inputCols=["island_encoded", "scaled_features", "sex_f"],\
                           outputCol="features")

lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

pipeline = Pipeline(stages=[impute_mean, impute_median, index, impute_mode, encoder, va, scaler, final_va, lr])

evaluator = MulticlassClassificationEvaluator()

output = ParamGridBuilder() \
    .baseOn({lr.labelCol: 'label'}) \
    .baseOn([lr.predictionCol, 'prediction']) \
    .addGrid(lr.regParam, [0.3, 1.3]) \
    .addGrid(lr.elasticNetParam, [0.6, 0.8]) \
    .addGrid(lr.maxIter, [5, 10]) \
    .build()

cv = CrossValidator(estimator=pipeline,\
                    estimatorParamMaps=output,\
                    evaluator=evaluator,\
                    numFolds=3)

cvModel = cv.fit(train)

test_predict = cvModel.transform(test)

evaluator = MulticlassClassificationEvaluator()

print(evaluator.evaluate(test_predict))
print(evaluator.evaluate(test_predict, {evaluator.metricName: "accuracy"}))

1.0
1.0
