In [1]:
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.ml.feature.{RegexTokenizer, Tokenizer,HashingTF, IDF,
                                    StopWordsRemover,CountVectorizer, CountVectorizerModel,
                                    StringIndexer,OneHotEncoderEstimator,VectorAssembler}
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.tuning.{ParamGridBuilder,CrossValidator}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.sql.functions._
import org.apache.spark.ml.{Pipeline, PipelineStage}
import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel}

In [2]:
val df: DataFrame = spark
  .read
  .option("inferSchema", "true") // pour inférer le type de chaque colonne (Int, String, etc.)
  .parquet("/Users/jeremieperes/MS_Big_data_Telecom/P1/INF729_Hadoop_Spark/Spark/spark_project_kickstarter_2019_2020/data/prepared_trainingset")


df = [project_id: string, name: string ... 12 more fields]


[project_id: string, name: string ... 12 more fields]

In [67]:
df.show(5)

+--------------+--------------------+--------------------+------+--------------------+------------+--------+---------+-------------------+-------------------+-------------------+-------------+-----------+--------------------+
|    project_id|                name|                desc|  goal|            keywords|final_status|country2|currency2|          deadline2|        created_at2|       launched_at2|days_campaign|hours_prepa|                text|
+--------------+--------------------+--------------------+------+--------------------+------------+--------+---------+-------------------+-------------------+-------------------+-------------+-----------+--------------------+
| kkst471421639|american options ...|looking to create...|100000|american-options-...|           0|      US|      USD|2014-11-15 17:31:27|2014-10-10 21:23:58|2014-10-16 17:31:27|           30|    140.125|american options ...|
|kkst1098019088|iheadbones bone c...|wireless bluetoot...| 20000|iheadbones-bone-c...|          

In [68]:
df.printSchema

root
 |-- project_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- desc: string (nullable = true)
 |-- goal: integer (nullable = true)
 |-- keywords: string (nullable = true)
 |-- final_status: integer (nullable = true)
 |-- country2: string (nullable = true)
 |-- currency2: string (nullable = true)
 |-- deadline2: string (nullable = true)
 |-- created_at2: string (nullable = true)
 |-- launched_at2: string (nullable = true)
 |-- days_campaign: integer (nullable = true)
 |-- hours_prepa: double (nullable = true)
 |-- text: string (nullable = true)



In [3]:
// Stage 1 : Get words 
val tokenizer = new RegexTokenizer()
      .setPattern("\\W+")
      .setGaps(true)
      .setInputCol("text")
      .setOutputCol("tokens")

tokenizer = regexTok_08656503340b


regexTok_08656503340b

In [4]:
// Stage 2 : Remove stop words 
val remover = new StopWordsRemover()
  .setInputCol(tokenizer.getOutputCol)
  .setOutputCol("words")


remover = stopWords_aee284b15dac


stopWords_aee284b15dac

In [5]:
// Stage 3 : TF
val hashingTF = new HashingTF()
  .setInputCol(remover.getOutputCol)
  .setOutputCol("rawFeatures")
  .setNumFeatures(10000)

hashingTF = hashingTF_42aae767d765


hashingTF_42aae767d765

In [6]:
// Stage 4 : IDF
val idf = new IDF()
    .setInputCol(hashingTF.getOutputCol)
    .setOutputCol("tfidf")

idf = idf_70e50de3eecd


idf_70e50de3eecd

In [22]:
// Stage 5 : convert country2
val countryIndexer = new StringIndexer()
  .setInputCol("country2")
  .setOutputCol("country_indexed")
  .setHandleInvalid("keep")

countryIndexer = strIdx_153b4e8035c8


lastException: Throwable = null


strIdx_153b4e8035c8

In [27]:
// Stage 6 : convert currency2
val currencyIndexer = new StringIndexer()
  .setInputCol("currency2")
  .setOutputCol("currency_indexed")

currencyIndexer = strIdx_baffb1649ff1


strIdx_baffb1649ff1

In [28]:
// Stage 7-8 : One-Hot encoding for country and currency
val encoder = new OneHotEncoderEstimator()
    .setInputCols(Array(countryIndexer.getOutputCol, currencyIndexer.getOutputCol))
    .setOutputCols(Array("country_onehot", "currency_onehot"))

encoder = oneHotEncoder_abb9a97f6d72


oneHotEncoder_abb9a97f6d72

In [29]:
// Stage 9 : Vector features assembler
val assembler = new VectorAssembler()
    .setInputCols(Array("tfidf", "days_campaign", "hours_prepa", "goal", "country_onehot", "currency_onehot"))
    .setOutputCol("features")

assembler = vecAssembler_69319796d049


vecAssembler_69319796d049

In [30]:
// Stage 10 : Classification model - Logistic Regression
val lr = new LogisticRegression()
  .setElasticNetParam(0.0)
  .setFitIntercept(true)
  .setFeaturesCol("features")
  .setLabelCol("final_status")
  .setStandardization(true)
  .setPredictionCol("predictions")
  .setRawPredictionCol("raw_predictions")
  .setThresholds(Array(0.7, 0.3))
  .setTol(1.0e-6)
  .setMaxIter(20)

lr = logreg_193178e95256


logreg_193178e95256

In [31]:
// Pipeline creation
val pipeline = new Pipeline()
    .setStages(Array(tokenizer,remover,hashingTF,idf,countryIndexer,currencyIndexer,encoder,assembler,lr))

pipeline = pipeline_800a06ff0348


pipeline_800a06ff0348

In [32]:
// Data splitting
val Array(trainingData, testData) = df.randomSplit(Array(0.9, 0.1),42)

trainingData = [project_id: string, name: string ... 12 more fields]
testData = [project_id: string, name: string ... 12 more fields]


[project_id: string, name: string ... 12 more fields]

In [33]:
// Training model
val model = pipeline.fit(trainingData)

model = pipeline_800a06ff0348


pipeline_800a06ff0348

In [131]:
// Make predictions.
val dfWithSimplePredictions = model.transform(testData)

dfWithSimplePredictions = [project_id: string, name: string ... 24 more fields]


[project_id: string, name: string ... 24 more fields]

In [132]:
dfWithSimplePredictions.groupBy("final_status", "predictions").count.show()

+------------+-----------+-----+
|final_status|predictions|count|
+------------+-----------+-----+
|           1|        0.0| 1298|
|           0|        0.0| 4498|
|           1|        1.0| 2146|
|           0|        1.0| 2807|
+------------+-----------+-----+



In [17]:
val evaluator = new MulticlassClassificationEvaluator()
      .setLabelCol("final_status")
      .setPredictionCol("predictions")
      .setMetricName("f1")

evaluator = mcEval_8ce4054abf19


mcEval_8ce4054abf19

In [18]:
val f1 = evaluator.evaluate(dfWithSimplePredictions)

Name: Compile Error
Message: <console>:38: error: not found: value dfWithSimplePredictions
       val f1 = evaluator.evaluate(dfWithSimplePredictions)
                                   ^

StackTrace: 

In [45]:
val paramGrid = new ParamGridBuilder()
    .addGrid(lr.regParam, Array(10e-8,10e-6,10e-4,10e-2))
    .addGrid(lr.maxIter, Array(20,35,50))
    .build()

paramGrid = 


Array({
	logreg_193178e95256-maxIter: 20,
	logreg_193178e95256-regParam: 1.0E-7
}, {
	logreg_193178e95256-maxIter: 20,
	logreg_193178e95256-regParam: 1.0E-5
}, {
	logreg_193178e95256-maxIter: 20,
	logreg_193178e95256-regParam: 0.001
}, {
	logreg_193178e95256-maxIter: 20,
	logreg_193178e95256-regParam: 0.1
}, {
	logreg_193178e95256-maxIter: 35,
	logreg_193178e95256-regParam: 1.0E-7
}, {
	logreg_193178e95256-maxIter: 35,
	logreg_193178e95256-regParam: 1.0E-5
}, {
	logreg_193178e95256-maxIter: 35,
	logreg_193178e95256-regParam: 0.001
}, {
	logreg_193178e95256-maxIter: 35,
	logreg_193178e95256-regParam: 0.1
}, {
	logreg_193178e95256-maxIter: 50,
	logreg_193178e95256-regParam: 1.0E-7
}, {
	logreg_193178e95256-maxIter: 50,
	logreg_193178e...


In [46]:
val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEvaluator(evaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(2) 

cv = cv_febb5ecf9001


cv_febb5ecf9001

In [47]:
val cvmodel = cv.fit(trainingData)

cvmodel = cv_febb5ecf9001


cv_febb5ecf9001

In [48]:
val dfWithSimplePredictions = cvmodel.transform(testData)

dfWithSimplePredictions = [project_id: string, name: string ... 24 more fields]


[project_id: string, name: string ... 24 more fields]

In [49]:
val cvEvaluator = new MulticlassClassificationEvaluator()
      .setLabelCol("final_status")
      .setPredictionCol("predictions")
      .setMetricName("f1")

cvEvaluator = mcEval_f4720cc7e711


mcEval_f4720cc7e711

In [50]:
val cvF1 = cvEvaluator.evaluate(dfWithSimplePredictions)

cvF1 = 0.6340065068060643


0.6340065068060643

In [39]:
model.save("/Users/jeremieperes/MS_Big_data_Telecom/P1/INF729_Hadoop_Spark/Spark/spark_project_kickstarter_2019_2020/model")