In [1]:
%sh wget "https://raw.githubusercontent.com/imendibo/SEPLN-TASS15/master/DATA/general-tweets-train-tagged.xml" & wget "http://dis.unal.edu.co/~fgonza/courses/eswikinews.bin"

In [2]:
dbutils.fs.cp("file:/databricks/driver/eswikinews.bin", "/FileStore/eswikinews.bin")
dbutils.fs.cp("file:/databricks/driver/general-tweets-train-tagged.xml", "/FileStore/general-tweets-train-tagged.xml")

In [3]:
%fs ls  "/FileStore"

# 1. Cargar archivo

In [5]:
df = sqlContext.read.format('com.databricks.spark.xml').options(rowTag='tweet').load('file:/databricks/driver/general-tweets-train-tagged.xml')
df.cache()
display(df)

# 2. Extraer contenido y etiqueta de sentimiento

In [7]:
df=df.withColumn("sentiment_values", df.sentiments.polarity.value)
display(df)

# 3. Preprocesar contenidos de los tweets

In [9]:
from pyspark.sql.functions import udf, lower
from pyspark.sql.types import *
import unicodedata, string

def remove_accents(input_str):
  nfkd_form = unicodedata.normalize('NFKD', input_str)
  only_ascii = nfkd_form.encode('ASCII', 'ignore')
  return only_ascii

def remove_punctuation(input_str):
  return input_str.translate(string.maketrans("",""), string.punctuation)
  

#to lowercase
df = df.withColumn("content", lower(df.content))
#remove accents
udfReAccents = udf(remove_accents, StringType())
df = df.withColumn("content", udfReAccents("content"))
#remove punctuation
udfRePunct = udf(remove_punctuation, StringType())
df = df.withColumn("content", udfRePunct("content"))
display(df)


# 4. Preprocesar etiquetas de los sentimientos

In [11]:
#select only sentiment.polarity.entity = "null"
def nullInList(_list):
  for i in _list:
    if i == None or i == 'null' or i=="NONE":
      return True
  return False

udfNullInList = udf(nullInList, BooleanType())

df=df.withColumn("sentiments_entity", udfNullInList("sentiments.polarity.entity"))
df=df.filter(df.sentiments_entity==True)

display(df)

In [12]:
def convertPolarity(_list):
  if _list[0] == "P+":
    return "P"
  elif _list[0] =="N+":
    return "N"
  else:
    return _list[0]

#convert polarity, get first polarity, let only NEU, P, N polarities
udfConvertP = udf(convertPolarity, StringType()) 
df = df.withColumn("sentiment_values", udfConvertP("sentiment_values"))
df = df.filter(df.sentiment_values!='NONE')
display(df)

# 5. Carga del modelo de Word2Vec

In [14]:
from gensim.models.word2vec import Word2Vec
from pyspark.ml.feature import Tokenizer
import numpy as np

#load word2vec model
model = Word2Vec.load_word2vec_format('/dbfs/FileStore/eswikinews.bin', binary=True)

#tokenize tweets
tokenizer = Tokenizer(inputCol="content", outputCol="words")
df = tokenizer.transform(df)
display(df)

# 6. Construcción del vector de características

In [16]:
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import col
import numpy as np

def w2vec(_list):  
  vec_list=[]
  for i in _list:
    try:
      vec_list.append(model[i])
    except KeyError:
      pass
  if not vec_list:
    return None
  return Vectors.dense(np.asarray(vec_list).mean(axis=0))

#get features from tokens
udfW2vec = udf(w2vec,  VectorUDT())
df = df.withColumn("features", udfW2vec("words"))
#amount of null Tweets
print "amount of null Tweets", df.where(col("features").isNull()).count()
df = df.where(col("features").isNotNull())

In [17]:
display(df.select('words', 'features'))

# 7. Clasificadores

## 7.1 Random Forest

In [20]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier, NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.feature import StringIndexer

(train, test) = df.randomSplit([0.7, 0.3])
train.cache()
test.cache()


accuracy_metric = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
recall_metric = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="weightedRecall")
f1_metric = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction")
#df.unpersist()

numFolds = 10

labelIndexer = StringIndexer(inputCol="sentiment_values", outputCol="label").fit(df)

rf = RandomForestClassifier(numTrees=30, maxDepth=10, labelCol="label", featuresCol="features")
pipeline = Pipeline(stages=[labelIndexer, rf])

grid = ParamGridBuilder().build()
rf_crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=grid,
    evaluator=f1_metric,
    numFolds=numFolds)

rf_model = rf_crossval.fit(train)
rf_prediction=rf_model.bestModel.transform(test)

print "Random Forest Classifier Metrics"
print "avg. F1-score", rf_model.avgMetrics
print "Best Model Metrics"
print "Accuracy: ",accuracy_metric.evaluate(rf_prediction)
print "Recall:", recall_metric.evaluate(rf_prediction)
print "F1:", f1_metric.evaluate(rf_prediction)


## 7.2 Mulilayer Perceptron

In [22]:
from pyspark.ml.classification import MultilayerPerceptronClassifier

numFolds = 10

labelIndexer = StringIndexer(inputCol="sentiment_values", outputCol="label").fit(df)

layers = [200, 100, 50, 20, 3]

mlp = MultilayerPerceptronClassifier(labelCol="label", featuresCol="features", maxIter=300, layers=layers )
pipeline = Pipeline(stages=[labelIndexer, mlp])

grid = ParamGridBuilder().build()
mlp_crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=grid,
    evaluator=f1_metric,
    numFolds=numFolds)

mlp_model = mlp_crossval.fit(train)
mlp_prediction=mlp_model.bestModel.transform(test)
print "Multi-Layer Perceptron"
print "avg. F1-score", mlp_model.avgMetrics
print "Best Model Metrics"
print "Accuracy: ",accuracy_metric.evaluate(mlp_prediction)
print "Recall:", recall_metric.evaluate(mlp_prediction)
print "F1:", f1_metric.evaluate(mlp_prediction)


# Fin 
___