In [1]:
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SQLContext, SparkSession}

Intitializing Scala interpreter ...

Spark Web UI available at http://137.194.76.42:4040
SparkContext available as 'sc' (version = 2.4.4, master = local[*], app id = local-1573821239904)
SparkSession available as 'spark'


import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SQLContext, SparkSession}


## Configuration de SparkSession

In [2]:
val conf = new SparkConf().setAll(Map(
      "spark.scheduler.mode" -> "FIFO",
      "spark.speculation" -> "false",
      "spark.reducer.maxSizeInFlight" -> "48m",
      "spark.serializer" -> "org.apache.spark.serializer.KryoSerializer",
      "spark.kryoserializer.buffer.max" -> "1g",
      "spark.shuffle.file.buffer" -> "32k",
      "spark.default.parallelism" -> "12",
      "spark.sql.shuffle.partitions" -> "12"
    ))

conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@2c3e6a3f


In [3]:
 val spark = SparkSession
      .builder
      .config(conf)
      .appName("TP Spark : Preprocessor")
      .getOrCreate()

import spark.implicits._  // to use the symbol $

spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@7ea0865e
import spark.implicits._


## Création du DataFrame

In [4]:
 val df: DataFrame = spark
        .read
        .option("header", true)
        .option("inferSchema", "true")
        .option("quote", "\"")
        .option("escape", "\"")
        .csv("../data/train_clean.csv")

println(s"Nombre de lignes: ${df.count}")
println(s"Nombre de colonnes: ${df.columns.length}")

Nombre de lignes: 108129
Nombre de colonnes: 14


df: org.apache.spark.sql.DataFrame = [project_id: string, name: string ... 12 more fields]


## Cleaning

In [5]:
 val dfCasted: DataFrame = df
      .withColumn("goal", $"goal".cast("Int"))
      .withColumn("deadline" , $"deadline".cast("Int"))
      .withColumn("state_changed_at", $"state_changed_at".cast("Int"))
      .withColumn("created_at", $"created_at".cast("Int"))
      .withColumn("launched_at", $"launched_at".cast("Int"))

dfCasted: org.apache.spark.sql.DataFrame = [project_id: string, name: string ... 12 more fields]


In [6]:
dfCasted
      .select("goal", "deadline", "state_changed_at", "created_at", "launched_at", "backers_count", "final_status")
      .describe()
      .show

+-------+-----------------+--------------------+--------------------+--------------------+--------------------+------------------+------------------+
|summary|             goal|            deadline|    state_changed_at|          created_at|         launched_at|     backers_count|      final_status|
+-------+-----------------+--------------------+--------------------+--------------------+--------------------+------------------+------------------+
|  count|           108129|              108129|              108129|              108129|              108129|            108129|            108129|
|   mean|36726.22826438791|1.3802484980048554E9|1.3801529957698119E9|1.3740368577694051E9|1.3772990047093103E9|123.51666065532835|0.3196274819891056|
| stddev|971902.7051687709|4.2702221220911644E7| 4.266401844467795E7|4.2723097677902974E7| 4.294421262600033E7| 1176.745162158387|0.4663343928283478|
|    min|                0|          1241333999|          1241334017|          1240335335|          

In [7]:
dfCasted.printSchema()

root
 |-- project_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- desc: string (nullable = true)
 |-- goal: integer (nullable = true)
 |-- keywords: string (nullable = true)
 |-- disable_communication: boolean (nullable = true)
 |-- country: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- deadline: integer (nullable = true)
 |-- state_changed_at: integer (nullable = true)
 |-- created_at: integer (nullable = true)
 |-- launched_at: integer (nullable = true)
 |-- backers_count: integer (nullable = true)
 |-- final_status: integer (nullable = true)



## Suppression de colonnes

In [8]:
// On enlève la colonne disable_communication, qui contient peu de données et les colonnes backers_count et state_changed_at qui sont des fuites du futur
val df2: DataFrame = df.drop("disable_communication", "backers_count", "state_changed_at")

df2: org.apache.spark.sql.DataFrame = [project_id: string, name: string ... 9 more fields]


In [9]:
df2.show()

+--------------+--------------------+--------------------+-------+--------------------+-------+--------+----------+----------+-----------+------------+
|    project_id|                name|                desc|   goal|            keywords|country|currency|  deadline|created_at|launched_at|final_status|
+--------------+--------------------+--------------------+-------+--------------------+-------+--------+----------+----------+-----------+------------+
|kkst1451568084| drawing for dollars|I like drawing pi...|   20.0| drawing-for-dollars|     US|     USD|1241333999|1240600507| 1240602723|           1|
|kkst1474482071|Sponsor Dereck Bl...|I  Dereck Blackbu...|  300.0|sponsor-dereck-bl...|     US|     USD|1242429000|1240960224| 1240975592|           0|
| kkst183622197|       Mr. Squiggles|So I saw darkpony...|   30.0|        mr-squiggles|     US|     USD|1243027560|1242163613| 1242164398|           0|
| kkst597742710|Help me write my ...|Do your part to h...|  500.0|help-me-write-my-...| 

## Vérification des données

In [10]:
// Ce n'est plus la peine de cleaner les colonnes currency et country, qui sont bien remplies 
// (elles étaient mal remplies au départ car les virgules n'étaient pas échappées à la lecture du fichier csv)
df2.select("goal", "country", "currency", "deadline","created_at", "launched_at","final_status")
      .describe()
      .show

+-------+-----------------+-------+--------+--------------------+--------------------+--------------------+------------------+
|summary|             goal|country|currency|            deadline|          created_at|         launched_at|      final_status|
+-------+-----------------+-------+--------+--------------------+--------------------+--------------------+------------------+
|  count|           108129| 108129|  108129|              108129|              108129|              108129|            108129|
|   mean| 36726.2288325981|   null|    null|1.3802484980048554E9|1.3740368577694051E9|1.3772990047093103E9|0.3196274819891056|
| stddev|971902.7051560311|   null|    null|4.2702221220911644E7|4.2723097677902974E7| 4.294421262600033E7|0.4663343928283478|
|    min|             0.01|     AU|     AUD|          1241333999|          1240335335|          1240602723|                 0|
|    max|            1.0E8|     US|     USD|          1433096938|          1432325200|          1432658473|    

In [11]:
df2.groupBy("final_status").count.show() 
// 0: fail, 1: success 

+------------+-----+
|final_status|count|
+------------+-----+
|           0|73568|
|           1|34561|
+------------+-----+



In [12]:
df2.printSchema()

root
 |-- project_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- desc: string (nullable = true)
 |-- goal: double (nullable = true)
 |-- keywords: string (nullable = true)
 |-- country: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- deadline: integer (nullable = true)
 |-- created_at: integer (nullable = true)
 |-- launched_at: integer (nullable = true)
 |-- final_status: integer (nullable = true)



## Retraitement de colonnes

### Traitement des dates

In [13]:
// create column "days_campaign" with the (truncated) number of days between launch time and deadline
val df3: DataFrame = df2.withColumn("days_campaign", datediff(from_unixtime($"deadline"), from_unixtime($"launched_at")))

df3: org.apache.spark.sql.DataFrame = [project_id: string, name: string ... 10 more fields]


In [14]:
// create column "hours_prepa" with the number of hours between creation time and launch time
val df4: DataFrame = df3.withColumn("hours_prepa", round(($"launched_at" - $"created_at")/3600,3))

df4: org.apache.spark.sql.DataFrame = [project_id: string, name: string ... 11 more fields]


In [15]:
df4.select("project_id", "days_campaign", "hours_prepa", "deadline", "launched_at", "created_at").show()

+--------------+-------------+-----------+----------+-----------+----------+
|    project_id|days_campaign|hours_prepa|  deadline|launched_at|created_at|
+--------------+-------------+-----------+----------+-----------+----------+
|kkst1451568084|            9|      0.616|1241333999| 1240602723|1240600507|
|kkst1474482071|           17|      4.269|1242429000| 1240975592|1240960224|
| kkst183622197|           10|      0.218|1243027560| 1242164398|1242163613|
| kkst597742710|           30|      0.815|1243555740| 1240966730|1240963795|
|kkst1913131122|           30|       0.73|1243769880| 1241180541|1241177914|
|kkst1085176748|           28|    114.908|1243815600| 1241464468|1241050799|
|kkst1468954715|           24|      3.093|1243872000| 1241736308|1241725172|
| kkst194050612|           31|      2.708|1244088000| 1241470291|1241460541|
| kkst708883590|           32|      18.26|1244264400| 1241480901|1241415164|
| kkst890976740|           31|      1.473|1244946540| 1242273460|1242268157|

In [16]:
val df5: DataFrame = df4.drop("launched_at", "created_at", "deadline")

df5: org.apache.spark.sql.DataFrame = [project_id: string, name: string ... 8 more fields]


In [17]:
df5.show()

+--------------+--------------------+--------------------+-------+--------------------+-------+--------+------------+-------------+-----------+
|    project_id|                name|                desc|   goal|            keywords|country|currency|final_status|days_campaign|hours_prepa|
+--------------+--------------------+--------------------+-------+--------------------+-------+--------+------------+-------------+-----------+
|kkst1451568084| drawing for dollars|I like drawing pi...|   20.0| drawing-for-dollars|     US|     USD|           1|            9|      0.616|
|kkst1474482071|Sponsor Dereck Bl...|I  Dereck Blackbu...|  300.0|sponsor-dereck-bl...|     US|     USD|           0|           17|      4.269|
| kkst183622197|       Mr. Squiggles|So I saw darkpony...|   30.0|        mr-squiggles|     US|     USD|           0|           10|      0.218|
| kkst597742710|Help me write my ...|Do your part to h...|  500.0|help-me-write-my-...|     US|     USD|           1|           30|     

### Traitement des colonnes contenant du texte

In [18]:
val df6: DataFrame = df5.withColumn("name", lower($"name"))
                        .withColumn("desc", lower($"desc"))
                        .withColumn("keywords", lower($"keywords"))

df6: org.apache.spark.sql.DataFrame = [project_id: string, name: string ... 8 more fields]


In [19]:
df6.show()

In [20]:
val df7: DataFrame = df6.withColumn("text", concat_ws(" ",$"name", $"desc",$"keywords"))
val df8: DataFrame = df7.drop("name", "desc", "keywords")

+--------------+--------------------+--------------------+-------+--------------------+-------+--------+------------+-------------+-----------+
|    project_id|                name|                desc|   goal|            keywords|country|currency|final_status|days_campaign|hours_prepa|
+--------------+--------------------+--------------------+-------+--------------------+-------+--------+------------+-------------+-----------+
|kkst1451568084| drawing for dollars|i like drawing pi...|   20.0| drawing-for-dollars|     US|     USD|           1|            9|      0.616|
|kkst1474482071|sponsor dereck bl...|i  dereck blackbu...|  300.0|sponsor-dereck-bl...|     US|     USD|           0|           17|      4.269|
| kkst183622197|       mr. squiggles|so i saw darkpony...|   30.0|        mr-squiggles|     US|     USD|           0|           10|      0.218|
| kkst597742710|help me write my ...|do your part to h...|  500.0|help-me-write-my-...|     US|     USD|           1|           30|     

df7: org.apache.spark.sql.DataFrame = [project_id: string, name: string ... 9 more fields]
df8: org.apache.spark.sql.DataFrame = [project_id: string, goal: double ... 6 more fields]


In [21]:
df8.show()

+--------------+-------+-------+--------+------------+-------------+-----------+--------------------+
|    project_id|   goal|country|currency|final_status|days_campaign|hours_prepa|                text|
+--------------+-------+-------+--------+------------+-------------+-----------+--------------------+
|kkst1451568084|   20.0|     US|     USD|           1|            9|      0.616|drawing for dolla...|
|kkst1474482071|  300.0|     US|     USD|           0|           17|      4.269|sponsor dereck bl...|
| kkst183622197|   30.0|     US|     USD|           0|           10|      0.218|mr. squiggles so ...|
| kkst597742710|  500.0|     US|     USD|           1|           30|      0.815|help me write my ...|
|kkst1913131122| 2000.0|     US|     USD|           0|           30|       0.73|support casting m...|
|kkst1085176748|  700.0|     US|     USD|           0|           28|    114.908|daily digest i m ...|
|kkst1468954715|  250.0|     US|     USD|           0|           24|      3.093|ig

## Valeurs nulles

In [22]:
// There is no null values. To verify :
println(df8.filter("project_id is null").count())
println(df8.filter("goal is null").count())
println(df8.filter("country is null").count())
println(df8.filter("currency is null").count())
println(df8.filter("final_status is null").count())
println(df8.filter("days_campaign is null").count())
println(df8.filter("hours_prepa is null").count())
println(df8.filter("text is null").count())

0
0
0
0
0
0
0
0


## Sauvegarde du dataframe

In [23]:
// The dataframe contains 108129 rows
df8.count()

res12: Long = 108129


In [24]:
// Save the dataframe in the folder ../data/dataframe
df8.write.parquet("../data/dataframe")

org.apache.spark.sql.AnalysisException:  path file:/media/xavier/Data/Documents/DEV/Telecom/INF729-spark/spark_project_kickstarter_2019_2020/data/dataframe already exists.;