## Packages and data

In [1]:
# !kaggle competitions download -c quora-question-pairs
# !pip install diff_match_patch

In [2]:
import os
import pathlib
import sys

sys.path.append(str(pathlib.Path(os.getcwd()).parent.parent))

In [3]:
import fasttext.util
import pandas as pd
from sklearn.metrics import classification_report, log_loss
from sklearn.model_selection import train_test_split
from functools import partial
import numpy as np

from utils.common import compress_read
from utils.shared_code import to_kaggle_submission

In [16]:
train_df = compress_read('./data/train.csv')
pred_df = compress_read('./data/test.csv')
sample_submission_df = compress_read('./data/sample_submission.csv')

## Graph Analysis

In [5]:
from collections import Counter, defaultdict

question_counter = Counter()
question_answers = defaultdict(set)

for q1, q2 in zip(train_df['question1'].values, train_df['question2'].values):
    question_counter.update([q1, q2])
    question_answers[q1].add(q2)
    question_answers[q2].add(q1)

for q1, q2 in zip(pred_df['question1'].values, pred_df['question2'].values):
    question_counter.update([q1, q2])
    question_answers[q1].add(q2)
    question_answers[q2].add(q1)

## Data Cleaning

In [6]:
def cleaning_sentence_1(text):
    import re
    from string import punctuation

    if text is None:
        text = ' '

    text = text.lower()
    text = re.sub(r"what's", "what is", text)
    text = re.sub(r"who's", "who is", text)
    text = re.sub(r"where's", "where is", text)
    text = re.sub(r"when's", "when is", text)
    text = re.sub(r"how's", "how is", text)
    text = re.sub(r"it's", "it is", text)
    text = re.sub(r"he's", "he is", text)
    text = re.sub(r"she's", "she is", text)
    text = re.sub(r"that's", "that is", text)
    text = re.sub(r"there's", "there is", text)
    text = re.sub(r"[^A-Za-z0-9^,!./'+-=]", " ", text)
    # Except for the above special cases, "\'s" can only represent possessive case and should be replaced with " "
    text = re.sub(r"\'s", " ", text)
    text = re.sub(r"\'ve", " have ", text)
    text = re.sub(r"can't", "can not ", text)
    text = re.sub(r"n't", " not ", text)
    text = re.sub(r"i'm", "i am", text)
    text = re.sub(r" m ", " am ", text)
    text = re.sub(r"\'re", " are ", text)
    text = re.sub(r"\'d", " would ", text)
    text = re.sub(r"\'ll", " will ", text)
    text = re.sub(r"60k", " 60000 ", text)
    text = re.sub(r" e g ", " eg ", text)
    text = re.sub(r" b g ", " bg ", text)
    text = re.sub(r"\0s", "0", text)
    text = re.sub(r" 9 11 ", "911", text)
    text = re.sub(r"e-mail", "email", text)
    text = re.sub(r"\s{2,}", " ", text)
    text = re.sub(r"quikly", "quickly", text)
    text = re.sub(r" usa ", " america ", text)
    text = re.sub(r" u s ", " america ", text)
    text = re.sub(r" uk ", " england ", text)
    text = re.sub(r"imrovement", "improvement", text)
    text = re.sub(r"intially", "initially", text)
    text = re.sub(r" dms ", "direct messages ", text)
    text = re.sub(r"demonitization", "demonetization", text)
    text = re.sub(r"actived", "active", text)
    text = re.sub(r"kms", " kilometers ", text)
    text = re.sub(r" cs ", " computer science ", text)
    text = re.sub(r" ds ", " data science ", text)
    text = re.sub(r" ee ", " electronic engineering ", text)
    text = re.sub(r" upvotes ", " up votes ", text)
    text = re.sub(r" iphone ", " phone ", text)
    text = re.sub(r"\0rs ", " rs ", text)
    text = re.sub(r"calender", "calendar", text)
    text = re.sub(r"ios", "operating system", text)
    text = re.sub(r"programing", "programming", text)
    text = re.sub(r"bestfriend", "best friend", text)
    text = re.sub(r"III", "3", text)
    text = re.sub(r"the us", "america", text)
    text = re.sub(r",", " ", text)
    text = re.sub(r"\.", " ", text)
    text = re.sub(r"!", " ", text)
    text = re.sub(r"/", " ", text)
    text = re.sub(r"\^", " ", text)
    text = re.sub(r"\+", " ", text)
    text = re.sub(r"-", " ", text)
    text = re.sub(r"=", " ", text)
    text = re.sub(r"'", " ", text)
    text = re.sub(r"(\d+)(k)", r"\g<1>000", text)
    text = re.sub(r":", " ", text)
    text = re.sub(r"\0s", "0", text)
    text = "".join([c for c in text if c not in punctuation])
    return text

def cleaning_sentence_2(text):
    from nltk.corpus import stopwords
    from nltk import word_tokenize
    stops = set(stopwords.words("english"))
    text = word_tokenize(text)
    text = [w for w in text if not w in stops]
    text = " ".join(text)
    return text

In [7]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.types import Row

spark = SparkSession.builder.appName("pandas-on-spark")\
    .config('spark.driver.memory', '8G')\
    .config('spark.driver.maxResultSize', '0')\
    .getOrCreate()

spark_br_dict = dict()
def get_or_upd_br(name, value):
    if name not in spark_br_dict:
        spark_br_dict[name] = spark.sparkContext.broadcast(value)
    return spark_br_dict[name]

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/01/19 08:07:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [19]:
def run_save_clean_df(df, name, force_update=False):
    target = f'./fasttext_data/{name}'
    if force_update or not pathlib.Path(target).exists():
        spark.createDataFrame(data=df)\
            .repartition(50)\
            .withColumn('question1_clean', F.udf(cleaning_sentence_1)(F.expr("question1")))\
            .withColumn('question2_clean', F.udf(cleaning_sentence_1)(F.expr("question2")))\
            .withColumn('question1_clean_stop', F.udf(cleaning_sentence_2)(F.expr("question1_clean")))\
            .withColumn('question2_clean_stop', F.udf(cleaning_sentence_2)(F.expr("question2_clean")))\
            .write.mode("overwrite").parquet(target)

run_save_clean_df(train_df, name='train_df', force_update=False)
run_save_clean_df(pred_df, name='pred_df', force_update=False)

22/01/19 08:23:08 WARN TaskSetManager: Stage 7 contains a task of very large size (3348 KiB). The maximum recommended task size is 1000 KiB.
22/01/19 08:23:53 WARN TaskSetManager: Stage 10 contains a task of very large size (18920 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

## FastText training

In [20]:
def generate_text(a, b):
    question1 = str(a).replace('\n', ' ')
    question2 = str(b).replace('\n', ' ')
    if not question1 or not question2:
        return None
    split1 = {w.lower() for w in question1.split(' ')}
    split2 = {w.lower() for w in question2.split(' ')}
    text = set()
    text |= {t for t in split1 & split2}
    text |= {'$' + t for t in split1 ^ split2}
    return ' '.join(text)

train_df = spark.read.parquet('./fasttext_data/train_df').toPandas()
train_dict, test_dict = train_test_split(train_df.sample(frac=1.0).to_dict(orient='records'), test_size=0.2)
ft_q_name_1, ft_q_name_2 = 'question1_clean', 'question2_clean'

                                                                                

In [21]:
# training
text_list = []
for i in train_dict:
    label = '__label__' + str(i['is_duplicate'])
    x = generate_text(i[ft_q_name_1], i[ft_q_name_2])
    if x: text_list.append({'label': label, 'text': x})
pd.DataFrame(text_list).to_csv('./fasttext_data/ft_train.txt', header=False, index=False, sep='\t')

model = fasttext.train_supervised(
    './fasttext_data/ft_train.txt',
    label_prefix="__label__",
    lr=0.1,
    epoch=5,
    verbose=2,
    minCount=3,
)
model.save_model('./fasttext_data/ft_model')

Read 5M words
Number of words:  45340
Number of labels: 2
Progress: 100.0% words/sec/thread: 3914866 lr:  0.000000 avg.loss:  0.423556 ETA:   0h 0m 0s


In [22]:
# testing
def fast_text_pred(a, b, model):
    words = generate_text(a, b)
    pred = model.predict(words, k=1)
    pred_label = 0 if pred[0][0] == '__label__0' else 1
    pred_score = pred[1][0] if pred_label == 1 else 1 - pred[1][0]
    return pred_label, pred_score

y_true = []
y_pred = []
y_pred_score = []
for i in test_dict:
    label = int(i['is_duplicate'])
    pred_label, pred_score = fast_text_pred(i[ft_q_name_1], i[ft_q_name_2], model)
    y_true.append(label)
    y_pred.append(pred_label)
    y_pred_score.append(pred_score)

print(classification_report(y_true, y_pred, digits=4))
print('log_loss:', log_loss(y_true, y_pred_score))

              precision    recall  f1-score   support

           0     0.8404    0.8625    0.8513     50972
           1     0.7545    0.7206    0.7372     29886

    accuracy                         0.8101     80858
   macro avg     0.7975    0.7916    0.7942     80858
weighted avg     0.8086    0.8101    0.8091     80858

log_loss: 0.413582217448883


In [23]:
# # submit directly using fasttext
# submission = []
# for i in pred_df.to_dict(orient='records'):
#     pred_label, pred_score = fast_text_pred(i[q_name_1], i[q_name_2], model)
#     submission.append({
#         'test_id': i['test_id'],
#         'is_duplicate': pred_score
#     })
# to_kaggle_submission(submission)

## Feature Engineering

In [24]:
def add_feature_ft(spark_df: DataFrame):
    def calc(rows):
        model = fasttext.load_model('./fasttext_data/ft_model')
        for row in rows:
            pred_label, pred_score = fast_text_pred(row[ft_q_name_1], row[ft_q_name_2], model)
            yield Row(**row.asDict(), feature_ft_pred_label=int(pred_label), feature_ft_pred_score=float(pred_score))

    rdd = spark_df.rdd.mapPartitions(calc)
    return spark.createDataFrame(rdd)

# add_feature_ft(spark.read.parquet('./fasttext_data/train_df').limit(10)).toPandas()

In [25]:
def add_feature_char_diff(spark_df: DataFrame):
    def calc(q1, q2, way):
        q1, q2 = str(q1), str(q2)
        if way == 1:
            return abs(len(q1) - len(q2))
        elif way == 2:
            return abs(len(q1) - len(q2)) / (len(q1) + len(q2) + 1)

    func = partial(F.udf(calc), F.expr('question1'), F.expr('question2'))
    return spark_df\
        .withColumn('feature_char_diff_1', func(F.expr("1")).cast("float"))\
        .withColumn('feature_char_diff_2', func(F.expr("2")).cast("float"))

# add_feature_char_diff(spark.read.parquet('./fasttext_data/train_df').limit(10)).toPandas()

In [26]:
def add_feature_word_count(spark_df: DataFrame):
    def calc(rows):
        from nltk import word_tokenize
        from collections import OrderedDict
        for row in rows:
            sources = {
                '1': ('question1', 'question2'),
                '2': ('question1_clean', 'question2_clean'),
                '3': ('question1_clean_stop', 'question2_clean_stop'),
            }
            cur = OrderedDict(row.asDict())
            for s_id, source in sources.items():
                q1, q2 = cur[source[0]], cur[source[1]]
                words1, words2 = set(word_tokenize(str(q1))), set(word_tokenize(str(q2)))
                len1, len2 = len(words1), len(words2)
                cur[f'feature_word_count_{s_id}_1'] = float(abs(len1 - len2))
                cur[f'feature_word_count_{s_id}_2'] = float(abs(len1 - len2) / (len1 + len2 + 1))
                cur[f'feature_word_count_{s_id}_3'] = len(words1 & words2) / (len(words1 | words2) + 1)
                cur[f'feature_word_count_{s_id}_4'] = len(words1 & words2) / (min(len1, len2) + 1)
            yield Row(**cur)

    return spark.createDataFrame(spark_df.rdd.mapPartitions(calc))

# add_feature_word_count(spark.read.parquet('./fasttext_data/train_df').limit(10)).toPandas()

In [27]:
def add_feature_first_word(spark_df: DataFrame):
    def calc(q1, q2):
        from nltk import word_tokenize
        word_list1 = word_tokenize(str(q1))
        word_list2 = word_tokenize(str(q2))
        if word_list1 and word_list2:
            return int(word_list1[0] == word_list2[0])
        else:
            return 0

    return spark_df.withColumn('feature_char_first_word', F.udf(calc)(F.expr('question1'), F.expr('question2')).cast("float"))

# add_feature_first_word(spark.read.parquet('./fasttext_data/train_df').limit(10)).toPandas()

In [28]:
def add_feature_graph(spark_df: DataFrame):
    qa_br = get_or_upd_br('question_answers', question_answers)

    def calc(rows):
        for row in rows:
            q1, q2 = str(row["question1"]), str(row["question2"])
            a1 = qa_br.value[q1]
            a2 = qa_br.value[q2]
            yield Row(
                **row.asDict(),
                feature_graph_1=len(a1),
                feature_graph_2=len(a2),
                feature_graph_3=(len(a1) + len(a2)) / 2,
                feature_graph_4=len(a1 & a2),
                feature_graph_5=len(a1 & a2) / (len(a1 | a2) + 1),
            )

    return spark.createDataFrame(spark_df.rdd.mapPartitions(calc))

# add_feature_graph(spark.read.parquet('./fasttext_data/train_df').limit(10)).toPandas()

In [29]:
def add_feature_sentiment(spark_df: DataFrame):
    def calc(rows):
        from nltk.sentiment.vader import SentimentIntensityAnalyzer
        analyzer = SentimentIntensityAnalyzer()
        for row in rows:
            q1, q2 = str(row["question1"]), str(row["question2"])
            s1, s2 = analyzer.polarity_scores(q1), analyzer.polarity_scores(q2)
            yield Row(
                **row.asDict(),
                feature_sentiment_neg=float(abs(s1['neg'] - s2['neg'])),
                feature_sentiment_neu=float(abs(s1['neu'] - s2['neu'])),
                feature_sentiment_pos=float(abs(s1['pos'] - s2['pos'])),
                feature_sentiment_compound=float(abs(s1['compound'] - s2['compound'])),
            )

    return spark.createDataFrame(spark_df.rdd.mapPartitions(calc))

# add_feature_sentiment(spark.read.parquet('./fasttext_data/train_df').limit(10)).toPandas()

In [30]:
def add_all_features(spark_df: DataFrame):
   spark_df = add_feature_ft(spark_df)
   spark_df = add_feature_char_diff(spark_df)
   spark_df = add_feature_word_count(spark_df)
   spark_df = add_feature_first_word(spark_df)
   spark_df = add_feature_graph(spark_df)
   spark_df = add_feature_sentiment(spark_df)
   return spark_df

# add_all_features(spark.read.parquet('./fasttext_data/train_df').limit(10)).toPandas()

## Final Model Training

In [31]:
from lightgbm import LGBMClassifier

In [34]:
train_df = add_all_features(spark.read.parquet('./fasttext_data/train_df')).cache()
train_X = train_df.selectExpr(*[c for c in train_df.columns if str(c).startswith("feature_")]).collect()
train_y = train_df.selectExpr('is_duplicate').collect()
train_df.unpersist()

train_X = np.array(train_X)
train_y = np.array(train_y).reshape(-1)
train_X, eval_X, train_y, eval_y = train_test_split(train_X, train_y, test_size=0.2)

22/01/19 08:31:10 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [35]:
lgbm = LGBMClassifier(
    silent=False,
    n_estimators=30
)
model = lgbm.fit(train_X, train_y)
model.score(train_X, train_y)



[LightGBM] [Info] Number of positive: 119384, number of negative: 204048
You can set `force_row_wise=true` to remove the overhead.
And if memory is not enough, you can set `force_col_wise=true`.
[LightGBM] [Info] Total Bins 4322
[LightGBM] [Info] Number of data points in the train set: 323432, number of used features: 26
[LightGBM] [Info] [binary:BoostFromScore]: pavg=0.369116 -> initscore=-0.536010
[LightGBM] [Info] Start training from score -0.536010


0.8872900640629251

In [36]:
eval_pred = []
eval_pred_score = []
for s0, s1 in model.predict_proba(eval_X):
    eval_pred.append(1 if s1 >= s0 else 0)
    eval_pred_score.append(s1)
print(classification_report(eval_y, eval_pred, digits=4))
print('log_loss:', log_loss(eval_y, eval_pred_score))

              precision    recall  f1-score   support

           0     0.8902    0.9340    0.9116     50979
           1     0.8771    0.8035    0.8387     29879

    accuracy                         0.8858     80858
   macro avg     0.8836    0.8687    0.8751     80858
weighted avg     0.8854    0.8858    0.8846     80858

log_loss: 0.26335183939513285


## Final Submit

In [37]:
pred_df = add_all_features(spark.read.parquet('./fasttext_data/pred_df').repartition(50)).cache()
pred_X = pred_df.selectExpr(*[c for c in pred_df.columns if str(c).startswith("feature_")]).collect()
pred_id = pred_df.selectExpr('test_id').collect()
train_df.unpersist()

pred_X = np.array(pred_X)
pred_id = np.array(pred_id).reshape(-1)

Traceback (most recent call last):                                              
  File "/home/kun/miniconda3/envs/py37-sci-v1/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/home/kun/miniconda3/envs/py37-sci-v1/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/home/kun/miniconda3/envs/py37-sci-v1/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 663, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/home/kun/miniconda3/envs/py37-sci-v1/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 564, in read_int
    raise EOFError
EOFError

                                                                                

In [38]:
submission = []
for test_id, (s0, s1) in zip(pred_id, model.predict_proba(pred_X)):
    submission.append({
        'test_id': test_id,
        'is_duplicate': s1
    })
to_kaggle_submission(submission)