In [None]:
!pip install mlflow
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.ml.feature import NGram, VectorAssembler, StopWordsRemover, HashingTF, IDF, Tokenizer, StringIndexer, CountVectorizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.classification import RandomForestClassifier

spark = SparkSession \
        .builder \
        .appName('Twitter Sentiment Analysis') \
        .getOrCreate()
print('Session created')

sc = spark.sparkContext


## import training dataset from AWS S3
tweets = spark.read.option('header',True).csv('/mnt/twitter_sentiment_dataset')


tweets = tweets.select('sentiment', F.col('SentimentText').alias('tweet'))

## data cleaning
tweets_clean   = tweets.withColumn('tweet', F.regexp_replace('tweet', r"http\S+", "")) \
                   .withColumn('tweet', F.regexp_replace('tweet', r"[^a-zA-z]", " ")) \
                   .withColumn('tweet', F.regexp_replace('tweet', r"\s+", " ")) \
                   .withColumn('tweet', F.lower('tweet')) \
                   .withColumn('tweet', F.trim('tweet'))

In [None]:
## Sentiment Analysis using LogisticRegression

# Use 90% cases for training, 10% cases for testing
train, test = tweets_clean.randomSplit([0.9, 0.1], seed=41)


# Create transformers for the ML pipeline
tokenizer = Tokenizer(inputCol="tweet", outputCol="tokens")
stopword_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered")
cv = CountVectorizer(vocabSize=2**16, inputCol="filtered", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="1gram_idf", minDocFreq=5) #minDocFreq: remove sparse terms
assembler = VectorAssembler(inputCols=["1gram_idf"], outputCol="features")
label_encoder= StringIndexer(inputCol = "sentiment", outputCol = "label")
# create the trainer and set its parameters

##MPC = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)
lr = LogisticRegression()
pipeline = Pipeline(stages=[tokenizer, stopword_remover, cv, idf, assembler, label_encoder, lr])


## Model tunning
paramGrid = ParamGridBuilder()\
.addGrid(lr.maxIter, [50])\
  .build()
  ##   \
    
 ##   
  
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
## Training and cross validation
tvs = TrainValidationSplit(estimator=pipeline,
                           estimatorParamMaps=paramGrid,
                           evaluator=evaluator,
                           parallelism=10,
                           # 80% of the data will be used for training, 20% for validation.
                           trainRatio=0.8)

tvs_model = tvs.fit(train)
predictions = tvs_model.transform(test)
##tvs_model.transform(test)\
##   .select("features", "label", "prediction")\
##    .show()
##evaluator.evaluate(predictions.select("prediction", "label"))
## predictionAndLabels = predictions.select("prediction", "label")
##evaluator =  BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
##MulticlassClassificationEvaluator(metricName="accuracy")
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(test.count())


print("Accuracy Score: {0:.4f}".format(accuracy))
##Accuracy Score: 0.7761
##Command took 53.44 minutes