In [5]:
# 1 - Read the CSV file

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName('DataTransformations').getOrCreate()

# Read the CSV file into a DataFrame
df = spark.read.csv("C:/Users/karab/Desktop/Self-Learning/people_data.csv", header=True, inferSchema=True)

# Show the first few rows
df.show()




+----------+---------+------+--------------------+-------------+------------------+
|First Name|Last Name|   Sex|               Email|Date of birth|         Job Title|
+----------+---------+------+--------------------+-------------+------------------+
|    Shelby|  Terrell|  Male|elijah57@example.net|   26/10/1945|   Games developer|
|   Phillip|  Summers|Female|bethany14@example...|   24/03/1910|    Phytotherapist|
|  Kristine|   Travis|  Male|bthompson@example...|   02/07/1992|         Homeopath|
|   Yesenia| Martinez|  Male|kaitlinkaiser@exa...|   03/08/2017| Market researcher|
|      Lori|     Todd|  Male|buchananmanuel@ex...|   01/12/1938|Veterinary surgeon|
|      ABCd|     NULL|Female|                  **|         NULL|              NULL|
+----------+---------+------+--------------------+-------------+------------------+



In [7]:
# 2. Filter Data Based on Condition (e.g., Sex = Male)
filtered_df = df.filter(df['Sex'] == 'Male')
filtered_df.show()


+----------+---------+----+--------------------+-------------+------------------+
|First Name|Last Name| Sex|               Email|Date of birth|         Job Title|
+----------+---------+----+--------------------+-------------+------------------+
|    Shelby|  Terrell|Male|elijah57@example.net|   26/10/1945|   Games developer|
|  Kristine|   Travis|Male|bthompson@example...|   02/07/1992|         Homeopath|
|   Yesenia| Martinez|Male|kaitlinkaiser@exa...|   03/08/2017| Market researcher|
|      Lori|     Todd|Male|buchananmanuel@ex...|   01/12/1938|Veterinary surgeon|
+----------+---------+----+--------------------+-------------+------------------+



In [8]:
# 3. Add a New Column (e.g., Age)

from pyspark.sql.functions import to_date, current_date, datediff

# Convert 'Date of birth' to date type (if not already)
df_with_date = df.withColumn('Date_of_birth', to_date(df['Date of birth'], 'dd/MM/yyyy'))

# Calculate age by subtracting 'Date of birth' from the current date
df_with_age = df_with_date.withColumn('Age', datediff(current_date(), df_with_date['Date_of_birth']) / 365)
df_with_age.show()


+----------+---------+------+--------------------+-------------+------------------+-------------+------------------+
|First Name|Last Name|   Sex|               Email|Date of birth|         Job Title|Date_of_birth|               Age|
+----------+---------+------+--------------------+-------------+------------------+-------------+------------------+
|    Shelby|  Terrell|  Male|elijah57@example.net|   26/10/1945|   Games developer|   1945-10-26|              79.2|
|   Phillip|  Summers|Female|bethany14@example...|   24/03/1910|    Phytotherapist|   1910-03-24|114.81643835616438|
|  Kristine|   Travis|  Male|bthompson@example...|   02/07/1992|         Homeopath|   1992-07-02|32.484931506849314|
|   Yesenia| Martinez|  Male|kaitlinkaiser@exa...|   03/08/2017| Market researcher|   2017-08-03| 7.380821917808219|
|      Lori|     Todd|  Male|buchananmanuel@ex...|   01/12/1938|Veterinary surgeon|   1938-12-01| 86.10684931506849|
|      ABCd|     NULL|Female|                  **|         NULL|

In [9]:
# 4. Drop Column

df_dropped = df_with_age.drop('Email')
df_dropped.show()


+----------+---------+------+-------------+------------------+-------------+------------------+
|First Name|Last Name|   Sex|Date of birth|         Job Title|Date_of_birth|               Age|
+----------+---------+------+-------------+------------------+-------------+------------------+
|    Shelby|  Terrell|  Male|   26/10/1945|   Games developer|   1945-10-26|              79.2|
|   Phillip|  Summers|Female|   24/03/1910|    Phytotherapist|   1910-03-24|114.81643835616438|
|  Kristine|   Travis|  Male|   02/07/1992|         Homeopath|   1992-07-02|32.484931506849314|
|   Yesenia| Martinez|  Male|   03/08/2017| Market researcher|   2017-08-03| 7.380821917808219|
|      Lori|     Todd|  Male|   01/12/1938|Veterinary surgeon|   1938-12-01| 86.10684931506849|
|      ABCd|     NULL|Female|         NULL|              NULL|         NULL|              NULL|
+----------+---------+------+-------------+------------------+-------------+------------------+



In [10]:
#5. Change Column Name
df_renamed = df_dropped.withColumnRenamed('First Name', 'First_Name') \
                       .withColumnRenamed('Last Name', 'Last_Name')
df_renamed.show()


+----------+---------+------+-------------+------------------+-------------+------------------+
|First_Name|Last_Name|   Sex|Date of birth|         Job Title|Date_of_birth|               Age|
+----------+---------+------+-------------+------------------+-------------+------------------+
|    Shelby|  Terrell|  Male|   26/10/1945|   Games developer|   1945-10-26|              79.2|
|   Phillip|  Summers|Female|   24/03/1910|    Phytotherapist|   1910-03-24|114.81643835616438|
|  Kristine|   Travis|  Male|   02/07/1992|         Homeopath|   1992-07-02|32.484931506849314|
|   Yesenia| Martinez|  Male|   03/08/2017| Market researcher|   2017-08-03| 7.380821917808219|
|      Lori|     Todd|  Male|   01/12/1938|Veterinary surgeon|   1938-12-01| 86.10684931506849|
|      ABCd|     NULL|Female|         NULL|              NULL|         NULL|              NULL|
+----------+---------+------+-------------+------------------+-------------+------------------+



In [11]:
# 6. Group By and Aggregate (e.g., Count by Job Title)
from pyspark.sql.functions import count

df_grouped = df_renamed.groupBy('Job Title').agg(count('First_Name').alias('Number_of_People'))
df_grouped.show()


+------------------+----------------+
|         Job Title|Number_of_People|
+------------------+----------------+
|   Games developer|               1|
|              NULL|               1|
|         Homeopath|               1|
|Veterinary surgeon|               1|
| Market researcher|               1|
|    Phytotherapist|               1|
+------------------+----------------+



In [12]:
# 7. Sort Data by a Column (e.g., Age)

df_sorted = df_with_age.orderBy(df_with_age['Age'], ascending=False)
df_sorted.show()


+----------+---------+------+--------------------+-------------+------------------+-------------+------------------+
|First Name|Last Name|   Sex|               Email|Date of birth|         Job Title|Date_of_birth|               Age|
+----------+---------+------+--------------------+-------------+------------------+-------------+------------------+
|   Phillip|  Summers|Female|bethany14@example...|   24/03/1910|    Phytotherapist|   1910-03-24|114.81643835616438|
|      Lori|     Todd|  Male|buchananmanuel@ex...|   01/12/1938|Veterinary surgeon|   1938-12-01| 86.10684931506849|
|    Shelby|  Terrell|  Male|elijah57@example.net|   26/10/1945|   Games developer|   1945-10-26|              79.2|
|  Kristine|   Travis|  Male|bthompson@example...|   02/07/1992|         Homeopath|   1992-07-02|32.484931506849314|
|   Yesenia| Martinez|  Male|kaitlinkaiser@exa...|   03/08/2017| Market researcher|   2017-08-03| 7.380821917808219|
|      ABCd|     NULL|Female|                  **|         NULL|

In [13]:
# 8. Replace Values in a Column (e.g., Replace NULL with 'Unknown')

from pyspark.sql.functions import when

df_replaced = df_renamed.withColumn('Job Title', 
                                   when(df_renamed['Job Title'].isNull(), 'Unknown')
                                   .otherwise(df_renamed['Job Title']))
df_replaced.show()


+----------+---------+------+-------------+------------------+-------------+------------------+
|First_Name|Last_Name|   Sex|Date of birth|         Job Title|Date_of_birth|               Age|
+----------+---------+------+-------------+------------------+-------------+------------------+
|    Shelby|  Terrell|  Male|   26/10/1945|   Games developer|   1945-10-26|              79.2|
|   Phillip|  Summers|Female|   24/03/1910|    Phytotherapist|   1910-03-24|114.81643835616438|
|  Kristine|   Travis|  Male|   02/07/1992|         Homeopath|   1992-07-02|32.484931506849314|
|   Yesenia| Martinez|  Male|   03/08/2017| Market researcher|   2017-08-03| 7.380821917808219|
|      Lori|     Todd|  Male|   01/12/1938|Veterinary surgeon|   1938-12-01| 86.10684931506849|
|      ABCd|     NULL|Female|         NULL|           Unknown|         NULL|              NULL|
+----------+---------+------+-------------+------------------+-------------+------------------+



In [14]:
# 9. Change Data Type of a Column (e.g., String to Date for 'Date of Birth')

df_with_date = df.withColumn('Date_of_birth', to_date(df['Date of birth'], 'dd/MM/yyyy'))
df_with_date.show()


+----------+---------+------+--------------------+-------------+------------------+-------------+
|First Name|Last Name|   Sex|               Email|Date of birth|         Job Title|Date_of_birth|
+----------+---------+------+--------------------+-------------+------------------+-------------+
|    Shelby|  Terrell|  Male|elijah57@example.net|   26/10/1945|   Games developer|   1945-10-26|
|   Phillip|  Summers|Female|bethany14@example...|   24/03/1910|    Phytotherapist|   1910-03-24|
|  Kristine|   Travis|  Male|bthompson@example...|   02/07/1992|         Homeopath|   1992-07-02|
|   Yesenia| Martinez|  Male|kaitlinkaiser@exa...|   03/08/2017| Market researcher|   2017-08-03|
|      Lori|     Todd|  Male|buchananmanuel@ex...|   01/12/1938|Veterinary surgeon|   1938-12-01|
|      ABCd|     NULL|Female|                  **|         NULL|              NULL|         NULL|
+----------+---------+------+--------------------+-------------+------------------+-------------+



In [15]:
# 10. Standardize the Column (e.g., Normalize the Age)

from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors

# Create a DataFrame for MinMaxScaler input
df_for_scaling = df_with_age.select('Age').rdd.map(lambda row: (Vectors.dense([row['Age']]),)).toDF(["features"])

# Initialize MinMaxScaler
scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")

# Fit and transform data
scaler_model = scaler.fit(df_for_scaling)
scaled_df = scaler_model.transform(df_for_scaling)

# Show normalized data
scaled_df.show()


+--------------------+--------------------+
|            features|     scaled_features|
+--------------------+--------------------+
|              [79.2]|[0.6684857448870302]|
|[114.81643835616438]|               [1.0]|
|[32.484931506849314]|[0.23366654766154...|
| [7.380821917808219]|               [0.0]|
| [86.10684931506849]|[0.7327740092823991]|
|               [NaN]|               [NaN]|
+--------------------+--------------------+

