In [1]:
import numpy as np
import pandas as pd
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

# Data Inspection

## Data in customer_feedbacks/

In [2]:
sc = SparkContext('local')
spark = SparkSession(sc)

In [3]:
parquetFile = spark.read.parquet("../data/customer_feedbacks/part-00000-985ad763-a6d6-4ead-a6dd-c02279e9eipeba-c000.snappy.parquet")

AnalysisException: 'Path does not exist: file:/Users/YuhouZhou/Desktop/2019 Spring/Advanced Project 1/data/customer_feedbacks/part-00000-985ad763-a6d6-4ead-a6dd-c02279e9eeba-c000.snappy.parquet;'

In [4]:
parquetFile.printSchema()

NameError: name 'parquetFile' is not defined

In [5]:
parquetFile.createOrReplaceTempView("parquetFile")

In [6]:
len(parquetFile.columns)

101

In [7]:
spark.sql("SELECT * FROM parquetFile LIMIT 1").show()

+------------------+-------------------+-------+--------+-----+------+--------------+--------------------+-------------+--------------------+------------+--------+---------+-----------------+-------------+-------------+-------------+-------------------+----------------+----------------+-------------+----------+----------------+----------------+------------+-------+---------------+------------+--------+---+------------+--------+------------+----------+---------------+---------+----------------+-----------------+-----------------+----------------+----------------------------+----------------+---------------+------------------+--------------------+----------------------+----------------------+------------------+------------------------+------+--------+-------------+-----------+--------+----------------+--------------+----------------+-----------------+--------------------+--------------------+---------------------+-------------------------+-----------------+----------------------+--------

## Data in customer_feedbacks_cat/

In [8]:
parquetFile2 = spark.read.parquet("../data/customer_feedbacks_cat/part-00000-4820af87-4b19-4958-a7a6-7ed03b76f1b1-c000.snappy.parquet")

In [9]:
parquetFile2.createOrReplaceTempView("parquetFile2")
spark.sql("SELECT * FROM parquetFile2 LIMIT 5").show()

+--------------------+-----------+----------+--------------+--------------+-----------+--------------+
|         KATEGORIE_2|KATEGORIE_1|  STIMMUNG|KATEGORIE_2_ID|KATEGORIE_1_ID|STIMMUNG_ID|KATEGORIE_2_CC|
+--------------------+-----------+----------+--------------+--------------+-----------+--------------+
| Versand - Vorschlag|    Versand|Suggestion|            38|            10|          4|          1038|
|       Versand - Lob|    Versand|    Praise|            39|            10|          2|          1039|
|     Versand - Keine|    Versand|      None|            40|            10|          1|          1040|
|Versand - Beschwerde|    Versand| Complaint|            41|            10|          3|          1041|
|  Editor - Vorschlag|     Editor|Suggestion|            42|            11|          4|          1142|
+--------------------+-----------+----------+--------------+--------------+-----------+--------------+



# Generate dataframe

In [10]:
df=spark.sql("""
SELECT
    T0.KATEGORIE_2     AS CATEGORY_2,
    T0.KATEGORIE_1     AS CATEGORY_1,
    T0.STIMMUNG          AS SENTIMENT,
    T1.ERGEBNISSATZ_ID AS RESPONSE_ID,
    T1.DATUM_ID        AS DATE,
    T1.ANTWORT_WERT    AS TEXT
FROM
    parquetFile2 T0,
    parquetFile T1
WHERE
    T0.KATEGORIE_1_ID = T1.KATEGORIE_1_ID
    AND T0.KATEGORIE_2_ID = T1.KATEGORIE_2_ID
    AND T0.STIMMUNG_ID = T1.STIMMUNG_ID            
    AND (NOT T1.ANTWORT_WERT IS NULL
        AND (T1.UMFRAGE_KATEGORIE_ID = 1
            AND (T1.GRUPPE_ID = 170
                OR T1.GRUPPE_ID = 171)))
""")

In [11]:
df.show()

+--------------------+--------------------+---------+------------------+-------------------+--------------------+
|          CATEGORY_2|          CATEGORY_1|SENTIMENT|       RESPONSE_ID|               DATE|                TEXT|
+--------------------+--------------------+---------+------------------+-------------------+--------------------+
|Delivery & POS Fe...|Delivery & POS Fe...|     None|7492033.0000000000|2012-04-14 00:00:00|Habe bis heute me...|
|  Other - Beschwerde|               Other|Complaint|7492033.0000000000|2012-04-14 00:00:00|Habe bis heute me...|
|                 N/A|                 N/A|      N/A|7492033.0000000000|2012-04-14 00:00:00|  Bin stinke sauer! |
|HPS, IPS & App - ...|      HPS, IPS & App|     None|7509285.0000000000|2012-04-16 00:00:00|Teuer, aber gut. ...|
|Customer Service ...|Customer Service ...|   Praise|7509285.0000000000|2012-04-16 00:00:00|Teuer, aber gut. ...|
|Product Quality -...|     Product Quality|   Praise|7510696.0000000000|2012-04-16 00:00

In [130]:
df.head(8)

[Row(CATEGORY_2='Delivery & POS Feedback - Keine', CATEGORY_1='Delivery & POS Feedback', SENTIMENT='None', RESPONSE_ID=Decimal('7492033.0000000000'), DATE=datetime.datetime(2012, 4, 14, 0, 0), TEXT='Habe bis heute mein Fotobuch noch nicht obwohl ich es im März bestellt habe. War ein Geburtstagsgeschenk doch dieser ist mittlerweile auch vorbei. Keiner weiß wo mein Buch geblieben ist. Schade oder- Vertrauen dahin. Werde ich nie wieder über Müller machen! '),
 Row(CATEGORY_2='Other - Beschwerde', CATEGORY_1='Other', SENTIMENT='Complaint', RESPONSE_ID=Decimal('7492033.0000000000'), DATE=datetime.datetime(2012, 4, 14, 0, 0), TEXT='Habe bis heute mein Fotobuch noch nicht obwohl ich es im März bestellt habe. War ein Geburtstagsgeschenk doch dieser ist mittlerweile auch vorbei. Keiner weiß wo mein Buch geblieben ist. Schade oder- Vertrauen dahin. Werde ich nie wieder über Müller machen! '),
 Row(CATEGORY_2='N/A', CATEGORY_1='N/A', SENTIMENT='N/A', RESPONSE_ID=Decimal('7492033.0000000000'), DAT

In [54]:
df.printSchema()

root
 |-- CATEGORY_2: string (nullable = true)
 |-- CATEGORY_1: string (nullable = true)
 |-- SENTIMENT: string (nullable = true)
 |-- RESPONSE_ID: decimal(38,10) (nullable = true)
 |-- DATE: timestamp (nullable = true)
 |-- TEXT: string (nullable = true)



In [57]:
df.createOrReplaceTempView("df")

In [153]:
# df_text = spark.sql("SELECT TEXT FROM df DESC LIMIT 40000")
df_text = spark.sql("SELECT TEXT FROM df")

In [154]:
type(df_text)

pyspark.sql.dataframe.DataFrame

In [155]:
spark.catalog.dropTempView("df_text")

In [148]:
# spark.sql("SELECT COUNT(*) FROM df").show()

+--------+
|count(1)|
+--------+
| 1917490|
+--------+



# Data Preparation

**test examples**

In [156]:
%%time
df_pd = df_text.dropDuplicates().toPandas()

Wall time: 28.3 s


In [157]:
# df_pd.iloc[-1]['TEXT']

In [158]:
# %%time
# import spacy
# nlp = spacy.load('de')
# TEXT_tokenized=df_pd['TEXT'].iloc[:].apply(lambda x: nlp(x))

In [159]:
# TEXT_tokenized

In [160]:
# type(TEXT_tokenized[0][0])

In [161]:
# spacy.displacy.render(TEXT_tokenized[0], style='ent',jupyter=True)

In [162]:
# # Example of lemmatization
# review = str(" ".join([i.lemma_ for i in TEXT_tokenized[0]]))

In [163]:
# # All the words are switched to its basic form
# spacy.displacy.render(nlp(review), style='ent',jupyter=True)

In [164]:
import string
from spacy.lang.de.stop_words import STOP_WORDS
stopwords = list(STOP_WORDS)

def text_preproc(sentence):
    mytokens = nlp.tokenizer(sentence)
    mytokens = [ word.lemma_.lower().strip() for word in mytokens ]
    mytokens = [ word for word in mytokens if word not in stopwords and word not in string.punctuation ]
    mytokens = " ".join([i for i in mytokens])
    return mytokens

In [165]:
from tqdm import tqdm
tqdm.pandas()
df_pd['TEXT_processed'] = df_pd['TEXT'].progress_apply(text_preproc)

100%|████████████████████████████████████████████████████████████████████████| 896683/896683 [05:42<00:00, 2617.24it/s]


# Data Cleaning

# Data Modelling

In [166]:
%%time
# Creating a vectorizer
from sklearn.feature_extraction.text import CountVectorizer

punctuations = string.punctuation
vectorizer = CountVectorizer(min_df=5, max_df=0.9, stop_words=list(STOP_WORDS), lowercase=True)
data_vectorized = vectorizer.fit_transform(df_pd['TEXT_processed'])

Wall time: 8.84 s


In [167]:
NUM_TOPICS = 10

In [168]:
%%time
# Latent Dirichlet Allocation Model
from sklearn.decomposition import LatentDirichletAllocation
lda = LatentDirichletAllocation(n_components=NUM_TOPICS, max_iter=10, learning_method='online',verbose=True)
data_lda = lda.fit_transform(data_vectorized)

iteration: 1 of max_iter: 10
iteration: 2 of max_iter: 10
iteration: 3 of max_iter: 10
iteration: 4 of max_iter: 10
iteration: 5 of max_iter: 10
iteration: 6 of max_iter: 10
iteration: 7 of max_iter: 10
iteration: 8 of max_iter: 10
iteration: 9 of max_iter: 10
iteration: 10 of max_iter: 10
Wall time: 30min 16s


In [169]:
# Functions for printing keywords for each topic
def selected_topics(model, vectorizer, top_n=10):
    for idx, topic in enumerate(model.components_):
        print("Topic %d:" % (idx))
        print([(vectorizer.get_feature_names()[i], topic[i])
                        for i in topic.argsort()[:-top_n - 1:-1]])

In [170]:
# Keywords for topics clustered by Latent Dirichlet Allocation
print("LDA Model:")
selected_topics(lda, vectorizer)

LDA Model:
Topic 0:
[('bestellen', 70787.76438177995), ('zufrieden', 58603.27698749063), ('cewe', 35171.54138934868), ('fotobücher', 31791.87395929618), ('kalender', 31586.684809625793), ('fotobuch', 15753.712718934608), ('einig', 11983.268374016454), ('obwohl', 11976.881422844624), ('mal', 11853.821506745677), ('mehrer', 11008.105785155522)]
Topic 1:
[('einfach', 66685.97801213167), ('software', 33564.919930763244), ('schön', 31196.83920141773), ('gestalten', 24288.460765391264), ('ergebnis', 23526.54655627589), ('erstellen', 16230.44746471368), ('super', 15849.687351570607), ('klappen', 15722.347595441746), ('leistung', 10686.913573084868), ('zuverlässig', 9336.950548106519)]
Topic 2:
[('geben', 23649.451996156586), ('format', 22421.793754262344), ('finden', 21161.44003401107), ('foto', 16566.42597966091), ('möglichkeit', 15934.33975659593), ('top', 15762.815259784938), ('gerne', 14510.257808029002), ('lassen', 14142.081787870127), ('abzug', 10774.39638280361), ('teuer', 10612.570531

In [171]:
# Transforming an individual sentence
text = spacy_tokenizer("Habe bis heute mein Fotobuch noch nicht obwohl ich es im März bestellt habe. War ein Geburtstagsgeschenk doch dieser ist mittlerweile auch vorbei. Keiner weiß wo mein Buch geblieben ist. Schade oder- Vertrauen dahin. Werde ich nie wieder über Müller machen!")
x = lda.transform(vectorizer.transform([text]))[0]
print(x)

[0.46791514 0.0978956  0.00714552 0.00714286 0.00714286 0.00714286
 0.07377012 0.10327583 0.22142595 0.00714327]


In [172]:
# Transforming an individual sentence
text = spacy_tokenizer("Teuer, aber gut. Ich schätze die vielen Möglichkeiten der Software, nutze regelmäßig das Forum und bei Problemen hilft der Kundendienst gut weiter.")
x = lda.transform(vectorizer.transform([text]))[0]
print(x)

[0.09999998 0.0999979  0.27171696 0.00909091 0.1101003  0.00909397
 0.28181815 0.00909311 0.09999781 0.00909091]


In [173]:
# Transforming an individual sentence
t = df_pd.iloc[-1]['TEXT']
print(t)
text = spacy_tokenizer(t)
x = lda.transform(vectorizer.transform([text]))[0]
print(x)

Hätte dennoch gewusst wie es zu der Reklamation im ersten cewe fotobuch kam. Oder worauf war der Fehler zurück zu führen und ist es möglich, das dieser Fehler noch einmal auftritt. Bedanke mich für die schnelle Antwort. 
[0.00714423 0.00714286 0.00714286 0.22142837 0.07857146 0.00714286
 0.49815314 0.00714286 0.00714304 0.15898834]


In [174]:
# Transforming an individual sentence
t = df_pd.iloc[-12]['TEXT']
print(t)
text = spacy_tokenizer(t)
x = lda.transform(vectorizer.transform([text]))[0]
print(x)

Bisher besteht kein Bedarf das jemand zu erpfehlen. Das ist ein individuelles Geschehen und an die Geiz ist Geil Mentalität gebunden
[0.0125     0.0125     0.26249959 0.13749962 0.0125     0.0125
 0.13750013 0.13750015 0.26249857 0.01250194]


# Visualization

In [175]:
import pyLDAvis.sklearn
pyLDAvis.enable_notebook()
dash = pyLDAvis.sklearn.prepare(lda, data_vectorized, vectorizer, mds='tsne')
dash

of pandas will change to not sort by default.

To accept the future behavior, pass 'sort=False'.


  return pd.concat([default_term_info] + list(topic_dfs))


# Gensim LDA

In [200]:
import string
from spacy.lang.de.stop_words import STOP_WORDS
stopwords = list(STOP_WORDS)

def text_preproc2(sentence):
    mytokens = nlp.tokenizer(sentence)
    mytokens = [ word.lemma_.lower().strip() for word in mytokens ]
    mytokens = [ word for word in mytokens if word not in stopwords and word not in string.punctuation ]
    return mytokens

In [201]:
# text_preproc2('Habe ich kein Zeit! Warum. Stein')

['stein']

In [283]:
from tqdm import tqdm
tqdm.pandas()
example = df_pd['TEXT'].progress_apply(text_preproc2)

100%|████████████████████████████████████████████████████████████████████████| 896683/896683 [05:22<00:00, 2779.76it/s]


In [284]:
# from gensim.test.utils import common_texts
from gensim.corpora.dictionary import Dictionary
# Create a corpus from a list of texts
dictionary = Dictionary(example)
corpus = [dictionary.doc2bow(text) for text in example]

In [285]:
# print(dictionary)

In [286]:
from  gensim.models.ldamodel import LdaModel
lda2 = LdaModel(corpus, num_topics=10)

In [287]:
# print(type(lda2))
# print(corpus)
# print(type(dictionary))

In [288]:
import pyLDAvis.gensim
pyLDAvis.enable_notebook()
vis = pyLDAvis.gensim.prepare(lda2, corpus, dictionary=dictionary)
vis