In [None]:
# ejemplo tomado de: 
# https://community.hortonworks.com/articles/84781/spark-text-analytics-uncovering-data-driven-topics.html
# github: https://github.com/zaratsian/Spark/blob/master/text_analytics_datadriven_topics.json (con zeppelin)
# otros ejemplos muy buenos: https://github.com/zaratsian/Spark

In [None]:
#configuración en google colab de spark y pyspark
from google.colab import drive
drive.mount('/content/gdrive')

In [None]:
#configuración en google colab
#instalar java y spark
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.5.2/spark-3.5.2-bin-hadoop3.tgz
!tar xf spark-3.5.2-bin-hadoop3.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.2-bin-hadoop3"
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession

#forma 1 de crear la sesión y el contexto Spark:
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

#forma 2 de crear la sesión y el contexto Spark:
#sc = SparkContext.getOrCreate()
#spark=SparkSession.builder.appName('nlp').getOrCreate()

In [None]:
#from pyspark.sql.types import *
from pyspark.sql.functions import monotonically_increasing_id, col, expr, when, concat, lit, isnan
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import GeneralizedLinearRegression
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression
from pyspark.ml.feature import VectorIndexer, VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, RegressionEvaluator, BinaryClassificationEvaluator
from pyspark.ml import Pipeline
import pyspark

In [None]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer
from pyspark.sql.types import StringType,DoubleType,IntegerType,ArrayType
#from pyspark.sql.functions import *
from pyspark.ml.linalg import Vectors, SparseVector
from pyspark.ml.clustering import LDA, BisectingKMeans
from pyspark.sql.functions import monotonically_increasing_id
import re

In [None]:
import nltk
import pandas as pd
import numpy as np
import re
import codecs
import matplotlib.pyplot as plt

In [None]:
nltk.download('punkt')
nltk.download('stopwords')

In [None]:
# stopwords en nltk
from nltk.corpus import stopwords
stop_words_nltk = set(stopwords.words('english'))

In [None]:
df=spark.read.csv("gdrive/MyDrive/st1800-241/datasets/airlines.csv", inferSchema=True, header=True)
df = df.fillna({'review': ''})                               # Replace nulls with blank string

# Add Unique ID
df = df.withColumn("uid", monotonically_increasing_id())     # Create Unique ID

# Generate YYYY-MM variable
df = df.withColumn("year_month", df.date.substr(1,6))

# Show rawdata (as DataFrame)
df.show(10)

In [None]:
df.createOrReplaceTempView("train_df")
sqlDF = spark.sql("SELECT * FROM train_df where cabin='Economy'")
sqlDF.show()

In [None]:
df = df.select('id','review')
df.show(10)

In [None]:
################################################################################################
#
#   Text Pre-processing (consider using one or all of the following):
#       - Remove common words (with stoplist)
#       - Handle punctuation
#       - lowcase/upcase
#       - Stemming
#       - Part-of-Speech Tagging (nouns, verbs, adj, etc.)
#
################################################################################################
from pyspark.sql.functions import udf,struct
#from pyspark.sql.types import StructType
def textprep(record):
    text  = record[1]
    uid   = record[0]
    tokens = text.split()
       
    # Custom List of Stopwords - Add your own here
    tokens = [re.sub('[^a-zA-Z0-9]','',word) for word in tokens]                                       # Remove special characters
    tokens = [word.lower() for word in tokens if len(word)>2 and word.lower() not in stop_words_nltk]     # Remove stopwords and words under X length
    return tokens

udf_textprep = udf(textprep , ArrayType(StringType()))
df = df.withColumn("words", udf_textprep(struct([df[x] for x in df.columns])))

#tokenizer = Tokenizer(inputCol="description", outputCol="words")
#wordsData = tokenizer.transform(text)
df.show(5)

In [None]:
# Term Frequency Vectorization  - Option 1 (Using hashingTF): 
#hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
#featurizedData = hashingTF.transform(clean_text)

# Term Frequency Vectorization  - Option 2 (CountVectorizer)    : 
#cv = CountVectorizer(inputCol="words", outputCol="rawFeatures", vocabSize = 1000)
cv = CountVectorizer(inputCol="words", outputCol="rawFeatures")
cvmodel = cv.fit(df)
featurizedData = cvmodel.transform(df)

vocab = cvmodel.vocabulary
vocab_broadcast = sc.broadcast(vocab)

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
df = idfModel.transform(featurizedData)
df.show(5)

In [None]:
# Generate 25 Data-Driven Topics:
lda = LDA(k=25, seed=123, optimizer="em", featuresCol="features")

ldamodel = lda.fit(df)

#model.isDistributed()
#model.vocabSize()

ldatopics = ldamodel.describeTopics()
#ldatopics.show(25)

def map_termID_to_Word(termIndices):
    words = []
    for termID in termIndices:
        words.append(vocab_broadcast.value[termID])
    
    return words

udf_map_termID_to_Word = udf(map_termID_to_Word , ArrayType(StringType()))
ldatopics_mapped = ldatopics.withColumn("topic_desc", udf_map_termID_to_Word(ldatopics.termIndices))
ldatopics_mapped.select(ldatopics_mapped.topic, ldatopics_mapped.topic_desc).show(50,False)

In [None]:
ldaResults = ldamodel.transform(df)

ldaResults.select('words','features','topicDistribution').show(5)

In [None]:
ldaResults.columns

In [None]:
def maintop(record):
    vectorlist = record.tolist()
    m = max(vectorlist)
    maintops = [i for i, j in enumerate(vectorlist) if j == m] 
    return maintops

def sorttop(record):
    vectorlist = record.tolist()
    unsorted = [(i,j) for i,j in enumerate(vectorlist)]
    maintops = [i for i,j in sorted(unsorted, key=lambda tup: -tup[1])]
    return maintops[:5]

def maintop2(record):
    return record.tolist()

udf_maintop = udf(maintop, ArrayType(DoubleType()))
udf_maintop2 = udf(maintop2, ArrayType(DoubleType()))
udf_maintop3 = udf(sorttop, ArrayType(IntegerType()))

# Extract document weights for Topics 0 and 20
enrichedData = ldaResults.withColumn("MainTopics", udf_maintop3(ldaResults.topicDistribution))
enrichedData = enrichedData.withColumn("MainTopic", enrichedData.MainTopics[0])

enrichedData.select('MainTopic','MainTopics').show(5,False)

enrichedData.groupBy('MainTopic').count().sort('count', ascending=False).show()

#enrichedData.agg(max("Topic_12")).show()

In [None]:
enrichedData.show()