In [1]:
# File location and type
file_location = "/FileStore/tables/Try_data.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "False"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

_c0,_c1,_c2
Product,Consumer complaint narrative,Clean Text
Debt collection,transworld systems inc.,
is trying to collect a debt that is not mine,"not owed and is inaccurate.""",transworld system inc. trying collect debt mine owed inaccurate
"Credit reporting, credit repair services, or other personal consumer reports","I would like to request the suppression of the following items from my credit report, which are the result of my falling victim to identity theft. This information does not relate to [ transactions that I have made/accounts that I have opened ], as the attached supporting documentation can attest. As such, it should be blocked from appearing on my credit report pursuant to section 605B of the Fair Credit Reporting Act.",would like request suppression following item credit report result falling victim identity theft information relate transaction made/accounts opened attached supporting documentation attest blocked appearing credit report pursuant section 605b fair credit reporting act
Debt collection,"Over the past 2 weeks, I have been receiving excessive amounts of telephone calls from the company listed in this complaint. The calls occur between XXXX XXXX and XXXX XXXX to my cell and at my job. The company does not have the right to harass me at work and I want this to stop. It is extremely distracting to be told 5 times a day that I have a call from this collection agency while at work.",past 2 week receiving excessive amount telephone call company listed complaint call occur unknown unknown cell job company right harass work want stop extremely distracting told 5 time day call collection agency work
"Credit reporting, credit repair services, or other personal consumer reports",someone used my personal information to get medical treatment that i did not authorize.i have filed a report i have tried to dispute these 2 accounts and nothing gets resolved.,someone used personal information get medical treatment authorize.i filed report tried dispute 2 account nothing get resolved
"Money transfer, virtual currency, or money service","I was sold access to an event digitally, of which I have all the screenshots to detail the transactions, transferred the money and was provided with only a fake of a ticket. I have reported this to paypal and it was for the amount of {$21.00} including a {$1.00} fee from paypal.",
This occured on XX/XX/2019,"by paypal user who gave two accounts : 1 ) XXXX 2 ) XXXX XXXX""",sold access event digitally screenshots detail transaction transferred money provided fake ticket reported paypal amount 21.00 including 1.00 fee paypal occured 2019 paypal user gave two account 1 unknown 2 unknown
Debt collection,"While checking my credit report I noticed three collections by a company called ARS that i was unfamiliar with. I disputed these collections with XXXX, and XXXX and they both replied that they contacted the creditor and the creditor verified the debt so I asked for proof which both bureaus replied that they are not required to prove anything. I then mailed a certified letter to ARS requesting proof of the debts n the form of an original aggrement, or a proof of a right to the debt, or even so much as the process as to how the bill was calculated, to which I was simply replied a letter for each collection claim that listed my name an account number and an amount with no other information to verify the debts after I sent a clear notice to provide me evidence. Afterwards I recontacted both XXXX, and XXXX, to redispute on the premise that it is not my debt if evidence can not be drawn up, I feel as if I am being personally victimized by ARS on my credit report for debts that are not owed to them or any party for that matter, and I feel discouraged that the credit bureaus who control many aspects of my personal finances are so negligent about my information.",checking credit report noticed three collection company called ar unfamiliar disputed collection unknown unknown replied contacted creditor creditor verified debt asked proof bureau replied required prove anything mailed certified letter ar requesting proof debt n form original aggrement proof right debt even much process bill calculated simply replied letter collection claim listed name account number amount information verify debt sent clear notice provide evidence afterwards recontacted unknown unknown redispute premise debt evidence drawn feel personally victimized ar credit report debt owed party matter feel discouraged credit bureau control many aspect personal finance negligent information
Credit card or prepaid card,On XX/XX/2018 I made a {$87.00} purchase with a Suntrust rewards credit card that was eligible for an activated Suntrust deal for 20 % cash back on a purchase from XXXX XXXX.,


In [2]:
df.describe().show()

In [3]:
from pyspark.sql.functions import col, lower, regexp_replace, split
import pyspark.sql.functions as f

def clean_text(c):
  #print(c)
  c = lower(c)
  c = regexp_replace(c, "[^a-zA-Z0-9\\s]", "")
  c = regexp_replace(c, "xxxx", "")
  
  c = split(c, "\\s+")
  #print(c)
  #c = [a for a in c if len(a)>2]
  #c = ' '.join(c)
  return c

clean_text_df = df.select(clean_text(col("_c1")).alias("text"))

clean_text_df.printSchema()
clean_data = clean_text_df.where(col("text").isNotNull())
clean_data.show(10)

In [4]:
clean_data = clean_data.withColumn("text", f.expr("filter(text, x -> not(length(x) < 3))"))

In [5]:
clean_data.show(10)

In [6]:
from pyspark.ml.feature import StopWordsRemover

# Define a list of stop words or use default list
remover = StopWordsRemover()
stopwords = remover.getStopWords() 

# Display default list
stopwords = stopwords+['xxxx','']
#stopwords

In [7]:
remover = StopWordsRemover(inputCol="text", outputCol="filtered")
words_df = remover.transform(clean_data)

In [8]:
t = []
for i in range(5):
  t.append(' '.join(words_df.select('filtered').collect()[i][0]))

In [9]:
from pyspark.sql.types import StringType, IntegerType
s = spark.createDataFrame(t, StringType())

In [10]:
from pyspark.sql.functions import split, explode

shakeWordsSplitDF = (s
                    .select(split(s.value, '\s+').alias('split')))
shakeWordsSingleDF = (shakeWordsSplitDF
                    .select(explode(shakeWordsSplitDF.split).alias('word')))
shakeWordsDF = shakeWordsSingleDF
shakeWordsDF.show()
shakeWordsDFCount = shakeWordsDF.count()
print (shakeWordsDFCount)

In [11]:
def wordCount(wordListDF):
    return (wordListDF.groupBy('word').count())

In [12]:
from pyspark.sql.functions import desc

WordsAndCountsDF = wordCount(shakeWordsDF)
topWordsAndCountsDF = WordsAndCountsDF.orderBy("count", ascending=0)

topWordsAndCountsDF.show()

In [13]:
clean_words = words_df.select('filtered').rdd.map(lambda row : row[0]).collect()

In [14]:
from pyspark.ml.feature import NGram

ngram = NGram(inputCol="text", outputCol="ngrams")
ngramDataFrame = ngram.transform(clean_data)
ngramDataFrame.show()

In [15]:
from pyspark.sql.functions import explode

bigram_count = ngramDataFrame.select(explode("ngrams").alias("ngrams")).groupBy("ngrams").count()

bigram_count.show()

In [16]:
#tokens = [['', 'not', 'owed', 'and', 'is', 'inaccurate','hello','yes','no'],['hello','yes','no','take']]

In [17]:
#tokens = clean_words

In [18]:
new_trigrams = []
#c = 0
for token in clean_words:
  c = 0
  while c < len(token) - 2:
    new_trigrams.append((token[c] + ' '+ token[c+1]+' '+ token[c+2]))
    c += 1

In [19]:
from pyspark.sql.types import ArrayType,StringType
tri_grams = spark.createDataFrame(new_trigrams, StringType())

tri_grams.show()

In [20]:
tri_grams_count = tri_grams.groupBy("value").count()

tri_grams_count.show()

In [21]:
#data = [['', 'not', 'owed','yes', 'and', 'is', 'hello','inaccurate'],['always','in','the'],['hello','yes','no','take']]

In [22]:
#data = 1

In [23]:
def computeReviewTFDict(review):
    #Counts the number of times the word appears in review
    reviewTFDict = {}
    for word in review:
        if word in reviewTFDict:
            reviewTFDict[word] += 1
        else:
            reviewTFDict[word] = 1
    #Computes tf for each word           
    for word in reviewTFDict:
        reviewTFDict[word] = reviewTFDict[word] / len(review)
    return reviewTFDict

In [24]:
tfDict = []
for i in range(len(clean_words)):
    tfDict.append(computeReviewTFDict(clean_words[i]))

In [25]:
def computeCountDict():
    
    countDict = {}
    # Run through each review's tf dictionary and increment countDict's (word, doc) pair
    for review in tfDict:
        for word in review:
            if word in countDict:
                countDict[word] += 1
            else:
                countDict[word] = 1
    return countDict

  #Stores the review count dictionary
countDict = computeCountDict()

In [26]:
import math

def computeIDFDict():
    
    idfDict = {}
    for word in countDict:
        idfDict[word] = math.log(len(clean_words) / countDict[word])
    return idfDict
  
  #Stores the idf dictionary
idfDict = computeIDFDict()

In [27]:
def computeReviewTFIDFDict(reviewTFDict):
    reviewTFIDFDict = {}
    #For each word in the review, we multiply its tf and its idf.
    for word in reviewTFDict:
        reviewTFIDFDict[word] = reviewTFDict[word] * idfDict[word]
    return reviewTFIDFDict

  #Stores the TF-IDF dictionaries
tfidfDict = [computeReviewTFIDFDict(review) for review in tfDict]

In [28]:
final_tfidf = {}


for i in range(len(tfidfDict)):
    keys = list(tfidfDict[i].keys())
    values = list(tfidfDict[i].values())
    for k in keys:
        if k not in final_tfidf:
            final_tfidf[k] = tfidfDict[i].get(k)
        elif k in final_tfidf:
            final_tfidf[k] = final_tfidf[k]+tfidfDict[i].get(k)

In [29]:
 # Create a list of unique words
wordDict = sorted(countDict.keys())

def computeTFIDFVector(review):
  tfidfVector = [0.0] * len(wordDict)
     
      # For each unique word, if it is in the review, store its TF-IDF value.
  for i, word in enumerate(wordDict):
    if word in review:
      tfidfVector[i] = review[word]
  return tfidfVector

tfidfVector = [computeTFIDFVector(review) for review in tfidfDict]

In [30]:
from pyspark.ml.linalg import Vectors,DenseVector

y = []
for i in range(len(tfidfVector)):
  y.append(DenseVector(ar =tfidfVector[i]))

In [31]:
y = y[:100]

In [32]:
rdd = spark.sparkContext.parallelize(y)

In [33]:
new_data = rdd.map(lambda x: (x, )).toDF(["features"])

In [34]:
from pyspark.ml.clustering import KMeans

kmeans = KMeans(k=2, seed=1)  # 2 clusters here
model = kmeans.fit(new_data.select('features'))

In [35]:
transformed = model.transform(new_data)
transformed.show()