In [0]:
%pyspark
df = spark.read.option("header", "false") \
    .option("delimiter", "\t") \
    .option("inferSchema", "true") \
    .csv("s3://megadados-alunos/dados/all_reviews_clean_tsv/") \
    .withColumnRenamed("_c0", "marketplace") \
    .withColumnRenamed("_c1", "customer_id") \
    .withColumnRenamed("_c2", "review_id") \
    .withColumnRenamed("_c3", "product_id") \
    .withColumnRenamed("_c4", "product_parent") \
    .withColumnRenamed("_c5", "product_title") \
    .withColumnRenamed("_c6", "product_category") \
    .withColumnRenamed("_c7", "star_rating") \
    .withColumnRenamed("_c8", "helpful_votes") \
    .withColumnRenamed("_c9", "total_votes") \
    .withColumnRenamed("_c10", "vine") \
    .withColumnRenamed("_c11", "verified_purchase") \
    .withColumnRenamed("_c12", "review_headline") \
    .withColumnRenamed("_c13", "review_body") \
    .withColumnRenamed("_c14", "review_date") \
    .cache()

In [1]:
%pyspark
star = df["star_rating"]

# limpar todos os que são nulos 
df_star = df.where((star == '1') | (star == '2') | (star == '3') | (star == '4') | (star == '5'))


In [2]:
%pyspark
from pyspark.sql.functions import when, lit

df_review = df_star.withColumn("avaliacao", when((df_star.star_rating == '5'), lit("positivo")) \
                    .when((df_star.star_rating == '4'), lit("neutro")) \
                    .otherwise(lit("negativo"))) \
                    [["review_id", "review_body", "review_headline", "avaliacao"]]

In [3]:
%pyspark
# filtrar apenas os que tem comentarios a respeito do produto 
df_review = df_review.filter(df_review.review_body.isNotNull())

df_review.show()

In [4]:
%pyspark
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import NaiveBayes

In [5]:
%pyspark
stages = []

# 1. clean data and tokenize sentences using RegexTokenizer

regexTokenizer = RegexTokenizer(inputCol="review_body", outputCol="tokens", pattern="\\W+")
stages += [regexTokenizer]

# 2. CountVectorize the data
cv = CountVectorizer(inputCol="tokens", outputCol="token_features", minDF=2.0)
stages += [cv]

# 3. Convert the labels to numerical values using binariser
indexer = StringIndexer(inputCol="avaliacao", outputCol="label")
stages += [indexer]

# 4. Vectorise features using vectorassembler
vecAssembler = VectorAssembler(inputCols=['token_features'], outputCol="features")
stages += [vecAssembler]

[print('\n', stage) for stage in stages]



In [6]:
%pyspark
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=stages)
data = pipeline.fit(df_review).transform(df_review)

In [7]:
%pyspark
# split database into train and test
train, test = data.randomSplit([0.7, 0.3], seed = 2018)

In [8]:
%pyspark
from pyspark.ml.classification import NaiveBayes

# Initialise the model
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

# Fit the model
model = nb.fit(train)

# Make predictions on test data
predictions = model.transform(test)
predictions.select("label", "prediction", "probability").show()


In [9]:
%pyspark
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction")
accuracy = evaluator.evaluate(predictions)

print ("Model Accuracy: ", accuracy)

In [10]:
%pyspark
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid and Evaluator for Cross Validation
paramGrid = ParamGridBuilder().addGrid(nb.smoothing, [0.0, 0.2, 0.4, 0.6, 0.8, 1.0, 1.5, 2.0]).build()
cvEvaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction")

# Run Cross-validation
cv = CrossValidator(estimator=nb, estimatorParamMaps=paramGrid, evaluator=cvEvaluator)
cvModel = cv.fit(train)

# Make predictions on testData. cvModel uses the bestModel.
cvPredictions = cvModel.transform(test)

# Evaluate bestModel found from Cross Validation
evaluator.evaluate(cvPredictions)