In [1]:
# PySpark Imports
from pyspark.sql.session import SparkSession
from pyspark.sql.types import FloatType
from pyspark.ml.feature import Imputer
from pyspark.sql.functions import expr, when, sum, avg, desc, asc
# Also can from pyspark.sql import functions as F

# ML Pyspark Imports
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

# Other Imports
import pandas as pd
import numpy as np
import os
import sys

In [2]:
# System paths
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

# Creating a Spark Session
spark = SparkSession.builder.appName('test').getOrCreate()

KeyboardInterrupt: 

### Loading Data
Get heart data from Kaggle: https://www.kaggle.com/datasets/fedesoriano/heart-failure-prediction

In [None]:
# Importing the data all as strings
df = spark.read.option('header', 'true').csv('heart.csv')

# Importing all the data with column types, pass it as an argument in the function
schema = 'Age INTEGER, Sex STRING, ChestPainType STRING'
df = spark.read.csv('heart.csv', schema= schema, header=True)

# Infer the schema based on the data in the column
df = spark.read.csv('heart.csv', inferSchema=True, header=True)

# Replace null values with any other value
df = spark.read.csv('heart.csv', inferSchema=True, nullValue='NA', header=True)

### Saving Data

In [None]:
# Non overwrittable
df.write.format("csv").save("/heart_save.csv")

# Overwrittable
df.write.format("csv").save("/heart_save.csv")

### General Functions

In [None]:
# Counting number of rows
df.count()

# Showing the first x rows
df.show(5)

# Showing a specific column
df.select('Age').show(5)
df.select(['Age', 'Sex']).show(5)

# PySpark to Pandas
pd_df = df.toPandas()

# Pandas to PySpark
spark_df = spark.createDataFrame(pd_df)

# To see column data type and see if they accept null values
df.printSchema()
# or
df.dtypes

# Change column data type to float for example
df = df.withcolumn("Age", df.Age.cast(FloatType()))

# To remove a column
df.drop("AgeFixed")

# To Rename a column
df.withColumnRenamed('Age','age')
# or
name_columns = {'Age':'age', 'Sex': 'sex'}
df.withColumnsRenamed(name_columns)

# Table of Statistics
df.select(['Age','RestingBP']).describe().show()

# Managing NA
df = df.na.drop() # Drop all rows that contain a single null value
df = df.na.drop(how='all') # Drop rows that have nothing but null values
df = df.na.drop(thresh=2) # Drop rows that have 2 or more null values
df = df.na.drop(how = 'any', subset = ['age','sex']) # Drop rows that have NA based on columns
df = df.na.fill(value='?', subset=['sex']) # Replace null values with a different value

imptr = Imputer(inputCols=['age','RestingBP'], outputCols=['age','RestingBP']).setStrategy('mean') # Create an imputer to set the missing values
df = imptr.fit(df).transform(df) # Fit the changes to the dataframe to fill in the missing values

# Filtering
df.filter('age > 18')
df.where('age > 18')
df.where(df['age'] > 18)
df.where((df['age'] > 18) & (df['ChestPainType'] == 'ATA')) # AND operator
df.where((df['age'] > 18) | (df['ChestPainType'] == 'ATA')) # OR operator
df.filter(~(df['ChestPainType'] == 'ATA')) # Choose all rows that do not match the behavior

# Evaluating a string
exp = 'age + 0.2 * AgeFixed'
df.withColumn('new_col', expr(exp))

# Multiplying columns
df.withColumn('new_col', df.age * 0.2)
df.withColumn('new_col', when(df.ChestPainType == 'ATA', 0).otherwise(df.age * 0.2))

# Group by
df.groupby('age').agg(avg('HeartDisease').alias('avg_HeartDisease')).show()

# Ordering by asc or descending
disease_by_age = df.groupby('Age').agg(avg('HeartDisease').alias('avg_HeartDisease')) # Can continue adding new and different aggregations
disease_by_age.orderBy(asc('Age')).show()

# Pivoting
df.groupby('age').pivot('sex', ("M", "F")).count() # Is computational expensive but is lessened by providing the values you want to pivot on to the function

### SQL Syntax

In [None]:
# Query using SQL Syntax
df.createOrReplaceTempView("df")
spark.sql("""SELECT Sex from df""").show(2)

# Using a regular Select Expresion
df.selectExpr("age >= 40 as older", "age", "HeartDisease").show(2) # Can continue adding more columns

### Combining Comands

In [66]:
# Just an example
df.selectExpr("Age >= 40 as older", "Age", "Sex").groupBy("sex")\
    .pivot("older", ("true", "false")).count().show()

+---+----+-----+
|sex|true|false|
+---+----+-----+
|  F| 174|   19|
|  M| 664|   61|
+---+----+-----+



### Split a dataset

In [None]:
# PySpark ML does not accept a dataframe but instead you need to make it into one column that has a vector per row with each of the values
feature_col_names = ['Age', 'Cholesterol']
v_asemblr = VectorAssembler(inputCols=feature_col_names, outputCol='Fvec')
model_df = v_asemblr.transform(df)

# Using a model
trainset, testset = model_df.randomSplit([0.8, 0.2]) # Division proportion 
model = LinearRegression(featuresCol='Fvec', labelCol='MaxHR')
model = model.fit(trainset)

print(model.coefficients)
print(model.intercept)

model.evaluate(testset).predictions

[-0.9938204489815767,0.04816126113055623]
180.57905208099078
+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+------------+------------------+
|Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|        Fvec|        prediction|
+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+------------+------------------+
| 31|  M|          ASY|      120|        270|        0|    Normal|  153|             Y|    1.5|    Flat|           1|[31.0,270.0]| 162.7741586678121|
| 34|  M|           TA|      140|        156|        0|    Normal|  180|             N|    0.0|    Flat|           1|[34.0,156.0]|154.30231355198393|
| 35|  F|          ASY|      140|        167|        0|    Normal|  150|             N|    0.0|      Up|           0|[35.0,167.0]| 153.8382669754385|
| 35|  F|           TA|      120|      