In [3]:
//Transfer the file from GCS to HDFS

import org.apache.spark.sql.SparkSession

// Step 1: Initialize SparkSession
val spark = SparkSession.builder()
  .appName("Transfer File from GCS to HDFS")
  .getOrCreate()

// Step 2: Define the GCS path and HDFS path
val gcsPath = "gs://deva_vasadi/movie_lens_data/movies.csv"
val hdfsPath = "hdfs:///user/casestudies/casestudy4/movies.csv"

// Step 3: Read the file from GCS
val data = spark.read
  .option("header", "true")  // Read the header from the CSV file
  .csv(gcsPath)

// Step 4: Write the file to HDFS with headers
data.write
  .option("header", "true")  // Include the header in the output
  .mode("overwrite")         // Overwrite if the file already exists
  .csv(hdfsPath)

println(s"File transferred from $gcsPath to $hdfsPath successfully!")

File transferred from gs://deva_vasadi/movie_lens_data/movies.csv to hdfs:///user/casestudies/casestudy4/movies.csv successfully!


lastException = null
spark = org.apache.spark.sql.SparkSession@2635be6e
gcsPath = gs://deva_vasadi/movie_lens_data/movies.csv
hdfsPath = hdfs:///user/casestudies/casestudy4/movies.csv
data = [movieId: string, title: string ... 1 more field]


[movieId: string, title: string ... 1 more field]

In [4]:
//Adding duplicates

// Step 1: Read the existing movies.csv from HDFS
val moviesPath = "hdfs:///user/casestudies/casestudy4/movies.csv"
val moviesDF = spark.read.option("header", "true").csv(moviesPath)

// Step 2: Add 1000 duplicates by appending the same DataFrame multiple times
val duplicateMoviesDF = moviesDF.limit(1000) // Take 1000 rows to duplicate
val moviesWithDuplicatesDF = moviesDF.union(duplicateMoviesDF) // Append duplicates

// Step 3: Write the updated DataFrame with duplicates back to the same HDFS path
moviesWithDuplicatesDF.write
  .option("header", "true")
  .mode("overwrite") // Overwrite the existing file
  .csv("hdfs:///user/casestudies/casestudy4/duplicated_movies.csv")

println("1000 duplicates inserted and file updated successfully!")


1000 duplicates inserted and file updated successfully!


moviesPath = hdfs:///user/casestudies/casestudy4/movies.csv
moviesDF = [movieId: string, title: string ... 1 more field]
duplicateMoviesDF = [movieId: string, title: string ... 1 more field]
moviesWithDuplicatesDF = [movieId: string, title: string ... 1 more field]


[movieId: string, title: string ... 1 more field]

In [12]:
// Add depedencies for avro format
%AddDeps org.apache.spark spark-avro_2.12 3.3.2 --transitive

Marking org.apache.spark:spark-avro_2.12:3.3.2 for download
Obtained 12 files


lastException = null


Marking org.apache.spark:spark-avro_2.12:3.3.2 for download
Obtained 12 files


In [13]:
//Remove duplicates and store the final CSV in avro format

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

// Step 1: Initialize SparkSession
val spark = SparkSession.builder()
  .appName("Movies Duplicate Cleaner")
  .getOrCreate()

// Step 2: Load movies.csv into a DataFrame from HDFS
val moviesPath = "hdfs:///user/casestudies/casestudy4/duplicated_movies.csv"
val moviesDF = spark.read.option("header", "true").csv(moviesPath)

//moviesDF.show()

println("Running...")

// Step 3: Convert to RDD with composite key (movieId, title)
val moviesRDD = moviesDF.rdd.map(row => {
  val movieId = row.getString(row.fieldIndex("movieId"))
  val title = row.getString(row.fieldIndex("title"))
  val genres = row.getString(row.fieldIndex("genres"))
  ((movieId, title), genres) // Key: (movieId, title), Value: genres
})

// Combine genres for duplicate keys
val uniqueMoviesRDD = moviesRDD.reduceByKey((genres1, genres2) => s"$genres1|$genres2")

// Transform back to (movieId, title, genres) format
val cleanedMoviesRDD = uniqueMoviesRDD.map {
  case ((movieId, title), combinedGenres) => (movieId, title, combinedGenres)
}

val cleanedMoviesDF = cleanedMoviesRDD.toDF("movieId", "title", "genres")

// Step 4: Validation
val originalCount = moviesDF.count()
val deduplicatedCount = cleanedMoviesDF.count()
val duplicatesRemoved = originalCount - deduplicatedCount
println(s"Original record count: $originalCount")
println(s"Deduplicated record count: $deduplicatedCount")
println(s"Duplicates removed: $duplicatesRemoved")


Running...
Original record count: 88586
Deduplicated record count: 87585
Duplicates removed: 1001


spark = org.apache.spark.sql.SparkSession@2635be6e
moviesPath = hdfs:///user/casestudies/casestudy4/duplicated_movies.csv
moviesDF = [movieId: string, title: string ... 1 more field]
moviesRDD = MapPartitionsRDD[163] at map at <console>:82
uniqueMoviesRDD = ShuffledRDD[164] at reduceByKey at <console>:90
cleanedMoviesRDD = MapPartitionsRDD[165] at map at <console>:93
cleanedMoviesDF = [movieId: string, title: string ... 1 more field]


originalCou...


[movieId: string, title: string ... 1 more field]

In [14]:
// Step 6: Save the cleaned data as Avro files in GCP Cloud Storage
val outputPath = "gs://deva_vasadi/case_studies/cleaned-movies.avro"
cleanedMoviesDF.write
  .format("avro")
  .mode("overwrite")
  .save(outputPath)

println("Cleaned movies data saved successfully in Avro format.")

Cleaned movies data saved successfully in Avro format.


outputPath = gs://deva_vasadi/case_studies/cleaned-movies.avro


gs://deva_vasadi/case_studies/cleaned-movies.avro

In [6]:
//Method to count duplicates

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._

def countDuplicates(inputDF: DataFrame): Long = {
  // Step 1: Group by specified columns and count occurrences
  val groupedDF = inputDF
    .groupBy("movieId", "title")
    .count()
    
  //groupedDF.show()

  // Step 2: Filter groups with more than 1 record (duplicates)
  val duplicateGroupsDF = groupedDF.filter(col("count") > 1)
    
  //duplicateGroupsDF.show()
  if (duplicateGroupsDF.head(1).isEmpty) {
    println("No duplicate groups found.")
    return 0L
  }

  // Step 3: Calculate the total number of duplicate records
  val duplicateCount = duplicateGroupsDF
    .select(expr("sum(count - 1)")) // Subtract 1 for each group to exclude the first record
    .collect()(0)(0) // Extract the result from Row

  // Step 4: Return the total count of duplicate records
  duplicateCount.toString.toLong
}

countDuplicates: (inputDF: org.apache.spark.sql.DataFrame)Long


In [7]:
countDuplicates(moviesDF)

1001

In [8]:
countDuplicates(cleanedMoviesDF)

No duplicate groups found.


0