In [1]:
# ejemplo tomado de: 
# https://community.hortonworks.com/articles/84781/spark-text-analytics-uncovering-data-driven-topics.html
# github: https://github.com/zaratsian/Spark/blob/master/text_analytics_datadriven_topics.json (con zeppelin)
# otros ejemplos muy buenos: https://github.com/zaratsian/Spark

In [1]:
from pyspark.sql.types import *
from pyspark.sql.functions import monotonically_increasing_id, col, expr, when, concat, lit, isnan
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import GeneralizedLinearRegression
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression
from pyspark.ml.feature import VectorIndexer, VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, RegressionEvaluator, BinaryClassificationEvaluator
from pyspark.ml import Pipeline
import pyspark

In [2]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local', "app-topic-detection") 
spark = SparkSession(sc)

In [3]:
df=spark.read.csv("file:///opt/datasets/airlines.csv", inferSchema=True, header=True)
df.show(10)

+-----+---------------+---------+--------+------+--------+-----+-----------+--------------------+
|   id|        airline|     date|location|rating|   cabin|value|recommended|              review|
+-----+---------------+---------+--------+------+--------+-----+-----------+--------------------+
|10001|Delta Air Lines|21-Jun-14|Thailand|     7| Economy|    4|        YES|Flew Mar 30 NRT t...|
|10002|Delta Air Lines|19-Jun-14|     USA|     0| Economy|    2|         NO|Flight 2463 leavi...|
|10003|Delta Air Lines|18-Jun-14|     USA|     0| Economy|    1|         NO|Delta Website fro...|
|10004|Delta Air Lines|17-Jun-14|     USA|     9|Business|    4|        YES|"I just returned ...|
|10005|Delta Air Lines|17-Jun-14| Ecuador|     7| Economy|    3|        YES|"Round-trip fligh...|
|10006|Delta Air Lines|17-Jun-14|     USA|     9|Business|    5|        YES|Narita - Bangkok ...|
|10007|Delta Air Lines|14-Jun-14|      UK|     0| Economy|    1|         NO|Flight from NY La...|
|10008|Delta Air Lin

In [4]:
df.createOrReplaceTempView("train_df")
sqlDF = spark.sql("SELECT * FROM train_df")
sqlDF.show()

+-----+---------------+---------+---------+------+--------+-----+-----------+--------------------+
|   id|        airline|     date| location|rating|   cabin|value|recommended|              review|
+-----+---------------+---------+---------+------+--------+-----+-----------+--------------------+
|10001|Delta Air Lines|21-Jun-14| Thailand|     7| Economy|    4|        YES|Flew Mar 30 NRT t...|
|10002|Delta Air Lines|19-Jun-14|      USA|     0| Economy|    2|         NO|Flight 2463 leavi...|
|10003|Delta Air Lines|18-Jun-14|      USA|     0| Economy|    1|         NO|Delta Website fro...|
|10004|Delta Air Lines|17-Jun-14|      USA|     9|Business|    4|        YES|"I just returned ...|
|10005|Delta Air Lines|17-Jun-14|  Ecuador|     7| Economy|    3|        YES|"Round-trip fligh...|
|10006|Delta Air Lines|17-Jun-14|      USA|     9|Business|    5|        YES|Narita - Bangkok ...|
|10007|Delta Air Lines|14-Jun-14|       UK|     0| Economy|    1|         NO|Flight from NY La...|
|10008|Del

In [5]:
import nltk
import pandas as pd
import numpy as np
import re
import codecs
import matplotlib.pyplot as plt

In [6]:
nltk.download('punkt')
nltk.download('stopwords')

[nltk_data] Downloading package punkt to /Users/emontoya/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/emontoya/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [7]:

# stopwords en nltk
from nltk.corpus import stopwords
 
stop_words_nltk = set(stopwords.words('english'))

In [8]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.linalg import Vectors, SparseVector
from pyspark.ml.clustering import LDA, BisectingKMeans
from pyspark.sql.functions import monotonically_increasing_id
import re

In [9]:
rawdata = spark.read.load("file:///opt/datasets/airlines.csv", format="csv", header=True)
rawdata.printSchema()
rawdata[0]
rawdata = rawdata.fillna({'review': ''})                               # Replace nulls with blank string

# Add Unique ID
rawdata = rawdata.withColumn("uid", monotonically_increasing_id())     # Create Unique ID

# Generate YYYY-MM variable
rawdata = rawdata.withColumn("year_month", rawdata.date.substr(1,6))

# Show rawdata (as DataFrame)
rawdata.show(10)

# Print data types
for type in rawdata.dtypes:
    print(type)

target = rawdata.select(rawdata['rating'].cast(IntegerType()))
target.dtypes

root
 |-- id: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- date: string (nullable = true)
 |-- location: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- cabin: string (nullable = true)
 |-- value: string (nullable = true)
 |-- recommended: string (nullable = true)
 |-- review: string (nullable = true)

+-----+---------------+---------+--------+------+--------+-----+-----------+--------------------+---+----------+
|   id|        airline|     date|location|rating|   cabin|value|recommended|              review|uid|year_month|
+-----+---------------+---------+--------+------+--------+-----+-----------+--------------------+---+----------+
|10001|Delta Air Lines|21-Jun-14|Thailand|     7| Economy|    4|        YES|Flew Mar 30 NRT t...|  0|    21-Jun|
|10002|Delta Air Lines|19-Jun-14|     USA|     0| Economy|    2|         NO|Flight 2463 leavi...|  1|    19-Jun|
|10003|Delta Air Lines|18-Jun-14|     USA|     0| Economy|    1|         NO|Delta Websi

[('rating', 'int')]

In [10]:
################################################################################################
#
#   Text Pre-processing (consider using one or all of the following):
#       - Remove common words (with stoplist)
#       - Handle punctuation
#       - lowcase/upcase
#       - Stemming
#       - Part-of-Speech Tagging (nouns, verbs, adj, etc.)
#
################################################################################################

def cleanup_text(record):
    text  = record[8]
    uid   = record[9]
    words = text.split()
    
    # Default list of Stopwords
    stopwords_core = ['a', u'about', u'above', u'after', u'again', u'against', u'all', u'am', u'an', u'and', u'any', u'are', u'arent', u'as', u'at', 
    u'be', u'because', u'been', u'before', u'being', u'below', u'between', u'both', u'but', u'by', 
    u'can', 'cant', 'come', u'could', 'couldnt', 
    u'd', u'did', u'didn', u'do', u'does', u'doesnt', u'doing', u'dont', u'down', u'during', 
    u'each', 
    u'few', 'finally', u'for', u'from', u'further', 
    u'had', u'hadnt', u'has', u'hasnt', u'have', u'havent', u'having', u'he', u'her', u'here', u'hers', u'herself', u'him', u'himself', u'his', u'how', 
    u'i', u'if', u'in', u'into', u'is', u'isnt', u'it', u'its', u'itself', 
    u'just', 
    u'll', 
    u'm', u'me', u'might', u'more', u'most', u'must', u'my', u'myself', 
    u'no', u'nor', u'not', u'now', 
    u'o', u'of', u'off', u'on', u'once', u'only', u'or', u'other', u'our', u'ours', u'ourselves', u'out', u'over', u'own', 
    u'r', u're', 
    u's', 'said', u'same', u'she', u'should', u'shouldnt', u'so', u'some', u'such', 
    u't', u'than', u'that', 'thats', u'the', u'their', u'theirs', u'them', u'themselves', u'then', u'there', u'these', u'they', u'this', u'those', u'through', u'to', u'too', 
    u'under', u'until', u'up', 
    u'very', 
    u'was', u'wasnt', u'we', u'were', u'werent', u'what', u'when', u'where', u'which', u'while', u'who', u'whom', u'why', u'will', u'with', u'wont', u'would', 
    u'y', u'you', u'your', u'yours', u'yourself', u'yourselves']
    
    # Custom List of Stopwords - Add your own here
    stopwords_custom = ['']
    stopwords = stopwords_core + stopwords_custom
    stopwords = [word.lower() for word in stopwords]    
    
    text_out = [re.sub('[^a-zA-Z0-9]','',word) for word in words]                                       # Remove special characters
    text_out = [word.lower() for word in text_out if len(word)>2 and word.lower() not in stopwords]     # Remove stopwords and words under X length
    return text_out

udf_cleantext = udf(cleanup_text , ArrayType(StringType()))
clean_text = rawdata.withColumn("words", udf_cleantext(struct([rawdata[x] for x in rawdata.columns])))

#tokenizer = Tokenizer(inputCol="description", outputCol="words")
#wordsData = tokenizer.transform(text)

In [11]:
# Term Frequency Vectorization  - Option 1 (Using hashingTF): 
#hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
#featurizedData = hashingTF.transform(clean_text)

# Term Frequency Vectorization  - Option 2 (CountVectorizer)    : 
cv = CountVectorizer(inputCol="words", outputCol="rawFeatures", vocabSize = 1000)
cvmodel = cv.fit(clean_text)
featurizedData = cvmodel.transform(clean_text)

vocab = cvmodel.vocabulary
vocab_broadcast = sc.broadcast(vocab)

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

In [13]:
# Generate 25 Data-Driven Topics:
lda = LDA(k=25, seed=123, optimizer="em", featuresCol="features")

ldamodel = lda.fit(rescaledData)

#model.isDistributed()
#model.vocabSize()

ldatopics = ldamodel.describeTopics()
#ldatopics.show(25)

def map_termID_to_Word(termIndices):
    words = []
    for termID in termIndices:
        words.append(vocab_broadcast.value[termID])
    
    return words

udf_map_termID_to_Word = udf(map_termID_to_Word , ArrayType(StringType()))
ldatopics_mapped = ldatopics.withColumn("topic_desc", udf_map_termID_to_Word(ldatopics.termIndices))
ldatopics_mapped.select(ldatopics_mapped.topic, ldatopics_mapped.topic_desc).show(50,False)

+-----+---------------------------------------------------------------------------------------------+
|topic|topic_desc                                                                                   |
+-----+---------------------------------------------------------------------------------------------+
|0    |[extra, row, main, segments, legroom, exit, cabin, pay, seat, seats]                         |
|1    |[vegas, las, upgrade, economy, upgraded, seat, online, internal, dec, seats]                 |
|2    |[oakland, nearly, phoenix, less, united, york, couple, lax, time, flights]                   |
|3    |[san, phoenix, day, missed, connection, told, desk, connecting, diego, leaving]              |
|4    |[son, supervisor, boarding, different, told, seattle, agent, southwest, got, check]          |
|5    |[clt, wife, airways, class, miles, lga, club, coach, first, seats]                           |
|6    |[phl, louisville, philadelphia, washington, laguardia, york, set, airways, 

In [14]:
ldaResults = ldamodel.transform(rescaledData)

ldaResults.select('id','airline','date','cabin','rating','words','features','topicDistribution').show()

+-----+---------------+---------+--------+------+--------------------+--------------------+--------------------+
|   id|        airline|     date|   cabin|rating|               words|            features|   topicDistribution|
+-----+---------------+---------+--------+------+--------------------+--------------------+--------------------+
|10001|Delta Air Lines|21-Jun-14| Economy|     7|[flew, mar, nrt, ...|(1000,[0,3,11,25,...|[0.03326390899231...|
|10002|Delta Air Lines|19-Jun-14| Economy|     0|[flight, 2463, le...|(1000,[0,1,5,8,15...|[0.03232534061717...|
|10003|Delta Air Lines|18-Jun-14| Economy|     0|[delta, website, ...|(1000,[0,3,4,8,16...|[0.02085699457876...|
|10004|Delta Air Lines|17-Jun-14|Business|     9|[returned, roundt...|(1000,[0,1,2,3,8,...|[0.04526958235729...|
|10005|Delta Air Lines|17-Jun-14| Economy|     7|[roundtrip, fligh...|(1000,[0,4,8,10,1...|[0.02298085578274...|
|10006|Delta Air Lines|17-Jun-14|Business|     9|[narita, bangkok,...|(1000,[0,2,3,9,11...|[0.06

In [15]:
def breakout_array(index_number, record):
    vectorlist = record.tolist()
    return vectorlist[index_number]

udf_breakout_array = udf(breakout_array, FloatType())

# Extract document weights for Topics 12 and 20
enrichedData = ldaResults                                                                   \
        .withColumn("Topic_12", udf_breakout_array(lit(12), ldaResults.topicDistribution))  \
        .withColumn("topic_20", udf_breakout_array(lit(20), ldaResults.topicDistribution))            

enrichedData.select('id','airline','date','cabin','rating','words','features','topicDistribution','Topic_12','Topic_20').show()

#enrichedData.agg(max("Topic_12")).show()

+-----+---------------+---------+--------+------+--------------------+--------------------+--------------------+-----------+-----------+
|   id|        airline|     date|   cabin|rating|               words|            features|   topicDistribution|   Topic_12|   Topic_20|
+-----+---------------+---------+--------+------+--------------------+--------------------+--------------------+-----------+-----------+
|10001|Delta Air Lines|21-Jun-14| Economy|     7|[flew, mar, nrt, ...|(1000,[0,3,11,25,...|[0.03326390899231...| 0.02525354| 0.06193166|
|10002|Delta Air Lines|19-Jun-14| Economy|     0|[flight, 2463, le...|(1000,[0,1,5,8,15...|[0.03232534061717...| 0.08261078|0.020200083|
|10003|Delta Air Lines|18-Jun-14| Economy|     0|[delta, website, ...|(1000,[0,3,4,8,16...|[0.02085699457876...|0.040910717|0.017439311|
|10004|Delta Air Lines|17-Jun-14|Business|     9|[returned, roundt...|(1000,[0,1,2,3,8,...|[0.04526958235729...|0.017391128| 0.12361219|
|10005|Delta Air Lines|17-Jun-14| Economy