# Étape 3 - Chargement, pré-traitement - Dataset Gapminder

In [None]:
import org.apache.spark.sql.functions._

case class GapminderData(
  rownames: Int,
  country: String,
  continent: String,
  year: Int,
  lifeExp: Double,
  pop: Long,
  gdpPercap: Double
)

val rdd = spark.sparkContext.textFile("/workspaces/atelier3-I1DEV2GrpB/spark/gapminder_unfiltered.csv")


val gapminderRDD = rdd.flatMap { line =>
  val cols = line.split(",")
  try {
    Some(GapminderData(
      cols(0).toInt,
      cols(1),
      cols(2),
      cols(3).toInt,
      cols(4).toDouble,
      cols(5).toLong,
      cols(6).toDouble
    ))
  } catch {
    case _: Throwable => None
  }
}
//on as utiliser flatmap pour que les données vide soit automatiquement enlever

defined class GapminderData
rdd = /workspaces/atelier3-I1DEV2GrpB/spark/gapminder_unfiltered.csv MapPartitionsRDD[1] at textFile at <console>:36
gapminderRDD = MapPartitionsRDD[2] at flatMap at <console>:39


MapPartitionsRDD[2] at flatMap at <console>:39

# Séance 2

## Étape 1 - RDD vers un dataframe

In [2]:
val gapminderDF = gapminderRDD.toDF()

gapminderDF.show(10)

+--------+-----------+---------+----+-------+--------+-----------+
|rownames|    country|continent|year|lifeExp|     pop|  gdpPercap|
+--------+-----------+---------+----+-------+--------+-----------+
|       1|Afghanistan|     Asia|1952| 28.801| 8425333|779.4453145|
|       2|Afghanistan|     Asia|1957| 30.332| 9240934|820.8530296|
|       3|Afghanistan|     Asia|1962| 31.997|10267083|  853.10071|
|       4|Afghanistan|     Asia|1967|  34.02|11537966|836.1971382|
|       5|Afghanistan|     Asia|1972| 36.088|13079460|739.9811058|
|       6|Afghanistan|     Asia|1977| 38.438|14880372|  786.11336|
|       7|Afghanistan|     Asia|1982| 39.854|12881816|978.0114388|
|       8|Afghanistan|     Asia|1987| 40.822|13867957|852.3959448|
|       9|Afghanistan|     Asia|1992| 41.674|16317921|649.3413952|
|      10|Afghanistan|     Asia|1997| 41.763|22227415| 635.341351|
+--------+-----------+---------+----+-------+--------+-----------+
only showing top 10 rows



gapminderDF = [rownames: int, country: string ... 5 more fields]


[rownames: int, country: string ... 5 more fields]

In [3]:
gapminderDF.printSchema()

root
 |-- rownames: integer (nullable = false)
 |-- country: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- year: integer (nullable = false)
 |-- lifeExp: double (nullable = false)
 |-- pop: long (nullable = false)
 |-- gdpPercap: double (nullable = false)



In [4]:
gapminderDF.summary().show()

+-------+------------------+-----------+---------+------------------+-----------------+--------------------+------------------+
|summary|          rownames|    country|continent|              year|          lifeExp|                 pop|         gdpPercap|
+-------+------------------+-----------+---------+------------------+-----------------+--------------------+------------------+
|  count|              3225|       3225|     3225|              3225|             3225|                3225|              3225|
|   mean|1658.6496124031007|       NULL|     NULL|1980.2651162790698|65.38284346046518|3.2255168763100777E7|11432.063840538056|
| stddev| 960.2651444585073|       NULL|     NULL|16.934393233528564|11.70994175957781|1.0584644004220994E8|11391.873700888762|
|    min|                 1|Afghanistan|   Africa|              1950|           23.599|               59412|       298.8462121|
|    25%|               831|       NULL|     NULL|              1967|            58.69|             2713

In [5]:
println(s"Nombre de lignes total : ${gapminderDF.count()}")

Nombre de lignes total : 3225


In [None]:
gapminderDF.select("continent").distinct().show()
gapminderDF.select("year").distinct().show()
gapminderDF.select(min("lifeExp"), max("lifeExp")).show()
gapminderDF.select(min("gdpPercap"), max("gdpPercap")).show()
gapminderDF.select(min("year"), max("year")).show()

+---------+
|continent|
+---------+
|   Europe|
|   Africa|
|      FSU|
| Americas|
|  Oceania|
|     Asia|
+---------+

+------------+------------+
|min(lifeExp)|max(lifeExp)|
+------------+------------+
|      23.599|       82.67|
+------------+------------+

+--------------+--------------+
|min(gdpPercap)|max(gdpPercap)|
+--------------+--------------+
|   298.8462121|   113523.1329|
+--------------+--------------+

+---------+---------+
|min(year)|max(year)|
+---------+---------+
|     1950|     2007|
+---------+---------+



## Étape 2 - Extraction de dimensions

In [7]:
import org.apache.spark.sql.functions.monotonically_increasing_id

// Dimension Continent
val dfContinent = gapminderDF
  .select("continent")
  .distinct()
  .withColumn("id_continent", monotonically_increasing_id())

dfContinent.show()

val gapminderDF_withContinentID = gapminderDF
  .join(dfContinent, Seq("continent"))
  .drop("continent")

gapminderDF_withContinentID.show(3)

// Dimension Year
val dfYear = gapminderDF
  .select("year")
  .distinct()
  .withColumn("id_year", monotonically_increasing_id())

dfYear.show(3)

val gapminderDF_withIDs = gapminderDF_withContinentID
  .join(dfYear, Seq("year"))
  .drop("year")

gapminderDF_withIDs.show(3)

// Dimension Country
val dfCountry = gapminderDF
  .select("country")
  .distinct()
  .withColumn("id_country", monotonically_increasing_id())

dfCountry.show(3)

val gapminderDF_enriched = gapminderDF_withIDs
  .join(dfCountry, Seq("country"))
  .drop("country")

gapminderDF_enriched.show(3)

+---------+------------+
|continent|id_continent|
+---------+------------+
|   Europe|           0|
|   Africa|           1|
|      FSU|           2|
| Americas|           3|
|  Oceania|           4|
|     Asia|           5|
+---------+------------+

+--------+-------+----+-------+-------+-----------+------------+
|rownames|country|year|lifeExp|    pop|  gdpPercap|id_continent|
+--------+-------+----+-------+-------+-----------+------------+
|      13|Albania|1952|  55.23|1282697|1601.056136|           0|
|      14|Albania|1957|  59.28|1476505|1942.284244|           0|
|      15|Albania|1962|  64.82|1728137|2312.888958|           0|
+--------+-------+----+-------+-------+-----------+------------+
only showing top 3 rows

+----+-------+
|year|id_year|
+----+-------+
|1959|      0|
|1990|      1|
|1975|      2|
+----+-------+
only showing top 3 rows

+--------+-------+-------+-------+-----------+------------+-------+
|rownames|country|lifeExp|    pop|  gdpPercap|id_continent|id_year|
+--

dfContinent = [continent: string, id_continent: bigint]
gapminderDF_withContinentID = [rownames: int, country: string ... 5 more fields]
dfYear = [year: int, id_year: bigint]
gapminderDF_withIDs = [rownames: int, country: string ... 5 more fields]
dfCountry = [country: string, id_country: bigint]
gapminderDF_enriched = [rownames: int, lifeExp: double ... 5 more fields]


[rownames: int, lifeExp: double ... 5 more fields]

## Étape 3 - Tables Hive, SQL

In [8]:
import org.apache.spark.sql.hive.HiveContext

val hc = new HiveContext(sc)

// Sauvegarder les tables
gapminderDF_enriched.write.mode("overwrite").saveAsTable("gapminder_facts")
dfContinent.write.mode("overwrite").saveAsTable("dim_continent")
dfYear.write.mode("overwrite").saveAsTable("dim_year")
dfCountry.write.mode("overwrite").saveAsTable("dim_country")

// Requêtes SQL d'exemple
hc.sql("SELECT id_country, lifeExp FROM gapminder_facts ORDER BY lifeExp DESC LIMIT 10").show(3)
hc.sql("SELECT COUNT(*) FROM dim_continent").show()
hc.sql("SELECT COUNT(*) FROM dim_year").show()
hc.sql("SELECT COUNT(*) FROM dim_country").show()

// Requête plus complexe avec jointures
hc.sql("""
  SELECT c.continent, ROUND(AVG(f.lifeExp), 1) as avg_lifeExp
  FROM gapminder_facts f
  JOIN dim_continent c ON f.id_continent = c.id_continent
  GROUP BY c.continent
  ORDER BY avg_lifeExp ASC
""").show(10)

org.apache.spark.SparkRuntimeException: [LOCATION_ALREADY_EXISTS] Cannot name the managed table as `spark_catalog`.`default`.`gapminder_facts`, as its associated location 'file:/workspaces/atelier3-I1DEV2GrpB/spark/spark-warehouse/gapminder_facts' already exists. Please pick a different table name, or remove the existing location first.

# Séance 3 : Agrégations et Visualisations

## Agrégations pour Dr. Sarah Chen - Analyste ONU

### 1. moyenne de l'ésperance de vie par continents et par années


In [None]:
val agg1 = hc.sql("""
  SELECT 
  dim_continent.continent,
  dim_year.year,
  ROUND(AVG(f.lifeExp), 2) AS avg_life_expectancy
FROM gapminder_facts f
JOIN dim_continent ON f.id_continent = dim_continent.id_continent
JOIN dim_year ON f.id_year = dim_year.id_year
GROUP BY dim_continent.continent, dim_year.year
ORDER BY dim_year.year, dim_continent.continent;
""")
agg1.show()



+---------+----+-------------------+
|continent|year|avg_life_expectancy|
+---------+----+-------------------+
|   Africa|1950|              41.36|
| Americas|1950|              59.41|
|     Asia|1950|              53.68|
|   Europe|1950|              65.76|
|      FSU|1950|              59.95|
|  Oceania|1950|              69.16|
| Americas|1951|              68.38|
|     Asia|1951|              58.05|
|   Europe|1951|              66.16|
|  Oceania|1951|              68.94|
|   Africa|1952|              39.26|
| Americas|1952|              53.28|
|     Asia|1952|              46.12|
|   Europe|1952|              64.46|
|  Oceania|1952|              69.26|
| Americas|1953|              68.96|
|     Asia|1953|              56.08|
|   Europe|1953|              67.89|
|  Oceania|1953|              69.97|
| Americas|1954|              69.79|
+---------+----+-------------------+
only showing top 20 rows



agg1 = [continent: string, year: int ... 1 more field]


[continent: string, year: int ... 1 more field]

### 2. Espérance de vie moyenne par pays


In [None]:
val agg2 = hc.sql("""
  SELECT 
    dim_country.country,
    ROUND(AVG(f.lifeExp), 2) AS avg_life_expectancy
  FROM gapminder_facts f
  JOIN dim_country ON f.id_country = dim_country.id_country
  GROUP BY dim_country.country
  ORDER BY avg_life_expectancy DESC
""")
agg2.show()


agg2.coalesce(1)
  .write
  .mode("overwrite")
  .format("com.databricks.spark.csv")
  .option("header", "true")
  .save("out/ESPm")


+--------------------+-------------------+
|             country|avg_life_expectancy|
+--------------------+-------------------+
|          Martinique|              78.78|
|          Guadeloupe|              78.41|
|             Iceland|              76.23|
|              Cyprus|              76.13|
|              Sweden|              76.02|
|              Norway|              75.69|
|         Netherlands|              75.46|
|             Germany|              75.36|
|         Switzerland|              75.32|
|       French Guiana|              75.14|
|              Canada|              74.63|
|               Japan|              74.49|
|           Australia|              74.23|
|             Denmark|              74.21|
|               Malta|              74.15|
|              France|              73.98|
|               Spain|              73.88|
|Netherlands Antilles|              73.74|
|              Israel|              73.65|
|               Italy|              73.61|
+----------

agg2 = [country: string, avg_life_expectancy: double]


[country: string, avg_life_expectancy: double]

### 3. Population totale par continent et année


In [None]:
val agg3 = hc.sql("""
  SELECT 
    dim_continent.continent,
    dim_year.year,
    SUM(f.pop) AS total_population
  FROM gapminder_facts f
  JOIN dim_continent ON f.id_continent = dim_continent.id_continent
  JOIN dim_year ON f.id_year = dim_year.id_year
  GROUP BY dim_continent.continent, dim_year.year
  ORDER BY dim_year.year, dim_continent.continent
""")
agg3.show()

agg3.coalesce(1)
  .write
  .mode("overwrite")
  .format("com.databricks.spark.csv")
  .option("header", "true")
  .save("out/ESPm")


+---------+----+----------------+
|continent|year|total_population|
+---------+----+----------------+
|   Africa|1950|         6483063|
| Americas|1950|       203703178|
|     Asia|1950|       119361179|
|   Europe|1950|       332200038|
|      FSU|1950|       138711670|
|  Oceania|1950|        10175647|
| Americas|1951|       169208675|
|     Asia|1951|        93415174|
|   Europe|1951|       208026546|
|  Oceania|1951|        10457989|
|   Africa|1952|       222840648|
| Americas|1952|       345152446|
|     Asia|1952|      1358454563|
|   Europe|1952|       418420109|
|  Oceania|1952|        10686006|
| Americas|1953|       175367375|
|     Asia|1953|       677895125|
|   Europe|1953|       208126198|
|  Oceania|1953|        10905315|
| Americas|1954|       178662245|
+---------+----+----------------+
only showing top 20 rows



agg3 = [continent: string, year: int ... 1 more field]


[continent: string, year: int ... 1 more field]

### 4. PIB par habitant moyen par continent


In [None]:
val agg4 = hc.sql("""
  SELECT 
    dim_continent.continent,
    ROUND(AVG(f.gdpPercap), 2) AS avg_gdp_per_capita
  FROM gapminder_facts f
  JOIN dim_continent ON f.id_continent = dim_continent.id_continent
  GROUP BY dim_continent.continent
  ORDER BY avg_gdp_per_capita DESC
""")
agg4.show()


+---------+------------------+
|continent|avg_gdp_per_capita|
+---------+------------------+
|   Europe|          16551.18|
|  Oceania|          14458.72|
| Americas|          10802.57|
|     Asia|          10146.65|
|      FSU|           7326.69|
|   Africa|           2183.51|
+---------+------------------+



agg4 = [continent: string, avg_gdp_per_capita: double]


[continent: string, avg_gdp_per_capita: double]

### 5. PIB total (gdpPercap × population) par pays et année


In [None]:
val agg6 = hc.sql("""
  SELECT 
    dim_country.country,
    dim_year.year,
    ROUND(SUM(f.gdpPercap * f.pop), 2) AS total_gdp
  FROM gapminder_facts f
  JOIN dim_country ON f.id_country = dim_country.id_country
  JOIN dim_year ON f.id_year = dim_year.id_year
  GROUP BY dim_country.country, dim_year.year
  ORDER BY dim_year.year, total_gdp DESC
""")
agg6.show()

agg6.coalesce(1)
  .write
  .mode("overwrite")
  .format("com.databricks.spark.csv")
  .option("header", "true")
  .save("out/Pib")


+--------------+----+------------------+
|       country|year|         total_gdp|
+--------------+----+------------------+
| United States|1950|1.9676815390953E12|
|United Kingdom|1950|4.8956553362663E11|
|       Germany|1950|4.1640348236898E11|
|        Russia|1950|3.7461297627117E11|
|        France|1950|2.7589885818009E11|
|         Japan|1950|2.2162948338584E11|
|         Italy|1950|2.1013857012626E11|
|        Canada|1950|1.4825857649477E11|
|       Ukraine|1950|1.3244293168844E11|
|        Mexico|1950| 9.382383536794E10|
|         Spain|1950| 9.205535914686E10|
|   Netherlands|1950| 8.912281703673E10|
|     Australia|1950| 8.293066093637E10|
|       Belgium|1950| 6.903258287365E10|
|   Switzerland|1950|  6.50945479621E10|
|Czech Republic|1950| 5.971575687986E10|
|        Sweden|1950|  5.86709508029E10|
|       Hungary|1950| 4.413317417264E10|
|       Denmark|1950| 4.132396422312E10|
|       Austria|1950|  3.97596087304E10|
+--------------+----+------------------+
only showing top

agg6 = [country: string, year: int ... 1 more field]


[country: string, year: int ... 1 more field]

### 6. Écart d'espérance de vie entre pays par continent et année


In [None]:
val agg7 = hc.sql("""
  SELECT 
    dim_continent.continent,
    dim_year.year,
    ROUND(MAX(f.lifeExp) - MIN(f.lifeExp), 2) AS life_expectancy_gap
  FROM gapminder_facts f
  JOIN dim_continent ON f.id_continent = dim_continent.id_continent
  JOIN dim_year ON f.id_year = dim_year.id_year
  GROUP BY dim_continent.continent, dim_year.year
  ORDER BY dim_year.year, dim_continent.continent
""")
agg7.show()

agg7.coalesce(1)
  .write
  .mode("overwrite")
  .format("com.databricks.spark.csv")
  .option("header", "true")
  .save("out/ecart")


+---------+----+-------------------+
|continent|year|life_expectancy_gap|
+---------+----+-------------------+
|   Africa|1950|               2.72|
| Americas|1950|              18.68|
|     Asia|1950|               14.1|
|   Europe|1950|               13.2|
|      FSU|1950|                5.3|
|  Oceania|1950|               0.27|
| Americas|1951|               0.33|
|     Asia|1951|               5.89|
|   Europe|1951|              13.82|
|  Oceania|1951|               0.44|
|   Africa|1952|              22.72|
| Americas|1952|              31.17|
|     Asia|1952|              36.59|
|   Europe|1952|              29.09|
|  Oceania|1952|               0.27|
| Americas|1953|               0.34|
|     Asia|1953|              18.81|
|   Europe|1953|              12.03|
|  Oceania|1953|               0.54|
| Americas|1954|               0.41|
+---------+----+-------------------+
only showing top 20 rows



agg7 = [continent: string, year: int ... 1 more field]


[continent: string, year: int ... 1 more field]