# First names processing

In [1]:
import os
import sys

def is_spark_initialized():
    try:
        if spark:
            return True
    except:
        return False
    
if not is_spark_initialized():
    if not "SPARK_HOME" in os.environ:
        raise "SPARK_HOME is not defined !"

    sys.path.insert(0, os.path.join(os.environ["SPARK_HOME"], "python"))
    sys.path.insert(0, os.path.join(os.environ["SPARK_HOME"], "python", "lib", "pyspark.zip"))
    sys.path.insert(0, os.path.join(os.environ["SPARK_HOME"], "python", "lib", "py4j-0.9-src.zip"))

    from pyspark.sql import SparkSession
    
    os.environ["SPARK_LOCAL_IP"] = "127.0.0.1"
    spark = SparkSession.builder.master("local[*]").config("spark.local.ip", "127.0.0.1").getOrCreate()

### Data source

Data source used: http://www.data.gouv.fr/fr/datasets/fichier-des-prenoms-edition-2016/

In [2]:
firstnames = spark.read.csv("data/dpt2015.txt", header=True, sep="\t", encoding="utf-8")
firstnames.printSchema()

root
 |-- sexe: string (nullable = true)
 |-- preusuel: string (nullable = true)
 |-- annais: string (nullable = true)
 |-- dpt: string (nullable = true)
 |-- nombre: string (nullable = true)



### Clean dataset

- Only keep first names with at leat 2 characters.
- Remove invalid first names.
- Cast columns.
- Rename columns.

In [3]:
from pyspark.sql.functions import when, length, lower

firstnames = firstnames \
    .filter(length(firstnames.preusuel) > 1) \
    .filter(firstnames.preusuel != "_PRENOMS_RARES") \
    .withColumnRenamed("preusuel", "firstname") \
    .withColumn("gender", firstnames.sexe.cast("int")) \
    .withColumn("birth_year", when(firstnames.annais.startswith("X"), None).otherwise(firstnames.annais.cast("int"))) \
    .withColumn("count", firstnames.nombre.cast("int")) \
    .drop("preusuel") \
    .drop("sexe") \
    .drop("dpt") \
    .drop("annais") \
    .drop("nombre")
    
firstnames.printSchema()

root
 |-- firstname: string (nullable = true)
 |-- gender: integer (nullable = true)
 |-- birth_year: integer (nullable = true)
 |-- count: integer (nullable = true)



### Group dataset

Group by first name, pivot by gender, aggregate by most recent year first name was used and number of occurences.

In [6]:
from pyspark.sql.functions import max, sum

firstnames_gpby = firstnames \
    .groupBy("firstname") \
    .pivot("gender", [1, 2]) \
    .agg(max("birth_year").alias("year"), sum("count").alias("count"))

firstnames_gpby.printSchema()

root
 |-- firstname: string (nullable = true)
 |-- 1_year: integer (nullable = true)
 |-- 1_count: long (nullable = true)
 |-- 2_year: integer (nullable = true)
 |-- 2_count: long (nullable = true)



Convert first names:
    
- Remove accents.
- Lower case.

In [7]:
from pyspark.sql.functions import udf
from unidecode import unidecode

def decode_firstname(firstname):
    return unidecode(firstname).encode("ascii").lower()

udf_decode_firstname = udf(decode_firstname)

firstnames_gpby = firstnames_gpby \
    .withColumn("firstname", udf_decode_firstname(firstnames.firstname))

### Persist dataset

Order by first name (ascending).

In [8]:
%%!
rm -r "staging/firstnames.csv"

[]

In [9]:
from pyspark.sql.functions import desc

firstnames_gpby \
    .coalesce(1) \
    .sort("firstname") \
    .write \
        .option("charset", "utf-8") \
        .csv("staging/firstnames.csv", header=True, sep="\t")