In [1]:
from pyspark import SparkConf, SparkContext

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql import functions as F

In [2]:
spark = SparkSession \
    .builder \
    .appName("clean") \
    .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/hotel.raw") \
    .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/hotel.clean") \
    .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.11:2.4.1') \
    .getOrCreate()

In [3]:
raw_reviews = spark.read.format("mongo").load()

In [4]:
raw_reviews.printSchema()

root
 |-- Additional_Number_of_Scoring: integer (nullable = true)
 |-- Average_Score: double (nullable = true)
 |-- Hotel_Address: string (nullable = true)
 |-- Hotel_Name: string (nullable = true)
 |-- Negative_Review: string (nullable = true)
 |-- Positive_Review: string (nullable = true)
 |-- Review_Date: timestamp (nullable = true)
 |-- Review_Total_Negative_Word_Counts: integer (nullable = true)
 |-- Review_Total_Positive_Word_Counts: integer (nullable = true)
 |-- Reviewer_Nationality: string (nullable = true)
 |-- Reviewer_Score: double (nullable = true)
 |-- Tags: string (nullable = true)
 |-- Total_Number_of_Reviews: integer (nullable = true)
 |-- Total_Number_of_Reviews_Reviewer_Has_Given: integer (nullable = true)
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- days_since_review: string (nullable = true)
 |-- lat: decimal(9,7) (nullable = true)
 |-- lng: decimal(8,7) (nullable = true)



In [5]:
# take 25000 samples
raw_reviews = spark.createDataFrame(raw_reviews.take(20000))

In [6]:
useless_reviews = ["NOTHING", "Nothing", "nothing", "Nothing at all", "No Negative", "No Positive", "n a", "N a", "N A", " "]

# prepare positive dataframe and negative dataframe for merging
# AND remove useless reviews
positive_reviews = raw_reviews \
    .select("average_score", "hotel_name", "review_date", "tags", "lat", "lng", F.col("positive_review").alias("review")) \
    .filter(F.col("positive_review").isin(useless_reviews) == False) \
    .withColumn("sentiment", F.lit(1))
negative_reviews = raw_reviews \
    .select("average_score", "hotel_name", "review_date", "tags", "lat", "lng", F.col("negative_review").alias("review")) \
    .filter(F.col("negative_review").isin(useless_reviews) == False) \
    .withColumn("sentiment", F.lit(0))

In [7]:
# analyze balance
print("positive reviews count: ", positive_reviews.count())
print("negative reviews count: ", negative_reviews.count())

positive reviews count:  17649
negative reviews count:  14711


In [8]:
# merge positive and negative dataframe
clean_reviews = positive_reviews.union(negative_reviews)

# write clean reviews to mongo
clean_reviews.write.format("mongo").mode("append").save()

In [9]:
clean_reviews.printSchema()

root
 |-- average_score: double (nullable = true)
 |-- hotel_name: string (nullable = true)
 |-- review_date: timestamp (nullable = true)
 |-- tags: string (nullable = true)
 |-- lat: decimal(38,18) (nullable = true)
 |-- lng: decimal(38,18) (nullable = true)
 |-- review: string (nullable = true)
 |-- sentiment: integer (nullable = false)

