# Data Cleaning

## Reading author data into a Dataframe

In [2]:
val datafilePath = "bdad-proj/goodreads_book_authors.json"
val authorsDf = spark.read.json(datafilePath)

In [3]:
z.show(authorsDf)

In [4]:
authorsDf.printSchema()

In [5]:
val origEntryCount = authorsDf.count()
val numAttributes = authorsDf.columns.size

## Removing entries with null values


In [7]:
val nonNullEntriesDF = authorsDf.na.drop(Seq("author_id"))
val newEntryCount = nonNullEntriesDF.count()

## Reading books data for augmentation

In [9]:
val booksDatafilePath = "bdad-proj/goodreads_books.json"
val booksDF = spark.read.json(booksDatafilePath)

In [10]:
booksDF.printSchema()

## Working with publication years

In [12]:
val booksWithAuthors = booksDF.select($"book_id", $"authors", $"publication_year")
z.show(booksWithAuthors)

In [13]:
val normalizedBwA = booksWithAuthors.select($"book_id", $"publication_year", explode($"authors").alias("author"))
z.show(normalizedBwA)

### Converting years into int and using explode to normalize data

In [15]:
val authorBookCount = normalizedBwA.select("book_id", "publication_year", "author.author_id", "author.role").filter($"role" === "")
    .groupBy(col("author_id").alias("auth_id")).count()

normalizedBwA.printSchema()

val castDF = normalizedBwA.select(normalizedBwA.columns.map {
    case column@"publication_year" =>
      col(column).cast("int").as(column)
    case column =>
      col(column)
  }: _*)
  
castDF.printSchema()

## Selecting the first and last publication years

In [17]:
val _firstYear = castDF.select("publication_year", "author.author_id", "author.role").filter($"role" === "")
    .groupBy(col("author_id").alias("auth_id")).min("publication_year")

val firstYear = _firstYear.filter(_firstYear("min(publication_year)") > 1000 && _firstYear("min(publication_year)") < 2023)
    
val _lastYear = castDF.select("publication_year", "author.author_id", "author.role").filter($"role" === "")
    .groupBy(col("author_id").alias("auth_id_2")).max("publication_year")
    
val lastYear = _lastYear.filter(_lastYear("max(publication_year)") > 1000 && _lastYear("max(publication_year)") < 2023)

In [18]:
val yearsActive = firstYear.join(lastYear, firstYear("auth_id") === lastYear("auth_id_2"), "fullouter")
yearsActive.printSchema()

In [19]:
val authorWYearsDF = nonNullEntriesDF.join(yearsActive, nonNullEntriesDF("author_id") === yearsActive("auth_id"), "leftouter").drop("auth_id", "auth_id_2")

authorWYearsDF.printSchema()

In [20]:
val authorWYearsRenamedDF = authorWYearsDF
    .withColumnRenamed("author_id", "auth")
    .withColumnRenamed("min(publication_year)", "first_published_in")
    .withColumnRenamed("max(publication_year)", "last_published_in")
    .select("auth", "first_published_in", "last_published_in")
z.show(authorWYearsRenamedDF)

In [21]:
val augmentedAuthorDF = nonNullEntriesDF.join(authorBookCount, nonNullEntriesDF("author_id") === authorBookCount("auth_id"), "leftouter")
    .drop("auth_id").na.fill(0,Array("count")).withColumnRenamed("count", "books_count")
z.show(augmentedAuthorDF)

In [22]:
val finalDF = augmentedAuthorDF.join(authorWYearsRenamedDF, augmentedAuthorDF("author_id") === authorWYearsRenamedDF("auth"), "leftouter").drop("auth")

z.show(finalDF)

In [23]:
val authorsOutputDF = finalDF
authorsOutputDF.printSchema()

In [24]:
authorsOutputDF.write.parquet("bdad-proj/data-prof-clean/goodreads_authors_2.parquet")

# Some Data Profiling

## Performing some aggregation statistics

In [27]:
val starDistributions = authorsOutputDF.withColumn("avg_rating", round(col("average_rating"), 0)).groupBy("avg_rating").count()
z.show(starDistributions)

In [28]:
val bookCountDistributions = authorsOutputDF.groupBy("books_count").count()
z.show(bookCountDistributions)

In [29]:
val totalNumberOfAuthors = authorsOutputDF.count()

In [30]:
val startYear = authorsOutputDF.na.drop(Seq("first_published_in")).groupBy("first_published_in").count()
z.show(startYear)