In [56]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master('local[*]') \
    .config("spark.driver.memory", "5g") \
    .appName('app') \
    .getOrCreate()

### On installe nltk pour les stop words

In [2]:
!pip install nltk



### On importe les bibliotheques que l'on va utiliser

In [57]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import *

from pyspark.ml.classification import *
from pyspark.ml import Pipeline
from pyspark.ml.tuning import *
from pyspark.ml.feature import  StringIndexer


In [58]:
from pyspark.ml.evaluation import *

In [59]:
import nltk.corpus 
from nltk.corpus import stopwords

In [61]:
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /home/jovyan/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [62]:
import string

### On convertit le csv en spark df

In [63]:
path = "DBPEDIA_train.csv"
df_train = spark.read.format('csv').options(header=True,delimiter=',').load(path)


In [64]:
df_train.schema

StructType(List(StructField(text,StringType,true),StructField(l1,StringType,true),StructField(l2,StringType,true),StructField(l3,StringType,true)))

### On observe nos données

In [65]:
df_train.count()

                                                                                

240942

## On remarque que nos données contiennent des lignes qui n'ont pas de labels donc il faut les degager

In [66]:
df_train=df_train.dropna()
df_train=df_train.filter((df_train.l1 == "Agent") | (df_train.l1=='Event') | (df_train.l1=='Species') | (df_train.l1=='Place')| (df_train.l1=='UnitOfWork')| (df_train.l1=='SportsSeason')| (df_train.l1=='TopicalConcept'))

In [67]:
df_train.select('l1').distinct().show()



+--------------+
|            l1|
+--------------+
|       Species|
|  SportsSeason|
|         Place|
|    UnitOfWork|
|         Event|
|TopicalConcept|
|         Agent|
+--------------+



                                                                                

In [69]:
df_train.select('l1').count()

                                                                                

192235

 On a finalement 7 labels et on a perdu environ 60k lignes sur 260k

In [70]:
add_stopwords= stopwords.words('english')

##### On definit la liste des stop words que l'on veut enlever

In [71]:
l = ["http","https","amp","rt","t","c","the"]
for i in l:
    add_stopwords.append(i)

In [72]:
s=list(string.punctuation)

In [73]:
for i in s:
    add_stopwords.append(i)

In [74]:
print('let s go')

let s go


## Pre-processing

Pour cette etape on a voulu tester plusieurs types de pre-pro

In [75]:
def apply_pipeline(level,pipeline,df):
    label_stringIdx = StringIndexer(inputCol = "l"+str(level), outputCol = "label")
    pipelineFit=pipeline.fit(df)
    temp_df=pipelineFit.transform(df)
    d=label_stringIdx.fit(temp_df)
    new_df=d.transform(temp_df)
    return new_df

###### Count vectorizer

In [76]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression
# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="text", outputCol="words", pattern="\\W")
# stop words
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)
# bag of words count
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors])

In [77]:
type(pipeline)

pyspark.ml.pipeline.Pipeline

In [78]:
dataset_l1_p1=apply_pipeline(1,pipeline,df_train)

                                                                                

In [80]:
dataset_l2_p1=apply_pipeline(2,pipeline,df_train)

                                                                                

In [55]:
dataset_l3_p1=apply_pipeline(3,pipeline,df_train)

                                                                                

In [81]:
print('good')

good


Nos données sont pretes à etre utilisés dans des modeles de Ml

###### TF/IDF

In [82]:
ngram=NGram(inputCol=stopwordsRemover.getOutputCol(),outputCol="Ngrams")

In [83]:
hashingTF=HashingTF(inputCol=ngram.getOutputCol(),outputCol="features")

In [84]:
idf=IDF(minDocFreq=3,inputCol=hashingTF.getOutputCol(),outputCol="final")

In [85]:
pipeline_prepro=Pipeline(stages=[regexTokenizer,stopwordsRemover,ngram,hashingTF,idf])

In [86]:
dataset_l1_p2=apply_pipeline(1,pipeline_prepro,df_train)

                                                                                

In [87]:
dataset_l2_p2=apply_pipeline(2,pipeline_prepro,df_train)

                                                                                

In [62]:
dataset_l3_p2=apply_pipeline(3,pipeline_prepro,df_train)

                                                                                

Word2Vec prend trop de temps du coup on ne va pas le faire 

## Classification

##### definition de modele

In [88]:
lr = LogisticRegression(maxIter=10, regParam=0.3,featuresCol='features',labelCol='label')

###### level 1

In [89]:
def fit_model(model,dataset):
    (trainingData, testData) = dataset.randomSplit([0.9, 0.1], seed = 100)
    lrModel = lr.fit(trainingData)
    return lrModel  

In [90]:
fitl1_1=fit_model(lr,dataset_l1_p1)

                                                                                

In [91]:
type(fitl1_1)

pyspark.ml.classification.LogisticRegressionModel

In [92]:
from pyspark.ml.classification import LogisticRegressionModel

In [96]:
fitl1_1.write().overwrite().save("model_l1_p4")

In [94]:
path=("model_l1_p3")

In [95]:
m=LogisticRegressionModel.load(path)

In [26]:
type(m)

pyspark.ml.classification.LogisticRegressionModel

In [51]:
path=("model_l1_p2")

In [52]:
m2=LogisticRegressionModel.load(path)

                                                                                

In [30]:
type(m2)

pyspark.ml.classification.LogisticRegressionModel

In [24]:
fitl1_2=fit_model(lr,dataset_l1_p2)

                                                                                

In [28]:
fitl1_2.write().overwrite().save("model_l1_p2")

                                                                                

##### level 2

In [69]:
fitl2_1=fit_model(lr,dataset_l2_p1)

                                                                                

In [29]:
fitl2_2=fit_model(lr,dataset_l2_p2)

22/04/01 11:17:25 ERROR Executor: Exception in task 6.0 in stage 80.0 (TID 354)]
java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.ml.linalg.DenseMatrix$.zeros(Matrices.scala:491)
	at org.apache.spark.ml.optim.aggregator.MultinomialLogisticBlockAggregator.add(MultinomialLogisticBlockAggregator.scala:160)
	at org.apache.spark.ml.optim.aggregator.MultinomialLogisticBlockAggregator.add(MultinomialLogisticBlockAggregator.scala:45)
	at org.apache.spark.ml.optim.loss.RDDLossFunction.$anonfun$calculate$1(RDDLossFunction.scala:59)
	at org.apache.spark.ml.optim.loss.RDDLossFunction$$Lambda$4469/0x00000008417c3840.apply(Unknown Source)
	at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
	at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.sca

ConnectionRefusedError: [Errno 111] Connection refused

##### level 3

In [None]:
fitl3_1=fit_model(lr,dataset_l3_p1)

In [None]:
fitl3_2=fit_model(lr,dataset_l3_p2)

In [60]:
df=risque.select(col("label"),col("text_class")).distinct()

In [73]:
df.collect()[0][0]

2.0

In [76]:
df.collect()[0][1]

'Species'

In [62]:
n=df.collect()

                                                                                

In [71]:
type(n[0])
dico={}
for row in range(0,len(n)):
    
    
    
    
    
    

In [100]:
test.show()


[Stage 156:>                                                        (0 + 1) / 1]

+--------------------+--------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+----------+
|                text|            l1|               words|            filtered|            features|label|       rawPrediction|         probability|prediction|
+--------------------+--------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+----------+
|"""Echourouk TV\"...|         Agent|[echourouk, tv, a...|[echourouk, tv, a...|(10000,[1,2,3,4,5...|  0.0|[2.99682356653826...|[0.71343850416859...|       0.0|
|"3757 Anagolay, p...|         Place|[3757, anagolay, ...|[3757, anagolay, ...|(10000,[0,1,2,3,4...|  1.0|[0.45869983148295...|[0.00144256628818...|       1.0|
|"Abies hidalgensi...|       Species|[abies, hidalgens...|[abies, hidalgens...|(10000,[0,1,2,3,4...|  2.0|[-0.2417903089158...|[0.00138855432011...|       2.0|
|"American Communi...|    UnitOfWork|[am

                                                                                

In [44]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction",labelCol="label",metricName='f1')

###### Score avec Count Vectorizer

In [45]:
evaluator.evaluate(test1)

                                                                                

0.9277851392097687

###### Score avec TF/IDF

In [46]:
evaluator.evaluate(test2)

                                                                                

0.916509190912641

###### Score avec Word2Vec

In [47]:
evaluator.evaluate(test3)

                                                                                

0.8505022768303262

### On passe à l'optimisation d'hyper parametres, tentative 1:

In [None]:
path = "DBPEDIA_val.csv/DBPEDIA_val.csv"
df_val = spark.read.format('csv').options(header=True,delimiter=',').load(path)

In [None]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction",labelCol="label",metricName='f1')
grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator,
    parallelism=2)
cvModel = cv.fit()

### Tentative de streaming