In [1]:
spark

In [2]:
!pip install rdflib

In [3]:
%fs head 'FileStore/tables/ChatbotDataset.csv'

In [4]:
from pyspark.sql.types import *

inputPath = "/FileStore/tables/"

# Since we know the data format already, let's define the schema to speed up processing (no need for Spark to infer schema)
csvSchema = StructType([StructField("description", StringType(), True) ])

# Static DataFrame representing data in the JSON files
staticInputDF = (
  spark
    .read
    .schema(csvSchema)
    .csv(inputPath)
)

display(staticInputDF)

In [5]:
from pyspark.sql.functions import *      # for window() function

staticCountsDF = (
  staticInputDF
    .groupBy(
       staticInputDF.description)
           
    .count()
)
staticCountsDF.cache()

# Register the DataFrame as table 'static_counts'
staticCountsDF.createOrReplaceTempView("static_counts")

In [6]:
%sql select description, sum(count) as total_count from static_counts group by description

In [7]:
from pyspark.sql.functions import *


# Similar to definition of staticInputDF above, just using `readStream` instead of `read`
streamingInputDF = (
  spark
    .readStream                       
    .schema(csvSchema)               # Set the schema of the JSON data
    .option("maxFilesPerTrigger", 1)  # Treat a sequence of files as a stream by picking one file at a time
    .csv(inputPath)
)

# Same query as staticInputDF
streamingCountsDF = (                 
  streamingInputDF
    .groupBy(
      streamingInputDF.description 
      )
    .count()
)

# Is this DF actually a streaming DF?
streamingCountsDF.isStreaming

In [8]:
%fs ls FileStore/tables


In [9]:
chatbotData = sqlContext.read.format("csv").load("/FileStore/tables/ChatbotDataset.csv")
display(chatbotData)

In [10]:
thuss = []

a = spark.read.csv("/FileStore/tables/ChatbotDataset.csv", header=True, mode="DROPMALFORMED")
thuss.append(a)


type(a)

In [11]:
f = open('/FileStore/tables/ChatbotDataset.csv')
csv_f = csv.reader(f)

In [12]:
spark.conf.set("spark.sql.shuffle.partitions", "2")  # keep the size of shuffles small

query = (
  streamingCountsDF
    .writeStream
    .format("memory")        # memory = store in-memory table (for testing only in Spark 2.0)
    .queryName("counts")     # counts = name of the in-memory table
    .outputMode("complete")  # complete = all the counts should be in the table
    .start()
)

In [13]:
!pip install nltk
!pip install --upgrade nltk
!pip install -U nltk[corenlp]

In [14]:
import nltk
from nltk.stem.lancaster import LancasterStemmer
import os
import json
import datetime
stemmer = LancasterStemmer()

In [15]:
%sh python -m nltk.downloader all

In [16]:
import sys
ssplitword = 0
ssplitNegativeWord = 0
training_data = []
b = []

chatBotDataset = sqlContext.read.format("csv").load("/FileStore/tables/ChatbotDataset.csv")
for row in chatBotDataset.rdd.collect():
 for r in row:
   b.append(r)
   for w in r.split():
     if w == 'right' or w == 'better' or w == 'awol' or w == 'like':
       ssplitword += 1
     elif w == 'sorry' or w == 'worry':
       ssplitNegativeWord += 1
       #print(w)
   if ssplitword > ssplitNegativeWord:
       training_data.append({"class":"Positive", "sentence":r})
   else:
       training_data.append({"class":"Negative", "sentence":r})
print("%s sentences in training data" % len(training_data))  
print(training_data)

In [17]:
words = []
classes = []
documents = []
ignore_words = ['?']
# loop through each sentence in our training data
for pattern in training_data:
   # tokenize each word in the sentence
   w = nltk.word_tokenize(pattern['sentence'])
   # add to our words list
   words.extend(w)
   # add to documents in our corpus
   documents.append((w, pattern['class']))
   # add to our classes list
   if pattern['class'] not in classes:
       classes.append(pattern['class'])

# stem and lower each word and remove duplicates
words = [stemmer.stem(w.lower()) for w in words if w not in ignore_words]
words = list(set(words))

# remove duplicates
classes = list(set(classes))

print (len(documents), "documents")
print (len(classes), "classes", classes)
print (len(words), "unique stemmed words", words)

In [18]:
# create our training data
training = []
output = []
# create an empty array for our output
output_empty = [0] * len(classes)

# training set, bag of words for each sentence
for doc in documents:
   # initialize our bag of words
   bag = []
   # list of tokenized words for the pattern
   pattern_words = doc[0]
   # stem each word
   pattern_words = [stemmer.stem(word.lower()) for word in pattern_words]
   # create our bag of words array
   for w in words:
       bag.append(1) if w in pattern_words else bag.append(0)

   training.append(bag)
   # output is a '0' for each tag and '1' for current tag
   output_row = list(output_empty)
   output_row[classes.index(doc[1])] = 1
   output.append(output_row)

print ("# words", len(words))
print ("# classes", len(classes))

In [19]:
# sample training/output
i = 0
w = documents[i][0]
print ([stemmer.stem(word.lower()) for word in w])
print (training[i])
print (output[i])

In [20]:
import numpy as np
import time

# compute sigmoid nonlinearity
def sigmoid(x):
   output = 1/(1+np.exp(-x))
   return output

# convert output of sigmoid function to its derivative
def sigmoid_output_to_derivative(output):
   return output*(1-output)

def clean_up_sentence(sentence):
   # tokenize the pattern
   sentence_words = nltk.word_tokenize(sentence)
   # stem each word
   sentence_words = [stemmer.stem(word.lower()) for word in sentence_words]
   return sentence_words

# return bag of words array: 0 or 1 for each word in the bag that exists in the sentence
def bow(sentence, words, show_details=False):
   # tokenize the pattern
   sentence_words = clean_up_sentence(sentence)
   # bag of words
   bag = [0]*len(words)  
   for s in sentence_words:
       for i,w in enumerate(words):
           if w == s: 
               bag[i] = 1
               if show_details:
                   print ("found in bag: %s" % w)

   return(np.array(bag))

def think(sentence, show_details=False):
   x = bow(sentence.lower(), words, show_details)
   if show_details:
       print ("sentence:", sentence, "\n bow:", x)
   # input layer is our bag of words
   l0 = x
   # matrix multiplication of input and hidden layer
   l1 = sigmoid(np.dot(l0, synapse_0))
   # output layer
   l2 = sigmoid(np.dot(l1, synapse_1))
   return l2

In [21]:
# ANN and Gradient Descent code from https://iamtrask.github.io//2015/07/27/python-network-part2/
def train(X, y, hidden_neurons=10, alpha=1, epochs=50000, dropout=False, dropout_percent=0.5):

   print ("Training with %s neurons, alpha:%s, dropout:%s %s" % (hidden_neurons, str(alpha), dropout, dropout_percent if dropout else '') )
   print ("Input matrix: %sx%s    Output matrix: %sx%s" % (len(X),len(X[0]),1, len(classes)) )
   np.random.seed(1)

   last_mean_error = 1
   # randomly initialize our weights with mean 0
   synapse_0 = 2*np.random.random((len(X[0]), hidden_neurons)) - 1
   synapse_1 = 2*np.random.random((hidden_neurons, len(classes))) - 1

   prev_synapse_0_weight_update = np.zeros_like(synapse_0)
   prev_synapse_1_weight_update = np.zeros_like(synapse_1)

   synapse_0_direction_count = np.zeros_like(synapse_0)
   synapse_1_direction_count = np.zeros_like(synapse_1)
       
   for j in iter(range(epochs+1)):

       # Feed forward through layers 0, 1, and 2
       layer_0 = X
       layer_1 = sigmoid(np.dot(layer_0, synapse_0))
               
       if(dropout):
           layer_1 *= np.random.binomial([np.ones((len(X),hidden_neurons))],1-dropout_percent)[0] * (1.0/(1-dropout_percent))

       layer_2 = sigmoid(np.dot(layer_1, synapse_1))

       # how much did we miss the target value?
       layer_2_error = y - layer_2

       if (j% 10000) == 0 and j > 5000:
           # if this 10k iteration's error is greater than the last iteration, break out
           if np.mean(np.abs(layer_2_error)) < last_mean_error:
               print ("delta after "+str(j)+" iterations:" + str(np.mean(np.abs(layer_2_error))) )
               last_mean_error = np.mean(np.abs(layer_2_error))
           else:
               print ("break:", np.mean(np.abs(layer_2_error)), ">", last_mean_error )
               break
               
       # in what direction is the target value?
       # were we really sure? if so, don't change too much.
       layer_2_delta = layer_2_error * sigmoid_output_to_derivative(layer_2)

       # how much did each l1 value contribute to the l2 error (according to the weights)?
       layer_1_error = layer_2_delta.dot(synapse_1.T)

       # in what direction is the target l1?
       # were we really sure? if so, don't change too much.

In [22]:
X = np.array(training)
y = np.array(output)

start_time = time.time()

train(X, y, hidden_neurons=20, alpha=0.1, epochs=100000, dropout=False, dropout_percent=0.2)

elapsed_time = time.time() - start_time
print ("processing time:", elapsed_time, "seconds")

In [23]:
# probability threshold
ERROR_THRESHOLD = 0.2
# load our calculated synapse values
synapse_file = 'synapses.json' 
with open(synapse_file) as data_file: 
   synapse = json.load(data_file) 
   synapse_0 = np.asarray(synapse['synapse0']) 
   synapse_1 = np.asarray(synapse['synapse1'])

def classify(sentence, show_details=False):
   results = think(sentence, show_details)

   results = [[i,r] for i,r in enumerate(results) if r>ERROR_THRESHOLD ] 
   results.sort(key=lambda x: x[1], reverse=True) 
   return_results =[[classes[r[0]],r[1]] for r in results]
   print ("%s \n classification: %s" % (sentence, return_results))
   return return_results

classify("like")
classify("sorry")
classify("better")
classify("good")
#classify("right")
print ()
classify("right", show_details=True)