In [2]:
import pandas as pd

In [4]:
data = {'raw_value': [2, 45, -23, 85, 28, 2, 35, -12]}


# 1 Scaling with Pandas

## 1.1 Min-max normalization

In [13]:
pdf = pd.DataFrame(data)
pdf.head()

Unnamed: 0,raw_value
0,2
1,45
2,-23
3,85
4,28


In [14]:
source_col_name = "raw_value"
min_max_norme_col_name = "min_max_normalized"
pdf[min_max_norme_col_name] = (pdf[source_col_name] - pdf[source_col_name].min()) / (
            pdf[source_col_name].max() - pdf[source_col_name].min())
pdf.head()

Unnamed: 0,raw_value,min_max_normalized
0,2,0.231481
1,45,0.62963
2,-23,0.0
3,85,1.0
4,28,0.472222


## 1.2 Z-score normalization

Pandas dataframe.std() function return `sample standard deviation` over requested axis. By default the standard deviations are normalized by N-1. It is a measure that is used to quantify the amount of variation or dispersion of a set of data values


In [16]:
pdf['z_normalized'] = (pdf[source_col_name] - pdf[source_col_name].mean()) / pdf[source_col_name].std()

In [17]:
pdf.head()

Unnamed: 0,raw_value,min_max_normalized,z_normalized
0,2,0.231481,-0.518878
1,45,0.62963,0.703684
2,-23,0.0,-1.22967
3,85,1.0,1.840952
4,28,0.472222,0.220346


# 2. Scaling with spark


In [31]:
from pyspark.sql import SparkSession
import os
from pyspark.sql.functions import col, min,max,lit,avg,stddev_samp
from pyspark.sql.types import StructType, StructField, IntegerType

In [2]:
local = True
if local:
    spark = SparkSession.builder \
        .master("local[4]") \
        .appName("Feature_scaling") \
        .getOrCreate()
else:
    spark = SparkSession.builder \
        .master("k8s://https://kubernetes.default.svc:443") \
        .appName("Feature_scaling") \
        .config("spark.kubernetes.container.image", "inseefrlab/jupyter-datascience:master") \
        .config("spark.kubernetes.authenticate.driver.serviceAccountName", os.environ['KUBERNETES_SERVICE_ACCOUNT']) \
        .config("spark.executor.instances", "4") \
        .config("spark.executor.memory", "2g") \
        .config("spark.kubernetes.namespace", os.environ['KUBERNETES_NAMESPACE']) \
        .getOrCreate()

# make the large dataframe show pretty
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

22/07/21 15:17:45 WARN Utils: Your hostname, ubuntu resolves to a loopback address: 127.0.1.1; using 192.168.184.146 instead (on interface ens33)
22/07/21 15:17:45 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/07/21 15:17:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [26]:
data1 = [(2,), (45,), (-23,), (85,), (28,), (2,), (35,), (-12,)]
schema=StructType([StructField("raw_value",IntegerType(),True),])
df = spark.createDataFrame(data1,schema)

In [9]:
df.show()

+---------+
|raw_value|
+---------+
|        2|
|       45|
|      -23|
|       85|
|       28|
|        2|
|       35|
|      -12|
+---------+



## 2.1 Min-max normalization

In spark, min max can only be calculated inside an aggregation. So can do the one liner as in pandas

In [21]:
target_val_name="raw_value"


In [15]:
row=df.agg(min(target_val_name).alias("min"),max(target_val_name).alias("max")).head()

In [17]:
min_val=row.asDict().get("min")
max_val=row.asDict().get("max")

In [18]:
print(f"min: {min_val}, max: {max_val}")

min: -23, max: 85


In [28]:
df=df.withColumn("min",lit(min_val)) \
    .withColumn("max",lit(max_val)) \
    .withColumn("min_max_normalization",(col(target_val_name)-col("min"))/(col("max")-col("min")))\
    .drop("min")\
    .drop("max")

In [29]:
df.show()

+---------+---------------------+
|raw_value|min_max_normalization|
+---------+---------------------+
|        2|  0.23148148148148148|
|       45|   0.6296296296296297|
|      -23|                  0.0|
|       85|                  1.0|
|       28|   0.4722222222222222|
|        2|  0.23148148148148148|
|       35|   0.5370370370370371|
|      -12|  0.10185185185185185|
+---------+---------------------+



## 2.2 Z-score normalization

Note,

In [32]:
row=df.agg(avg(target_val_name).alias("mean"),stddev_samp(target_val_name).alias("std")).head()

In [33]:
mean_val=row.asDict().get("mean")
std_val=row.asDict().get("std")

In [34]:
print(f"mean: {mean_val}, std: {std_val}")

mean: 20.25, std: 35.17202622214746


In [35]:
df=df.withColumn("mean",lit(mean_val)) \
    .withColumn("std",lit(std_val)) \
    .withColumn("z_normalization",(col(target_val_name)-col("mean"))/(col("std")))\
    .drop("mean")\
    .drop("std")

In [36]:
df.show()

+---------+---------------------+-------------------+
|raw_value|min_max_normalization|    z_normalization|
+---------+---------------------+-------------------+
|        2|  0.23148148148148148|-0.5188782666296367|
|       45|   0.6296296296296297| 0.7036842246073154|
|      -23|                  0.0| -1.229670412697632|
|       85|                  1.0|  1.840951658316108|
|       28|   0.4722222222222222|0.22034556528107857|
|        2|  0.23148148148148148|-0.5188782666296367|
|       35|   0.5370370370370371| 0.4193673661801173|
|      -12|  0.10185185185185185|-0.9169218684277141|
+---------+---------------------+-------------------+

