In [1]:
from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params, TypeConverters
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql import DataFrame
from pyspark.sql.types import StringType
import pyspark.sql.functions as F

In [2]:
class CustomTransformer(Transformer, HasInputCol, HasOutputCol, DefaultParamsReadable, DefaultParamsWritable):
    input_col = Param(Params._dummy(), "input_col", "input column name.", typeConverter=TypeConverters.toString)
    output_col = Param(Params._dummy(), "output_col", "output column name.", typeConverter=TypeConverters.toString)

    @keyword_only
    def __init__(self, input_col: str = "input", output_col: str = "output"):
        super(CustomTransformer, self).__init__()
        self._setDefault(input_col=None, output_col=None)
        kwargs = self._input_kwargs
        self.set_params(**kwargs)

    @keyword_only
    def set_params(self, input_col: str = "input", output_col: str = "output"):
        kwargs = self._input_kwargs
        self._set(**kwargs)

    def get_input_col(self):
        return self.getOrDefault(self.input_col)

    def get_output_col(self):
        return self.getOrDefault(self.output_col)

    def _transform(self, df: DataFrame):
        input_col = self.get_input_col()
        output_col = self.get_output_col()
        # The custom action: concatenate the integer form of the doubles from the Vector
        transform_udf = F.udf(lambda x: '/'.join([str(int(y)) for y in x]), StringType())
        return df.withColumn(output_col, transform_udf(input_col))

In [4]:
import cml.data_v1 as cmldata

CONNECTION_NAME = "go01-aw-dl"
conn = cmldata.get_connection(CONNECTION_NAME)
spark = conn.get_spark_session()

# Sample usage to run query through spark
EXAMPLE_SQL_QUERY = "show databases"
spark.sql(EXAMPLE_SQL_QUERY).show()

Setting spark.hadoop.yarn.resourcemanager.principal to pauldefusco
Hive Session ID = 6623bf11-46e2-47c4-8be0-79c98cb376fd


+--------------------+
|           namespace|
+--------------------+
|         01_car_data|
|           01_car_dw|
|             airline|
|          airline_dw|
|            airlines|
|        airlines_csv|
|       airlines_csv1|
|   airlines_csv_vish|
|    airlines_iceberg|
|   airlines_iceberg1|
|airlines_iceberg_...|
|          airquality|
|          atlas_demo|
|            bankdemo|
|              bhagan|
|             cdedemo|
|        cdp_overview|
|        cgsifacebook|
|               claim|
|           clev_bank|
+--------------------+
only showing top 20 rows



In [7]:
from pyspark.ml.feature import ElementwiseProduct
from pyspark.ml.linalg import Vectors
from pyspark.ml import Pipeline

In [8]:
df = spark.createDataFrame([(Vectors.dense([2.0, 1.0, 3.0]),), (Vectors.dense([0.4, 0.9, 7.0]),)], ["numbers"])

In [9]:
elementwise_product = ElementwiseProduct(scalingVec=Vectors.dense([2.0, 3.0, 5.0]), inputCol="numbers", outputCol="product")
custom_transformer = CustomTransformer(input_col="product", output_col="results")
pipeline = Pipeline(stages=[elementwise_product, custom_transformer])
model = pipeline.fit(df)
results = model.transform(df)
results.show()

[Stage 0:>                                                          (0 + 1) / 1]

+-------------+--------------+-------+
|      numbers|       product|results|
+-------------+--------------+-------+
|[2.0,1.0,3.0]|[4.0,3.0,15.0]| 4/3/15|
|[0.4,0.9,7.0]|[0.8,2.7,35.0]| 0/2/35|
+-------------+--------------+-------+



                                                                                