In [0]:
# Importing the pyspark and pyspark SQL modules and specifying the app name 
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import Row
appName= "Spark Cleansing"
master= "local"
     

In [0]:
# Creating a spark session and enabling the Hive support to interact with the Hive database
spark = SparkSession.builder \
	.master(master).appName(appName).enableHiveSupport().getOrCreate() 
spark.sql("CREATE DATABASE IF NOT EXISTS spark_data_cleansing").show()
spark.sql("DESCRIBE DATABASE spark_data_cleansing").show(truncate=False)
spark.sql("USE spark_data_cleansing").show() 

++
||
++
++

+-------------------------+-------------------------------------------------+
|database_description_item|database_description_value                       |
+-------------------------+-------------------------------------------------+
|Catalog Name             |spark_catalog                                    |
|Namespace Name           |spark_data_cleansing                             |
|Comment                  |                                                 |
|Location                 |dbfs:/user/hive/warehouse/spark_data_cleansing.db|
|Owner                    |root                                             |
+-------------------------+-------------------------------------------------+

++
||
++
++



In [0]:
# Verifying the databases in Hive using pyspark
df=spark.sql("show databases")
df.show()
     

+--------------------+
|        databaseName|
+--------------------+
|             default|
|spark_data_cleansing|
|          spark_hive|
+--------------------+



In [0]:
# Specifying the path to your CSV file
csv_file_path = "dbfs:/FileStore/tables/CUSTOMERS.csv"

# Reading CSV file into a DataFrame
datafile = spark.read.csv(csv_file_path, header=True)

# Showing the first 5 rows to verify the data is read correctly
datafile.show(10)

+----------+----------------+---+-------+
|CustomerID|        FullName|Age| Salary|
+----------+----------------+---+-------+
|       293|  Catherine Abel| 50| 500000|
|       295| Kim Abercrombie| 30| 600000|
|       297|Humberto Acevedo| 55|   None|
|       293|  Catherine Abel| 50| 500000|
|       299|  Pilar Ackerman| 35|5000000|
|       305|     Carla Adams| 42| 300000|
|       301|   Frances Adams| 34| 250000|
|       307|       Jay Adams| 30| 210000|
|       305|     Carla Adams| 42| 300000|
|       311| Samuel Agcaoili| 21|   None|
+----------+----------------+---+-------+
only showing top 10 rows



In [0]:
# Saving DataFrame as Hive table
datafile.write.saveAsTable("CustomerTBL")

In [0]:
# Querying the Hive table using Spark SQL
df = spark.sql("SELECT * FROM CustomerTBL")

# Showing the query result
df.show(10)

+----------+----------------+---+-------+
|CustomerID|        FullName|Age| Salary|
+----------+----------------+---+-------+
|       293|  Catherine Abel| 50| 500000|
|       295| Kim Abercrombie| 30| 600000|
|       297|Humberto Acevedo| 55|   None|
|       293|  Catherine Abel| 50| 500000|
|       299|  Pilar Ackerman| 35|5000000|
|       305|     Carla Adams| 42| 300000|
|       301|   Frances Adams| 34| 250000|
|       307|       Jay Adams| 30| 210000|
|       305|     Carla Adams| 42| 300000|
|       311| Samuel Agcaoili| 21|   None|
+----------+----------------+---+-------+
only showing top 10 rows



In [0]:
# Showing schema
df.printSchema()


root
 |-- CustomerID: string (nullable = true)
 |-- FullName: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Salary: string (nullable = true)



In [0]:
# Handling Missing Values
from pyspark.sql.functions import mean

# Converting Age and Salary columns to correct types
df = df.withColumn("Age", df["Age"].cast("long"))
df = df.withColumn("Salary", df["Salary"].cast("double"))

# Calculating means
mean_age = df.agg(mean(col("Age"))).first()[0]
mean_salary = df.agg(mean(col("Salary"))).first()[0]

# Replacing null values with means
df = df.fillna({"Age": mean_age, "Salary": mean_salary})

# Displaying cleaned DataFrame
df.show(10)


+----------+----------------+---+---------+
|CustomerID|        FullName|Age|   Salary|
+----------+----------------+---+---------+
|       293|  Catherine Abel| 50| 500000.0|
|       295| Kim Abercrombie| 30| 600000.0|
|       297|Humberto Acevedo| 55| 715625.0|
|       293|  Catherine Abel| 50| 500000.0|
|       299|  Pilar Ackerman| 35|5000000.0|
|       305|     Carla Adams| 42| 300000.0|
|       301|   Frances Adams| 34| 250000.0|
|       307|       Jay Adams| 30| 210000.0|
|       305|     Carla Adams| 42| 300000.0|
|       311| Samuel Agcaoili| 21| 715625.0|
+----------+----------------+---+---------+
only showing top 10 rows



In [0]:
# Handling Duplicates
# Removing duplicates
df = df.dropDuplicates()

# Displaying the cleaned data frame
print("Cleaned Data Frame:")
df.show(10)


Cleaned Data Frame:
+----------+----------------+---+--------+
|CustomerID|        FullName|Age|  Salary|
+----------+----------------+---+--------+
|       313|   James Aguilar| 23|450000.0|
|       319|       Kim Akers| 25|720000.0|
|       325|   Anna Albright| 51|440000.0|
|       301|   Frances Adams| 34|250000.0|
|       329|     Paul Alcorn| 38| 20000.0|
|       297|Humberto Acevedo| 55|715625.0|
|       327|   Milton Albury| 60|450000.0|
|       305|     Carla Adams| 42|300000.0|
|       293|  Catherine Abel| 50|500000.0|
|       315| Robert Ahlering| 37|610000.0|
+----------+----------------+---+--------+
only showing top 10 rows



In [0]:
# Handling Outliers
from pyspark.sql.functions import mean, stddev, abs

# Calculating mean and standard deviation of Salary
agg_stats = df.agg(mean(col("Salary")).alias("mean_salary"), stddev(col("Salary")).alias("stddev_salary")).first()
mean_salary = agg_stats["mean_salary"]
stddev_salary = agg_stats["stddev_salary"]

# Filtering outliers based on mean and standard deviation
df_filtered = df.filter((col("Salary") >= mean_salary - 2 * stddev_salary) & (col("Salary") <= mean_salary + 2 * stddev_salary))

# Displaying the cleaned DataFrame
print("Cleaned Data Frame:")
df_filtered.show(10)


Cleaned Data Frame:
+----------+----------------+---+--------+
|CustomerID|        FullName|Age|  Salary|
+----------+----------------+---+--------+
|       313|   James Aguilar| 23|450000.0|
|       319|       Kim Akers| 25|720000.0|
|       325|   Anna Albright| 51|440000.0|
|       301|   Frances Adams| 34|250000.0|
|       329|     Paul Alcorn| 38| 20000.0|
|       297|Humberto Acevedo| 55|715625.0|
|       327|   Milton Albury| 60|450000.0|
|       305|     Carla Adams| 42|300000.0|
|       293|  Catherine Abel| 50|500000.0|
|       315| Robert Ahlering| 37|610000.0|
+----------+----------------+---+--------+
only showing top 10 rows



In [0]:
# Converting Data Types
from pyspark.sql.types import IntegerType, FloatType

# Converting the data type of the 'Age' column to integer
df = df.withColumn("Age", df["Age"].cast(IntegerType()))

# Converting the data type of the 'Salary' column to float
df = df.withColumn("Salary", df["Salary"].cast(FloatType()))

# Displaying the converted data frame
print("Converted Data Frame:")
df.show(10)

Converted Data Frame:
+----------+----------------+---+--------+
|CustomerID|        FullName|Age|  Salary|
+----------+----------------+---+--------+
|       313|   James Aguilar| 23|450000.0|
|       319|       Kim Akers| 25|720000.0|
|       325|   Anna Albright| 51|440000.0|
|       301|   Frances Adams| 34|250000.0|
|       329|     Paul Alcorn| 38| 20000.0|
|       297|Humberto Acevedo| 55|715625.0|
|       327|   Milton Albury| 60|450000.0|
|       305|     Carla Adams| 42|300000.0|
|       293|  Catherine Abel| 50|500000.0|
|       315| Robert Ahlering| 37|610000.0|
+----------+----------------+---+--------+
only showing top 10 rows

