In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [3]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

In [4]:
# Load the csv into a dataframe
titanic_df = spark.read.csv("tested.csv", header=True, inferSchema=True)
titanic_df

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
892,0,3,"Kelly, Mr. James",male,34.5,0,0,330911,7.8292,,Q
893,1,3,"Wilkes, Mrs. Jame...",female,47.0,1,0,363272,7.0,,S
894,0,2,"Myles, Mr. Thomas...",male,62.0,0,0,240276,9.6875,,Q
895,0,3,"Wirz, Mr. Albert",male,27.0,0,0,315154,8.6625,,S
896,1,3,"Hirvonen, Mrs. Al...",female,22.0,1,1,3101298,12.2875,,S
897,0,3,"Svensson, Mr. Joh...",male,14.0,0,0,7538,9.225,,S
898,1,3,"Connolly, Miss. Kate",female,30.0,0,0,330972,7.6292,,Q
899,0,2,"Caldwell, Mr. Alb...",male,26.0,1,1,248738,29.0,,S
900,1,3,"Abrahim, Mrs. Jos...",female,18.0,0,0,2657,7.2292,,C
901,0,3,"Davies, Mr. John ...",male,21.0,2,0,A/4 48871,24.15,,S


In [5]:
titanic_df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [6]:
titanic_df.describe()

summary,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
count,418.0,418.0,418.0,418,418,332.0,418.0,418.0,418,417.0,91,418
mean,1100.5,0.3636363636363636,2.2655502392344498,,,30.272590361445783,0.4473684210526316,0.3923444976076555,223850.98986486485,35.6271884892086,,
stddev,120.81045760473994,0.4816221409322309,0.8418375519640503,,,14.181209235624424,0.8967595611217135,0.9814288785371694,369523.7764694362,55.90757617997384,,
min,892.0,0.0,1.0,"""Assaf Khalil, Mr...",female,0.17,0.0,0.0,110469,0.0,A11,C
max,1309.0,1.0,3.0,"van Billiard, Mas...",male,76.0,8.0,9.0,W.E.P. 5734,512.3292,G6,S


In [7]:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf, round
  

titanic_df=titanic_df.withColumn('Age',round(titanic_df['Age']))
titanic_df

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
892,0,3,"Kelly, Mr. James",male,35.0,0,0,330911,7.8292,,Q
893,1,3,"Wilkes, Mrs. Jame...",female,47.0,1,0,363272,7.0,,S
894,0,2,"Myles, Mr. Thomas...",male,62.0,0,0,240276,9.6875,,Q
895,0,3,"Wirz, Mr. Albert",male,27.0,0,0,315154,8.6625,,S
896,1,3,"Hirvonen, Mrs. Al...",female,22.0,1,1,3101298,12.2875,,S
897,0,3,"Svensson, Mr. Joh...",male,14.0,0,0,7538,9.225,,S
898,1,3,"Connolly, Miss. Kate",female,30.0,0,0,330972,7.6292,,Q
899,0,2,"Caldwell, Mr. Alb...",male,26.0,1,1,248738,29.0,,S
900,1,3,"Abrahim, Mrs. Jos...",female,18.0,0,0,2657,7.2292,,C
901,0,3,"Davies, Mr. John ...",male,21.0,2,0,A/4 48871,24.15,,S


In [8]:
# replace NAs with mean of column
from pyspark.ml.feature import Imputer # In statistics, imputation is the process of  replacing missing data with substituted values
from pyspark.ml.feature import StringIndexer #for linear regression MODEL , we have to change the Sex category column to Sex indexer colum

imptr = Imputer(inputCols=['Age','Fare'],
                outputCols=['Age','Fare']).setStrategy('mean') # can also be 'median' and so on
indexer = StringIndexer(inputCol="Sex", outputCol="SexIndex")
titanic_df= indexer.fit(titanic_df).transform(titanic_df)
titanic_df = imptr.fit(titanic_df).transform(titanic_df)


In [9]:
titanic_df

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked,SexIndex
892,0,3,"Kelly, Mr. James",male,35.0,0,0,330911,7.8292,,Q,0.0
893,1,3,"Wilkes, Mrs. Jame...",female,47.0,1,0,363272,7.0,,S,1.0
894,0,2,"Myles, Mr. Thomas...",male,62.0,0,0,240276,9.6875,,Q,0.0
895,0,3,"Wirz, Mr. Albert",male,27.0,0,0,315154,8.6625,,S,0.0
896,1,3,"Hirvonen, Mrs. Al...",female,22.0,1,1,3101298,12.2875,,S,1.0
897,0,3,"Svensson, Mr. Joh...",male,14.0,0,0,7538,9.225,,S,0.0
898,1,3,"Connolly, Miss. Kate",female,30.0,0,0,330972,7.6292,,Q,1.0
899,0,2,"Caldwell, Mr. Alb...",male,26.0,1,1,248738,29.0,,S,0.0
900,1,3,"Abrahim, Mrs. Jos...",female,18.0,0,0,2657,7.2292,,C,1.0
901,0,3,"Davies, Mr. John ...",male,21.0,2,0,A/4 48871,24.15,,S,0.0


In [10]:
# compute summery statistics
titanic_df.select(['Age','Fare']).describe().show()

+-------+------------------+------------------+
|summary|               Age|              Fare|
+-------+------------------+------------------+
|  count|               418|               418|
|   mean|30.295180722891548|  35.6271884892086|
| stddev|12.633087412660265|55.840500479541056|
|    min|               0.0|               0.0|
|    max|              76.0|          512.3292|
+-------+------------------+------------------+



In [11]:
titanic_df.select('PassengerId', 'Survived').limit(5)

PassengerId,Survived
892,0
893,1
894,0
895,0
896,1


In [12]:
titanic_df.where((titanic_df.Age > 25) & (titanic_df.Survived == 1))

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked,SexIndex
893,1,3,"Wilkes, Mrs. Jame...",female,47.0,1,0,363272,7.0,,S,1.0
898,1,3,"Connolly, Miss. Kate",female,30.0,0,0,330972,7.6292,,Q,1.0
906,1,1,"Chaffee, Mrs. Her...",female,47.0,1,0,W.E.P. 5734,61.175,E31,S,1.0
910,1,3,"Ilmakangas, Miss....",female,27.0,1,0,STON/O2. 3101270,7.925,,S,1.0
911,1,3,"""Assaf Khalil, Mr...",female,45.0,0,0,2696,7.225,,C,1.0
914,1,1,"Flegenheim, Mrs. ...",female,30.295180722891565,0,0,PC 17598,31.6833,,S,1.0
916,1,1,"Ryerson, Mrs. Art...",female,48.0,1,3,PC 17608,262.375,B57 B59 B63 B66,C,1.0
924,1,3,"Dean, Mrs. Bertra...",female,33.0,1,2,C.A. 2315,20.575,,S,1.0
925,1,3,"""Johnston, Mrs. A...",female,30.295180722891565,1,2,W./C. 6607,23.45,,S,1.0
928,1,3,"Roth, Miss. Sarah A",female,30.295180722891565,0,0,342712,8.05,,S,1.0


In [13]:
titanic_df.agg({'Fare':'avg'})

avg(Fare)
35.6271884892086


In [14]:
titanic_df.groupBy('Pclass').agg({'Fare':'avg'}).orderBy('Pclass', ascending=False)

Pclass,avg(Fare)
3,12.5659508646294
2,22.20210430107527
1,94.28029719626169


In [15]:
titanic_df.groupBy('Survived',"Sex").count()

Survived,Sex,count
1,female,152
0,male,266


selon cette base de données, seulement les females ont pu survivre 

In [16]:
titanic_df.groupBy('Survived',"Pclass").count().orderBy("count",ascending=False)

Survived,Pclass,count
0,3,146
1,3,72
0,2,63
0,1,57
1,1,50
1,2,30


In [17]:
titanic_df.filter((titanic_df.Age > 60) & (titanic_df.Survived == 1) )

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked,SexIndex
988,1,1,"Cavendish, Mrs. T...",female,76.0,1,0,19877,78.85,C46,S,1.0
1006,1,1,"Straus, Mrs. Isid...",female,63.0,1,0,PC 17483,221.7792,C55 C57,S,1.0
1071,1,1,"Compton, Mrs. Ale...",female,64.0,0,2,PC 17756,83.1583,E45,C,1.0
1197,1,1,"Crosby, Mrs. Edwa...",female,64.0,1,1,112901,26.55,B26,S,1.0


In [18]:
titanic_df.filter(titanic_df.Age > 25).agg({'Fare':'avg'})

avg(Fare)
41.85656010322175


In [19]:
from pyspark.sql import functions as F
titanic_df.groupby('Survived').agg(F.min(titanic_df['Age']),F.max(titanic_df['age'])).show()

+--------+--------+--------+
|Survived|min(Age)|max(age)|
+--------+--------+--------+
|       1|     0.0|    76.0|
|       0|     0.0|    67.0|
+--------+--------+--------+



In [20]:
titanic_df.groupby('Age').pivot('Sex').count()

Age,female,male
8.0,1.0,1
67.0,,1
0.0,1.0,1
7.0,,1
49.0,,3
29.0,5.0,6
64.0,2.0,1
47.0,2.0,3
42.0,,5
44.0,,1


In [21]:
titanic_df.groupby('Survived').pivot('Sex').count()

Survived,female,male
1,152.0,
0,,266.0


In [22]:
titanic_df.groupby('Age').pivot('Survived').count()

Age,0,1
8.0,1,1.0
67.0,1,
0.0,1,1.0
7.0,1,
49.0,3,
29.0,6,5.0
64.0,1,2.0
47.0,3,2.0
42.0,5,
44.0,1,


In [23]:
def round_float_down(x):
  return int(x)
round_float_down_udf = udf(round_float_down, IntegerType())

titanic_df.select('PassengerId', 'Fare', round_float_down_udf('Fare').alias('Fare Rounded Down')).limit(5)

PassengerId,Fare,Fare Rounded Down
892,7.8292,7
893,7.0,7
894,9.6875,9
895,8.6625,8
896,12.2875,12


In [24]:
titanic_df.createOrReplaceTempView("Titanic") # tell PySpark how the table will be called in the SQL query

In [25]:
spark.sql('select * from Titanic')

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked,SexIndex
892,0,3,"Kelly, Mr. James",male,35.0,0,0,330911,7.8292,,Q,0.0
893,1,3,"Wilkes, Mrs. Jame...",female,47.0,1,0,363272,7.0,,S,1.0
894,0,2,"Myles, Mr. Thomas...",male,62.0,0,0,240276,9.6875,,Q,0.0
895,0,3,"Wirz, Mr. Albert",male,27.0,0,0,315154,8.6625,,S,0.0
896,1,3,"Hirvonen, Mrs. Al...",female,22.0,1,1,3101298,12.2875,,S,1.0
897,0,3,"Svensson, Mr. Joh...",male,14.0,0,0,7538,9.225,,S,0.0
898,1,3,"Connolly, Miss. Kate",female,30.0,0,0,330972,7.6292,,Q,1.0
899,0,2,"Caldwell, Mr. Alb...",male,26.0,1,1,248738,29.0,,S,0.0
900,1,3,"Abrahim, Mrs. Jos...",female,18.0,0,0,2657,7.2292,,C,1.0
901,0,3,"Davies, Mr. John ...",male,21.0,2,0,A/4 48871,24.15,,S,0.0


In [26]:
# devide dataset to training features and target
X_column_names = ["Pclass","SexIndex","Age","Fare"]
target_colum_name = ['Survived']

# convert feature columns into a columns where the vlues are feature vectors
from pyspark.ml.feature import VectorAssembler
v_asmblr = VectorAssembler(inputCols=X_column_names, outputCol='Inputvec')
titanic_df = v_asmblr.transform(titanic_df)
X = titanic_df.select(['Inputvec','Survived'])
X.show()

+--------------------+--------+
|            Inputvec|Survived|
+--------------------+--------+
|[3.0,0.0,35.0,7.8...|       0|
|  [3.0,1.0,47.0,7.0]|       1|
|[2.0,0.0,62.0,9.6...|       0|
|[3.0,0.0,27.0,8.6...|       0|
|[3.0,1.0,22.0,12....|       1|
|[3.0,0.0,14.0,9.225]|       0|
|[3.0,1.0,30.0,7.6...|       1|
| [2.0,0.0,26.0,29.0]|       0|
|[3.0,1.0,18.0,7.2...|       1|
|[3.0,0.0,21.0,24.15]|       0|
|[3.0,0.0,30.29518...|       0|
| [1.0,0.0,46.0,26.0]|       0|
|[1.0,1.0,23.0,82....|       1|
| [2.0,0.0,63.0,26.0]|       0|
|[1.0,1.0,47.0,61....|       1|
|[2.0,1.0,24.0,27....|       1|
|[2.0,0.0,35.0,12.35]|       0|
|[3.0,0.0,21.0,7.225]|       0|
|[3.0,1.0,27.0,7.925]|       1|
|[3.0,1.0,45.0,7.225]|       1|
+--------------------+--------+
only showing top 20 rows



In [27]:
# devide dataset into training and testing sets
trainset, testset = X.randomSplit([0.8,0.2])

In [28]:
# predict 'Survived' using linear regression
from pyspark.ml.regression import LinearRegression
model = LinearRegression(featuresCol='Inputvec', labelCol='Survived')
model = model.fit(trainset)
print(model.coefficients)
print(model.intercept)

[4.389926824958759e-16,1.0,1.843872670187381e-17,3.34875637870885e-18]
-1.724131865508298e-15


In [29]:
# evaluate model
model.evaluate(testset).predictions.show()

+--------------------+--------+--------------------+
|            Inputvec|Survived|          prediction|
+--------------------+--------+--------------------+
| [1.0,0.0,17.0,47.1]|       0|-8.13954403643380...|
|[1.0,0.0,24.0,82....|       0|-5.67118605787123...|
| [1.0,0.0,30.0,26.0]|       0|-6.44909716109777...|
|[1.0,0.0,30.0,151...|       0|-2.24473352762881...|
|[1.0,0.0,31.0,28....|       0|-6.17973520096930...|
| [1.0,0.0,41.0,30.5]|       0|-4.27014318684975...|
|[1.0,0.0,42.0,26.55]|       0|-4.21803179679001...|
|[1.0,0.0,43.0,27....|       0|-3.99443729008936...|
| [1.0,0.0,45.0,29.7]|       0|-3.55938416980447...|
|[1.0,0.0,46.0,75....|       0|-1.84991631906329...|
|  [1.0,0.0,49.0,0.0]|       0|-3.81641574620605...|
| [1.0,0.0,50.0,26.0]|       0|-2.76135182072301...|
|[1.0,0.0,61.0,262...|       0|7.182531006656154...|
|[1.0,0.0,64.0,75.25]|       0|1.469332434053427...|
|[1.0,0.0,67.0,221...|       0|6.929400166780692...|
|[1.0,1.0,23.0,82....|       1|  0.99999999999