<a href="https://colab.research.google.com/github/morrowbord/Spark/blob/main/Custom_Estimator.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
pip install pyspark pyarrow

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 32 kB/s 
Collecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 35.7 MB/s 
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805911 sha256=e7ccfb04045029025faa3188b9fb7ce3047ecea80bb369579c7fc0f80a493cd6
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local[*]")\
        .appName("Colab_pyspark")\
        .config('spark.ui.port', '4050')\
        .config('spark.executor.memory', '3g')\
        .config('spark.sql.execution.arrow.enabled', 'true')\
        .getOrCreate()
        # .config('spark.sql.execution.arrow.enabled', 'true')\
        # .config('spark."Broadcastsizetable"', '-1')\
        # .config('preferSortHashJoin', 'true')\

In [None]:
from pyspark.ml import Estimator
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split

In [None]:
X, y = make_classification(random_state=5757)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=5757)

In [None]:
X_train.shape

(80, 20)

In [None]:
est = LogisticRegression(random_state=5757)
est.fit(X_train, y_train)

LogisticRegression(random_state=5757)

## Способ №1, правильный, но не работает :(

In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.mllib.linalg import VectorUDT, DenseVector

schema = StructType(fields=[
    StructField("features", VectorUDT()),
    StructField("label", IntegerType())
])

In [None]:
df_test = spark.createDataFrame(zip(map(DenseVector, X_test), map(int, y_test)), schema=schema)

In [None]:
df_test.printSchema()

root
 |-- features: vector (nullable = true)
 |-- label: integer (nullable = true)



In [None]:
df_test.show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[0.16713128673558...|    0|
|[2.50158633810562...|    0|
|[-0.7699917522248...|    0|
|[-0.9514898817744...|    0|
|[1.32598926517448...|    0|
|[-0.5091954518113...|    0|
|[-0.9442948966018...|    1|
|[0.53537784380006...|    1|
|[-1.9316295195085...|    0|
|[-0.8788162423442...|    1|
|[0.47792785291472...|    1|
|[-0.7537969085613...|    1|
|[-0.1103342853056...|    0|
|[0.21903394455633...|    1|
|[-0.7010314631582...|    1|
|[-0.4024860125426...|    1|
|[1.91508615582208...|    1|
|[1.03957987692160...|    1|
|[-1.9962336859493...|    1|
|[0.03293939843484...|    1|
+--------------------+-----+



In [None]:
est_broadcast = spark.sparkContext.broadcast(est)

In [None]:
import pyspark.sql.functions as F

In [None]:
@F.udf(FloatType())
def predict(values):
    predictions = est_broadcast.value.predict([np.array(values)])
    return predictions[0]

In [None]:
import numpy as np
df_test.withColumn("prediction", predict("features")).show()

In [None]:
@F.pandas_udf(FloatType())
def predict(series):
    predictions = est_broadcast.value.predict(series)
    return pd.Series(predictions)

In [None]:
df_test.withColumn("prediction", predict("features")).show()

## Способ №2, неправильный, но работает :/

In [None]:
import pandas as pd
import numpy as np

In [None]:
@F.udf(returnType=ArrayType(FloatType()))
def vectorToArray(row):
    return [float(x) for x in row]

In [None]:
df_test

DataFrame[features: vector, label: int]

In [None]:
df_test = df_test.withColumn("features_array", vectorToArray("features"))

In [None]:
df_test.printSchema()

root
 |-- features: vector (nullable = true)
 |-- label: integer (nullable = true)
 |-- features_array: array (nullable = true)
 |    |-- element: float (containsNull = true)



In [None]:
@F.pandas_udf(FloatType())
def predict(series):
    # Необходимо сделать преобразования, потому что на вход приходит pd.Series(list)
    predictions = est_broadcast.value.predict(series.tolist())
    return pd.Series(predictions)

In [None]:
df_test.withColumn("prediction", predict("features_array")).show()

+--------------------+-----+--------------------+----------+
|            features|label|      features_array|prediction|
+--------------------+-----+--------------------+----------+
|[0.16713128673558...|    0|[0.16713129, 2.28...|       0.0|
|[2.50158633810562...|    0|[2.5015864, 1.394...|       0.0|
|[-0.7699917522248...|    0|[-0.76999176, 0.1...|       0.0|
|[-0.9514898817744...|    0|[-0.95148987, 1.6...|       0.0|
|[1.32598926517448...|    0|[1.3259892, 1.287...|       0.0|
|[-0.5091954518113...|    0|[-0.50919545, 2.0...|       0.0|
|[-0.9442948966018...|    1|[-0.94429487, 1.2...|       1.0|
|[0.53537784380006...|    1|[0.53537786, 0.08...|       1.0|
|[-1.9316295195085...|    0|[-1.9316295, -1.3...|       0.0|
|[-0.8788162423442...|    1|[-0.87881625, -0....|       1.0|
|[0.47792785291472...|    1|[0.47792786, 1.28...|       1.0|
|[-0.7537969085613...|    1|[-0.75379694, 1.9...|       1.0|
|[-0.1103342853056...|    0|[-0.110334285, 3....|       1.0|
|[0.21903394455633...|  

In [None]:
import pickle

In [None]:
with open("logistic_model.pk", "wb") as f:
    pickle.dump(est, f)

In [None]:
from pyspark import keyword_only
from pyspark.ml import Model
from pyspark.ml.param import Param, Params, TypeConverters
from pyspark.ml.param.shared import HasFeaturesCol, HasLabelCol, HasPredictionCol

In [None]:
class SKLogisticRegressionModel(Model, HasFeaturesCol, HasLabelCol, HasPredictionCol):
    model_file = Param(Params._dummy(), "model_file",
                      "path to pickled scikit-learn logistic regression model",
                      typeConverter=TypeConverters.toString)
    @keyword_only
    def __init__(self, model_file=None, featuresCol="features", labelCol="label", predictionCol="prediction"):
        super(SKLogisticRegressionModel, self).__init__()
        if model_file is None:
            raise ValueError("model_file must be specified!")
        with open(model_file, "rb") as f:
            self.estimator = pickle.load(f)
        kwargs = self._input_kwargs
        self._set(**kwargs)
        
    def _transform(self, dataset):
        return dataset.withColumn(self.getPredictionCol(), predict(self.getFeaturesCol()))

In [None]:
spark_est = SKLogisticRegreesionModel(
    model_file="logistic_model.pk", 
    featuresCol="features_array")

In [None]:
spark_est.transform(df_test).show()

In [None]:
from pyspark.ml import Pipeline

In [None]:
pipeline = Pipeline(stages=[
    spark_est
])

In [None]:
pipeline_model = pipeline.fit(df_test)

In [None]:
pipeline_model.transform(df_test).show()

+--------------------+-----+--------------------+----------+
|            features|label|      features_array|prediction|
+--------------------+-----+--------------------+----------+
|[0.16713128673558...|    0|[0.16713129, 2.28...|       0.0|
|[2.50158633810562...|    0|[2.5015864, 1.394...|       0.0|
|[-0.7699917522248...|    0|[-0.76999176, 0.1...|       0.0|
|[-0.9514898817744...|    0|[-0.95148987, 1.6...|       0.0|
|[1.32598926517448...|    0|[1.3259892, 1.287...|       0.0|
|[-0.5091954518113...|    0|[-0.50919545, 2.0...|       0.0|
|[-0.9442948966018...|    1|[-0.94429487, 1.2...|       1.0|
|[0.53537784380006...|    1|[0.53537786, 0.08...|       1.0|
|[-1.9316295195085...|    0|[-1.9316295, -1.3...|       0.0|
|[-0.8788162423442...|    1|[-0.87881625, -0....|       1.0|
|[0.47792785291472...|    1|[0.47792786, 1.28...|       1.0|
|[-0.7537969085613...|    1|[-0.75379694, 1.9...|       1.0|
|[-0.1103342853056...|    0|[-0.110334285, 3....|       1.0|
|[0.21903394455633...|  

In [None]:
class SKLogisticRegression(Estimator, HasFeaturesCol, HasPredictionCol, HasLabelCol):
    @keyword_only
    def __init__(self, featuresCol="features", predictionCol="prediction", labelCol="label"):
        super(SKLogisticRegression, self).__init__()
        kwargs = self._input_kwargs
        self._set(**kwargs)
        
    def _fit(self, dataset):
        local_dataset = dataset.select(self.getFeaturesCol(), self.getLabelCol()).toPandas()
        self.est = LogisticRegression()
        self.est.fit(local_dataset[self.getFeaturesCol()].tolist(), local_dataset[self.getLabelCol()])
        self.model_file = "logistic_regression.pk"
        with open(self.model_file, "wb") as f:
            pickle.dump(self.est, f)
        return SKLogisticRegressionModel(model_file=self.model_file, predictionCol=self.getPredictionCol(),
                                         featuresCol=self.getFeaturesCol(), labelCol=self.getLabelCol())

In [None]:
+-------------------+--------+-------+------------+--------+-----------------------------+----------------------------+---------------+
|           datesold|postcode|  price|propertyType|bedrooms|avg_price_for_10_deals_before|avg_price_for_10_deals_after|last_deal_price|
+-------------------+--------+-------+------------+--------+-----------------------------+----------------------------+---------------+
|2007-07-08 00:00:00|    2600| 327000|       house|       1|                     327000.0|                   706681.82|         900000|
|2007-08-16 00:00:00|    2600| 790000|       house|       4|                     558500.0|                   692590.91|         625000|
|2007-12-05 00:00:00|    2600| 825000|       house|       3|                    647333.33|                   703954.55|         620000|
|2008-01-21 00:00:00|    2600| 315000|        unit|       1|                     564250.0|                   741681.82|         580000|
|2008-04-24 00:00:00|    2600| 292500|       house|       1|                     509900.0|                   792818.18|         445000|
|2008-05-30 00:00:00|    2600| 329000|        unit|       2|                     479750.0|                   859045.45|         357000|
|2008-06-19 00:00:00|    2600| 765000|       house|       5|                     520500.0|                   816772.73|         362000|
|2008-07-29 00:00:00|    2600| 927000|       house|       4|                     571312.5|                   775681.82|         365000|
|2008-09-02 00:00:00|    2600|1380000|       house|       5|                    661166.67|                   754772.73|         541000|
|2008-09-08 00:00:00|    2600| 740000|       house|       3|                     669050.0|                   739772.73|         360500|
|2008-09-17 00:00:00|    2600| 720000|       house|       3|                    673681.82|                   726863.64|         420000|
|2008-09-22 00:00:00|    2600| 690000|       house|       4|                    706681.82|                   744136.36|         685000|
|2008-11-18 00:00:00|    2600| 635000|       house|       3|                    692590.91|                   723681.82|         540000|
|2008-11-18 00:00:00|    2600| 950000|       house|       3|                    703954.55|                   729590.91|         635000|
|2008-11-21 00:00:00|    2600| 730000|       house|       3|                    741681.82|                   728863.64|         500000|
|2008-12-22 00:00:00|    2600| 855000|       house|       3|                    792818.18|                   679318.18|         630000|
|2008-12-24 00:00:00|    2600|1057500|       house|       4|                    859045.45|                   638636.36|         499000|
|2009-01-06 00:00:00|    2600| 300000|        unit|       2|                    816772.73|                   669545.45|         450000|
|2009-01-12 00:00:00|    2600| 475000|        unit|       2|                    775681.82|                   687727.27|         750000|
|2009-01-20 00:00:00|    2600|1150000|       house|       4|                    754772.73|                   674090.91|         500000|
|2009-01-22 00:00:00|    2600| 575000|       house|       3|                    739772.73|                   655909.09|         512000|
|2009-02-03 00:00:00|    2600| 578000|        unit|       2|                    726863.64|                   707909.09|         365000|
|2009-02-13 00:00:00|    2600| 880000|       house|       4|                    744136.36|                    697000.0|         450000|
|2009-02-27 00:00:00|    2600| 410000|        unit|       1|                    723681.82|                   760636.36|         476000|
|2009-03-17 00:00:00|    2600|1015000|       house|       4|                    729590.91|                    742000.0|         580000|
|2009-03-28 00:00:00|    2600| 722000|       house|       4|                    728863.64|                   758181.82|         690000|
|2009-03-30 00:00:00|    2600| 310000|        unit|       1|                    679318.18|                   803818.18|         568500|
|2009-03-31 00:00:00|    2600| 610000|       house|       3|                    638636.36|                   835636.36|         580000|
|2009-04-16 00:00:00|    2600| 640000|        unit|       2|                    669545.45|                   844272.73|         372000|
|2009-05-08 00:00:00|    2600| 675000|       house|       3|                    687727.27|                    854000.0|         615000|
+-------------------+--------+-------+------------+--------+-----------------------------+----------------------------+---------------+

In [None]:
spark_est = SKLogisticRegression(featuresCol="features_array")

In [None]:
spark_est_model = spark_est.fit(df_test)

In [None]:
spark_est

SKLogisticRegression_3da6464bb5e8

In [None]:
spark_est_model

SKLogisticRegreesionModel_1d029893e47c

In [None]:
spark_est_model.transform(df_test).show()

+--------------------+-----+--------------------+----------+
|            features|label|      features_array|prediction|
+--------------------+-----+--------------------+----------+
|[0.16713128673558...|    0|[0.16713129, 2.28...|       0.0|
|[2.50158633810562...|    0|[2.5015864, 1.394...|       0.0|
|[-0.7699917522248...|    0|[-0.76999176, 0.1...|       0.0|
|[-0.9514898817744...|    0|[-0.95148987, 1.6...|       0.0|
|[1.32598926517448...|    0|[1.3259892, 1.287...|       0.0|
|[-0.5091954518113...|    0|[-0.50919545, 2.0...|       0.0|
|[-0.9442948966018...|    1|[-0.94429487, 1.2...|       1.0|
|[0.53537784380006...|    1|[0.53537786, 0.08...|       1.0|
|[-1.9316295195085...|    0|[-1.9316295, -1.3...|       0.0|
|[-0.8788162423442...|    1|[-0.87881625, -0....|       1.0|
|[0.47792785291472...|    1|[0.47792786, 1.28...|       1.0|
|[-0.7537969085613...|    1|[-0.75379694, 1.9...|       1.0|
|[-0.1103342853056...|    0|[-0.110334285, 3....|       1.0|
|[0.21903394455633...|  

In [None]:
pipeline = Pipeline(stages=[
    spark_est
])

In [None]:
pipeline_model = pipeline.fit(df_test)

In [None]:
pipeline_model.transform(df_test).show()

+--------------------+-----+--------------------+----------+
|            features|label|      features_array|prediction|
+--------------------+-----+--------------------+----------+
|[0.16713128673558...|    0|[0.16713129, 2.28...|       0.0|
|[2.50158633810562...|    0|[2.5015864, 1.394...|       0.0|
|[-0.7699917522248...|    0|[-0.76999176, 0.1...|       0.0|
|[-0.9514898817744...|    0|[-0.95148987, 1.6...|       0.0|
|[1.32598926517448...|    0|[1.3259892, 1.287...|       0.0|
|[-0.5091954518113...|    0|[-0.50919545, 2.0...|       0.0|
|[-0.9442948966018...|    1|[-0.94429487, 1.2...|       1.0|
|[0.53537784380006...|    1|[0.53537786, 0.08...|       1.0|
|[-1.9316295195085...|    0|[-1.9316295, -1.3...|       0.0|
|[-0.8788162423442...|    1|[-0.87881625, -0....|       1.0|
|[0.47792785291472...|    1|[0.47792786, 1.28...|       1.0|
|[-0.7537969085613...|    1|[-0.75379694, 1.9...|       1.0|
|[-0.1103342853056...|    0|[-0.110334285, 3....|       1.0|
|[0.21903394455633...|  