In [2]:
# a SparkSession object can perform the most common data processing tasks
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('test').getOrCreate() # will return existing session if one was
                                                           # created before and was not closed

22/08/18 11:09:00 WARN Utils: Your hostname, FFT-ThinkPad-L490 resolves to a loopback address: 127.0.1.1; using 192.168.29.4 instead (on interface wlp5s0)
22/08/18 11:09:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/08/18 11:09:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark

**dataset:**
https://www.kaggle.com/fedesoriano/heart-failure-prediction

In [4]:
# read csv, all columns will be of type string
df = spark.read.option('header','true').csv('heart.csv')
# tell pyspark the type of the columns - saves time on large dataset. there are other ways to do this, but that's
# my favorite
schema = 'Age INTEGER, Sex STRING, ChestPainType STRING'
# let PySpark infer the schema
df = spark.read.csv('heart.csv', inferSchema=True, header=True, nullValue='NA')
# save data
df.write.format("csv").save("heart_save.csv")
# if you want to overwrite the file
df.write.format("csv").mode("overwrite").save("heart_save.csv")

In [5]:
# show head of table
df.show(3)

+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
|Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|
+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
| 40|  M|          ATA|      140|        289|        0|    Normal|  172|             N|    0.0|      Up|           0|
| 49|  F|          NAP|      160|        180|        0|    Normal|  156|             N|    1.0|    Flat|           1|
| 37|  M|          ATA|      130|        283|        0|        ST|   98|             N|    0.0|      Up|           0|
+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
only showing top 3 rows



In [6]:
# count number of rows
df.count()

918

In [7]:
# show parts of the table
df.select('Age').show(3)
df.select(['Age','Sex']).show(3)

+---+
|Age|
+---+
| 40|
| 49|
| 37|
+---+
only showing top 3 rows

+---+---+
|Age|Sex|
+---+---+
| 40|  M|
| 49|  F|
| 37|  M|
+---+---+
only showing top 3 rows



In [8]:
# convert PySpark DataFrame to Pandas DataFrame
pd_df = df.toPandas()
# convert it back
spark_df = spark.createDataFrame(pd_df)

In [9]:
# show first three rows as three row objects, which is how spark represents single rows from a table.
# we will learn more about it later
df.head(3)

[Row(Age=40, Sex='M', ChestPainType='ATA', RestingBP=140, Cholesterol=289, FastingBS=0, RestingECG='Normal', MaxHR=172, ExerciseAngina='N', Oldpeak=0.0, ST_Slope='Up', HeartDisease=0),
 Row(Age=49, Sex='F', ChestPainType='NAP', RestingBP=160, Cholesterol=180, FastingBS=0, RestingECG='Normal', MaxHR=156, ExerciseAngina='N', Oldpeak=1.0, ST_Slope='Flat', HeartDisease=1),
 Row(Age=37, Sex='M', ChestPainType='ATA', RestingBP=130, Cholesterol=283, FastingBS=0, RestingECG='ST', MaxHR=98, ExerciseAngina='N', Oldpeak=0.0, ST_Slope='Up', HeartDisease=0)]

In [10]:
# type os columns
df.printSchema()

root
 |-- Age: integer (nullable = true)
 |-- Sex: string (nullable = true)
 |-- ChestPainType: string (nullable = true)
 |-- RestingBP: integer (nullable = true)
 |-- Cholesterol: integer (nullable = true)
 |-- FastingBS: integer (nullable = true)
 |-- RestingECG: string (nullable = true)
 |-- MaxHR: integer (nullable = true)
 |-- ExerciseAngina: string (nullable = true)
 |-- Oldpeak: double (nullable = true)
 |-- ST_Slope: string (nullable = true)
 |-- HeartDisease: integer (nullable = true)



In [11]:
# column dtypes as list of tuples
df.dtypes

[('Age', 'int'),
 ('Sex', 'string'),
 ('ChestPainType', 'string'),
 ('RestingBP', 'int'),
 ('Cholesterol', 'int'),
 ('FastingBS', 'int'),
 ('RestingECG', 'string'),
 ('MaxHR', 'int'),
 ('ExerciseAngina', 'string'),
 ('Oldpeak', 'double'),
 ('ST_Slope', 'string'),
 ('HeartDisease', 'int')]

In [12]:
# cast a column from one type to other
from pyspark.sql.types import FloatType
df = df.withColumn("Age",df.Age.cast(FloatType()))
df = df.withColumn("RestingBP",df.Age.cast(FloatType()))

In [13]:
# compute summery statistics
df.select(['Age','RestingBP']).describe().show()

+-------+------------------+------------------+
|summary|               Age|         RestingBP|
+-------+------------------+------------------+
|  count|               918|               918|
|   mean|53.510893246187365|53.510893246187365|
| stddev|  9.43261650673202|  9.43261650673202|
|    min|              28.0|              28.0|
|    max|              77.0|              77.0|
+-------+------------------+------------------+



In [14]:
# add a new column or replace existing one
AgeFixed = df['Age'] + 1  # select alwayes returns a DataFrame object, and we need a column object
df = df.withColumn('AgeFixed', AgeFixed)

In [15]:
df.select(['AgeFixed','Age']).describe().show()

+-------+------------------+------------------+
|summary|          AgeFixed|               Age|
+-------+------------------+------------------+
|  count|               918|               918|
|   mean|54.510893246187365|53.510893246187365|
| stddev|  9.43261650673202|  9.43261650673202|
|    min|              29.0|              28.0|
|    max|              78.0|              77.0|
+-------+------------------+------------------+



In [16]:
# remove columns
df.drop('AgeFixed').show(1) # add df = to get the new DataFrame into a variable

+----+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
| Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|
+----+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
|40.0|  M|          ATA|     40.0|        289|        0|    Normal|  172|             N|    0.0|      Up|           0|
+----+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
only showing top 1 row



In [17]:
# rename a column
df.withColumnRenamed('Age','age').select('age').show(1)
# to rename more than a single column, i would suggest a loop.
name_pairs = [('Age','age'),('Sex','sex')]
for old_name, new_name in name_pairs:
    df = df.withColumnRenamed(old_name,new_name)

+----+
| age|
+----+
|40.0|
+----+
only showing top 1 row



In [18]:
df.select(['age','sex']).show(1)

+----+---+
| age|sex|
+----+---+
|40.0|  M|
+----+---+
only showing top 1 row



In [19]:
# drop all rows that contain any NA
df = df.na.drop()
df.count()
# drop all rows where all values are NA
df = df.na.drop(how='all')
# drop all rows where more at least 2 values are NOT NA
df = df.na.drop(thresh=2)
# drop all rows where any value at specific columns are NAs.
df = df.na.drop(how='any', subset=['age','sex']) # 'any' is the defult

In [20]:
# fill missing values in a specific column with a '?'
df = df.na.fill(value='?',subset=['sex'])
# replace NAs with mean of column
from pyspark.ml.feature import Imputer # In statistics, imputation is the process of
                                       # replacing missing data with substituted values
imptr = Imputer(inputCols=['age','RestingBP'],
                outputCols=['age','RestingBP']).setStrategy('mean') # can also be 'median' and so on

df = imptr.fit(df).transform(df)

In [21]:
# filter to adults only and calculate mean
df.filter('age > 18')
df.where('age > 18')# 'where' is an alias to 'filter'
df.where(df['age'] > 18) # third option
# add another condition ('&' means and, '|' means or)
df.where((df['age'] > 18) | (df['ChestPainType'] == 'ATA'))
# take every record where the 'ChestPainType' is NOT 'ATA'
df.filter(~(df['ChestPainType'] == 'ATA'))

DataFrame[age: float, sex: string, ChestPainType: string, RestingBP: float, Cholesterol: int, FastingBS: int, RestingECG: string, MaxHR: int, ExerciseAngina: string, Oldpeak: double, ST_Slope: string, HeartDisease: int, AgeFixed: float]

In [22]:
df.filter('age > 18').show()

+----+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+--------+
| age|sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|AgeFixed|
+----+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+--------+
|40.0|  M|          ATA|     40.0|        289|        0|    Normal|  172|             N|    0.0|      Up|           0|    41.0|
|49.0|  F|          NAP|     49.0|        180|        0|    Normal|  156|             N|    1.0|    Flat|           1|    50.0|
|37.0|  M|          ATA|     37.0|        283|        0|        ST|   98|             N|    0.0|      Up|           0|    38.0|
|48.0|  F|          ASY|     48.0|        214|        0|    Normal|  108|             Y|    1.5|    Flat|           1|    49.0|
|54.0|  M|          NAP|     54.0|        195|        0|    Normal|  122|             N|    0.0|      Up

In [23]:
# evaluate a string expression into command
from pyspark.sql.functions import expr
exp = 'age + 0.2 * AgeFixed'
df.withColumn('new_col', expr(exp)).select('new_col').show(3)

+-------+
|new_col|
+-------+
|   48.2|
|   59.0|
|   44.6|
+-------+
only showing top 3 rows



In [24]:
# group by age
disease_by_age = df.groupby('age').mean().select(['age','avg(HeartDisease)'])
# sort values in desnding order
from pyspark.sql.functions import desc
disease_by_age.orderBy(desc("age")).show(5)

+----+------------------+
| age| avg(HeartDisease)|
+----+------------------+
|77.0|               1.0|
|76.0|               0.5|
|75.0|0.6666666666666666|
|74.0|0.7142857142857143|
|73.0|               1.0|
+----+------------------+
only showing top 5 rows



In [25]:
from pyspark.sql.functions import asc
disease_by_age = df.groupby('age').mean().select(['age','avg(HeartDisease)'])
disease_by_age.orderBy(desc("age")).show(3)

+----+------------------+
| age| avg(HeartDisease)|
+----+------------------+
|77.0|               1.0|
|76.0|               0.5|
|75.0|0.6666666666666666|
+----+------------------+
only showing top 3 rows



In [26]:
# aggregate to get several statistics for several columns
# the available aggregate functions are avg, max, min, sum, count
from pyspark.sql import functions as F
df.agg(F.min(df['age']),F.max(df['age']),F.avg(df['age'])).show()

+--------+--------+------------------+
|min(age)|max(age)|          avg(age)|
+--------+--------+------------------+
|    28.0|    77.0|53.510893246187365|
+--------+--------+------------------+



In [27]:
df.groupby('HeartDisease').agg(F.min(df['age']),F.avg(df['age'])).show()

+------------+--------+------------------+
|HeartDisease|min(age)|          avg(age)|
+------------+--------+------------------+
|           1|    31.0|  55.8996062992126|
|           0|    28.0|50.551219512195125|
+------------+--------+------------------+



In [28]:
# run an SQL query on the data
df.createOrReplaceTempView("df") # tell PySpark how the table will be called in the SQL query
spark.sql("""SELECT age, sex from df""").show(2)

# we also choose columns using SQL sytnx, with a command that combins '.select()' and '.sql()'
df.selectExpr("age > 40 as older", "age").show(2)

+----+---+
| age|sex|
+----+---+
|40.0|  M|
|49.0|  F|
+----+---+
only showing top 2 rows

+-----+----+
|older| age|
+-----+----+
|false|40.0|
| true|49.0|
+-----+----+
only showing top 2 rows



In [29]:
df.groupby('age').pivot('sex', ("M", "F")).count().show(3)

+----+---+---+
| age|  M|  F|
+----+---+---+
|64.0| 16|  6|
|47.0| 15|  4|
|58.0| 35|  7|
+----+---+---+
only showing top 3 rows



In [30]:
# pivot - expensive operation
df.selectExpr("age >= 40 as older", "age",'sex').groupBy("sex")\
                    .pivot("older", ("true", "false")).count().show()

+---+----+-----+
|sex|true|false|
+---+----+-----+
|  F| 174|   19|
|  M| 664|   61|
+---+----+-----+



In [31]:
df.select(['age','MaxHR','Cholesterol']).show(4)

+----+-----+-----------+
| age|MaxHR|Cholesterol|
+----+-----+-----------+
|40.0|  172|        289|
|49.0|  156|        180|
|37.0|   98|        283|
|48.0|  108|        214|
+----+-----+-----------+
only showing top 4 rows



In [32]:
# devide dataset to training features and target
X_column_names = ['age','Cholesterol']
target_colum_name = ['MaxHR']

# convert feature columns into a columns where the vlues are feature vectors
from pyspark.ml.feature import VectorAssembler
v_asmblr = VectorAssembler(inputCols=X_column_names, outputCol='Fvec')
df = v_asmblr.transform(df)
X = df.select(['age','Cholesterol','Fvec','MaxHR'])
X.show(3)

+----+-----------+------------+-----+
| age|Cholesterol|        Fvec|MaxHR|
+----+-----------+------------+-----+
|40.0|        289|[40.0,289.0]|  172|
|49.0|        180|[49.0,180.0]|  156|
|37.0|        283|[37.0,283.0]|   98|
+----+-----------+------------+-----+
only showing top 3 rows



In [33]:
# devide dataset into training and testing sets
trainset, testset = X.randomSplit([0.8,0.2])

In [34]:
# predict 'RestingBP' using linear regression
from pyspark.ml.regression import LinearRegression
model = LinearRegression(featuresCol='Fvec', labelCol='MaxHR')
model = model.fit(trainset)
print(model.coefficients)
print(model.intercept)

22/08/18 11:09:19 WARN Instrumentation: [85ec5e28] regParam is zero, which might cause numerical instability and overfitting.
[-1.0137974732829853,0.03994738325453204]
183.47488201613427


In [35]:
# evaluate model
model.evaluate(testset).predictions.show()

+----+-----------+------------+-----+------------------+
| age|Cholesterol|        Fvec|MaxHR|        prediction|
+----+-----------+------------+-----+------------------+
|29.0|        204|[29.0,204.0]|  202|162.22402147485224|
|29.0|        243|[29.0,243.0]|  160|  163.781969421779|
|30.0|        237|[30.0,237.0]|  170| 162.5284876489688|
|31.0|        219|[31.0,219.0]|  150|160.79563727710425|
|32.0|        254|[32.0,254.0]|  155| 161.1799982177299|
|35.0|        198|[35.0,198.0]|  130|155.90155233562714|
|37.0|        215|[37.0,215.0]|  170| 154.5530629043882|
|37.0|        240|[37.0,240.0]|  165|155.55174748575152|
|38.0|          0|  [38.0,0.0]|  120|144.95057803138081|
|38.0|        289|[38.0,289.0]|  105|156.49537179194058|
|39.0|        219|[39.0,219.0]|  140|152.68525749084037|
|39.0|        241|[39.0,241.0]|  146|153.56409992244005|
|40.0|          0|  [40.0,0.0]|  144|142.92298308481486|
|40.0|        167|[40.0,167.0]|  114|149.59419608832172|
|40.0|        235|[40.0,235.0]|

In [36]:
# handel categorical features with ordinal indexing
from pyspark.ml.feature import StringIndexer
indxr = StringIndexer(inputCol='ChestPainType', outputCol='ChestPainTypeInxed')
indxr.fit(df).transform(df).select('ChestPainTypeInxed').show(3)

+------------------+
|ChestPainTypeInxed|
+------------------+
|               2.0|
|               1.0|
|               2.0|
+------------------+
only showing top 3 rows

