In [1]:
import findspark
findspark.init()

from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params, TypeConverters
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable  
from pyspark.ml import Estimator
from pyspark.sql import functions as F
from pyspark.ml import Pipeline

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
data = spark.createDataFrame([(4, 13.87), (5, 23.5), (6, 45.10), (7, 30.78)], ["id", "price"])
data.show()

:: loading settings :: url = jar:file:/Users/agundrod/spark-3.3.2/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/agundrod/.ivy2/cache
The jars for the packages stored in: /Users/agundrod/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-d54e18bf-8233-41f2-a245-7a8de68cae60;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.2.0 in central
	found io.delta#delta-storage;2.2.0 in central
	found org.antlr#antlr4-runtime;4.8 in central
:: resolution report :: resolve 150ms :: artifacts dl 8ms
	:: modules in use:
	io.delta#delta-core_2.12;2.2.0 from central in [default]
	io.delta#delta-storage;2.2.0 from central in [default]
	org.antlr#antlr4-runtime;4.8 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0  

23/04/06 11:19:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/06 11:19:35 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


[Stage 0:>                                                          (0 + 1) / 1]

+---+-----+
| id|price|
+---+-----+
|  4|13.87|
|  5| 23.5|
|  6| 45.1|
|  7|30.78|
+---+-----+



                                                                                

# Custom SparkML

![SparkML](sparkml.webp)

Rodrigo Agundez - 06 April 2023- [Tutorial in Medium](https://medium.com/@rragundez/easily-build-custom-sparkml-transformers-and-estimators-16ba70414abe)

- [StackOverflow: Create a custom Transformer in PySpark ML](https://stackoverflow.com/questions/32331848/create-a-custom-transformer-in-pyspark-ml)
- [StackOverflow: Serialize a custom transformer using python to be used within a Pyspark ML pipeline](https://stackoverflow.com/questions/41399399/serialize-a-custom-transformer-using-python-to-be-used-within-a-pyspark-ml-pipel/44377489#44377489)

# Transformer: Divide column by a number

The hard way

In [2]:
class DivideColumnTransformer(Transformer, HasInputCol, HasOutputCol, DefaultParamsReadable, DefaultParamsWritable):

    value = Param(Params._dummy(), "value", "value", typeConverter=TypeConverters.toFloat)

    @keyword_only
    def __init__(self, inputCol=None, outputCol=None, value=None):
        super().__init__()
        self.value = Param(self, "value", "")
        self._setDefault(value=1)
        kwargs = self._input_kwargs
        self.setParams(**kwargs)
 
    @keyword_only
    def setParams(self, inputCol=None, outputCol=None, value=None):
      

    def setValue(self, value):
        return self._set(value=float(value))

    def getValue(self):
        return self.getOrDefault(self.value)

    def setInputCol(self, value):
        return self._set(inputCol=value)

    def setOutputCol(self, value):
        return self._set(outputCol=value)

    def _transform(self, dataset):
        dataset = dataset.withColumn(self.getOutputCol(), F.col(self.getInputCol()) / self.getValue())
        return dataset

# Let's transform some data

In [3]:
data.show()

+---+-----+
| id|price|
+---+-----+
|  4|13.87|
|  5| 23.5|
|  6| 45.1|
|  7|30.78|
+---+-----+



In [4]:
col_transformer = DivideColumnTransformer(inputCol="price", outputCol="price", value=2)
col_transformer.transform(data).show()

+---+-----+
| id|price|
+---+-----+
|  4|6.935|
|  5|11.75|
|  6|22.55|
|  7|15.39|
+---+-----+



# Estimator: Normalize by the mean

The hard way

In [5]:
class MeanNormalizerEstimator(Estimator, HasInputCol):

    @keyword_only
    def __init__(self, inputCol=None):
        super().__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def setInputCol(self, value):
        return self._set(inputCol=value)

    def _fit(self, dataset):
        mean, = dataset.agg(F.mean(self.getInputCol())).first()
        return DivideColumnTransformer(inputCol=self.getInputCol(), outputCol=self.getInputCol(), value=mean)

## Fit

In [6]:
column_transformer = MeanNormalizerEstimator(inputCol="price")
pipeline = Pipeline(stages=[column_transformer])
fitted_pipeline = pipeline.fit(data)

## Transform

In [7]:
print("Mean:", fitted_pipeline.stages[0].getValue())
fitted_pipeline.transform(data).show()

Mean: 28.3125
+---+------------------+
| id|             price|
+---+------------------+
|  4|0.4898896247240618|
|  5|0.8300220750551877|
|  6|1.5929359823399558|
|  7|1.0871523178807947|
+---+------------------+



# Save the pipeline

In [8]:
fitted_pipeline.write().overwrite().save("fitted_pipeline.pipeline")

# Load the fitted pipeline and transform

In [9]:
%reset -f
from pyspark.sql import SparkSession
from pyspark.ml import PipelineModel
spark = SparkSession.builder.getOrCreate()
test = spark.createDataFrame([(4, 34.87), (5, 33.5), (6, 15.10), (7, 20.78)], ["id", "price"])
test.show()

+---+-----+
| id|price|
+---+-----+
|  4|34.87|
|  5| 33.5|
|  6| 15.1|
|  7|20.78|
+---+-----+



In [10]:
from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params, TypeConverters
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable  
from pyspark.sql import functions as F
from pyspark.ml import Pipeline

data = spark.createDataFrame([(4, 13.87), (5, 23.5), (6, 45.10), (7, 30.78)], ["id", "price"])

class DivideColumnTransformer(Transformer, HasInputCol, HasOutputCol, DefaultParamsReadable, DefaultParamsWritable):

    value = Param(Params._dummy(), "value", "value", typeConverter=TypeConverters.toFloat)

    @keyword_only
    def __init__(self, inputCol=None, outputCol=None, value=None):
        super().__init__()
        self.value = Param(self, "value", "")
        self._setDefault(value=1)
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None, value=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def setValue(self, value):
        return self._set(value=float(value))

    def getValue(self):
        return self.getOrDefault(self.value)

    def setInputCol(self, value):
        return self._set(inputCol=value)

    def setOutputCol(self, value):
        return self._set(outputCol=value)

    def _transform(self, dataset):
        dataset = dataset.withColumn(self.getOutputCol(), F.col(self.getInputCol()) / self.getValue())
        return dataset

In [11]:
fitted_pipeline = PipelineModel.load('fitted_pipeline.pipeline')
print("Mean:", fitted_pipeline.stages[0].getValue())
fitted_pipeline.transform(test).show()

Mean: 28.3125
+---+------------------+
| id|             price|
+---+------------------+
|  4|1.2316114790286974|
|  5|1.1832229580573952|
|  6|0.5333333333333333|
|  7|0.7339514348785873|
+---+------------------+



# The easy way

[SparkML Base Clases in Pypi](https://pypi.org/project/sparkml-base-classes/)

In [12]:
!pip uninstall sparkml-base-classes -y
!pip install sparkml-base-classes --no-cache-dir -v

Found existing installation: sparkml-base-classes 0.1.6
Uninstalling sparkml-base-classes-0.1.6:
  Successfully uninstalled sparkml-base-classes-0.1.6
Using pip 22.0.4 from /Users/agundrod/.pyenv/versions/3.9.11/lib/python3.9/site-packages/pip (python 3.9)
Collecting sparkml-base-classes
  Downloading sparkml_base_classes-0.1.6-py3-none-any.whl (6.1 kB)
Installing collected packages: sparkml-base-classes
Successfully installed sparkml-base-classes-0.1.6
You should consider upgrading via the '/Users/agundrod/.pyenv/versions/3.9.11/bin/python3.9 -m pip install --upgrade pip' command.[0m[33m
[0m

# Transformer: Divide column by a number (The easy way)

In [22]:
test.show()

+---+-----+
| id|price|
+---+-----+
|  4|34.87|
|  5| 33.5|
|  6| 15.1|
|  7|20.78|
+---+-----+



In [13]:
from pyspark import keyword_only
from pyspark.sql import functions as F
from sparkml_base_classes import TransformerBaseClass

class DivideColumnTransformer(TransformerBaseClass):

    @keyword_only
    def __init__(self, column_name=None, value=None):
        super().__init__()

    def _transform(self, ddf):
        ddf = ddf.withColumn(self._column_name, F.col(self._column_name) / self._value)
        return ddf

In [14]:
col_transformer = DivideColumnTransformer(column_name="price", value=2)
col_transformer.transform(test).show()

+---+------+
| id| price|
+---+------+
|  4|17.435|
|  5| 16.75|
|  6|  7.55|
|  7| 10.39|
+---+------+



#  Estimator: Normalize by the mean

The easy way

In [15]:
from sparkml_base_classes import EstimatorBaseClass

class MeanNormalizerEstimator(EstimatorBaseClass):

    @keyword_only
    def __init__(self, column_name=None):
        super().__init__()

    def _fit(self, ddf):
        mean, = ddf.agg(F.mean(self._column_name)).first()
        return DivideColumnTransformer(
            column_name=self._column_name,
            value=mean
        )

In [16]:
column_transformer = MeanNormalizerEstimator(column_name="price")
pipeline = Pipeline(stages=[column_transformer])

## Fit

In [17]:
fitted_pipeline = pipeline.fit(data)
print("Mean:", fitted_pipeline.stages[0]._value)
fitted_pipeline.transform(data).show()

Mean: 28.3125
+---+------------------+
| id|             price|
+---+------------------+
|  4|0.4898896247240618|
|  5|0.8300220750551877|
|  6|1.5929359823399558|
|  7|1.0871523178807947|
+---+------------------+



# Save the fitted pipeline

In [18]:
fitted_pipeline.write().overwrite().save("custom_fitted_pipeline.pipeline")

# Load the fitted pipeline and transform

In [19]:
%reset -f
from pyspark.sql import SparkSession
from pyspark.ml import PipelineModel
spark = SparkSession.builder.getOrCreate()
test = spark.createDataFrame([(4, 34.87), (5, 33.5), (6, 15.10), (7, 20.78)], ["id", "price"])
test.show()

+---+-----+
| id|price|
+---+-----+
|  4|34.87|
|  5| 33.5|
|  6| 15.1|
|  7|20.78|
+---+-----+



In [20]:
from pyspark import keyword_only
from pyspark.sql import functions as F
from sparkml_base_classes import TransformerBaseClass

class DivideColumnTransformer(TransformerBaseClass):

    @keyword_only
    def __init__(self, column_name=None, value=None):
        super().__init__()

    def _transform(self, ddf):
        ddf = ddf.withColumn(self._column_name, F.col(self._column_name) / self._value)
        return ddf

In [21]:
fitted_pipeline = PipelineModel.load('custom_fitted_pipeline.pipeline')
print("Mean:", fitted_pipeline.stages[0]._value)
fitted_pipeline.transform(test).show()

Mean: 28.3125
+---+------------------+
| id|             price|
+---+------------------+
|  4|1.2316114790286974|
|  5|1.1832229580573952|
|  6|0.5333333333333333|
|  7|0.7339514348785873|
+---+------------------+



# [Example: OneHot Encoding in aditrade](http://aditrade.aws.3stripes.net/tmp_source/aditrade.etl.one_hot_estimator.html)

# [Example: PCA in aditrade](http://aditrade.aws.3stripes.net/tmp_source/aditrade.etl.principal_component_analysis.html)


# [Source Code of SparkML Base Classes](https://github.com/rragundez/sparkml-base-classes)