In [43]:
!pip install pyspark

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Disneyland Reviews") \
    .getOrCreate()

file_path = '/Users/nous/Desktop/DisneylandReviews.csv'


df = spark.read.csv(file_path, header=True, inferSchema=True)

df.show(5)

+---------+------+----------+--------------------+--------------------+-------------------+
|Review_ID|Rating|Year_Month|   Reviewer_Location|         Review_Text|             Branch|
+---------+------+----------+--------------------+--------------------+-------------------+
|670772142|     4|    2019-4|           Australia|If you've ever be...|Disneyland_HongKong|
|670682799|     4|    2019-5|         Philippines|Its been a while ...|Disneyland_HongKong|
|670623270|     4|    2019-4|United Arab Emirates|Thanks God it was...|Disneyland_HongKong|
|670607911|     4|    2019-4|           Australia|HK Disneyland is ...|Disneyland_HongKong|
|670607296|     4|    2019-4|      United Kingdom|the location is n...|Disneyland_HongKong|
+---------+------+----------+--------------------+--------------------+-------------------+
only showing top 5 rows



24/01/21 01:14:39 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [44]:
from pyspark.sql.functions import split

split_column = split(df['Year_Month'], '-')

df = df.withColumn('Year', split_column.getItem(0))
df = df.withColumn('Month', split_column.getItem(1))

df.show()

+---------+------+----------+--------------------+--------------------+-------------------+----+-----+
|Review_ID|Rating|Year_Month|   Reviewer_Location|         Review_Text|             Branch|Year|Month|
+---------+------+----------+--------------------+--------------------+-------------------+----+-----+
|670772142|     4|    2019-4|           Australia|If you've ever be...|Disneyland_HongKong|2019|    4|
|670682799|     4|    2019-5|         Philippines|Its been a while ...|Disneyland_HongKong|2019|    5|
|670623270|     4|    2019-4|United Arab Emirates|Thanks God it was...|Disneyland_HongKong|2019|    4|
|670607911|     4|    2019-4|           Australia|HK Disneyland is ...|Disneyland_HongKong|2019|    4|
|670607296|     4|    2019-4|      United Kingdom|the location is n...|Disneyland_HongKong|2019|    4|
|670591897|     3|    2019-4|           Singapore|Have been to Disn...|Disneyland_HongKong|2019|    4|
|670585330|     5|    2019-4|               India|Great place! Your...|Di

In [45]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when, isnull

# 计算每列的空值数量
null_counts = df.select([count(when(isnull(c), c)).alias(c) for c in df.columns])

# 显示空值情况
null_counts.show()

total_rows = df.count()
print(total_rows)

+---------+------+----------+-----------------+-----------+------+----+-----+
|Review_ID|Rating|Year_Month|Reviewer_Location|Review_Text|Branch|Year|Month|
+---------+------+----------+-----------------+-----------+------+----+-----+
|        0|     0|         0|                0|          0|     0|   0| 2613|
+---------+------+----------+-----------------+-----------+------+----+-----+

42656


In [46]:
df = df.withColumn("Year_Month", when(df["Year_Month"] == "missing", 0).otherwise(df["Year_Month"]))


df = df.withColumn("Year", when(df["Year"] == "missing", 0).otherwise(df["Year"]))


df = df.withColumn("Month", when(df["Month"].isNull(), 0).otherwise(df["Month"]))


null_counts = df.select([count(when(df[c].isNull(), c)).alias(c) for c in df.columns])


null_counts.show()

total_rows = df.count()

print(total_rows)

+---------+------+----------+-----------------+-----------+------+----+-----+
|Review_ID|Rating|Year_Month|Reviewer_Location|Review_Text|Branch|Year|Month|
+---------+------+----------+-----------------+-----------+------+----+-----+
|        0|     0|         0|                0|          0|     0|   0|    0|
+---------+------+----------+-----------------+-----------+------+----+-----+

42656


In [47]:
# Drop the review_text column
df_without_review_text = df.drop('review_text')

# Show the DataFrame without the review_text column
df_without_review_text.show(5)

df_without_review_text.write.csv('/Users/nous/Desktop/Disneylandhive.csv', header=True)

+---------+------+----------+--------------------+-------------------+----+-----+
|Review_ID|Rating|Year_Month|   Reviewer_Location|             Branch|Year|Month|
+---------+------+----------+--------------------+-------------------+----+-----+
|670772142|     4|    2019-4|           Australia|Disneyland_HongKong|2019|    4|
|670682799|     4|    2019-5|         Philippines|Disneyland_HongKong|2019|    5|
|670623270|     4|    2019-4|United Arab Emirates|Disneyland_HongKong|2019|    4|
|670607911|     4|    2019-4|           Australia|Disneyland_HongKong|2019|    4|
|670607296|     4|    2019-4|      United Kingdom|Disneyland_HongKong|2019|    4|
+---------+------+----------+--------------------+-------------------+----+-----+
only showing top 5 rows



In [48]:
df_without_review_text.coalesce(1).write.csv('/Users/nous/Desktop/Disneylandhive1', header=True)

In [49]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, concat_ws, regexp_replace
from pyspark.ml.feature import StopWordsRemover

spark = SparkSession.builder \
    .appName("Extract Review Text") \
    .getOrCreate()

punctuation_pattern = "[^\\w\\s]"

review_text_df = df.withColumn("review_text", regexp_replace(col("review_text"), punctuation_pattern, ""))

review_text_df = review_text_df.withColumn("words", split(col("review_text"), " "))

remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")

review_text_df = remover.transform(review_text_df)

review_text_df = review_text_df.withColumn("filtered_text", concat_ws(" ", "filtered_words"))

collected_reviews = review_text_df.select("filtered_text").rdd.flatMap(lambda x: x).collect()

output_path = '/Users/nous/Desktop/output.txt'
with open(output_path, 'w') as file:
    for review in collected_reviews:
        file.write(review + "\n")

24/01/21 01:15:06 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                