<a href="https://colab.research.google.com/github/usshaa/SMBDA/blob/main/C-5.8%3A%20Data_Transformations.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Data Transformations


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.ml import Pipeline
from pyspark.ml.feature import (VectorAssembler, StringIndexer, OneHotEncoder, StandardScaler, MinMaxScaler, PCA)
import pyspark.pandas as ps

1. **Initialization and Sample DataFrame**:
   - Create a Spark session and a sample DataFrame.

In [None]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName("Comprehensive Data Processing") \
    .getOrCreate()

In [None]:
# Sample DataFrame
data = [
    (1, "Alice", 29, 12000.0, "2020-01-01", "M", None),
    (2, "Bob", 35, None, "2019-07-01", "F", 3000.0),
    (3, "Charlie", None, 5000.0, "2018-05-20", "M", 1200.0),
    (4, "David", 40, 15000.0, None, "F", 4500.0),
    (5, "Eva", 50, 7000.0, "2021-12-12", None, 3400.0)
]

In [None]:
columns = ["id", "name", "age", "salary", "join_date", "gender", "bonus"]

df = spark.createDataFrame(data, columns)
# df.to_pandas_on_spark()
df.pandas_api()

Unnamed: 0,id,name,age,salary,join_date,gender,bonus
0,1,Alice,29.0,12000.0,2020-01-01,M,
1,2,Bob,35.0,,2019-07-01,F,3000.0
2,3,Charlie,,5000.0,2018-05-20,M,1200.0
3,4,David,40.0,15000.0,,F,4500.0
4,5,Eva,50.0,7000.0,2021-12-12,,3400.0


2. **Data Preprocessing**:
   - Fill missing values using `na.fill`.

In [None]:
# Fill missing values
df = df.na.fill({"age": 0, "salary": 0, "join_date": "1900-01-01", "gender": "Unknown", "bonus": 0})

3. **Feature Engineering**:
   - Create new features such as `age_group` and `total_compensation`.

In [None]:
# Feature Engineering: Create new features
df = df.withColumn("age_group",
                   when(col("age") < 30, "young")
                   .when((col("age") >= 30) & (col("age") <= 50), "middle-aged")
                   .otherwise("old"))

df = df.withColumn("total_compensation", col("salary") + col("bonus"))

4. **String Indexing and One-Hot Encoding**:
   - Use `StringIndexer` to convert categorical columns (`gender` and `age_group`) to numerical indices.
   - Use `OneHotEncoder` to convert indexed columns to one-hot encoded vectors.

In [None]:
# String Indexing and One-Hot Encoding for categorical features
gender_indexer = StringIndexer(inputCol="gender", outputCol="gender_index")
age_group_indexer = StringIndexer(inputCol="age_group", outputCol="age_group_index")

gender_encoder = OneHotEncoder(inputCol="gender_index", outputCol="gender_vec")
age_group_encoder = OneHotEncoder(inputCol="age_group_index", outputCol="age_group_vec")

5. **Vector Assembler**:
   - Combine all feature columns into a single feature vector using `VectorAssembler`.

In [None]:
# Vector Assembler to combine feature columns into a single vector
assembler = VectorAssembler(inputCols=["age", "salary", "bonus", "gender_vec", "age_group_vec"], outputCol="features")

6. **Scaling**:
   - Apply `StandardScaler` to standardize the feature vectors.
   - Apply `MinMaxScaler` to scale features between a specified range.

In [None]:
# Standard Scaler
standard_scaler = StandardScaler(inputCol="features", outputCol="scaled_features")

In [None]:
# Min-Max Scaler
min_max_scaler = MinMaxScaler(inputCol="features", outputCol="minmax_features")

7. **Dimension Reduction**:
   - Apply PCA to reduce the number of dimensions in the feature vector.

In [None]:
# PCA for dimension reduction
pca = PCA(k=3, inputCol="scaled_features", outputCol="pca_features")

8. **Pipeline**:
   - Create a pipeline that chains together all the steps.
   - Fit the pipeline to the DataFrame and transform the data.

In [None]:
# Building the Pipeline
pipeline = Pipeline(stages=[
    gender_indexer,
    age_group_indexer,
    gender_encoder,
    age_group_encoder,
    assembler,
    standard_scaler,
    min_max_scaler,
    pca
])

In [None]:
# Fit the pipeline to the DataFrame
pipeline_model = pipeline.fit(df)
df_transformed = pipeline_model.transform(df)

9. **Display Results**:
   - Show the resulting DataFrame with the original features, scaled features, and PCA features.

In [None]:
# Display the results
df_transformed.select("id", "name", "features", "scaled_features", "minmax_features", "pca_features").show(truncate=False)

+---+-------+---------------------------------+----------------------------------------------------------------------------------------------------+---------------------------------------------------------+-------------------------------------------------------------+
|id |name   |features                         |scaled_features                                                                                     |minmax_features                                          |pca_features                                                 |
+---+-------+---------------------------------+----------------------------------------------------------------------------------------------------+---------------------------------------------------------+-------------------------------------------------------------+
|1  |Alice  |[29.0,12000.0,0.0,0.0,1.0,0.0]   |[1.5376449466659428,2.037119512395132,0.0,0.0,1.8257418583505538,0.0]                               |[0.58,0.8,0.0,0.0,1.0,0.0]                   

In [None]:
# Convert to pandas-on-Spark DataFrame and display
df_transformed_pandas = df_transformed.select("id", "name", "features", "scaled_features", "minmax_features", "pca_features").to_pandas_on_spark()
df_transformed_pandas

  Unable to convert the field features. If this column is not necessary, you may consider dropping it or converting to primitive type before the conversion.
Direct cause: Unsupported type in conversion to Arrow: VectorUDT()
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)


Unnamed: 0,id,name,features,scaled_features,minmax_features,pca_features
0,1,Alice,"[29.0, 12000.0, 0.0, 0.0, 1.0, 0.0]","[1.5376449466659428, 2.037119512395132, 0.0, 0...","[0.58, 0.8, 0.0, 0.0, 1.0, 0.0]","[0.3009419511580602, -2.525632023442241, -0.06..."
1,2,Bob,"[35.0, 0.0, 3000.0, 1.0, 0.0, 1.0]","[1.8557783839071722, 0.0, 1.666152501401719, 1...","[0.7000000000000001, 0.0, 0.6666666666666667, ...","[-3.087465033598685, -0.19596631264102435, -0...."
2,3,Charlie,"[0.0, 5000.0, 1200.0, 0.0, 1.0, 0.0]","[0.0, 0.8487997968313049, 0.6664610005606876, ...","[0.0, 0.33333333333333337, 0.26666666666666666...","[0.5940135040088568, -0.9376253643247832, -0.6..."
3,4,David,"[40.0, 15000.0, 4500.0, 1.0, 0.0, 1.0]","[2.1208895816081967, 2.5463993904939146, 2.499...","[0.8, 1.0, 1.0, 1.0, 0.0, 1.0]","[-3.576173490503931, -2.645193070168465, -1.16..."
4,5,Eva,"[50.0, 7000.0, 3400.0, 0.0, 0.0, 1.0]","[2.6511119770102463, 1.1883197155638268, 1.888...","[1.0, 0.4666666666666667, 0.7555555555555556, ...","[-2.830663276043418, -1.8861237400205002, 0.99..."


# !Great Job