In [0]:
#Read CSV dataset with PERMISSIVE mode to capture malformed records

df_raw = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("mode", "PERMISSIVE") \
    .csv("/Volumes/workspace/day1/data1/Iris.csv")
df_raw.show(5)


+---+-------------+------------+-------------+------------+-----------+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|
+---+-------------+------------+-------------+------------+-----------+
|  1|          5.1|         3.5|          1.4|         0.2|Iris-setosa|
|  2|          4.9|         3.0|          1.4|         0.2|Iris-setosa|
|  3|          4.7|         3.2|          1.3|         0.2|Iris-setosa|
|  4|          4.6|         3.1|          1.5|         0.2|Iris-setosa|
|  5|          5.0|         3.6|          1.4|         0.2|Iris-setosa|
+---+-------------+------------+-------------+------------+-----------+
only showing top 5 rows


In [0]:
# a. Print all column names
print("Columns:", df_raw.columns)
# b. Count total rows
print("Row count:", df_raw.count())
# c. Display schema structure
df_raw.printSchema()
# d. Print the number of columns
print("Number of columns:", len(df_raw.columns))

Columns: ['Id', 'SepalLengthCm', 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm', 'Species']
Row count: 150
root
 |-- Id: integer (nullable = true)
 |-- SepalLengthCm: double (nullable = true)
 |-- SepalWidthCm: double (nullable = true)
 |-- PetalLengthCm: double (nullable = true)
 |-- PetalWidthCm: double (nullable = true)
 |-- Species: string (nullable = true)

Number of columns: 6


In [0]:
from pyspark.sql.functions import col

if "_corrupt_record" in df_raw.columns:
    corrupt = df_raw.filter(col("_corrupt_record").isNotNull())
    print("Corrupted records found:")
    corrupt.show(truncate=False)
else:
    print("No '_corrupt_record' column detected — no malformed records or clean dataset.")

No '_corrupt_record' column detected — no malformed records or clean dataset.


In [0]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
# Define explicit schema for Iris dataset
custom_schema = StructType([
    StructField("sepal_length", DoubleType(), True),
    StructField("sepal_width", DoubleType(), True),
    StructField("petal_length", DoubleType(), True),
    StructField("petal_width", DoubleType(), True),
    StructField("species", StringType(), True)
])
df = spark.read \
    .option("header", "true") \
    .option("mode", "PERMISSIVE") \
    .schema(custom_schema) \
    .csv("/Volumes/workspace/day1/data1/Iris.csv")
df.printSchema()


root
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_width: double (nullable = true)
 |-- species: string (nullable = true)



In [0]:
from pyspark.sql.functions import col, lit, round
from pyspark.sql.types import DoubleType
df_transformed = (
    df
    # Rename columns
    .withColumnRenamed("sepal_length", "sepal_len")
    .withColumnRenamed("sepal_width", "sepal_wid")
    # Derived column
    .withColumn("petal_area", round(col("petal_length") * col("petal_width"), 2))
    # Filter rows
    .filter(col("sepal_len") > 4.0)
    # Add constant column
    .withColumn("dataset_name", lit("Iris Dataset"))
    # Cast type
    .withColumn("sepal_len_cm", col("sepal_len").cast(DoubleType()))
    # Remove unnecessary column
    .drop("sepal_wid")
)
display(df_transformed.limit(5))

sepal_len,petal_length,petal_width,species,petal_area,dataset_name,sepal_len_cm
5.0,3.6,1.4,0.2,5.04,Iris Dataset,5.0
6.0,3.9,1.7,0.4,6.63,Iris Dataset,6.0
7.0,3.4,1.4,0.3,4.76,Iris Dataset,7.0
8.0,3.4,1.5,0.2,5.1,Iris Dataset,8.0
9.0,2.9,1.4,0.2,4.06,Iris Dataset,9.0


In [0]:
# Identify nulls in each column
from pyspark.sql.functions import col
print("Null counts by column:")
for c in df_transformed.columns:
    n_null = df_transformed.filter(col(c).isNull()).count()
    print(f"{c}: {n_null}")
# Fill or drop nulls (if any)
df_clean = df_transformed.na.fill({
    "petal_area": 0.0,
    "sepal_len_cm": 0.0
})

Null counts by column:
sepal_len: 0
petal_length: 0
petal_width: 0
species: 0
petal_area: 0
dataset_name: 0
sepal_len_cm: 0


In [0]:
print("Row count before removing duplicates:", df_clean.count())
df_duplicates = df_clean.exceptAll(df_clean.dropDuplicates())
print("Duplicate records count:", df_duplicates.count())
display(df_duplicates)
df_nodup = df_clean.dropDuplicates()
print("After removing duplicates:", df_nodup.count())

Row count before removing duplicates: 146
Duplicate records count: 0


sepal_len,petal_length,petal_width,species,petal_area,dataset_name,sepal_len_cm


After removing duplicates: 146


In [0]:
output_path = "/Volumes/workspace/day1/data1/iris_cleaned_parquet"

df_nodup.write.mode("overwrite").parquet(output_path)

print(f"Processed data saved to: {output_path}")

# Display sample data from the saved parquet file
df_sample = spark.read.parquet(output_path).limit(5)
display(df_sample)

Processed data saved to: /Volumes/workspace/day1/data1/iris_cleaned_parquet


sepal_len,petal_length,petal_width,species,petal_area,dataset_name,sepal_len_cm
118.0,3.8,6.7,2.2,25.46,Iris Dataset,118.0
24.0,3.3,1.7,0.5,5.61,Iris Dataset,24.0
23.0,3.6,1.0,0.2,3.6,Iris Dataset,23.0
17.0,3.9,1.3,0.4,5.07,Iris Dataset,17.0
65.0,2.9,3.6,1.3,10.44,Iris Dataset,65.0
