In [0]:
%file
ls /user/majesteye/DS05_INSURANCE_DATASET/input

In [1]:
%spark
sc.hadoopConfiguration.set("fs.defaultFS", "hdfs://namenode:9000")

<h1 align="center">General Schema</h1>

In [3]:
// Show and Print "clients_polices" Schema
val dfcp = spark.read
    .format("csv")               
    .option("header", "true")   
    .option("inferSchema", "true") 
    .option("delimiter", "\t") 
    .load("/user/majesteye/DS05_INSURANCE_DATASET/input/clients_polices.tsv")

dfcp.printSchema() 
dfcp.show(10)      


In [4]:
// Show and Print "client_features" Schema
val dfcf = spark.read
    .format("csv")               
    .option("header", "true")   
    .option("inferSchema", "true") 
    .option("delimiter", "\t") 
    .load("/user/majesteye/DS05_INSURANCE_DATASET/input/client_features.tsv")

dfcf.printSchema() 
dfcf.show(10)      

In [5]:
// Show and Print "clients_sinistre" Schema

val dfcs = spark.read
    .format("csv")               
    .option("header", "true")    
    .option("inferSchema", "true") 
    .option("delimiter", "\t")   
    .load("/user/majesteye/DS05_INSURANCE_DATASET/input/clients_sinistre.tsv")

dfcs.printSchema() 
dfcs.show(10)      

In [6]:
// Show and Print "object_features" Schema

val dfof = spark.read
    .format("csv")               
    .option("header", "true")    
    .option("inferSchema", "true") 
    .option("delimiter", "\t")   
    .load("/user/majesteye/DS05_INSURANCE_DATASET/input/object_features.tsv")

dfof.printSchema() 
dfof.show(10)      

In [7]:
// For SQL Utilisation
dfcp.createOrReplaceTempView("dfcp_TBL")
dfcf.createOrReplaceTempView("dfcf_TBL")
dfcs.createOrReplaceTempView("dfcs_TBL")
dfof.createOrReplaceTempView("dfof_TBL")


<h1 align="center">General Exploration of Each File</h1>


<h1 align="center">"clients_polices" File</h1>


## --- Distribution of IsToutRisque and fractionnement ---

In [11]:
%sql
-- Distribution of `IsToutRisque`
-- Count occurrences and calculate percentages for each value in the column
SELECT IsToutRisque, COUNT(*) AS count,
       COUNT(*) * 100.0 / SUM(COUNT(*)) OVER () AS percentage
FROM dfcp_TBL
GROUP BY IsToutRisque;

-- Distribution of `fractionnement`
SELECT fractionnement, COUNT(*) AS count,
       COUNT(*) * 100.0 / SUM(COUNT(*)) OVER () AS percentage
FROM dfcp_TBL
GROUP BY fractionnement;


## --- Analysis of the distribution of Prime ---

In [13]:
// Calculate descriptive statistics for the `Prime` column
dfcp.selectExpr(
  "min(Prime) as Min_Prime",   // Minimum value
  "max(Prime) as Max_Prime",   // Maximum value
  "avg(Prime) as Avg_Prime",   // Average (mean)
  "stddev(Prime) as StdDev_Prime" // Standard deviation (élévé que la moyenne, grande hétérogénéité dans les montants)
).show()

In [14]:
// Total number of rows in the dataset
val totalRows = dfcp.count()

// Count rows where `Prime` is negative
val negativePrimeCount = dfcp.filter("Prime < 0").count()

// Calculate the percentage of negative `Prime` values
val negativePercentage = (negativePrimeCount.toDouble / totalRows) * 100

// Display the result
println(f"Number of negative Prime values: $negativePrimeCount")
println(f"Percentage of negative Prime values: $negativePercentage%.2f%%")

dfcp.select("Prime").summary("count", "min", "25%", "50%", "75%", "max").show()

## ---Finding duplicates ---

In [16]:
// Group by all columns and count the occurrences
val duplicateRows = dfcp.groupBy(dfcp.columns.map(col): _*) // Group by all columns
                        .count()
                        .filter("count > 1") // Keep only duplicates

// Show duplicate rows with their counts
duplicateRows.show()


## ---Missing values of dfcp ---

<h1 align="center">"client_features" File</h1>

In [19]:
import org.apache.spark.sql.functions._

// Total number of rows in the dataset
val totalRows = dfcf.count()

// Calculate the number and percentage of missing (null or NaN) values for each column
val missingValues = dfcf.columns.map { colName =>
  val missingCount = dfcf.filter(col(colName).isNull || isnan(col(colName))).count() // Count missing values
  val missingPercentage = (missingCount.toDouble / totalRows) * 100 // Calculate percentage
  (colName, missingCount, f"${missingPercentage}%.2f") // Tuple: column name, missing count, percentage
}

// Convert the result to a DataFrame for better visualization
val missingValuesDF = spark.createDataFrame(missingValues)
                           .toDF("Column", "Missing Count", "Missing Percentage")

// Show the result
missingValuesDF.show()


In [20]:
val dfof = spark.read
    .format("csv")               
    .option("header", "true")    
    .option("inferSchema", "true") 
    .option("delimiter", "\t")   
    .load("/user/majesteye/DS05_INSURANCE_DATASET/input/object_features.tsv")

dfof.printSchema() 
dfof.show()      

In [21]:
import org.apache.spark.sql.functions._

def missingValuesStats(df: org.apache.spark.sql.DataFrame, dfName: String): Unit = {
  val totalRows = df.count()

  // Calcul des valeurs manquantes par colonne
  val missingCount = df.columns.map { colName =>
    val nullCount = df.filter(col(colName).isNull || col(colName) === "").count()
    (colName, nullCount)
  }.toMap

  // Calcul du pourcentage
  val missingPercentage = missingCount.map {
    case (colName, count) => (colName, (count.toDouble / totalRows) * 100)
  }

  // Affichage des résultats
  println(s"\n=== Résultats pour $dfName ===")
  missingCount.foreach { case (colName, count) =>
    val percentage = missingPercentage(colName)
    println(f"Colonne: $colName, Valeurs manquantes: $count, Pourcentage: $percentage%.2f%%")
  }
}

// Appliquer la fonction à chaque dataframe
val dfList = Map( "dfcf" -> dfcf, "dfcp" -> dfcp, "dfcs" -> dfcs, "dfof" -> dfof)
dfList.foreach { case (name, dataframe) =>
  missingValuesStats(dataframe, name)
}

<h1 align="center">Join Tables</h1>

<div style="border: 1px solid #d0e7f5; padding: 10px; background-color: #eef7fc; border-radius: 5px;">
  <p><strong>Important Note:</strong></p>
  <p>
    To perform the table joins required for creating the final dataset, we encountered an issue of 
    <strong>non-unique primary key values</strong> for the combination of <code>(N_souscrip, year)</code>. 
    This caused a conflict during the join process. To resolve this, we decided to remove duplicates 
    by retaining <strong>only one record per client for each year</strong>.
  </p>
</div>

<h1 align="center">Solve the non-unique key issue</h1>

## For clients-polices 

In [26]:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

// Concept:
// In this code, we will remove duplicates from a DataFrame by keeping only the rows where "IsToutRisque" equals "Yes"
// for each unique combination of "N_SOUSCRIP" and "year". If multiple rows meet this condition, one of them will be kept randomly.

def removeDuplicatesBasedOnIsToutRisque(df: org.apache.spark.sql.DataFrame): org.apache.spark.sql.DataFrame = {
  // Repartition the DataFrame by "N_SOUSCRIP" to optimize memory usage and shuffle
  val repartitionedDF = df.repartition(200, col("N_SOUSCRIP"))

  // Remove duplicates by keeping rows where "IsToutRisque" equals "Yes"
  val deduplicatedDF = repartitionedDF
    .withColumn("IsToutRisqueNumeric", when(col("IsToutRisque") === "Yes", 1).otherwise(0)) // Convert "IsToutRisque" to a numeric column for sorting
    .withColumn("Rank", row_number()
      .over(Window.partitionBy("N_SOUSCRIP", "year") // Partition by "N_SOUSCRIP" and "year"
        .orderBy(rand())) // Randomize the order to select a random row when "IsToutRisque" is "Yes"
    )
    .filter(col("Rank") === 1) // Keep only the row with rank 1, i.e., the row with "IsToutRisque" equal to "Yes"
    .drop("IsToutRisqueNumeric", "Rank") // Drop the temporary columns used for sorting and ranking

  deduplicatedDF // Return the deduplicated DataFrame
}

// Apply the function to the "dfcp2" DataFrame and show the results
val dfcp2 = removeDuplicatesBasedOnIsToutRisque(dfcp) 

// Show the first 100 rows of the deduplicated DataFrame
dfcp2.show(100, truncate = false)


In [27]:
import org.apache.spark.sql.functions._

// Group by year and N_SOUSCRIP and count the occurrences
val duplicateCounts = dfcp2
  .groupBy("year", "N_SOUSCRIP")
  .count()
  .filter($"count" > 1) // Filter only combinations that appear more than once

// Count the number of duplicate combinations
val numberOfDuplicates = duplicateCounts.count()

// Show the duplicates (if any)
println(s"Number of duplicate combinations of 'year' and 'N_SOUSCRIP': $numberOfDuplicates")
println("Details of duplicate combinations (if any):")
duplicateCounts.show(100) // Display the first 100 duplicate combinations (if any)

## For clients-features 

In [29]:
// Finding duplicates
// Group by all columns and count the occurrences
val duplicateRows = dfof.groupBy(dfof.columns.map(col): _*) // Group by all columns
                        .count()
                        .filter("count > 1") // Keep only duplicates

// Show duplicate rows with their counts
duplicateRows.show(10)