### Spark related

In [1]:
import findspark
findspark.init("/Users/sharmilisrinivasan/spark-2.4.3-bin-hadoop2.7/")

In [2]:
from pyspark import SparkConf
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession \
    .builder \
    .master("spark://ip:7077") \
    .appName("Exp") \
    .getOrCreate()

### Data

In [4]:
from pyspark.sql import Row
from pyspark.sql.functions import col

In [5]:
record = Row("group_id", "row_id", "f1", "f2", "f3")

sample_df = spark.createDataFrame(
            [record("G1", "R1", 0.2, 60, 500),
             record("G1", "R2", 0.98, 70, 780),
             record("G1", "R3", 0.18, 78, 856),
             record("G2", "R4", 0.999, 160, 1500),
             record("G2", "R5", 0.2, 60, 500),
             record("G3", "R6", 0.2, 60, 500)])

sample_df.show()

+--------+------+-----+---+----+
|group_id|row_id|   f1| f2|  f3|
+--------+------+-----+---+----+
|      G1|    R1|  0.2| 60| 500|
|      G1|    R2| 0.98| 70| 780|
|      G1|    R3| 0.18| 78| 856|
|      G2|    R4|0.999|160|1500|
|      G2|    R5|  0.2| 60| 500|
|      G3|    R6|  0.2| 60| 500|
+--------+------+-----+---+----+



In [6]:
feature_cols = ["f1", "f2", "f3"]

### Processing

In [7]:
from pyspark.sql.window import Window
from pyspark.sql.functions import max as spark_max, min as spark_min, col

In [8]:
w = Window.partitionBy('group_id')

In [9]:
for c in feature_cols:
    sample_df = (sample_df.withColumn('maxi', spark_max(c).over(w))
                 .withColumn(c, ((col(c)) / (col('maxi'))))
                 .drop('maxi'))

In [10]:
sample_df.show()

+--------+------+-------------------+------------------+------------------+
|group_id|row_id|                 f1|                f2|                f3|
+--------+------+-------------------+------------------+------------------+
|      G2|    R4|                1.0|               1.0|               1.0|
|      G2|    R5| 0.2002002002002002|             0.375|0.3333333333333333|
|      G3|    R6|                1.0|               1.0|               1.0|
|      G1|    R1|0.20408163265306123|0.7692307692307693|0.5841121495327103|
|      G1|    R2|                1.0|0.8974358974358975|0.9112149532710281|
|      G1|    R3| 0.1836734693877551|               1.0|               1.0|
+--------+------+-------------------+------------------+------------------+



### Verification with VectorAssembler and MaxAbsScaler

In [11]:
from pyspark.ml.feature import VectorAssembler

In [12]:
assembler = VectorAssembler(
    inputCols=["f1", "f2", "f3"],
    outputCol="features")

In [13]:
new_df=assembler.transform(sample_df)

In [14]:
from pyspark.ml.feature import MaxAbsScaler

In [15]:
scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures")

In [16]:
g1_df = new_df.filter(col("group_id")=="G1")
g2_df = new_df.filter(col("group_id")=="G2")
g3_df = new_df.filter(col("group_id")=="G3")

In [17]:
scalerModel = scaler.fit(g1_df)
scalerModel.transform(g1_df).collect()

[Row(group_id='G1', row_id='R1', f1=0.20408163265306123, f2=0.7692307692307693, f3=0.5841121495327103, features=DenseVector([0.2041, 0.7692, 0.5841]), scaledFeatures=DenseVector([0.2041, 0.7692, 0.5841])),
 Row(group_id='G1', row_id='R2', f1=1.0, f2=0.8974358974358975, f3=0.9112149532710281, features=DenseVector([1.0, 0.8974, 0.9112]), scaledFeatures=DenseVector([1.0, 0.8974, 0.9112])),
 Row(group_id='G1', row_id='R3', f1=0.1836734693877551, f2=1.0, f3=1.0, features=DenseVector([0.1837, 1.0, 1.0]), scaledFeatures=DenseVector([0.1837, 1.0, 1.0]))]

In [18]:
scalerModel = scaler.fit(g2_df)
scalerModel.transform(g2_df).collect()

[Row(group_id='G2', row_id='R4', f1=1.0, f2=1.0, f3=1.0, features=DenseVector([1.0, 1.0, 1.0]), scaledFeatures=DenseVector([1.0, 1.0, 1.0])),
 Row(group_id='G2', row_id='R5', f1=0.2002002002002002, f2=0.375, f3=0.3333333333333333, features=DenseVector([0.2002, 0.375, 0.3333]), scaledFeatures=DenseVector([0.2002, 0.375, 0.3333]))]

In [19]:
scalerModel = scaler.fit(g3_df)
scalerModel.transform(g3_df).collect()

[Row(group_id='G3', row_id='R6', f1=1.0, f2=1.0, f3=1.0, features=DenseVector([1.0, 1.0, 1.0]), scaledFeatures=DenseVector([1.0, 1.0, 1.0]))]