<a href="https://colab.research.google.com/github/mahuthu/SQL-Launching-Airbnb-Sydney/blob/main/Data_Transformation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
pip install pyspark


Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285387 sha256=ede319ea76db2d987c73a6d7361b3ef9c7c735c05e9435bb2fa2423841a54454
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Data Transformation in PySpark") \
    .getOrCreate()


In [None]:
from pyspark.sql.functions import col, min, max

# Sample DataFrame
data = [("Alice", 34), ("Bob", 45), ("Catherine", 29)]
columns = ["Name", "Age"]

df = spark.createDataFrame(data, columns)

# Calculate Min and Max for Age
age_min = df.select(min(col("Age"))).collect()[0][0]
age_max = df.select(max(col("Age"))).collect()[0][0]

# Perform Normalization
df_normalized = df.withColumn("Age_Normalized", (col("Age") - age_min) / (age_max - age_min))

df_normalized.show()


+---------+---+--------------+
|     Name|Age|Age_Normalized|
+---------+---+--------------+
|    Alice| 34|        0.3125|
|      Bob| 45|           1.0|
|Catherine| 29|           0.0|
+---------+---+--------------+



In [None]:
from pyspark.sql.functions import mean, stddev

# Calculate Mean and Standard Deviation for Age
age_mean = df.select(mean(col("Age"))).collect()[0][0]
age_stddev = df.select(stddev(col("Age"))).collect()[0][0]

# Perform Standardization
df_standardized = df.withColumn("Age_Standardized", (col("Age") - age_mean) / age_stddev)

df_standardized.show()


+---------+---+--------------------+
|     Name|Age|    Age_Standardized|
+---------+---+--------------------+
|    Alice| 34|-0.24433888871261045|
|      Bob| 45|   1.099524999206747|
|Catherine| 29| -0.8551861104941365|
+---------+---+--------------------+



In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

# Sample DataFrame
data = [("red",), ("green",), ("blue",)]
columns = ["Color"]

df = spark.createDataFrame(data, columns)

# Label Encoding
indexer = StringIndexer(inputCol="Color", outputCol="Color_Index")
df_indexed = indexer.fit(df).transform(df)

# One-Hot Encoding
encoder = OneHotEncoder(inputCol="Color_Index", outputCol="Color_OneHot")
df_encoded = encoder.fit(df_indexed).transform(df_indexed)

df_encoded.show()


+-----+-----------+-------------+
|Color|Color_Index| Color_OneHot|
+-----+-----------+-------------+
|  red|        2.0|    (2,[],[])|
|green|        1.0|(2,[1],[1.0])|
| blue|        0.0|(2,[0],[1.0])|
+-----+-----------+-------------+



In [None]:
# Sample DataFrame
data = [("Apple",), ("Banana",), ("Cherry",), ("Apple",), ("Banana",), ("Apple",)]
columns = ["Fruit"]

df = spark.createDataFrame(data, columns)
df.show()

+------+
| Fruit|
+------+
| Apple|
|Banana|
|Cherry|
| Apple|
|Banana|
| Apple|
+------+



In [None]:
from pyspark.ml.feature import StringIndexer

# Initialize StringIndexer
indexer = StringIndexer(inputCol="Fruit", outputCol="Fruit_Index")

# Fit and Transform the DataFrame
df_indexed = indexer.fit(df).transform(df)
df_indexed.show()


+------+-----------+
| Fruit|Fruit_Index|
+------+-----------+
| Apple|        0.0|
|Banana|        1.0|
|Cherry|        2.0|
| Apple|        0.0|
|Banana|        1.0|
| Apple|        0.0|
+------+-----------+



In [None]:
from pyspark.ml.feature import OneHotEncoder

# Initialize OneHotEncoder
encoder = OneHotEncoder(inputCol="Fruit_Index", outputCol="Fruit_OneHot")

# Transform the DataFrame
df_encoded = encoder.fit(df_indexed).transform(df_indexed)
df_encoded.show()


+------+-----------+-------------+
| Fruit|Fruit_Index| Fruit_OneHot|
+------+-----------+-------------+
| Apple|        0.0|(2,[0],[1.0])|
|Banana|        1.0|(2,[1],[1.0])|
|Cherry|        2.0|    (2,[],[])|
| Apple|        0.0|(2,[0],[1.0])|
|Banana|        1.0|(2,[1],[1.0])|
| Apple|        0.0|(2,[0],[1.0])|
+------+-----------+-------------+



In [None]:
#full code

# Sample DataFrame
data = [("Apple",), ("Banana",), ("Cherry",), ("Apple",), ("Banana",), ("Apple",)]
columns = ["Fruit"]
df = spark.createDataFrame(data, columns)

# Label Encoding
indexer = StringIndexer(inputCol="Fruit", outputCol="Fruit_Index")
df_indexed = indexer.fit(df).transform(df)

# One-Hot Encoding
encoder = OneHotEncoder(inputCol="Fruit_Index", outputCol="Fruit_OneHot")
df_encoded = encoder.fit(df_indexed).transform(df_indexed)

# Show the final DataFrame
df_encoded.show()


+------+-----------+-------------+
| Fruit|Fruit_Index| Fruit_OneHot|
+------+-----------+-------------+
| Apple|        0.0|(2,[0],[1.0])|
|Banana|        1.0|(2,[1],[1.0])|
|Cherry|        2.0|    (2,[],[])|
| Apple|        0.0|(2,[0],[1.0])|
|Banana|        1.0|(2,[1],[1.0])|
| Apple|        0.0|(2,[0],[1.0])|
+------+-----------+-------------+



In [None]:
# Sample DataFrame
data = [("Alice", 34, "Female"), ("Bob", 45, "Male"), ("Catherine", 29, "Female")]
columns = ["Name", "Age", "Gender"]

df = spark.createDataFrame(data, columns)
df.show()

+---------+---+------+
|     Name|Age|Gender|
+---------+---+------+
|    Alice| 34|Female|
|      Bob| 45|  Male|
|Catherine| 29|Female|
+---------+---+------+



In [None]:
from pyspark.sql.functions import col

# Create a new feature Age_Squared = Age * Age
df_poly = df.withColumn("Age_Squared", col("Age") ** 2)

df_poly.show()


+---------+---+------+-----------+
|     Name|Age|Gender|Age_Squared|
+---------+---+------+-----------+
|    Alice| 34|Female|     1156.0|
|      Bob| 45|  Male|     2025.0|
|Catherine| 29|Female|      841.0|
+---------+---+------+-----------+



In [None]:
# Sample DataFrame
data = [(1, 10), (2, 20), (3, 30), (4, 40)]
columns = ["Units_Sold", "Price_Per_Unit"]

df = spark.createDataFrame(data, columns)

# Create an interaction feature: Total_Sales_Value = Units_Sold * Price_Per_Unit
df_interaction = df.withColumn("Total_Sales_Value", col("Units_Sold") * col("Price_Per_Unit"))

df_interaction.show()

+----------+--------------+-----------------+
|Units_Sold|Price_Per_Unit|Total_Sales_Value|
+----------+--------------+-----------------+
|         1|            10|               10|
|         2|            20|               40|
|         3|            30|               90|
|         4|            40|              160|
+----------+--------------+-----------------+



Understanding Interaction and Polynomial Features in PySpark


In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Understanding Interaction and Polynomial Features") \
    .getOrCreate()


In [None]:
from pyspark.sql.functions import col

# Sample DataFrame
data = [(20,), (30,), (40,), (50,)]
columns = ["Age"]

df = spark.createDataFrame(data, columns)

# Create Polynomial Feature: Age_Squared = Age * Age
df_poly = df.withColumn("Age_Squared", col("Age") ** 2)

df_poly.show()


+---+-----------+
|Age|Age_Squared|
+---+-----------+
| 20|      400.0|
| 30|      900.0|
| 40|     1600.0|
| 50|     2500.0|
+---+-----------+



In [None]:
# Sample DataFrame
data = [(1, 10), (2, 20), (3, 30)]
columns = ["Units_Sold", "Price_Per_Unit"]

df = spark.createDataFrame(data, columns)

# Create Interaction Feature: Total_Sales = Units_Sold * Price_Per_Unit
df_interaction = df.withColumn("Total_Sales", col("Units_Sold") * col("Price_Per_Unit"))

df_interaction.show()


+----------+--------------+-----------+
|Units_Sold|Price_Per_Unit|Total_Sales|
+----------+--------------+-----------+
|         1|            10|         10|
|         2|            20|         40|
|         3|            30|         90|
+----------+--------------+-----------+



In [None]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Sample DataFrame with historical stock data
data = [(100, 1000, 0.1), (110, 1100, 0.2), (105, 900, -0.1)]
columns = ["Closing_Price", "Volume", "Market_Sentiment"]

df = spark.createDataFrame(data, columns)

# Prepare the data
vec_assembler = VectorAssembler(inputCols=columns, outputCol="Features")
df_vector = vec_assembler.transform(df)

# Initialize Linear Regression Model
lr = LinearRegression(featuresCol="Features", labelCol="Closing_Price")

# Fit the model
lr_model = lr.fit(df_vector)

# Make predictions
predictions = lr_model.transform(df_vector)
predictions.show()


+-------------+------+----------------+------------------+------------------+
|Closing_Price|Volume|Market_Sentiment|          Features|        prediction|
+-------------+------+----------------+------------------+------------------+
|          100|  1000|             0.1|[100.0,1000.0,0.1]| 99.99999999999952|
|          110|  1100|             0.2|[110.0,1100.0,0.2]|110.00000000000028|
|          105|   900|            -0.1|[105.0,900.0,-0.1]|105.00000000000016|
+-------------+------+----------------+------------------+------------------+

