In [0]:
#Read CSV dataset with PERMISSIVE mode to capture malformed records
spark
df_raw = spark.read\
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("mode", "PERMISSIVE") \
    .csv("/Volumes/workspace/sinchana/sinchana1/dataset.csv")
df_raw.show(5)


+---+--------------------+--------------------+--------------------+--------------------+----------+-----------+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+-----------+
|_c0|            track_id|             artists|          album_name|          track_name|popularity|duration_ms|explicit|danceability|energy|key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|time_signature|track_genre|
+---+--------------------+--------------------+--------------------+--------------------+----------+-----------+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+-----------+
|  0|5SuOikwiRyPMVoIQD...|         Gen Hoshino|              Comedy|              Comedy|        73|     230666|   False|       0.676| 0.461|  1|  -6.746|   0|      0.143|      0.0322|         1.01E-6|   0.358|  0.715| 87.917|           4

In [0]:
# a. Print all column names
print("Columns:", df_raw.columns)
# b. Count total rows
print("Row count:", df_raw.count())
# c. Display schema structure
df_raw.printSchema()
# d. Print the number of columns
print("Number of columns:", len(df_raw.columns))

Columns: ['_c0', 'track_id', 'artists', 'album_name', 'track_name', 'popularity', 'duration_ms', 'explicit', 'danceability', 'energy', 'key', 'loudness', 'mode', 'speechiness', 'acousticness', 'instrumentalness', 'liveness', 'valence', 'tempo', 'time_signature', 'track_genre']
Row count: 114000
root
 |-- _c0: integer (nullable = true)
 |-- track_id: string (nullable = true)
 |-- artists: string (nullable = true)
 |-- album_name: string (nullable = true)
 |-- track_name: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- duration_ms: string (nullable = true)
 |-- explicit: string (nullable = true)
 |-- danceability: string (nullable = true)
 |-- energy: string (nullable = true)
 |-- key: string (nullable = true)
 |-- loudness: string (nullable = true)
 |-- mode: string (nullable = true)
 |-- speechiness: string (nullable = true)
 |-- acousticness: string (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- liveness: string (nullable = true)
 |-- va

In [0]:
from pyspark.sql.functions import col

if "_corrupt_record" in df_raw.columns:
    corrupt = df_raw.filter(col("_corrupt_record").isNotNull())
    print("Corrupted records found:")
    corrupt.show(truncate=False)
else:
    print("No '_corrupt_record' column detected — no malformed records or clean dataset.")

No '_corrupt_record' column detected — no malformed records or clean dataset.


In [0]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType

# Define explicit schema for your dataset
custom_schema = StructType([
    StructField("Id", IntegerType(), True),
    StructField("sepal_length", DoubleType(), True),
    StructField("sepal_width", DoubleType(), True),
    StructField("petal_length", DoubleType(), True),
    StructField("petal_width", DoubleType(), True),
    StructField("species", StringType(), True)
])

df = spark.read \
    .option("header", "true") \
    .option("mode", "PERMISSIVE") \
    .schema(custom_schema) \
    .csv("/Volumes/workspace/sinchana/sinchana1/dataset.csv")
df.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_width: double (nullable = true)
 |-- species: string (nullable = true)



In [0]:
from pyspark.sql.functions import col, round, lit
from pyspark.sql.types import DoubleType

# Check the actual columns in df
print("Columns:", df.columns)

# Adjust column transformations based on available columns in df
df_transformed = (
    df
    .withColumnRenamed("species", "song_title")  # Example: renaming 'species' to 'song_title'
    .withColumn("duration_min", round(col("sepal_length") / 60, 2))  # Example: converting 'sepal_length' to minutes
    .withColumn("dataset_name", lit("sinchana1/dataset.csv"))
    .withColumn("popularity_score", col("sepal_width").cast(DoubleType()))  # Example: casting 'sepal_width'
    .drop("petal_length")  # Example: dropping 'petal_length'
)

display(df_transformed.limit(5))

Columns: ['Id', 'sepal_length', 'sepal_width', 'petal_length', 'petal_width', 'species']


Id,sepal_length,sepal_width,petal_width,song_title,duration_min,dataset_name,popularity_score
0,,,,73,,sinchana1/dataset.csv,
1,,,,55,,sinchana1/dataset.csv,
2,,,,57,,sinchana1/dataset.csv,
3,,,,71,,sinchana1/dataset.csv,
4,,,,82,,sinchana1/dataset.csv,


In [0]:
# Identify nulls in each column
from pyspark.sql.functions import col
print("Null counts by column:")
for c in df_transformed.columns:
    n_null = df_transformed.filter(col(c).isNull()).count()
    print(f"{c}: {n_null}")
# Fill or drop nulls (if any)
df_clean = df_transformed.na.fill({
    "sepal_length": 0.0,
    "sepal_width": 0.0,
    "petal_width": 0.0,
    "duration_min": 0.0,
    "popularity_score": 0.0,
    "song_title": "",
    "dataset_name": ""
})

Null counts by column:
Id: 0
sepal_length: 114000
sepal_width: 113955
petal_width: 113828
song_title: 0
duration_min: 114000
dataset_name: 0
popularity_score: 113955


In [0]:
print("Row count before removing duplicates:", df_clean.count())
df_duplicates = df_clean.subtract(df_clean.dropDuplicates())
print("Duplicate records count:", df_duplicates.count())
display(df_duplicates)
df_nodup = df_clean.dropDuplicates()
print("After removing duplicates:", df_nodup.count())

Row count before removing duplicates: 114000
Duplicate records count: 0


Id,sepal_length,sepal_width,petal_width,song_title,duration_min,dataset_name,popularity_score


After removing duplicates: 114000


In [0]:
output_path = "/Volumes/workspace/sinchana/sinchana1/iris_cleaned_parquet"

df_nodup.write.mode("overwrite").parquet(output_path)

print(f"Processed data saved to: {output_path}")

# Display sample data from the saved parquet file
df_sample = spark.read.parquet(output_path).limit(5)
display(df_sample)

Processed data saved to: /Volumes/workspace/sinchana/sinchana1/iris_cleaned_parquet


Id,sepal_length,sepal_width,petal_width,song_title,duration_min,dataset_name,popularity_score
3261,0.0,0.0,0.0,67,0.0,sinchana1/dataset.csv,0.0
3801,0.0,0.0,0.0,60,0.0,sinchana1/dataset.csv,0.0
4108,0.0,0.0,0.0,57,0.0,sinchana1/dataset.csv,0.0
4225,0.0,0.0,0.0,63,0.0,sinchana1/dataset.csv,0.0
4416,0.0,0.0,0.0,51,0.0,sinchana1/dataset.csv,0.0
