In [0]:
val basePath = "/user/yc7093_nyu_edu/imdb-reviews-w-emotion/part"
val fileSuffixes = List("-01-all") //, "-02-all", "-03-all", "-04-all")

// Initialize the first DataFrame with the schema of the first file
val initialPath = s"$basePath${fileSuffixes.head}"
var rawDF = spark.read.parquet(initialPath)

// Loop through the remaining file suffixes, construct the full path, and concatenate the DataFrames
for (suffix <- fileSuffixes.tail) {
  val fullPath = s"$basePath$suffix" // Construct the full path
  val part_df = spark.read.parquet(fullPath) // Read each Parquet file
  rawDF = rawDF.union(part_df) // Concatenate the DataFrames
}

// Show the result
rawDF.show(5)


In [1]:
// Assuming the ratings are stored in a column named "rating"
// Replace "rating" with the actual column name in your DataFrame
val ratingDistribution = rawDF.groupBy("rating")
  .count() // Count the number of occurrences for each rating
  .orderBy("rating") // Optional: Order by rating for better readability

// Show the result
ratingDistribution.show()


In [2]:
val outputPath = "/user/yc7093_nyu_edu/imdb-emotion-analysis/rating-distribution"

// Coalesce to a single partition and write as CSV
ratingDistribution.coalesce(1)
  .write
  .mode("overwrite")
  .option("header", "true")
  .csv(outputPath)

In [3]:
// Assuming the emotions are stored in a column named "emotion"
// Replace "emotion" with the actual column name in your DataFrame
val emotionDistribution = rawDF.groupBy("emotion")
  .count() // Count the number of occurrences for each emotion
  .orderBy("emotion") // Optional: Order by emotion for better readability

// Show the result
emotionDistribution.show()


In [4]:
val outputPath = "/user/yc7093_nyu_edu/imdb-emotion-analysis/emotion-distribution"

// Coalesce to a single partition and write as CSV
emotionDistribution.coalesce(1)
  .write
  .mode("overwrite")
  .option("header", "true")
  .csv(outputPath)

In [5]:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

// Define a window partitioned by emotion
val emotionWindow = Window.partitionBy("emotion")

// Group by emotion and rating to get the count
val ratingDistributionWithinEmotion = rawDF.groupBy("emotion", "rating")
  .count() // Count the number of occurrences for each emotion and rating
  .withColumn("total_count", sum("count").over(emotionWindow)) // Total count per emotion
  .withColumn("percentage", (col("count") / col("total_count")) * 100) // Percentage calculation
  .orderBy("emotion", "rating") // Order by emotion and then by rating

// Show the result
val rowCount = ratingDistributionWithinEmotion.count()
ratingDistributionWithinEmotion.show(rowCount.toInt, truncate = false)



In [6]:
val outputPath = "/user/yc7093_nyu_edu/imdb-emotion-analysis/rating-distribution-w-emotion"

// Coalesce to a single partition and write as CSV
ratingDistributionWithinEmotion.coalesce(1)
  .write
  .mode("overwrite")
  .option("header", "true")
  .csv(outputPath)


In [7]:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

// Define a window partitioned by rating
val ratingWindow = Window.partitionBy("rating")

// Group by rating and emotion to get the count
val emotionDistributionWithinRating = rawDF.groupBy("rating", "emotion")
  .count() // Count the number of occurrences for each rating and emotion
  .withColumn("total_count", sum("count").over(ratingWindow)) // Total count per rating
  .withColumn("percentage", (col("count") / col("total_count")) * 100) // Percentage calculation
  .orderBy("rating", "emotion") // Order by rating and then by emotion

// Show the result
val rowCount = emotionDistributionWithinRating.count()
emotionDistributionWithinRating.show(rowCount.toInt, truncate = false)




In [8]:
val outputPath = "/user/yc7093_nyu_edu/imdb-emotion-analysis/emotion-distribution-w-rating"

// Coalesce to a single partition and write as CSV
emotionDistributionWithinRating.coalesce(1)
  .write
  .mode("overwrite")
  .option("header", "true")
  .csv(outputPath)


In [9]:
import org.apache.spark.sql.functions._

// Define the base path and the emotion you want to analyze
val basePath = "/user/yc7093_nyu_edu/imdb_partitioned_by_emotion_rating_word"
val specificEmotion = "sadness"

// Load the partitioned data for the specific emotion
val emotionDf = spark.read.parquet(s"$basePath/emotion=$specificEmotion")

// Calculate keyword distribution
val keywordDistribution = emotionDf.groupBy("word")
  .count() // Count occurrences of each word within the emotion
  .orderBy(desc("count")) // Order by count in descending order



// Show the result
val rowCount = keywordDistribution.count()
keywordDistribution.show(rowCount.toInt, truncate = false)




In [10]:
val outputPath = s"/user/yc7093_nyu_edu/imdb-emotion-analysis/$specificEmotion-keyword-distribution"

// Coalesce to a single partition and write as CSV
keywordDistribution.coalesce(1)
  .write
  .mode("overwrite")
  .option("header", "true")
  .csv(outputPath)

In [11]:
import org.apache.spark.sql.functions._

// Define the base path and the emotion you want to analyze
val basePath = "/user/yc7093_nyu_edu/imdb_partitioned_by_emotion_rating_word"
val specificEmotion = "love"

// Load the partitioned data for the specific emotion
val emotionDf = spark.read.parquet(s"$basePath/emotion=$specificEmotion")

// Calculate keyword distribution
val keywordDistribution = emotionDf.groupBy("word")
  .count() // Count occurrences of each word within the emotion
  .orderBy(desc("count")) // Order by count in descending order


// Show the result
val rowCount = keywordDistribution.count()
keywordDistribution.show(rowCount.toInt, truncate = false)

In [12]:
val outputPath = s"/user/yc7093_nyu_edu/imdb-emotion-analysis/$specificEmotion-keyword-distribution"

// Coalesce to a single partition and write as CSV
keywordDistribution.coalesce(1)
  .write
  .mode("overwrite")
  .option("header", "true")
  .csv(outputPath)

In [13]:
import org.apache.spark.sql.functions._

// Define the base path and the emotion you want to analyze
val basePath = "/user/yc7093_nyu_edu/imdb_partitioned_by_emotion_rating_word"
val specificEmotion = "joy"

// Load the partitioned data for the specific emotion
val emotionDf = spark.read.parquet(s"$basePath/emotion=$specificEmotion")

// Calculate keyword distribution
val keywordDistribution = emotionDf.groupBy("word")
  .count() // Count occurrences of each word within the emotion
  .orderBy(desc("count")) // Order by count in descending order


// Show the result
val rowCount = keywordDistribution.count()
keywordDistribution.show(rowCount.toInt, truncate = false)

In [14]:
val outputPath = s"/user/yc7093_nyu_edu/imdb-emotion-analysis/$specificEmotion-keyword-distribution"

// Coalesce to a single partition and write as CSV
keywordDistribution.coalesce(1)
  .write
  .mode("overwrite")
  .option("header", "true")
  .csv(outputPath)

In [15]:
import org.apache.spark.sql.functions._

// Define the base path and the emotion you want to analyze
val basePath = "/user/yc7093_nyu_edu/imdb_partitioned_by_emotion_rating_word"
val specificEmotion = "surprise"

// Load the partitioned data for the specific emotion
val emotionDf = spark.read.parquet(s"$basePath/emotion=$specificEmotion")

// Calculate keyword distribution
val keywordDistribution = emotionDf.groupBy("word")
  .count() // Count occurrences of each word within the emotion
  .orderBy(desc("count")) // Order by count in descending order


// Show the result
val rowCount = keywordDistribution.count()
keywordDistribution.show(rowCount.toInt, truncate = false)

In [16]:
val outputPath = s"/user/yc7093_nyu_edu/imdb-emotion-analysis/$specificEmotion-keyword-distribution"

// Coalesce to a single partition and write as CSV
keywordDistribution.coalesce(1)
  .write
  .mode("overwrite")
  .option("header", "true")
  .csv(outputPath)

In [17]:
import org.apache.spark.sql.functions._

// Define the base path and the emotion you want to analyze
val basePath = "/user/yc7093_nyu_edu/imdb_partitioned_by_emotion_rating_word"
val specificEmotion = "anger"

// Load the partitioned data for the specific emotion
val emotionDf = spark.read.parquet(s"$basePath/emotion=$specificEmotion")

// Calculate keyword distribution
val keywordDistribution = emotionDf.groupBy("word")
  .count() // Count occurrences of each word within the emotion
  .orderBy(desc("count")) // Order by count in descending order


// Show the result
val rowCount = keywordDistribution.count()
keywordDistribution.show(rowCount.toInt, truncate = false)

In [18]:
val outputPath = s"/user/yc7093_nyu_edu/imdb-emotion-analysis/$specificEmotion-keyword-distribution"

// Coalesce to a single partition and write as CSV
keywordDistribution.coalesce(1)
  .write
  .mode("overwrite")
  .option("header", "true")
  .csv(outputPath)

In [19]:
import org.apache.spark.sql.functions._

// Define the base path and the emotion you want to analyze
val basePath = "/user/yc7093_nyu_edu/imdb_partitioned_by_emotion_rating_word"
val specificEmotion = "fear"

// Load the partitioned data for the specific emotion
val emotionDf = spark.read.parquet(s"$basePath/emotion=$specificEmotion")

// Calculate keyword distribution
val keywordDistribution = emotionDf.groupBy("word")
  .count() // Count occurrences of each word within the emotion
  .orderBy(desc("count")) // Order by count in descending order


// Show the result
val rowCount = keywordDistribution.count()
keywordDistribution.show(rowCount.toInt, truncate = false)

In [20]:
val outputPath = s"/user/yc7093_nyu_edu/imdb-emotion-analysis/$specificEmotion-keyword-distribution"

// Coalesce to a single partition and write as CSV
keywordDistribution.coalesce(1)
  .write
  .mode("overwrite")
  .option("header", "true")
  .csv(outputPath)

In [21]:
import org.apache.spark.sql.functions._

// Define the base path for the partitioned data
val basePath = "/user/yc7093_nyu_edu/imdb_partitioned_by_emotion_rating_word"

// Read the partitioned Parquet data
val partitionedDf = spark.read.parquet(basePath)

// Group by "word" and "rating" to calculate the count for each combination
val ratingDistributionByKeyword = partitionedDf.groupBy("word", "rating")
  .count() // Count occurrences for each word-rating combination
  .orderBy("word", "rating") // Optional: Order by word and rating for better readability

// Pivot the table to make "word" the row index and "rating" the column index
val pivotedDistribution = ratingDistributionByKeyword.groupBy("word")
  .pivot("rating") // Pivot on the "rating" column
  .sum("count") // Aggregate counts for each word-rating combination

// Show the resulting DataFrame
pivotedDistribution.show(truncate = false)


In [22]:
val outputPath = "/user/yc7093_nyu_edu/imdb-emotion-analysis/rating-distribution-w-word"

// Coalesce to a single partition and write as CSV
pivotedDistribution.coalesce(1)
  .write
  .mode("overwrite")
  .option("header", "true")
  .csv(outputPath)

