## Quora NLP DeepLearning Project 
1. Prepare train and test data for network in local environment.
2. Implement the network above and tune it in local environment with part of the train data. Parameters to be tuned are: number of neurons in each layer, learning rates (including recurrent_dropout of LSTM layer), standard deviation of *GaussianNoise* layer, *batch_size*. 
3. Run the model on the cluster with complete data and generate submission file as follows:

*submission = pd.DataFrame({"test_id": test_id, "is_duplicate": prediction_prob})*  
*submission.to_csv("submission1.csv", index=False)*,

where *prediction_prob* is 1D array of prediction probabilities, *test_id* is index from *test_id* column of *test.csv* file.

For the first part of this project I will create the following set of features using PySpark:

1. `lWCount1` - word count in lemma of question 1
2. `lWCount2` - word count in lemma of question 2
3. `qWCount1` - word count in question 1
4. `qWCount2` - word count in question 2
5. `lLen1` - length of lemma 1
6. `lLen2` - length of lemma 2
5. `qLen1` - length of question 1
6. `qLen2` - length of question 2
7. `lWCount_ratio` - ratio of lemmas word counts
8. `qWCount_ratio` - ratio of questions word counts
9. `lLen_ratio` - ratio of lemmas lengths
10. `qLen_ratio` - ratio of questions lengths
11. `lNgrams_1` - unigrams of lemmas 
12. `lNgrams_2` - bigrams of lemmas
13. `lNgrams_3` - trigrams of lemmas
14. `qNgrams_1` - unigrams of questions
15. `qNgrams_2` - bigrams of questions
16. `qNgrams_3` - trigrams of questions
17. `qUnigram_ratio` - question unigram ratio
18. `lUnigram_ratio` - lemma unigram ratio
19. `tfidfDistance` - 

In [7]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
from pyspark.sql.functions import isnan, when, count, length, lit, udf, col, struct
from pyspark.sql.functions import length
from pyspark.ml.feature import IDF, Tokenizer, CountVectorizer
from nltk.corpus import wordnet
from nltk.stem import WordNetLemmatizer
import numpy as np
import nltk
import re

In [2]:
nltk.download("popular")
nltk.download("tagsets")

[nltk_data] Downloading collection 'popular'
[nltk_data]    | 
[nltk_data]    | Downloading package cmudict to
[nltk_data]    |     /home/wendygao16/nltk_data...
[nltk_data]    |   Package cmudict is already up-to-date!
[nltk_data]    | Downloading package gazetteers to
[nltk_data]    |     /home/wendygao16/nltk_data...
[nltk_data]    |   Package gazetteers is already up-to-date!
[nltk_data]    | Downloading package genesis to
[nltk_data]    |     /home/wendygao16/nltk_data...
[nltk_data]    |   Package genesis is already up-to-date!
[nltk_data]    | Downloading package gutenberg to
[nltk_data]    |     /home/wendygao16/nltk_data...
[nltk_data]    |   Package gutenberg is already up-to-date!
[nltk_data]    | Downloading package inaugural to
[nltk_data]    |     /home/wendygao16/nltk_data...
[nltk_data]    |   Package inaugural is already up-to-date!
[nltk_data]    | Downloading package movie_reviews to
[nltk_data]    |     /home/wendygao16/nltk_data...
[nltk_data]    |   Package movie_

True

In [5]:
spark = SparkSession.builder.getOrCreate()

In [3]:
trainFileName = "/user/wendygao16/fs_Quora/train.csv"
testFileName = "/user/wendygao16/fs_Quora/test.csv"
#outTrainFileName = "./wendygao16/notebooks/Quora_NLP/train_features.csv"
#outTestFileName = "./wendygao16/notebooks/Quora_NLP/test_features.csv"
#fitting_outPath = "./wendygao16/notebooks/Quora_NLP/predictions.csv"

In [4]:
sch = StructType([StructField('id',IntegerType()), \
                  StructField('qid1',IntegerType()),\
                  StructField('qid2',IntegerType()), \
                  StructField('question1',StringType()),\
                  StructField('question2',StringType()), \
                  StructField('is_duplicate',IntegerType())])

train = spark.read.csv(trainFileName, header=True, escape='"', 
                       quote='"',schema=sch, multiLine = True)
train = train.dropna()
train.cache()

test = spark.read.csv(testFileName, header=True, escape='"', \
                           encoding='utf8', multiLine = True)
test = test.dropna()
test.cache()

DataFrame[test_id: string, question1: string, question2: string]

In [5]:
train.show(10)

+---+----+----+--------------------+--------------------+------------+
| id|qid1|qid2|           question1|           question2|is_duplicate|
+---+----+----+--------------------+--------------------+------------+
|  0|   1|   2|What is the step ...|What is the step ...|           0|
|  1|   3|   4|What is the story...|What would happen...|           0|
|  2|   5|   6|How can I increas...|How can Internet ...|           0|
|  3|   7|   8|Why am I mentally...|Find the remainde...|           0|
|  4|   9|  10|Which one dissolv...|Which fish would ...|           0|
|  5|  11|  12|Astrology: I am a...|I'm a triple Capr...|           1|
|  6|  13|  14| Should I buy tiago?|What keeps childe...|           0|
|  7|  15|  16|How can I be a go...|What should I do ...|           1|
|  8|  17|  18|When do you use シ...|When do you use "...|           0|
|  9|  19|  20|Motorola (company...|How do I hack Mot...|           0|
+---+----+----+--------------------+--------------------+------------+
only s

In [7]:
test.show(10)

+-------+--------------------+--------------------+
|test_id|           question1|           question2|
+-------+--------------------+--------------------+
|      0|How does the Surf...|Why did Microsoft...|
|      1|Should I have a h...|How much cost doe...|
|      2|What but is the b...|What you send mon...|
|      3|Which food not em...|   What foods fibre?|
|      4|How "aberystwyth"...|How their can I s...|
|      5|How are the two w...|I admire I am con...|
|      6|How can I reduce ...|How can I reduce ...|
|      7|By scrapping the ...|How will the rece...|
|      8|What are the how ...|What are some of ...|
|      9|After 12th years ...|Can a 14 old guy ...|
+-------+--------------------+--------------------+
only showing top 10 rows



#### Data cleaning

In [None]:
train = train.drop('qid1', 'qid2')
maxTrainID = train.groupBy().max('id').collect()[0][0]
test_ = test.withColumn("id",(test.test_id+maxTrainID+1).cast("integer")).drop('test_id')
test_ = test.withColumn('is_duplicate', lit(-1))

In [None]:
data = train.union(test.select(train.columns))
print('Number of rows = %s' % data.count())
data.filter(data.id > -1).show(6)

#### Define UDFs

In [None]:
#Function lemmas_nltk(s)
wordnet_lemmatizer = WordNetLemmatizer()

stop_words = nltk.corpus.stopwords.words('english')
def lemmas_nltk(s):    
    #remove out stop_words
    words = [w for w in s.lower().split() if not w in stop_words]
    
    # Function *s.isalpha()* tests if *s* is non-empty and all characters in *s* are alphabetic.
    words = [w for w in words if w.isalpha()]
    
    # lemmatize to stem word
    words = [wordnet_lemmatizer.lemmatize(wordnet_lemmatizer.lemmatize(wordnet_lemmatizer.lemmatize(word,'a'),'v'),'n') for word in words]
    return " ".join(words)
#Create lemmas_nltk_udf
lemmas_nltk_udf = pyspark.sql.functions.udf(f=lemmas_nltk, returnType=StringType())

In [None]:
#Test
ss=train.select('question2').take(2)[1][0] 
print(ss, lemmas_nltk(ss))

In [None]:
# wordsCount_udf
def wordsCount(s):
    return len(nltk.word_tokenize(s))
    #return len(s.split())
wordsCount_udf = pyspark.sql.functions.udf(f=wordsCount, returnType=IntegerType())

In [None]:
# ratio_udf
############## divide by zero occures!!! ###########################
def ratio(x,y): return abs(x-y)/(x+y+1e-15)
ratio_udf = udf(ratio, DoubleType())

In [None]:
# commonNgrams(s1,s2,n), commonNgrams_udf
def commonNgrams(s1, s2, n):
    regex = re.compile('([^\s\w|_])+', flags=0)

    n = int(n)
    ss1 = s1.lower().split()
    ss2 = s2.lower().split()
    ss1 = [regex.sub('', str) for str in ss1]
    ss2 = [regex.sub('', str) for str in ss2]
    
    commonBigrams = (set(nltk.ngrams(ss1, n)) &set(nltk.ngrams(ss2, n))) 
    return len(commonBigrams)

commonNgrams_udf = udf(commonNgrams, IntegerType())

In [None]:
#unigram_ratio()
def unigram_ratio(ngrams, n1, n2):
    return ngrams/(1+max(n1, n2))

unigram_ratio_udf = udf(unigram_ratio, DoubleType())

### Create Features

In [None]:
featureNames = ['lWCount1', 'lWCount2',
                'qWCount1', 'qWCount2',
                'lLen1', 'lLen2',
                'qLen1', 'qLen2',
                'lWCount_ratio', 'qWCount_ratio',
                'lLen_ratio', 'qLen_ratio',
                'qNgrams_1', 'qNgrams_2', 'qNgrams_3', 
                'lNgrams_1', 'lNgrams_2', 'lNgrams_3', 
                'qUnigram_ratio', 'lUnigram_ratio', #'qDouegram_ratio','lDouegram_ratio']
                'tfidfDistance']

### Questions length, lemmas length

In [None]:
#Lemmas
data = data.withColumn('lemma1', lemmas_nltk_udf("question1"))
data = data.withColumn('lemma2', lemmas_nltk_udf("question2"))

#data.select('id','lemma1','lemma2').show(6)

In [None]:
# Features: lWCount, qWCount, lLen, qLen
for i in ["1","2"]:
    data = data.withColumn('lWCount'+i, wordsCount_udf(data['lemma'+i]))
    data = data.withColumn('qWCount'+i, wordsCount_udf(data['question'+i]))
    data = data.withColumn('lLen'+i, length(data['lemma'+i]))
    data = data.withColumn('qLen'+i, length(data['question'+i]))

#data.select('lWCount1','lWCount2','qWCount1','qWCount2','lLen1','lLen2','qLen1','qLen2').show(6)

##### Lengths ratios

In [None]:
#Lengths ratios
data = data.withColumn('lWCount_ratio', ratio_udf("lWCount1", "lWCount2"))
data = data.withColumn('qWCount_ratio', ratio_udf("qWCount1","qWCount2"))
data = data.withColumn('lLen_ratio', ratio_udf("lLen1", "lLen2"))
data = data.withColumn('qLen_ratio', ratio_udf("qLen1", "qLen2"))

#data.select('lWCount_ratio','qWCount_ratio','lLen_ratio','qLen_ratio').show(6)

#### N-gram

In [None]:
#N-grams and n-gram ratios
#commonNgrams_udf
for i in ["1","2","3"]:    
    data = data.withColumn('lNgrams_'+i, commonNgrams_udf("lemma1","lemma2", lit(i)))
    data = data.withColumn('qNgrams_'+i, commonNgrams_udf("question1","question2", lit(i)))
    print("lNgrams_qNgrams_ done")

data = data.withColumn('lUnigram_ratio', unigram_ratio_udf( "lNgrams_1","lLen1", "lLen2" ))
print("lUnigram_ratio done")

data = data.withColumn('qUnigram_ratio', unigram_ratio_udf( "qNgrams_1","qLen1", "qLen2" ))
print("qUnigram_ratio done")

#data = data.withColumn('lDouegram_ratio', unigram_ratio_udf( "lNgrams_2","lLen1", "lLen2" ))
print("lDouegram_ratio done")

#data = data.withColumn('qDouegram_ratio', unigram_ratio_udf( "qNgrams_2","qLen1", "qLen2" ))
print("qDouegram_ratio done")

#data.select('lNgrams_1','lNgrams_2','lNgrams_3','qNgrams_1','qNgrams_2','qNgrams_3','qUnigram_ratio','lUnigram_ratio').show(6)       

#### TF-IDF

In [None]:
#Tokenization of lemmas
tokenizer = Tokenizer(inputCol="lemma1", outputCol="words1") 
data = tokenizer.transform(data) 
tokenizer.setParams(inputCol="lemma2", outputCol="words2") 
data = tokenizer.transform(data) 
#data.select('id','lemma1','words1','lemma2','words2').show(5) 

In [None]:
corpus = data.selectExpr('words1 as words').join(data.selectExpr('words2 as words'), on='words', how='full')
cv = CountVectorizer(inputCol="words", outputCol="tf", minDF=2.0)
cvModel = cv.fit(corpus)
corpus = cvModel.transform(corpus)
print('CountVectorizerModel has a vocabulary of length ',len(cvModel.vocabulary))

In [None]:
res1 = cvModel.transform(data.selectExpr('id', 'words1 as words'))
res2 = cvModel.transform(data.selectExpr('id', 'words2 as words'))

In [8]:
print("TF Done")

TF Done


In [None]:
#Create IDF object, fit it to the whole corpus and calculate column 
#*"idf"* for both questions. Addcolumns *"idf"* to both *"res1"* and *"res2"*.
idf = IDF(inputCol="tf", outputCol="idf")
idfModel = idf.fit(corpus)
res1 = idfModel.transform(res1)
res2 = idfModel.transform(res2)
res = res1.selectExpr('id','idf as idf1').join(res2.selectExpr('id','idf as idf2'), on='id', how='inner')

In [None]:
def tfidfDist(a,b): 
    return float(a.squared_distance(b))

dist_udf = udf(tfidfDist, DoubleType())
res = res.withColumn('dist', dist_udf(res['idf1'], res['idf2']))

In [9]:
print("IDF Done")

IDF Done


In [None]:
data = data.join(res.selectExpr('id','dist as tfidfDistance'),on='id',how='inner')

In [10]:
print("Join Done")

Join Done


#### Saving features

In [None]:
outData = data.select(['id']+featureNames+['is_duplicate'])
print("outData ready")

In [None]:
outData.filter(outData.id <= maxTrainID).coalesce(1).write.csv(outTrainFileName,header=True,mode='overwrite',quote="")
print("output done")

In [None]:
outData.filter(outData.id > maxTrainID).withColumn('id', outData.id-maxTrainID-1).coalesce(1).write.csv(outTestFileName,header=True,mode='overwrite',quote="")
print("output done")