In [0]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy import stats
import seaborn as sns

In [0]:
#!wget https://files.consumerfinance.gov/ccdb/complaints.csv.zip

In [0]:
#!unzip  "/content/complaints.csv.zip" 
display(dbutils.fs.ls("dbfs:/FileStore/tables"))


path,name,size
dbfs:/FileStore/tables/complaints.csv,complaints.csv,1233411049
dbfs:/FileStore/tables/complaints_csv.zip,complaints_csv.zip,304454584
dbfs:/FileStore/tables/exercise_pyspark_dataframe.ipynb,exercise_pyspark_dataframe.ipynb,30542
dbfs:/FileStore/tables/flight_model/,flight_model/,0
dbfs:/FileStore/tables/flight_weather.csv,flight_weather.csv,431664555


In [0]:
# Read dataset 
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, TimestampType
df = (sqlContext.read.format("csv").
  option("header", "true").
  option("nullValue", "NA").
  option("inferSchema", True).
  load("/FileStore/tables/complaints.csv"))

In [0]:
csv = spark.read.csv('/FileStore/tables/complaints.csv', inferSchema=True, header=True)
csv.show(10)

In [0]:
csv.count()

In [0]:
#show sample data
df.select(['Product','Complaint ID','Consumer complaint narrative']).show(20)
#data = csv.select("Product", "Complaint ID").where(col("Complaint ID").isNull())
#data.show(20)

#df2 = df.dropna(thresh=2,subset=('Product','Complaint ID'))
#df2.where(col("Complaint ID").isNull()).show(20)
#save to table1
sqlContext.registerDataFrameAsTable(df, "table1")

In [0]:
dfProduct = sqlContext.sql("SELECT `Complaint ID` as id,lower(Product) AS product, `Consumer complaint narrative` AS narrative FROM table1 ")
dfProduct = dfProduct.dropna()
#df2 = csv.na.drop()
#df.printSchema()


###import and download NLP related files

In [0]:
#!/bin/bash
!pip install nltk
!pip install --upgrade pip
!nltk.downloader all

In [0]:
import nltk
nltk.download('punkt')
nltk.download('twitter_samples')
nltk.download('averaged_perceptron_tagger')
nltk.download('maxent_ne_chunker')
nltk.download('words')
nltk.download('ieer')
nltk.download('stopwords')
#stopwords = set(STOPWORDS) 
stopwords = nltk.corpus.stopwords.words('english')


- Clearing text from punctuation (regexp_replace)
 - Tokenization (Tokenizer)
 - Delete stop words (StopWordsRemover)
 - Stematization (SnowballStemmer)
 - Filtering short words (udf)

### sample code

In [0]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

sentenceDataFrame = spark.createDataFrame([
    (0, "Hi I heard about Spark"),
    (1, "I wish Java could use case classes"),
    (2, "Logistic,regression,models,are,neat")
], ["id", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")

regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\\W")
# alternatively, pattern="\\w+", gaps(False)

countTokens = udf(lambda words: len(words), IntegerType())

tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("sentence", "words")\
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)

regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words") \
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)

In [0]:
#get 100 sample data and remove id is null. backwords execution
dfProduct100 = dfProduct.dropna().limit(100)
type(dfProduct100)

In [0]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

sentenceDataFrame = dfProduct100

regexTokenizer = RegexTokenizer(inputCol="product", outputCol="words", pattern="\\W")
countTokens = udf(lambda words: len(words), IntegerType())

regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("product", "words") \
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)


In [0]:
#regexTokenized = regexTokenized.withColumn('words', concat_ws(' ', 'words'))
#regexTokenized.show(11)

In [0]:
#convert list column 'words' to string
#from pyspark.sql.functions import col, concat_ws
#words_list = regexTokenized.limit(100).select('words').collect() 
#stringList = ' '.join([str(item[0]) for item in words_list ])
#stringList
#import pyspark.sql.functions.*
#dfTokenized.select(concat_ws(' ', split(dfTokenized.words)).alias('content')).collect()

In [0]:
#word cloud
!pip install wordcloud
from wordcloud import WordCloud 
from wordcloud import WordCloud, STOPWORDS 
import matplotlib.pyplot as plt 

In [0]:
#stopwords = set(STOPWORDS) 
comment_words = ''  
# iterate through the csv file 
#for val in regexTokenized.words: 
    # typecaste each val to string 
    # split the value 
    ##tokens = val.split() 
    # Converts each token into lowercase 
    ##for i in range(len(tokens)): 
    ##    tokens[i] = tokens[i].lower() 
      
    ##comment_words += " ".join(tokens)+" "
#    comment_words += " ".join(val)+" "
wordcloud = WordCloud(width = 800, height = 800, 
                background_color ='white', 
                min_font_size = 10).generate('credit reporting credit repair services or other personal consumer reports debt collection debt collection payday loan title loan or personal loan mortgage credit reporting credit repair services or other personal consumer reports credit reporting credit repair services or other personal consumer reports credit card or prepaid card debt collection credit reporting credit repair services or other personal consumer reports credit reporting credit repair services or other personal consumer reports credit reporting credit repair services or other personal consumer reports credit reporting credit repair services or other personal consumer reports credit reporting credit repair services or other personal consumer reports credit reporting credit repair services or other personal consumer reports credit reporting credit repair services or other personal consumer reports credit reporting credit repair services or other personal consumer reports credit card or prepaid card credit reporting credit repair services or other personal consumer reports vehicle loan or lease credit reporting credit repair services or other personal consumer reports credit reporting credit repair services or other personal consumer reports checking or savings account debt collection credit reporting credit repair services or other personal consumer reports checking or savings account credit reporting credit repair services or other personal consumer reports mortgage credit reporting credit repair services or other personal consumer reports credit reporting credit repair services or other personal consumer reports credit reporting credit repair services or other personal consumer reports credit reporting credit repair services or other personal consumer reports or not applying for credit recently credit card or prepaid card debt collection credit reporting credit repair services or other personal consumer reports credit reporting credit repair services or other personal consumer reports credit reporting credit repair services or other personal consumer reports credit reporting credit repair services or other personal consumer reports credit reporting credit repair services or other personal consumer reports credit reporting credit repair services or other personal consumer reports credit reporting credit repair services or other personal consumer reports mortgage credit reporting credit repair services or other personal consumer reports credit reporting credit repair services or other personal consumer reports credit reporting credit repair services or other personal consumer reports credit reporting credit repair services or other personal consumer reports credit reporting credit repair services or other personal consumer reports credit reporting credit repair services or other personal consumer reports credit reporting credit repair services or other personal consumer reports debt collection credit reporting credit repair services or other personal consumer reports credit reporting credit repair services or other personal consumer reports credit reporting credit repair services') 


In [0]:
  
# plot the WordCloud image                        
plt.figure(figsize = (8, 8), facecolor = None) 
plt.imshow(wordcloud) 
plt.axis("off") 
plt.tight_layout(pad = 0) 
plt.show() 

###NLP
-  DocumentAssembler(), one of the most essential transformers of the Spark NLP library. It’s the entry point to get your data in, and then process further with annotators. And, without linking its output to annotators in a pipeline, it has no meaning. In the following articles, we will talk about how you can apply certain NLP tasks on top of DocumentAssembler()

In [0]:
from sparknlp.base import *
documentAssembler = DocumentAssembler().setInputCol("product").setOutputCol("document").setCleanupMode("shrink")
doc_df = documentAssembler.transform(dfProduct100)
doc_df.show(10)

####flatten the document column

In [0]:
doc_df.select("document.result").take(1)
import pyspark.sql.functions as F
doc_df.withColumn("tmp",F.explode("document")).select("tmp.*").show(3)

In [0]:
wordcloud = WordCloud(width = 800, height = 800, 
                background_color ='white', 
                stopwords = stopwords, 
                min_font_size = 10).generate(stringList) 
  
# plot the WordCloud image                        
plt.figure(figsize = (8, 8), facecolor = None) 
plt.imshow(wordcloud) 
plt.axis("off") 
plt.tight_layout(pad = 0) 
plt.show() 

In [0]:
dfProduct.describe()