# Data Ingestion

Afin de lancer ce notebook avec Spark en local sur 4 threads...
SPARK_OPTS='--master=local[4]' jupyter notebook 

In [1]:
import findspark
findspark.init()
import os
filename = os.path.join(os.environ["SPARK_HOME"], 'python/pyspark/shell.py')
exec(compile(open(filename, "rb").read(), filename, 'exec'))
print(sc)

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.2.1
      /_/

Using Python version 3.6.3 (default, Oct  6 2017 12:04:38)
SparkSession available as 'spark'.
<SparkContext master=local[*] appName=pyspark-shell>


In [None]:
# Stemming


In [2]:
## Import des différents packages nécessaires
# Traitement de données
import string
import shutil
import pandas as pd
import numpy as np

# PySpark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.ml.linalg import VectorUDT
from pyspark.sql.functions import split

sqlContext = SQLContext(sc)

from pyspark.ml.linalg import Vectors, SparseVector
from pyspark.ml.clustering import LDA
from pyspark.ml.feature import CountVectorizer, IDF, Tokenizer, CountVectorizerModel

# Pytorch
import torch
from torch.utils.data import DataLoader
from torch.autograd import Variable
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.optim.lr_scheduler as lr_scheduler
from torch.utils.data import DataLoader

In [7]:
# Une fonction pour charger ou creer un CountVectorizerModel

train_cv = 1 # 1 for training / 0 to load the model
cvModelPath = './Data/count-vectorizer-model'

def loadData(data_ingestion):
    if train_cv:
        # we train cv...
        cv_model = CountVectorizer(inputCol = 'words', outputCol = 'X')
    else:
        # we load cv ! 
        cv = CountVectorizerModel.load(cvModelPath)
    
    tokenizer = Tokenizer(inputCol = "comment", outputCol = "words")
    
    # Creation of an empty DataFrame
    field1 = StructField('score',IntegerType(),True)
    field2 = StructField('X',VectorUDT() ,True)
    
    fields = []
    fields.append(field1)
    fields.append(field2)
    
    schema = StructType(fields)
    
    X = spark.createDataFrame(sc.emptyRDD(), schema)
    
    # Ingestion par fichier
    for filePath in data_ingestion:
        file = sc.textFile(filePath)
        data = file.map(lambda line: line.split("\t")).toDF()
        data = data.withColumnRenamed('_1', 'score') 
        data = data.withColumn('score', data['score'].cast(IntegerType()))
        data = data.withColumnRenamed('_2', 'comment')
        
        data = tokenizer.transform(data)
        
        if train_cv :
            cv = cv_model.fit(data)
            
        data = cv.transform(data)
        
        X = X.union(data.select('score', 'X'))
    
    try : 
        shutil.rmtree(cvModelPath, ignore_errors = True)
    except:
        pass
    
    cv.save(cvModelPath)
    return(X)

In [8]:
data_ingestion = ['./Data/balanced_stemmed_amazon_350k.txt']

X = loadData(data_ingestion)
X.show()

+-----+--------------------+
|score|                   X|
+-----+--------------------+
|    5|(262144,[0,1,5,7,...|
|    4|(262144,[4,14,15,...|
|    1|(262144,[46,240,4...|
|    4|(262144,[0,1,2,3,...|
|    4|(262144,[3,24,39,...|
|    5|(262144,[0,13,20,...|
|    5|(262144,[0,11,90,...|
|    4|(262144,[2,10,13,...|
|    4|(262144,[0,1,7,13...|
|    5|(262144,[0,1,6,19...|
|    5|(262144,[0,19,48,...|
|    5|(262144,[0,3,6,13...|
|    5|(262144,[11,19,51...|
|    4|(262144,[0,3,23,3...|
|    5|(262144,[0,3,13,6...|
|    1|(262144,[0,5,15,7...|
|    5|(262144,[0,11,18,...|
|    4|(262144,[0,1,5,7,...|
|    4|(262144,[16,17,31...|
|    5|(262144,[0,1,2,3,...|
+-----+--------------------+
only showing top 20 rows



In [8]:
X_pd = X.toPandas()
X_pd.head(10)

Unnamed: 0,score,X
0,5,"(5.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0, ..."
1,4,"(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
2,1,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
3,4,"(4.0, 1.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, ..."
4,4,"(0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, ..."
5,5,"(2.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
6,5,"(2.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
7,4,"(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
8,4,"(1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, ..."
9,5,"(1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 2.0, 0.0, 0.0, ..."


In [13]:
X_pd.to_csv("./Data/bagOfWords.csv", sep = "\t", index = False)

In [14]:
X_from_pd = pd.read_csv("./Data/bagOfWords.csv", sep = "\t")
X_from_pd.head(6)

Unnamed: 0,score,X
0,5,"(113463,[0,1,7,8,9,12,13,24,33,43,65,70,73,81,..."
1,4,"(113463,[3,17,18,35,47,55,67,202,300,308,321,3..."
2,1,"(113463,[50,216,423,675,1467,3314,4467,5400,17..."
3,4,"(113463,[0,1,2,4,12,20,30,39,44,47,51,60,78,79..."
4,4,"(113463,[4,23,45,194,281,462,566,593,696,1711]..."
5,5,"(113463,[0,15,22,44,69,93,134,325,405,451,543,..."


In [11]:
X_from_pd.columns

Index(['Unnamed: 0', 'score', 'X'], dtype='object')

## Machine Learning

Nous implémentons un NaiveBayes puis un random forest.

In [9]:
from pyspark.ml.classification import NaiveBayes

X1 = X.select('score', 'X').where( (X.score == 4) | (X.score == 5) )

NB = NaiveBayes(smoothing = 1.0, modelType = "multinomial", labelCol = "score", featuresCol = "X")
#model = NB.fit(X1)

In [19]:
X.show()

+-----+--------------------+
|score|                   X|
+-----+--------------------+
|    5|(113463,[0,1,7,8,...|
|    4|(113463,[3,17,18,...|
|    1|(113463,[50,216,4...|
|    4|(113463,[0,1,2,4,...|
|    4|(113463,[4,23,45,...|
|    5|(113463,[0,15,22,...|
|    5|(113463,[0,9,139,...|
|    4|(113463,[2,12,15,...|
|    4|(113463,[0,1,8,15...|
|    5|(113463,[0,1,6,21...|
|    5|(113463,[0,21,46,...|
|    5|(113463,[0,4,6,15...|
|    5|(113463,[9,21,58,...|
|    4|(113463,[0,4,31,3...|
|    5|(113463,[0,4,15,4...|
|    1|(113463,[0,7,18,6...|
|    5|(113463,[0,9,20,2...|
|    4|(113463,[0,1,7,8,...|
|    4|(113463,[13,19,27...|
|    5|(113463,[0,1,2,4,...|
+-----+--------------------+
only showing top 20 rows



In [10]:
from pyspark.sql.functions import lit

X2_1 = X.where((X.score == 4)|(X.score == 5)).withColumn('score', lit(1))
X2_0 = X.where((X.score == 1)| (X.score == 2) | (X.score == 3)).withColumn('score', lit(0))

print(X2_1.count(), ' - ', X2_0.count())

X2 = X2_1.union(X2_0)
X2.show()


175000  -  175000
+-----+--------------------+
|score|                   X|
+-----+--------------------+
|    1|(262144,[0,1,5,7,...|
|    1|(262144,[4,14,15,...|
|    1|(262144,[0,1,2,3,...|
|    1|(262144,[3,24,39,...|
|    1|(262144,[0,13,20,...|
|    1|(262144,[0,11,90,...|
|    1|(262144,[2,10,13,...|
|    1|(262144,[0,1,7,13...|
|    1|(262144,[0,1,6,19...|
|    1|(262144,[0,19,48,...|
|    1|(262144,[0,3,6,13...|
|    1|(262144,[11,19,51...|
|    1|(262144,[0,3,23,3...|
|    1|(262144,[0,3,13,6...|
|    1|(262144,[0,11,18,...|
|    1|(262144,[0,1,5,7,...|
|    1|(262144,[16,17,31...|
|    1|(262144,[0,1,2,3,...|
|    1|(262144,[48,55,69...|
|    1|(262144,[3,29,34,...|
+-----+--------------------+
only showing top 20 rows



In [26]:
model = NB.fit(X2)
Y = model.transform(X2)
Y.show()

+-----+--------------------+--------------------+--------------------+----------+
|score|                   X|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|    1|(113463,[0,1,7,8,...|[-421.38956913796...|[0.00145588742341...|       1.0|
|    1|(113463,[3,17,18,...|[-265.59445234650...|[0.81795169456594...|       0.0|
|    1|(113463,[0,1,2,4,...|[-1093.5794889329...|[2.69468657757508...|       1.0|
|    1|(113463,[4,23,45,...|[-81.546492754511...|[0.70894833793054...|       0.0|
|    1|(113463,[0,15,22,...|[-183.16090056105...|[0.40769953415888...|       1.0|
|    1|(113463,[0,9,139,...|[-87.038373058332...|[0.00214170430029...|       1.0|
|    1|(113463,[2,12,15,...|[-111.98180698004...|[0.01561747942132...|       1.0|
|    1|(113463,[0,1,8,15...|[-274.99054237735...|[3.76934719208345...|       1.0|
|    1|(113463,[0,1,6,21...|[-164.37811017222...|[4.90553171319196...|       1.0|
|    1|(113463,[

In [33]:
Y.show()
tp = Y.select('score', 'prediction').where((Y.score == 1)&(Y.prediction == 1)).count()/Y.count()
tn = Y.select('score', 'prediction').where((Y.score == 0)&(Y.prediction == 0)).count()/Y.count()
fp = Y.select('score', 'prediction').where((Y.score == 0)&(Y.prediction == 1)).count()/Y.count()
fn = Y.select('score', 'prediction').where((Y.score == 1)&(Y.prediction == 0)).count()/Y.count()


print('tp : ', tp, '- fp : ', fp, '\nfn : ', fn, '- tn : ', tn)
print('Accuracy : ', (tp+tn)/(tp+tn+fn+fp), '\nPrecision : ', tp/(tp+fp), '\nSpecificity : ', tn/(tn+fp), '\nRecall : ', tp/(tp+fn), '\n')


+-----+--------------------+--------------------+--------------------+----------+
|score|                   X|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|    1|(113463,[0,1,7,8,...|[-421.38956913796...|[0.00145588742341...|       1.0|
|    1|(113463,[3,17,18,...|[-265.59445234650...|[0.81795169456594...|       0.0|
|    1|(113463,[0,1,2,4,...|[-1093.5794889329...|[2.69468657757508...|       1.0|
|    1|(113463,[4,23,45,...|[-81.546492754511...|[0.70894833793054...|       0.0|
|    1|(113463,[0,15,22,...|[-183.16090056105...|[0.40769953415888...|       1.0|
|    1|(113463,[0,9,139,...|[-87.038373058332...|[0.00214170430029...|       1.0|
|    1|(113463,[2,12,15,...|[-111.98180698004...|[0.01561747942132...|       1.0|
|    1|(113463,[0,1,8,15...|[-274.99054237735...|[3.76934719208345...|       1.0|
|    1|(113463,[0,1,6,21...|[-164.37811017222...|[4.90553171319196...|       1.0|
|    1|(113463,[

In [12]:
# Split train / test
training, test = X2.randomSplit([.8, .2])

model2 = NB.fit(training)
Y = model2.transform(test)

tp = Y.select('score', 'prediction').where((Y.score == 1)&(Y.prediction == 1)).count()/Y.count()
tn = Y.select('score', 'prediction').where((Y.score == 0)&(Y.prediction == 0)).count()/Y.count()
fp = Y.select('score', 'prediction').where((Y.score == 0)&(Y.prediction == 1)).count()/Y.count()
fn = Y.select('score', 'prediction').where((Y.score == 1)&(Y.prediction == 0)).count()/Y.count()


print('tp : ', tp, '- fp : ', fp, '\nfn : ', fn, '- tn : ', tn)
print('Accuracy : ', (tp+tn)/(tp+tn+fn+fp), '\nPrecision : ', tp/(tp+fp), '\nSpecificity : ', tn/(tn+fp), '\nRecall : ', tp/(tp+fn), '\n')

tp :  0.44268802822913406 - fp :  0.05729774337668251 
fn :  0.06000113827153467 - tn :  0.4400130901226488
Accuracy :  0.8827011183517829 
Precision :  0.8854012521343199 
Specificity :  0.8847848477912565 
Recall :  0.8806396829889612 



In [None]:
real_data = loadData(['./Data/balanced_stemmed_amazon_350k.txt'])


In [10]:
# Quelques essais...
file = sc.textFile('./Data/data_test.txt')

data = file.map(lambda line: line.split("\t")).toDF()
data = data.withColumnRenamed('_1', 'score') 
data = data.withColumn('score', data['score'].cast(IntegerType()))
data = data.withColumnRenamed('_2', 'comment')

print(data.show())

# Tokenize the text
tokenizer = Tokenizer(inputCol = "comment", outputCol = "words")
data = tokenizer.transform(data)

print(data.show())

# Countvectorizer
cv = CountVectorizer(inputCol = 'words', outputCol = 'X')
model = cv.fit(data)
data = model.transform(data)

print(data.show())

# TIDIDF
idf = IDF(inputCol="X", outputCol="features")
idfModel = idf.fit(data)
data = idfModel.transform(data)

print(data.show())

+-----+--------------------+
|score|             comment|
+-----+--------------------+
|    5|It totally change...|
|    2|Interesting Grish...|
+-----+--------------------+

None
+-----+--------------------+--------------------+
|score|             comment|               words|
+-----+--------------------+--------------------+
|    5|It totally change...|[it, totally, cha...|
|    2|Interesting Grish...|[interesting, gri...|
+-----+--------------------+--------------------+

None
+-----+--------------------+--------------------+--------------------+
|score|             comment|               words|                   X|
+-----+--------------------+--------------------+--------------------+
|    5|It totally change...|[it, totally, cha...|(102,[0,1,2,3,4,5...|
|    2|Interesting Grish...|[interesting, gri...|(102,[0,2,3,5,7,8...|
+-----+--------------------+--------------------+--------------------+

None
+-----+--------------------+--------------------+--------------------+------------