Presentation:
    - Spark et pandas sont differents. Pandas met tout en ram tandis que spark ne charge la donnée qu'au moment d'une action et ecrit sur le disque les resultats intermediaire
    - On a pas des machines assez grosses pour voir vraiment les limites.
    - Si ça rentre pas en ram alors on peut pas utiliser pandas 
    - Pour pouvoir benchmarker il faudrait faire exactement la même chose dans les 2 et être aussi bon dans l'un que dans l'autre
    - Pandas est column-based donc le code peut tres souvent être optimiser (par exemple un apply sur les lignes est vraiment pas performant par rapport a un apply sur les colonnes.
    - Spark lui est row-based et une particularité est que si on fait une udf, la jvm doit créer un process python  + serializer la données + etc ce qui entraine un sur coup.
    - Pour optimiser le code spark il faut aussi faire attention au cache 
    

# Etude 


In [1]:
import os
python_path = '/Users/mnannan/.virtualenvs/supelec/bin/python'


os.environ['PYSPARK_PYTHON'] = python_path
os.environ['PYSPARK_DRIVER_PYTHON'] = python_path



import pyspark.sql.functions as F
import pyspark.sql.types as T

from pyspark.sql import SparkSession
from pyspark.context import SparkContext, SparkConf
import time

import matplotlib.pyplot as plt
%matplotlib inline


sc = SparkContext()
spark = SparkSession(sc)

In [2]:
import time

In [None]:
DATA_PATH = './data/train.csv'

## 1. Importation des données

Pour tester les perfomances de lectures nous allons simplement collecter toutes les données ainsi spark devra lire toutes les données (simplement compter le nombre de lignes ne serait pas significatif car spark utiliserait des metadatas)


In [3]:
%%time
df = spark.read.csv(DATA_PATH, header=True, escape='"')
_ = df.collect()

CPU times: user 1.93 s, sys: 149 ms, total: 2.07 s
Wall time: 8.05 s


Pour le reste de l'etude nous allons mettre la donnée spark en memoire pour ne pas avoir de surcôut lié au fait que spark ne stocke rien en memoire par defaut

In [4]:
df_full = spark.read.csv('./data/train.csv', header=True, escape='"')

In [5]:
df_full.count()

404301

In [6]:
df_dict = {
    1000: df_full.limit(1000),
    10000: df_full.limit(10000),
    100000: df_full.limit(100000),
    400000: df_full,
    800000: df_full.union(df_full),
    1200000: df_full.union(df_full).union(df_full)
}

In [7]:
for _, df in df_dict.items():
    df.cache()
    df.distinct().count()

## 2. Séparation des données en un ensemble d'apprentissage et un ensemble de validation.


In [8]:
taux_sep = 0.7
def split_dataset(df, taux_sep):
    return df.randomSplit([taux_sep, 1-taux_sep])

## 3. Nettoyage des données

Pour le nettoyage des données nous allons:
- supprimer les questions vides
- retirer les stopwords
- passer le text en minuscule
- tokenizer les questions

In [9]:
from nltk.corpus import stopwords
english_stopwords = list(set(stopwords.words("english")))

from pyspark.ml.feature import StopWordsRemover, Tokenizer


def clean_dataframe(df):
    for column in ['question1', 'question2']:
        df = df.filter(F.col(column).isNotNull())
        df = df.withColumn(column, F.lower(F.col(column)))
        tokenizer = Tokenizer(inputCol=column, outputCol=f'{column}_tokenized')
        stopwords_remover = StopWordsRemover(inputCol=f'{column}_tokenized', outputCol=f'{column}_tokenized_cleaned', stopWords=english_stopwords)
        for task in [tokenizer, stopwords_remover]:
            df = task.transform(df)
        
        df = df.drop(f'{column}_tokenized', column)
        df = df.withColumnRenamed(f'{column}_tokenized_cleaned', column)
        
    return df

In [10]:
df_cleaned = {}
cleaning_time = {}

for key, df in df_dict.items():
    st = time.time()
    df_cleaned[key] = clean_dataframe(df)
    df_cleaned[key].distinct().count()
    cleaning_time[key] = time.time() - st

In [11]:
cleaning_time

{1000: 0.6644432544708252,
 10000: 0.5577132701873779,
 100000: 1.7603912353515625,
 400000: 3.1319870948791504,
 800000: 4.577440023422241,
 1200000: 6.014577150344849}

In [12]:
for key, df in df_cleaned.items():
    df.cache()
    df.distinct().count()

## 3.  Représentation des données.

Pour représenter nos données (i.e. la description textuelle des produits), pluieurs principes seront utilisés et comparés :

 + L'approche de représentation d'un document textuel par un sac de mots de type `one_hot_encoding` avec scikit-learn comme expliqué [ici](scikit-learn)
 + L'approche de représentation d'un document textuel par un sac de mots et une pondération [tf-idf](https://fr.wikipedia.org/wiki/TF-IDF) vue dans les premiers cours. De nombreux modules sont disponibles dans scikit-learn, notamment [ici](https://scikit-learn.org/stable/modules/feature_extraction.html#the-bag-of-words-representation) pour son implémentation.
 + Une approche de hachage qui est une des techniques utilisées pour le traitement des données massives. Elle consiste à réduire fortement le volume de calculs à faire sur les données en réduisant l'ordre de complexité des calculs à faire par l'exploitation des caractéristiques de similarité des données. Ici aussi, vous pouvez tirer partie des modules existants dans scikit-learn décrits [ici](https://scikit-learn.org/stable/modules/feature_extraction.html#vectorizing-a-large-text-corpus-with-the-hashing-trick).
 + Une représentation de type word2vec avec la bibliothèque [gensim](https://radimrehurek.com/gensim/).
 

###### Calcul de l'idf

In [13]:
min_count = 2
eps = 10000

In [14]:
def compute_idf(df):
    df_idf = df.select('question1')\
    .union(df.select('question2')).distinct()\
    .select(F.explode('question1').alias('word'))\
    .groupBy('word')\
    .agg(F.count('*').alias('idf'))\
    .withColumn(
        'idf',
        F.when(F.col('idf') < min_count, F.lit(0))\
        .otherwise(1/(F.col('idf')+eps))
    )
    return df_idf

In [15]:
df_idf_dict = {}
idf_dict = {}

idf_time = {}

for key, df in df_cleaned.items():
    st = time.time()
    df_idf_dict[key] = compute_idf(df)
    idf_dict[key] = { word: idf for word,idf in df_idf_dict[key].collect()}
    idf_time[key] = time.time() - st

In [16]:
idf_time

{1000: 0.6151769161224365,
 10000: 0.4928293228149414,
 100000: 1.2541282176971436,
 400000: 2.2709591388702393,
 800000: 2.772006034851074,
 1200000: 3.2807397842407227}

In [17]:
for key, df in df_idf_dict.items():
    df.cache()
    df.distinct().count()

###### Common words

In [18]:
def compute_common_words(df):
    df = df.withColumn(
        'common_words',
        2 * F.size(F.array_intersect('question1', 'question2'))/ (F.size('question1') + F.size('question2'))
    )
    return df

In [19]:
df_common_words = {}

common_words_time = {}

for key, df in df_cleaned.items():
    st = time.time()
    df_common_words[key] = compute_common_words(df)
    df_common_words[key].distinct().count()
    common_words_time[key] = time.time() - st

In [20]:
common_words_time

{1000: 0.15171313285827637,
 10000: 0.11482405662536621,
 100000: 0.5359230041503906,
 400000: 1.7709598541259766,
 800000: 1.7045090198516846,
 1200000: 2.117926836013794}

###### Common words sans udf with join

In [27]:
def compute_common_words(df):
    df_words = df.select('id', F.explode('question1').alias('word1'), 'question2')\
    .select('id', 'word1', F.explode('question2').alias('word2'))

    shared_words = df_words\
    .filter('word1 = word2')\
    .groupby('id')\
    .agg(F.count('*').alias('common_words'))

    df = df.join(shared_words, 'id', 'left')\
    .withColumn(
        'common_words',
        F.when(F.col('common_words').isNull(), F.lit(0))\
        .otherwise(F.col('common_words'))
    )\
    .withColumn(
        'common_words',
       2*F.col('common_words') / (F.size('question1') + F.size('question2'))
    )
    return df

In [28]:
df_common_words = {}

common_words_time = {}

for key, df in df_cleaned.items():
    st = time.time()
    df_common_words[key] = compute_common_words(df)
    df_common_words[key].distinct().count()
    common_words_time[key] = time.time() - st

In [29]:
common_words_time

{1000: 0.2672290802001953,
 10000: 0.32927918434143066,
 100000: 1.5108647346496582,
 400000: 2.2074599266052246,
 800000: 4.227099180221558,
 1200000: 5.615463018417358}

In [30]:
for key, df in df_common_words.items():
    df.cache()
    df.distinct().count()

###### Common words avec udf

In [35]:
@F.udf(T.IntegerType())
def udf_common_words(question1, question2):
    q1words = {}
    q2words = {}
    for word in question1:
        q1words[word] = 1
    for word in question2:
        q2words[word] = 1
    if len(q1words) == 0 or len(q2words) == 0:
        # The computer-generated chaff includes a few questions that are nothing but stopwords
        return 0
    shared_words = [w for w in q1words.keys() if w in q2words]
    R = (2 * len(shared_words))/(len(q1words) + len(q2words))
    return R

In [36]:
def compute_common_words_with_udf(df):
    df = df.withColumn(
        'common_words',
        udf_common_words(F.col('question1'), F.col('question2')))
    return df

In [37]:
df_common_words_with_udf = {}

common_words_with_udf_time = {}

for key, df in df_cleaned.items():
    st = time.time()
    df_common_words_with_udf[key] = compute_common_words_with_udf(df)
    df_common_words_with_udf[key].distinct().count()
    common_words_with_udf_time[key] = time.time() - st

In [38]:
common_words_with_udf_time

{1000: 0.7200469970703125,
 10000: 0.27631497383117676,
 100000: 1.1112160682678223,
 400000: 2.1631557941436768,
 800000: 3.177809953689575,
 1200000: 4.694714069366455}

###### common words idf weighted

In [39]:
def compute_common_words_idf_weighted(df):
    df_words = df.select('id', F.explode('question1').alias('word1'), 'question2')\
        .select('id', 'word1', F.explode('question2').alias('word2'))

    shared_idf_weighted = df_words\
    .filter('word1 = word2')\
    .join(df_idf.withColumnRenamed('word', 'word1') , 'word1', 'left')\
    .groupby('id')\
    .agg(F.sum('idf').alias('common_words_idf_weighted'))

    df_question_idf = df.select('question1','qid1').union(df.select('question2', 'qid2')).distinct()\
    .select(F.explode('question1').alias('word'), 'qid1')\
    .join(df_idf, 'word', 'left')\
    .groupby('qid1')\
    .agg(F.sum('idf').alias('question_idf'))
    df_question_idf.cache()

    df = df.join(shared_idf_weighted, 'id', 'left')\
    .join(
        df_question_idf.withColumnRenamed('question_idf', 'question1_idf'),
        'qid1',
        'left'
    )\
    .join(
        df_question_idf\
        .withColumnRenamed('question_idf', 'question2_idf')\
        .withColumnRenamed('qid1', 'qid2'),
        'qid2',
        'left'
    )\
    .withColumn(
        'common_words_idf_weighted',
        F.when(F.col('common_words_idf_weighted').isNull(), F.lit(0))\
        .otherwise(F.col('common_words_idf_weighted'))
    )\
    .withColumn(
       'common_words_idf_weighted',
       2*F.col('common_words_idf_weighted') / (F.col('question1_idf') + F.col('question2_idf'))
    )
    return df

In [44]:
df_common_words_idf = {}

common_words_idf_time = {}

for key, df in df_common_words.items():
    df_idf = df_idf_dict[key]
    st = time.time()
    df_common_words_idf[key] = compute_common_words_idf_weighted(df)
    df_common_words_idf[key].distinct().count()
    common_words_idf_time[key] = time.time() - st

In [42]:
common_words_idf_time

{1000: 1.3751139640808105,
 10000: 1.370466947555542,
 100000: 3.8508951663970947,
 400000: 6.787045240402222,
 800000: 8.197191953659058,
 1200000: 32.96792387962341}

In [43]:
for key, df in df_common_words_idf.items():
    df.cache()
    df.distinct().count()

###### common words idf weighted avec udf

In [32]:
def common_words_idf_weighted(question1, question2, idf):
    common_words_weighted = 0
    questions_weights = 0
    for word in question1:
        idf_weight = idf[word]
        if word in question2:
            common_words_weighted += idf_weight
        questions_weights += idf_weight
    for word in question2:
        questions_weights += idf[word]
    if questions_weights > 0:
        return 2 * common_words_weighted / questions_weights
    else:
        return 0

In [33]:
def compute_common_words_idf_weighted_with_udf(df):
    @F.udf(T.IntegerType())
    def udf_common_words_idf_weighted(question1, question2):
        return common_words_idf_weighted(question1, question2)

    df.withColumn(
        'common_words_idf_weighted',
        udf_common_words_idf_weighted(F.col('question1'), F.col('question2'))
    )
    return df

In [34]:
df_common_words_idf_with_udf = {}

common_words_idf_time_with_udf = {}

for key, df in df_common_words.items():
    st = time.time()
    df_common_words_idf_with_udf[key] = compute_common_words_idf_weighted_with_udf(df)
    df_common_words_idf_with_udf[key].distinct().count()
    common_words_idf_time_with_udf[key] = time.time() - st

In [35]:
common_words_idf_time_with_udf

{'full': 0.302523136138916,
 '1000': 0.048006296157836914,
 '10000': 0.06462216377258301,
 '100000': 0.22162413597106934}

###### Creation du train et du test set

In [46]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline

In [47]:
train_dict = {}
test_dict = {}

In [48]:
for key, df in df_common_words_idf.items():
    df = df.filter('is_duplicate is not null')

    df = df.withColumnRenamed('is_duplicate', 'label')\
    .withColumn('label', F.col('label').cast('int'))

    selected_columns = ['common_words', 'common_words_idf_weighted']

    for column in selected_columns:
        df = df.filter(F.col(column).isNotNull())
        df = df.withColumn(column, F.col(column).cast('double'))

    train, test = split_dataset(df, taux_sep)
    assembler = VectorAssembler(inputCols=selected_columns, outputCol="features")

    pipeline = Pipeline(stages=[assembler])
    pipelineModel = pipeline.fit(df)
    train_dict[key] = pipelineModel.transform(train)
    test_dict[key] = pipelineModel.transform(test)
    train_dict[key].cache().distinct().count()
    test_dict[key].cache().distinct().count()

## 4.  Apprentissage et performance

Pour tester les perfomances des modeles nous allons entrainer une regression logistic, un random forest et un arbre de decision

In [49]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


### Regression Logistique 

In [52]:
from pyspark.ml.classification import LogisticRegression

lr_dict = {}
lrModel_dict = {}

lr_time = {}

for key, train in train_dict.items():
    st = time.time()
    # Create initial LogisticRegression model
    lr_dict[key] = LogisticRegression(labelCol="label", featuresCol="features")

    # Train model with Training Data
    lrModel_dict[key] = lr_dict[key].fit(train)
    lr_time[key] = time.time() - st



In [53]:
lr_time

{1000: 0.5644731521606445,
 10000: 0.6493039131164551,
 100000: 0.9996500015258789,
 400000: 2.5949480533599854,
 800000: 3.173823118209839,
 1200000: 5.683064222335815}

In [54]:
for key, test in test_dict.items():
    predictions = lrModel_dict[key].transform(test)
    evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
    print(key, evaluator.evaluate(predictions))
    

1000 0.6846846846846847
10000 0.6808873720136519
100000 0.6767363542396052
400000 0.6689562934464938
800000 0.6683974335200089
1200000 0.669596254959296


### Arbres de décision

In [55]:
from pyspark.ml.classification import DecisionTreeClassifier

dt_dict = {}
dtModel_dict = {}

dt_time = {}

for key, train in train_dict.items():
    st = time.time()
    # Create initial LogisticRegression model
    dt_dict[key] = DecisionTreeClassifier(labelCol="label", featuresCol="features")

    # Train model with Training Data
    dtModel_dict[key] = dt_dict[key].fit(train)
    dt_time[key] = time.time() - st



In [56]:
dt_time

{1000: 1.2671706676483154,
 10000: 1.3924741744995117,
 100000: 0.9892888069152832,
 400000: 2.2401349544525146,
 800000: 2.6141581535339355,
 1200000: 4.613914966583252}

In [57]:
for key, test in test_dict.items():
    predictions = dtModel_dict[key].transform(test)
    evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
    print(key, evaluator.evaluate(predictions))
    

1000 0.7477477477477478
10000 0.7133105802047781
100000 0.7169484178586909
400000 0.7100870168254545
800000 0.7121116591872814
1200000 0.7086556017736776


### Random Forests


In [58]:
from pyspark.ml.classification import RandomForestClassifier

rf_dict = {}
rfModel_dict = {}

rf_time = {}

for key, train in train_dict.items():
    st = time.time()
    # Create initial LogisticRegression model
    rf_dict[key] = RandomForestClassifier(labelCol="label", featuresCol="features")

    # Train model with Training Data
    rfModel_dict[key] = rf_dict[key].fit(train)
    rf_time[key] = time.time() - st



In [59]:
rf_time

{1000: 0.7193012237548828,
 10000: 0.7812149524688721,
 100000: 2.272455930709839,
 400000: 5.494510889053345,
 800000: 7.683736801147461,
 1200000: 10.278573036193848}

In [48]:
for key, test in test_dict.items():
    predictions = rfModel_dict[key].transform(test)
    evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
    print(key, evaluator.evaluate(predictions))
    

full 0.7311377191640953
1000 0.735593220338983
10000 0.7229958599924727
100000 0.7327012159814708
