## Initialisation d'un context Spark Dataframe

In [1]:
from pyspark import SparkContext, SQLContext
sqlCtx = SQLContext(sc)
sqlCtx

<pyspark.sql.context.SQLContext at 0x1085170d0>

## Lecture d'un CSV avec un reader dataframe
## Ici le dataset 'titanic'

In [2]:
df = sqlCtx.load(source="com.databricks.spark.csv", header="true", path = "train.csv")

## Découvrir le schéma de mon fichier

In [3]:
df.printSchema()

root
 |-- PassengerId: string (nullable = true)
 |-- Survived: string (nullable = true)
 |-- Pclass: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- SibSp: string (nullable = true)
 |-- Parch: string (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: string (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



## afficher la 1ere ligne

In [4]:
df.head()

Row(PassengerId=u'1', Survived=u'0', Pclass=u'3', Name=u'Braund, Mr. Owen Harris', Sex=u'male', Age=u'22', SibSp=u'1', Parch=u'0', Ticket=u'A/5 21171', Fare=u'7.25', Cabin=u'', Embarked=u'S')

## Projection

In [5]:
clean_df = df.select(
    df.PassengerId.alias('id'),
    df.Survived.cast('boolean').alias('survived'),
    df.Pclass.cast('int').alias('pClass'),
    df.Name.alias('name'),
    df.Sex.alias('sex'),
    df.Age.cast('double').alias('age'),
    df.SibSp.cast('int').alias('nb_sibling'),
    df.Parch.cast('int').alias('nb_parent'),
    df.Ticket.alias('ticket'),
    df.Fare.alias('fare'),
    df.Cabin.alias('cabin'),
    df.Embarked.alias('embarked')
)
clean_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- survived: boolean (nullable = true)
 |-- pClass: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: double (nullable = true)
 |-- nb_sibling: integer (nullable = true)
 |-- nb_parent: integer (nullable = true)
 |-- ticket: string (nullable = true)
 |-- fare: string (nullable = true)
 |-- cabin: string (nullable = true)
 |-- embarked: string (nullable = true)



## data transformation

### Le but est d'extraire le titre de la personne

In [6]:
import pyspark.sql.functions as F
from pyspark.sql.types import StringType

extract_title = F.udf(lambda name: name.split(',')[1].strip().split(' ')[0], StringType())

In [7]:
clean_df = clean_df.withColumn('title', extract_title(clean_df.name))

In [9]:
clean_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- survived: boolean (nullable = true)
 |-- pClass: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: double (nullable = true)
 |-- nb_sibling: integer (nullable = true)
 |-- nb_parent: integer (nullable = true)
 |-- ticket: string (nullable = true)
 |-- fare: string (nullable = true)
 |-- cabin: string (nullable = true)
 |-- embarked: string (nullable = true)
 |-- title: string (nullable = true)



In [11]:
clean_df.take(1)

[Row(id=u'1', survived=False, pClass=3, name=u'Braund, Mr. Owen Harris', sex=u'male', age=22.0, nb_sibling=1, nb_parent=0, ticket=u'A/5 21171', fare=u'7.25', cabin=u'', embarked=u'S', title=u'Mr.')]

## Description des données

### Nombre de lignes

In [12]:
clean_df.count()

891L

### Statistique d'une colonne

In [13]:
clean_df.select('age').describe().collect()

[Row(summary=u'count', age=714),
 Row(summary=u'mean', age=29.69911764705882),
 Row(summary=u'stddev', age=14.516321150817328),
 Row(summary=u'min', age=0.42),
 Row(summary=u'max', age=80.0)]

## On cherche à remplacer les données d'age manquante 

### Le titre peut être une bonne indication pour remplacer l'age si absent

In [16]:
clean_df.filter(clean_df.age.isNull()).groupBy('title').count().collect()

[Row(title=u'Mrs.', count=17),
 Row(title=u'Miss.', count=36),
 Row(title=u'Dr.', count=1),
 Row(title=u'Master.', count=4),
 Row(title=u'Mr.', count=119)]

In [17]:
age_stat_by_title = clean_df.groupBy('title').agg(
    clean_df.title,
    F.count(clean_df.age).alias('count_by_age'),
    F.min(clean_df.age).alias('min_age'),
    F.max(clean_df.age).alias('max_age'),
    F.avg(clean_df.age).alias('avg_age')
)
age_stat_by_title.collect()

[Row(title=u'Mme.', count_by_age=1, min_age=24.0, max_age=24.0, avg_age=24.0),
 Row(title=u'Col.', count_by_age=2, min_age=56.0, max_age=60.0, avg_age=58.0),
 Row(title=u'Mrs.', count_by_age=108, min_age=14.0, max_age=63.0, avg_age=35.898148148148145),
 Row(title=u'Miss.', count_by_age=146, min_age=0.75, max_age=63.0, avg_age=21.773972602739725),
 Row(title=u'Capt.', count_by_age=1, min_age=70.0, max_age=70.0, avg_age=70.0),
 Row(title=u'Mlle.', count_by_age=2, min_age=24.0, max_age=24.0, avg_age=24.0),
 Row(title=u'Don.', count_by_age=1, min_age=40.0, max_age=40.0, avg_age=40.0),
 Row(title=u'Dr.', count_by_age=6, min_age=23.0, max_age=54.0, avg_age=42.0),
 Row(title=u'Sir.', count_by_age=1, min_age=49.0, max_age=49.0, avg_age=49.0),
 Row(title=u'Jonkheer.', count_by_age=1, min_age=38.0, max_age=38.0, avg_age=38.0),
 Row(title=u'Master.', count_by_age=36, min_age=0.42, max_age=12.0, avg_age=4.574166666666667),
 Row(title=u'Major.', count_by_age=2, min_age=45.0, max_age=52.0, avg_age=4

### On insère les données calculer dans le dataframe avec une jointure

In [19]:
# Bug if join two dataframe from the same source
#clean_df_with_age = age_stat_by_title.join(clean_df, age_stat_by_title.title == clean_df.title, "inner")


sqlCtx.registerDataFrameAsTable(clean_df, "clean_df")
sqlCtx.registerDataFrameAsTable(age_stat_by_title, "age_stat_by_title")
clean_df_with_age = sqlCtx.sql("select * from clean_df join age_stat_by_title on clean_df.title = age_stat_by_title.title")

# Wait for Spark 1.4
#clean_df_with_age = clean_df_with_age.drop(age_stat_by_title.title)

keep = [clean_df[c] for c in clean_df.columns] + [age_stat_by_title[c] for c in age_stat_by_title.columns if c != 'title']
clean_df_with_age = clean_df_with_age.select(*keep)

In [20]:
clean_df_with_age.printSchema()

root
 |-- id: string (nullable = true)
 |-- survived: boolean (nullable = true)
 |-- pClass: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: double (nullable = true)
 |-- nb_sibling: integer (nullable = true)
 |-- nb_parent: integer (nullable = true)
 |-- ticket: string (nullable = true)
 |-- fare: string (nullable = true)
 |-- cabin: string (nullable = true)
 |-- embarked: string (nullable = true)
 |-- title: string (nullable = true)
 |-- count_by_age: long (nullable = false)
 |-- min_age: double (nullable = true)
 |-- max_age: double (nullable = true)
 |-- avg_age: double (nullable = true)



In [21]:
clean_df_with_age.count()

891L

### On ajoute une nouvelle colonne contenant l'age si existant ou une estimation

In [22]:
from pyspark.sql.types import DoubleType
infere_age = F.udf(lambda age,avg_age: avg_age if age == None else age , DoubleType())
final_df = clean_df_with_age.withColumn('age_infered', infere_age(clean_df_with_age.age, clean_df_with_age.avg_age))

In [23]:
final_df.select(
    "id",
    "survived",
    "pClass",
    "name",
    "sex",
    "nb_sibling",
    "nb_parent",
    "ticket",
    "fare",
    "cabin",
    "embarked",
    "title",
    final_df.age_infered.alias("age")
).repartition(1).saveAsParquetFile("titanic.parquet")

##Passage de Spark dataframe vers Spark RDD

In [24]:
parquet_df = sqlCtx.parquetFile("titanic.parquet")
parquet_df.count()

891L

In [25]:
parquet_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- survived: boolean (nullable = true)
 |-- pClass: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- nb_sibling: integer (nullable = true)
 |-- nb_parent: integer (nullable = true)
 |-- ticket: string (nullable = true)
 |-- fare: string (nullable = true)
 |-- cabin: string (nullable = true)
 |-- embarked: string (nullable = true)
 |-- title: string (nullable = true)
 |-- age: double (nullable = true)



Dataframe n'est pas une extension de rdd. Il faut récupérer la référence du RDD

In [26]:
rdd = parquet_df.rdd
rdd.take(1)

[Row(id=u'370', survived=True, pClass=1, name=u'Aubart, Mme. Leontine Pauline', sex=u'female', nb_sibling=0, nb_parent=0, ticket=u'PC 17477', fare=u'69.3', cabin=u'B35', embarked=u'C', title=u'Mme.', age=24.0)]

In [27]:
sample = rdd.sample(False, 0.1)
sample

PythonRDD[272] at RDD at PythonRDD.scala:43

##Passage de Spark dataframe vers Spark RDD

In [29]:
sampleDF = sqlCtx.createDataFrame(sample, parquet_df.columns)
sampleDF.printSchema()
sampleDF.count()

root
 |-- id: string (nullable = true)
 |-- survived: boolean (nullable = true)
 |-- pClass: long (nullable = true)
 |-- name: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- nb_sibling: long (nullable = true)
 |-- nb_parent: long (nullable = true)
 |-- ticket: string (nullable = true)
 |-- fare: string (nullable = true)
 |-- cabin: string (nullable = true)
 |-- embarked: string (nullable = true)
 |-- title: string (nullable = true)
 |-- age: double (nullable = true)



88L

In [32]:
prdd = sc.parallelize([(1,'a'), (2,'b')])

In [33]:
df2 = sqlCtx.createDataFrame(prdd, ['id', 'name'])

In [34]:
df2.printSchema()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)



##Passage de panda vers Spark

In [35]:
pandasDF = sampleDF.toPandas()

In [36]:
pandasDF

Unnamed: 0,id,survived,pClass,name,sex,nb_sibling,nb_parent,ticket,fare,cabin,embarked,title,age
0,16,True,2,"Hewlett, Mrs. (Mary D Kingcome)",female,0,0,248706,16,,S,Mrs.,55.000000
1,252,False,3,"Strom, Mrs. Wilhelm (Elna Matilda Persson)",female,1,1,347054,10.4625,G6,S,Mrs.,29.000000
2,363,False,3,"Barbara, Mrs. (Catherine David)",female,0,1,2691,14.4542,,C,Mrs.,45.000000
3,514,True,1,"Rothschild, Mrs. Martin (Elizabeth L. Barrett)",female,1,0,PC 17603,59.4,,C,Mrs.,54.000000
4,519,True,2,"Angle, Mrs. William A (Florence ""Mary"" Agnes H...",female,1,0,226875,26,,S,Mrs.,36.000000
5,559,True,1,"Taussig, Mrs. Emil (Tillie Mandelbaum)",female,1,1,110413,79.65,E67,S,Mrs.,39.000000
6,560,True,3,"de Messemaeker, Mrs. Guillaume Joseph (Emma)",female,1,0,345572,17.4,,S,Mrs.,36.000000
7,582,True,1,"Thayer, Mrs. John Borland (Marian Longstreth M...",female,1,1,17421,110.8833,C68,C,Mrs.,39.000000
8,592,True,1,"Stephenson, Mrs. Walter Bertram (Martha Eustis)",female,1,0,36947,78.2667,D20,C,Mrs.,52.000000
9,701,True,1,"Astor, Mrs. John Jacob (Madeleine Talmadge Force)",female,1,0,PC 17757,227.525,C62 C64,C,Mrs.,18.000000
