# SÉANCE 1

### ÉTAPE 2 - Persona

#### Métier / Type d’utilisateur intéressé par ces données
Un thérapeute de couple ou un psychologue conjugal serait un profil très pertinent. Ces professionnels cherchent à comprendre les facteurs influençant la satisfaction conjugale et la fidélité dans le couple.

Les variables disponibles (âge, durée du mariage, présence d’enfants, religiosité, satisfaction dans le couple, etc.) peuvent les aider à identifier les profils à risque ou à mieux orienter leur accompagnement.

#### Persona : Dr. Clara Morel, psychologue conjugale
__Âge :__ 42 ans

__Profession :__ Psychologue spécialisée en thérapie de couple

__Lieu d’exercice :__ Cabinet privé à Lyon

__Contexte :__ Elle accompagne principalement des couples mariés entre 25 et 55 ans. Elle s’intéresse à l’impact de la durée du mariage, de la religiosité, du niveau d’éducation ou encore du niveau de satisfaction conjugale sur la fidélité.

### ÉTAPE 3 - Chargement, pré-traitement

In [8]:
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions.monotonically_increasing_id
import org.apache.spark.sql.hive.HiveContext

lastException = null


null

In [9]:
case class Affair(
    id: Int,
    affairs: Int,
    gender: String,
    age: Double,
    yearsMarried: Double,
    children: String,
    religiousness: Int,
    education: Int,
    occupation: Int,
    rating: Int
)

defined class Affair


In [10]:
val data = sc.textFile("data/Affairs.csv")
    .zipWithIndex().filter { case (_, idx) => idx > 0 }.map(_._1)

val affairsRDD: RDD[Affair] = data
  .map(_.split(","))
  .map(fields => Affair(
    fields(0).toInt,
    fields(1).toInt,
    fields(2).toString,
    fields(3).toDouble,
    fields(4).toDouble,
    fields(5).toString,
    fields(6).toInt,
    fields(7).toInt,
    fields(8).toInt,
    fields(9).toInt
  ))

affairsRDD.take(10).foreach(println)

Affair(4,0,male,37.0,10.0,no,3,18,7,4)
Affair(5,0,female,27.0,4.0,no,4,14,6,4)
Affair(11,0,female,32.0,15.0,yes,1,12,1,4)
Affair(16,0,male,57.0,15.0,yes,5,18,6,5)
Affair(23,0,male,22.0,0.75,no,2,17,6,3)
Affair(29,0,female,32.0,1.5,no,2,17,5,5)
Affair(44,0,female,22.0,0.75,no,2,12,1,3)
Affair(45,0,male,57.0,15.0,yes,2,14,4,4)
Affair(47,0,female,32.0,15.0,yes,4,16,1,2)
Affair(49,0,male,22.0,1.5,no,4,14,4,5)


data = MapPartitionsRDD[40] at map at <console>:36
affairsRDD = MapPartitionsRDD[42] at map at <console>:40


MapPartitionsRDD[42] at map at <console>:40

# SÉANCE 2

### ÉTAPE 1 - RDD vers un dataframe

In [11]:
val affairsDF = affairsRDD.toDF()

affairsDF.show(10)
affairsDF.printSchema()
println(s"Total rows: ${affairsDF.count()}")
affairsDF.summary().show()

+---+-------+------+----+------------+--------+-------------+---------+----------+------+
| id|affairs|gender| age|yearsMarried|children|religiousness|education|occupation|rating|
+---+-------+------+----+------------+--------+-------------+---------+----------+------+
|  4|      0|  male|37.0|        10.0|      no|            3|       18|         7|     4|
|  5|      0|female|27.0|         4.0|      no|            4|       14|         6|     4|
| 11|      0|female|32.0|        15.0|     yes|            1|       12|         1|     4|
| 16|      0|  male|57.0|        15.0|     yes|            5|       18|         6|     5|
| 23|      0|  male|22.0|        0.75|      no|            2|       17|         6|     3|
| 29|      0|female|32.0|         1.5|      no|            2|       17|         5|     5|
| 44|      0|female|22.0|        0.75|      no|            2|       12|         1|     3|
| 45|      0|  male|57.0|        15.0|     yes|            2|       14|         4|     4|
| 47|     

affairsDF = [id: int, affairs: int ... 8 more fields]


[id: int, affairs: int ... 8 more fields]

### ÉTAPE 2 - Extraction de dimensions

In [12]:
// dimensions littérales = children & gender

val dfChildren = affairsDF
    .select("children")
    .distinct
    .withColumn("id_children", monotonically_increasing_id)

dfChildren.show(10);

val dfGender = affairsDF
    .select("gender")
    .distinct
    .withColumn("id_gender", monotonically_increasing_id)

dfGender.show(10)

+--------+-----------+
|children|id_children|
+--------+-----------+
|      no|          0|
|     yes|          1|
+--------+-----------+

+------+---------+
|gender|id_gender|
+------+---------+
|female|        0|
|  male|        1|
+------+---------+



dfChildren = [children: string, id_children: bigint]
dfGender = [gender: string, id_gender: bigint]


[gender: string, id_gender: bigint]

In [13]:
val affairsNormalized = affairsDF
.join(dfGender, Seq("gender"))
.join(dfChildren, Seq("children"))
.drop("gender", "children")

affairsNormalized.printSchema()

affairsNormalized = [id: int, affairs: int ... 8 more fields]


root
 |-- id: integer (nullable = false)
 |-- affairs: integer (nullable = false)
 |-- age: double (nullable = false)
 |-- yearsMarried: double (nullable = false)
 |-- religiousness: integer (nullable = false)
 |-- education: integer (nullable = false)
 |-- occupation: integer (nullable = false)
 |-- rating: integer (nullable = false)
 |-- id_gender: long (nullable = false)
 |-- id_children: long (nullable = false)



[id: int, affairs: int ... 8 more fields]

### ÉTAPE 3 - Tables Hive, SQL

In [14]:
dfGender.write.mode("overwrite").saveAsTable("gender")
dfChildren.write.mode("overwrite").saveAsTable("children")
affairsNormalized.write.mode("overwrite").saveAsTable("affairs")

In [15]:
val hc = new HiveContext(sc)

hc.sql("SHOW TABLES").show()
hc.sql("SELECT * FROM affairs JOIN gender ON gender.id_gender = affairs.id_gender WHERE gender.gender = 'male' LIMIT 5").show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|  default|  affairs|      false|
|  default| children|      false|
|  default|   gender|      false|
+---------+---------+-----------+

+---+-------+----+------------+-------------+---------+----------+------+---------+-----------+------+---------+
| id|affairs| age|yearsMarried|religiousness|education|occupation|rating|id_gender|id_children|gender|id_gender|
+---+-------+----+------------+-------------+---------+----------+------+---------+-----------+------+---------+
|  4|      0|37.0|        10.0|            3|       18|         7|     4|        1|          0|  male|        1|
| 16|      0|57.0|        15.0|            5|       18|         6|     5|        1|          1|  male|        1|
| 23|      0|22.0|        0.75|            2|       17|         6|     3|        1|          0|  male|        1|
| 45|      0|57.0|        15.0|            2|       14|         4|     4|        1|

hc = org.apache.spark.sql.hive.HiveContext@43ed6e89




org.apache.spark.sql.hive.HiveContext@43ed6e89

# SÉANCE 3

### ÉTAPE 1 - Agrégations

#### Moyenne d'infidélité dans l’ensemble

In [16]:
hc.sql("SELECT ROUND(AVG(affairs),2) AS avg_affairs FROM affairs;").show()

+-----------+
|avg_affairs|
+-----------+
|       1.46|
+-----------+



#### Âge moyen des individus

In [17]:
val dfAverageAgeByGender = hc.sql("""
    SELECT 
    g.gender AS genre,
    ROUND(AVG(a.age)) AS age_moyen
    FROM affairs a
    JOIN gender g ON a.id_gender = g.id_gender
    GROUP BY g.gender;
""")

dfAverageAgeByGender.show()

dfAverageAgeByGender = [genre: string, age_moyen: double]


+------+---------+
| genre|age_moyen|
+------+---------+
|female|     31.0|
|  male|     34.0|
+------+---------+



[genre: string, age_moyen: double]

#### Nombre moyen d’infidélités selon le genre

In [18]:
val dfAverageAffairByGender = hc.sql("""
    SELECT g.gender, ROUND(AVG(a.affairs),2) AS avg_affairs
    FROM affairs a
    JOIN gender g ON a.id_gender = g.id_gender
    GROUP BY g.gender;
""")

dfAverageAffairByGender.show();

+------+-----------+
|gender|avg_affairs|
+------+-----------+
|female|       1.42|
|  male|        1.5|
+------+-----------+



dfAverageAffairByGender = [gender: string, avg_affairs: double]


[gender: string, avg_affairs: double]

#### Taux d’infidélité moyen selon présence d’enfants

In [19]:
hc.sql("""
    SELECT c.children, ROUND(AVG(a.affairs),2) AS avg_affairs
    FROM affairs a
    JOIN children c ON a.id_children = c.id_children
    GROUP BY c.children;
""").show()

+--------+-----------+
|children|avg_affairs|
+--------+-----------+
|      no|       0.91|
|     yes|       1.67|
+--------+-----------+



#### Taux d'infidélité moyen selon niveau d’éducation
9 = grade school, 12 = high school graduate, 14 = some college, 16 = college graduate, 17 = some graduate work, 18 = master's degree, 20 = Ph.D., M.D., or other advanced degree

In [20]:
hc.sql("""
    SELECT education, ROUND(AVG(affairs),2) AS avg_affairs
    FROM affairs
    GROUP BY education
    ORDER BY education;
""").show()

+---------+-----------+
|education|avg_affairs|
+---------+-----------+
|        9|       3.43|
|       12|        2.2|
|       14|       1.17|
|       16|        0.7|
|       17|       2.04|
|       18|        1.7|
|       20|       1.53|
+---------+-----------+



#### Satisfaction conjugale moyenne selon le nombre d’infidélités
1 = very unhappy, 2 = somewhat unhappy, 3 = average, 4 = happier than average, 5 = very happy.

In [21]:
val dfAverageAffairByRating = hc.sql("""
    SELECT affairs, ROUND(AVG(rating),1) AS avg_rating
    FROM affairs
    GROUP BY affairs
    ORDER BY affairs;
""")

dfAverageAffairByRating.show()

dfAverageAffairByRating = [affairs: int, avg_rating: double]


+-------+----------+
|affairs|avg_rating|
+-------+----------+
|      0|       4.1|
|      1|       4.0|
|      2|       3.6|
|      3|       3.2|
|      7|       3.5|
|     12|       3.0|
+-------+----------+



[affairs: int, avg_rating: double]

#### Regroupement par années de mariage
0.125 = 3 months or less, 0.417 = 4–6 months, 0.75 = 6 months–1 year, 1.5 = 1–2 years, 4 = 3–5 years, 7 = 6–8 years, 10 = 9–11 years, 15 = 12 or more years.

In [31]:
val dfCategoryYearsMarried = hc.sql("""
    SELECT 
    CASE 
        WHEN yearsMarried = 0.125 THEN '3 mois ou moins' 
        WHEN yearsMarried = 0.417 THEN '4 à 6 mois' 
        WHEN yearsMarried = 0.75 THEN '6 mois à 1 an' 
        WHEN yearsMarried = 1.5 THEN '1 à 2 ans' 
        WHEN yearsMarried = 4.0 THEN '3 à 5 ans' 
        WHEN yearsMarried = 7.0 THEN '6 à 8 ans' 
        WHEN yearsMarried = 10.0 THEN '9 à 11 ans' 
        WHEN yearsMarried = 15.0 THEN '12 ans ou plus'
        ELSE '60+' 
    END AS yearsMarried, 
    COUNT(*) AS nb_personnes, 
    ROUND(AVG(affairs)) AS avg_affairs
    FROM affairs
    GROUP BY yearsMarried
    ORDER BY yearsMarried;
""")

dfCategoryYearsMarried.show()

dfCategoryYearsMarried = [yearsMarried: string, nb_personnes: bigint ... 1 more field]


+---------------+------------+-----------+
|   yearsMarried|nb_personnes|avg_affairs|
+---------------+------------+-----------+
|      1 à 2 ans|          88|        0.0|
| 12 ans ou plus|         204|        2.0|
|3 mois ou moins|          11|        0.0|
|      3 à 5 ans|         105|        1.0|
|     4 à 6 mois|          10|        0.0|
|  6 mois à 1 an|          31|        1.0|
|      6 à 8 ans|          82|        2.0|
|     9 à 11 ans|          70|        2.0|
+---------------+------------+-----------+



[yearsMarried: string, nb_personnes: bigint ... 1 more field]

#### Catégorisation de l'infidélité chez les hommes et chez les femmes

In [43]:
val dfCategoryAffairByGender = hc.sql("""
    SELECT g.gender AS genre, 
    CASE 
        WHEN a.age < 30 THEN '0-29' 
        WHEN a.age < 40 THEN '30-39' 
        WHEN a.age < 50 THEN '40-49' 
        WHEN a.age < 60 THEN '50-59' 
        ELSE '60+' 
    END AS tranche_age, 
    CASE 
        WHEN a.affairs > 0 THEN 'infidele' 
        ELSE 'fidele' 
    END AS statut_infidelite, 
    CAST(ROUND(AVG(a.affairs)) AS INT) AS indice_infidelite 
    FROM affairs a 
    JOIN gender g ON a.id_gender = g.id_gender 
    GROUP BY g.gender, tranche_age, statut_infidelite 
    ORDER BY g.gender, tranche_age, statut_infidelite;
""")

dfCategoryAffairByGender.show()

dfCategoryAffairByGender = [genre: string, tranche_age: string ... 2 more fields]


+------+-----------+-----------------+-----------------+
| genre|tranche_age|statut_infidelite|indice_infidelite|
+------+-----------+-----------------+-----------------+
|female|       0-29|           fidele|                0|
|female|       0-29|         infidele|                5|
|female|      30-39|           fidele|                0|
|female|      30-39|         infidele|                7|
|female|      40-49|           fidele|                0|
|female|      40-49|         infidele|                7|
|female|      50-59|           fidele|                0|
|female|      50-59|         infidele|                6|
|  male|       0-29|           fidele|                0|
|  male|       0-29|         infidele|                4|
|  male|      30-39|           fidele|                0|
|  male|      30-39|         infidele|                6|
|  male|      40-49|           fidele|                0|
|  male|      40-49|         infidele|                7|
|  male|      50-59|           

[genre: string, tranche_age: string ... 2 more fields]

### Total des infidélités en fonction du niveau de religiosité et du sexe
religiousness: 1 = anti, 2 = not at all, 3 = slightly, 4 = somewhat, 5 = very.

In [40]:
val dfTotalAffairByReligiousness = hc.sql("""
    SELECT CAST(ROUND(AVG(a.affairs)) AS INT) AS total_affairs, g.gender, 
    CASE 
        WHEN a.religiousness = 1 THEN 'Anti-religieux' 
        WHEN a.religiousness = 2 THEN 'Athée' 
        WHEN a.religiousness = 3 THEN 'Presque' 
        WHEN a.religiousness = 4 THEN 'Un peu' 
        ELSE 'Très' 
    END AS religiousness
    FROM affairs a
    JOIN gender g ON g.id_gender = a.id_gender
    GROUP BY a.religiousness, g.gender
    ORDER BY a.religiousness
""")

dfTotalAffairByReligiousness.show()

+-------------+------+--------------+
|total_affairs|gender| religiousness|
+-------------+------+--------------+
|            2|  male|Anti-religieux|
|            4|female|Anti-religieux|
|            1|female|         Athée|
|            2|  male|         Athée|
|            2|  male|       Presque|
|            2|female|       Presque|
|            1|  male|        Un peu|
|            1|female|        Un peu|
|            1|  male|          Très|
|            1|female|          Très|
+-------------+------+--------------+



lastException = null
dfTotalAffairByReligiousness = [total_affairs: int, gender: string ... 1 more field]


[total_affairs: int, gender: string ... 1 more field]

### ÉTAPE 2 - Graphiques

In [25]:
dfAverageAffairByGender.coalesce(1)
  .write
  .mode("overwrite")
  .format("com.databricks.spark.csv")
  .option("header", "true")
  .save("output/avg_affairs_by_gender")

In [45]:
dfCategoryAffairByGender.coalesce(1)
  .write
  .mode("overwrite")
  .format("com.databricks.spark.csv")
  .option("header", "true")
  .save("output/category_affairs_by_gender")

In [27]:
dfAverageAffairByRating.coalesce(1)
  .write
  .mode("overwrite")
  .format("com.databricks.spark.csv")
  .option("header", "true")
  .save("output/avg_affairs_by_rating")

In [32]:
dfCategoryYearsMarried.coalesce(1)
  .write
  .mode("overwrite")
  .format("com.databricks.spark.csv")
  .option("header", "true")
  .save("output/category_years_married")

In [44]:
dfAverageAgeByGender.coalesce(1)
  .write
  .mode("overwrite")
  .format("com.databricks.spark.csv")
  .option("header", "true")
  .save("output/avg_age_by_gender")

In [41]:
dfTotalAffairByReligiousness.coalesce(1)
  .write
  .mode("overwrite")
  .format("com.databricks.spark.csv")
  .option("header", "true")
  .save("output/total_affairs_by_religiousness")