In [None]:
# ejemplo tomado de: 
# https://community.hortonworks.com/articles/84781/spark-text-analytics-uncovering-data-driven-topics.html
# github: https://github.com/zaratsian/Spark/blob/master/text_analytics_datadriven_topics.json (con zeppelin)
# otros ejemplos muy buenos: https://github.com/zaratsian/Spark

In [1]:
#configuración en google colab de spark y pyspark
from google.colab import drive
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [2]:
!pip install pyspark



In [4]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local', "app-topic-detection") 
spark = SparkSession(sc)

In [3]:
#from pyspark.sql.types import *
from pyspark.sql.functions import monotonically_increasing_id, col, expr, when, concat, lit, isnan
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import GeneralizedLinearRegression
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression
from pyspark.ml.feature import VectorIndexer, VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, RegressionEvaluator, BinaryClassificationEvaluator
from pyspark.ml import Pipeline
import pyspark

In [None]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer
from pyspark.sql.types import StringType,DoubleType,IntegerType,ArrayType
#from pyspark.sql.functions import *
from pyspark.ml.linalg import Vectors, SparseVector
from pyspark.ml.clustering import LDA, BisectingKMeans
from pyspark.sql.functions import monotonically_increasing_id
import re

In [None]:
import nltk
import pandas as pd
import numpy as np
import re
import codecs
import matplotlib.pyplot as plt

In [None]:
nltk.download('punkt')
nltk.download('stopwords')

In [None]:
# stopwords en nltk
from nltk.corpus import stopwords
stop_words_nltk = set(stopwords.words('english'))

In [22]:
df=spark.read.csv("gdrive/My Drive/datasets/airlines.csv", inferSchema=True, header=True)
df = df.fillna({'review': ''})                               # Replace nulls with blank string

# Add Unique ID
df = df.withColumn("uid", monotonically_increasing_id())     # Create Unique ID

# Generate YYYY-MM variable
df = df.withColumn("year_month", df.date.substr(1,6))

# Show rawdata (as DataFrame)
df.show(10)

+-----+---------------+---------+--------+------+--------+-----+-----------+--------------------+---+----------+
|   id|        airline|     date|location|rating|   cabin|value|recommended|              review|uid|year_month|
+-----+---------------+---------+--------+------+--------+-----+-----------+--------------------+---+----------+
|10001|Delta Air Lines|21-Jun-14|Thailand|     7| Economy|    4|        YES|Flew Mar 30 NRT t...|  0|    21-Jun|
|10002|Delta Air Lines|19-Jun-14|     USA|     0| Economy|    2|         NO|Flight 2463 leavi...|  1|    19-Jun|
|10003|Delta Air Lines|18-Jun-14|     USA|     0| Economy|    1|         NO|Delta Website fro...|  2|    18-Jun|
|10004|Delta Air Lines|17-Jun-14|     USA|     9|Business|    4|        YES|"I just returned ...|  3|    17-Jun|
|10005|Delta Air Lines|17-Jun-14| Ecuador|     7| Economy|    3|        YES|"Round-trip fligh...|  4|    17-Jun|
|10006|Delta Air Lines|17-Jun-14|     USA|     9|Business|    5|        YES|Narita - Bangkok ...

In [7]:
df.createOrReplaceTempView("train_df")
sqlDF = spark.sql("SELECT * FROM train_df where cabin='Economy'")
sqlDF.show()

+-----+---------------+---------+---------+------+-------+-----+-----------+--------------------+
|   id|        airline|     date| location|rating|  cabin|value|recommended|              review|
+-----+---------------+---------+---------+------+-------+-----+-----------+--------------------+
|10001|Delta Air Lines|21-Jun-14| Thailand|     7|Economy|    4|        YES|Flew Mar 30 NRT t...|
|10002|Delta Air Lines|19-Jun-14|      USA|     0|Economy|    2|         NO|Flight 2463 leavi...|
|10003|Delta Air Lines|18-Jun-14|      USA|     0|Economy|    1|         NO|Delta Website fro...|
|10005|Delta Air Lines|17-Jun-14|  Ecuador|     7|Economy|    3|        YES|"Round-trip fligh...|
|10007|Delta Air Lines|14-Jun-14|       UK|     0|Economy|    1|         NO|Flight from NY La...|
|10008|Delta Air Lines|14-Jun-14|      USA|     0|Economy|    1|         NO|Originally I had ...|
|10010|Delta Air Lines|13-Jun-14|       UK|     9|Economy|    3|        YES|"I flew from Heat...|
|10011|Delta Air Lin

In [24]:
df = df.select('id','review')
df.show(10)

+-----+--------------------+
|   id|              review|
+-----+--------------------+
|10001|Flew Mar 30 NRT t...|
|10002|Flight 2463 leavi...|
|10003|Delta Website fro...|
|10004|"I just returned ...|
|10005|"Round-trip fligh...|
|10006|Narita - Bangkok ...|
|10007|Flight from NY La...|
|10008|Originally I had ...|
|10009|We flew paid busi...|
|10010|"I flew from Heat...|
+-----+--------------------+
only showing top 10 rows



[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [25]:
################################################################################################
#
#   Text Pre-processing (consider using one or all of the following):
#       - Remove common words (with stoplist)
#       - Handle punctuation
#       - lowcase/upcase
#       - Stemming
#       - Part-of-Speech Tagging (nouns, verbs, adj, etc.)
#
################################################################################################
from pyspark.sql.functions import udf,struct
#from pyspark.sql.types import StructType
def textprep(record):
    text  = record[1]
    uid   = record[0]
    tokens = text.split()
       
    # Custom List of Stopwords - Add your own here
    tokens = [re.sub('[^a-zA-Z0-9]','',word) for word in tokens]                                       # Remove special characters
    tokens = [word.lower() for word in tokens if len(word)>2 and word.lower() not in stop_words_nltk]     # Remove stopwords and words under X length
    return tokens

udf_textprep = udf(textprep , ArrayType(StringType()))
df = df.withColumn("words", udf_textprep(struct([df[x] for x in df.columns])))

#tokenizer = Tokenizer(inputCol="description", outputCol="words")
#wordsData = tokenizer.transform(text)
df.show(5)

+-----+--------------------+--------------------+
|   id|              review|               words|
+-----+--------------------+--------------------+
|10001|Flew Mar 30 NRT t...|[flew, mar, nrt, ...|
|10002|Flight 2463 leavi...|[flight, 2463, le...|
|10003|Delta Website fro...|[delta, website, ...|
|10004|"I just returned ...|[returned, roundt...|
|10005|"Round-trip fligh...|[roundtrip, fligh...|
+-----+--------------------+--------------------+
only showing top 5 rows



In [26]:
# Term Frequency Vectorization  - Option 1 (Using hashingTF): 
#hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
#featurizedData = hashingTF.transform(clean_text)

# Term Frequency Vectorization  - Option 2 (CountVectorizer)    : 
#cv = CountVectorizer(inputCol="words", outputCol="rawFeatures", vocabSize = 1000)
cv = CountVectorizer(inputCol="words", outputCol="rawFeatures")
cvmodel = cv.fit(df)
featurizedData = cvmodel.transform(df)

vocab = cvmodel.vocabulary
vocab_broadcast = sc.broadcast(vocab)

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
df = idfModel.transform(featurizedData)
df.show(5)

+-----+--------------------+--------------------+--------------------+--------------------+
|   id|              review|               words|         rawFeatures|            features|
+-----+--------------------+--------------------+--------------------+--------------------+
|10001|Flew Mar 30 NRT t...|[flew, mar, nrt, ...|(7306,[0,3,13,26,...|(7306,[0,3,13,26,...|
|10002|Flight 2463 leavi...|[flight, 2463, le...|(7306,[0,1,6,9,16...|(7306,[0,1,6,9,16...|
|10003|Delta Website fro...|[delta, website, ...|(7306,[0,3,4,9,17...|(7306,[0,3,4,9,17...|
|10004|"I just returned ...|[returned, roundt...|(7306,[0,1,2,3,5,...|(7306,[0,1,2,3,5,...|
|10005|"Round-trip fligh...|[roundtrip, fligh...|(7306,[0,4,9,11,1...|(7306,[0,4,9,11,1...|
+-----+--------------------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [27]:
# Generate 25 Data-Driven Topics:
lda = LDA(k=25, seed=123, optimizer="em", featuresCol="features")

ldamodel = lda.fit(df)

#model.isDistributed()
#model.vocabSize()

ldatopics = ldamodel.describeTopics()
#ldatopics.show(25)

def map_termID_to_Word(termIndices):
    words = []
    for termID in termIndices:
        words.append(vocab_broadcast.value[termID])
    
    return words

udf_map_termID_to_Word = udf(map_termID_to_Word , ArrayType(StringType()))
ldatopics_mapped = ldatopics.withColumn("topic_desc", udf_map_termID_to_Word(ldatopics.termIndices))
ldatopics_mapped.select(ldatopics_mapped.topic, ldatopics_mapped.topic_desc).show(50,False)

+-----+-------------------------------------------------------------------------------------------+
|topic|topic_desc                                                                                 |
+-----+-------------------------------------------------------------------------------------------+
|0    |[today, vegas, las, iad, delta, first, text, received, united, telling]                    |
|1    |[nov, 2013, los, angeles, customers, tokyo, far, storage, service, dont]                   |
|2    |[sandwich, service, served, united, salad, april, lack, broken, wheelchair, class]         |
|3    |[hrs, philadelphia, chicago, year, philly, toronto, status, connection, check, luggage]    |
|4    |[economy, international, extra, much, food, manchester, plenty, meals, bkk, crew]          |
|5    |[crew, paris, one, asked, water, dallas, passengers, newark, first, milk]                  |
|6    |[terrible, march, seats, thing, uncomfortable, lost, entertainment, charge, flights, cana] |


In [29]:
ldaResults = ldamodel.transform(df)

ldaResults.select('words','features','topicDistribution').show(5)

+--------------------+--------------------+--------------------+
|               words|            features|   topicDistribution|
+--------------------+--------------------+--------------------+
|[flew, mar, nrt, ...|(7306,[0,3,13,26,...|[0.02137071958196...|
|[flight, 2463, le...|(7306,[0,1,6,9,16...|[0.01450591091449...|
|[delta, website, ...|(7306,[0,3,4,9,17...|[0.01751801012174...|
|[returned, roundt...|(7306,[0,1,2,3,5,...|[0.01077578417625...|
|[roundtrip, fligh...|(7306,[0,4,9,11,1...|[0.00889724273733...|
+--------------------+--------------------+--------------------+
only showing top 5 rows



In [30]:
ldaResults.columns

['id', 'review', 'words', 'rawFeatures', 'features', 'topicDistribution']

In [31]:
def maintop(record):
    vectorlist = record.tolist()
    m = max(vectorlist)
    maintops = [i for i, j in enumerate(vectorlist) if j == m] 
    return maintops

def sorttop(record):
    vectorlist = record.tolist()
    unsorted = [(i,j) for i,j in enumerate(vectorlist)]
    maintops = [i for i,j in sorted(unsorted, key=lambda tup: -tup[1])]
    return maintops[:5]

def maintop2(record):
    return record.tolist()

udf_maintop = udf(maintop, ArrayType(DoubleType()))
udf_maintop2 = udf(maintop2, ArrayType(DoubleType()))
udf_maintop3 = udf(sorttop, ArrayType(IntegerType()))

# Extract document weights for Topics 0 and 20
enrichedData = ldaResults.withColumn("MainTopics", udf_maintop3(ldaResults.topicDistribution))
enrichedData = enrichedData.withColumn("MainTopic", enrichedData.MainTopics[0])

enrichedData.select('MainTopic','MainTopics').show(5,False)

enrichedData.groupBy('MainTopic').count().sort('count', ascending=False).show()

#enrichedData.agg(max("Topic_12")).show()

+---------+--------------------+
|MainTopic|MainTopics          |
+---------+--------------------+
|4        |[4, 15, 24, 14, 17] |
|13       |[13, 20, 22, 18, 23]|
|17       |[17, 10, 19, 9, 14] |
|19       |[19, 14, 4, 8, 2]   |
|7        |[7, 2, 12, 13, 9]   |
+---------+--------------------+
only showing top 5 rows

+---------+-----+
|MainTopic|count|
+---------+-----+
|       22|   61|
|       19|   47|
|       11|   45|
|        4|   44|
|        8|   44|
|        3|   43|
|       13|   43|
|       15|   42|
|        2|   42|
|       12|   41|
|       16|   39|
|       17|   39|
|        1|   39|
|       14|   38|
|       23|   38|
|       10|   38|
|       24|   38|
|        6|   37|
|        7|   37|
|        9|   35|
+---------+-----+
only showing top 20 rows



In [32]:
enrichedData.show()

+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---------+
|   id|              review|               words|         rawFeatures|            features|   topicDistribution|          MainTopics|MainTopic|
+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---------+
|10001|Flew Mar 30 NRT t...|[flew, mar, nrt, ...|(7306,[0,3,13,26,...|(7306,[0,3,13,26,...|[0.02137071958196...| [4, 15, 24, 14, 17]|        4|
|10002|Flight 2463 leavi...|[flight, 2463, le...|(7306,[0,1,6,9,16...|(7306,[0,1,6,9,16...|[0.01450591091449...|[13, 20, 22, 18, 23]|       13|
|10003|Delta Website fro...|[delta, website, ...|(7306,[0,3,4,9,17...|(7306,[0,3,4,9,17...|[0.01751801012174...| [17, 10, 19, 9, 14]|       17|
|10004|"I just returned ...|[returned, roundt...|(7306,[0,1,2,3,5,...|(7306,[0,1,2,3,5,...|[0.01077578417625...|   [19, 14, 4, 8, 2]|   