In [1]:
!pip install pyspark

In [2]:
!pip install findspark

In [3]:
import findspark
findspark.init

In [4]:
from pyspark.sql import SparkSession # required to created a dataframe
spark = SparkSession.builder.appName("Basics").getOrCreate()

import pyspark.sql.functions as F

In [36]:
df = spark.read.csv("../input/heart-failure-prediction/heart.csv",
                    header=True,
                   inferSchema=True)

# read csv, all columns will be of type string
# df = spark.read.option('header','true').csv('heart.csv')
# tell pyspark the type of the columns - saves time on large dataset. there are other ways to do this, but that's
# my favorite
# schema = 'Age INTEGER, Sex STRING, ChestPainType STRING'
# df = spark.read.csv('../input/heart-failure-prediction/heart.csv', schema=schema, inferSchema=True ,header=True)
# let PySpark infer the schema
# df = spark.read.csv('../input/heart-failure-prediction/heart.csv', inferSchema=True, header=True)
# # replace nulls with other value at reading time
# df = spark.read.csv('../input/heart-failure-prediction/heart.csv', nullValue='NA')
# save data
# df.write.format("csv").save("./heart_save.csv")
# # if you want to overwrite the file
# df.write.format("csv").mode("overwrite").save("./heart_save.csv")

df.show()

In [37]:
df.printSchema()

In [38]:
df.count()

# Pandas DataFrame VS PySpark DataFrame

**both represents a table of data with rows and columns. however, under the hood they are different, as PySpark dataframe needs to support distributed computations. as we move forward, we will see more and more features of it that are not present in Pandas DataFrame. that being said - if you know how to use Pandas, than moving to PySpark will feel like a natural transition.**

# DAG

**directed acyclic graph is the way Spark runs computations. when you give it a series of transformation to apply to the dataset, it build a graph out of those transformations, so it knows what to do - but it does not execute those commands immediately, if it does not have to. rather, it is lazy - it will go through the DAG and apply the transformations only when it must, to provide a needed result. this allows better performance, since spark knows what's ahead of a certain computation and get optimize the process accordingly.**

# transformations VS actions

**in PySpark, there are two types of command: transformations and actions. transformation commands are added to the DAG, but does not get it to actually be executed. they transform one DataFrame into another, not changing the input DataFrame. on the other hand, actions make PySpark execute the DAG but does not create a new DataFrame - instead, they output the result of the DAG.**

# Caching

**every time you run a DAG, it will be re-computed from the beginning. that is, the results are not saved in memory. so, if we want to save a result so it won't have to be recomputed, we can use the cache command. note, that this will occupy space in the working node's memory - so be careful with the sizes of datasets you are caching! by default, the cached DF is stored to RAM, and is unserialized (not converted into a stream of bytes). you can change both of these - store data to hard disk, serialized it, or both!**

# Collecting

**even after caching a DataFrame, it still sits in the worker nodes memory. if you want to collect its pieces, assemble them and save them on the master node so you won't have to pull it every time, use the command for collecting. again, be very careful with this, since the collected file will have to fit in the master node memory!**

In [39]:
df.cache()
df.collect()

In [40]:
# convert PySpark DataFrame to Pandas DataFrame
pd_df = df.toPandas()
# convert it back
spark_df = spark.createDataFrame(pd_df)

In [41]:
# show first three rows as three row objects, which is how spark represents single rows from a table.
# we will learn more about it later
df.head(3)

In [42]:
df.dtypes

In [43]:
# cast a column from one type to other
from pyspark.sql.types import FloatType
df = df.withColumn("Age",df.Age.cast(FloatType()))
df = df.withColumn("RestingBP",df.Age.cast(FloatType()))

In [44]:
# compute summery statistics
df.select(['Age','RestingBP']).describe().show()

In [45]:
# add a new column or replace existing one
AgeFixed = df['Age'] + 1  # select alwayes returns a DataFrame object, and we need a column object
df = df.withColumn('AgeFixed', AgeFixed)

In [46]:
df.select(['AgeFixed','Age']).describe().show()

In [47]:
# remove columns
df.drop('AgeFixed').show(1) # add df = to get the new DataFrame into a variable

In [50]:
# rename a column
# df.withColumnRenamed('Age','age').select('age').show(1)
# to rename more than a single column, i would suggest a loop.
name_pairs = [('Age','age'),('Sex','sex')]
for old_name, new_name in name_pairs:
    df = df.withColumnRenamed(old_name,new_name)

In [51]:
df.show(1)

In [54]:
# drop all rows that contain any NA
df = df.na.drop()
df.count()
# drop all rows where all values are NA
df = df.na.drop(how='all')
# drop all rows where more at least 2 values are NOT NA
df = df.na.drop(thresh=2)
# drop all rows where any value at specific columns are NAs.
df = df.na.drop(how='any', subset=['age','sex']) # 'any' is the defult

In [55]:
# fill missing values in a specific column with a '?'
df = df.na.fill(value='?',subset=['sex'])
# replace NAs with mean of column
from pyspark.ml.feature import Imputer # In statistics, imputation is the process of
                                       # replacing missing data with substituted values
imptr = Imputer(inputCols=['age','RestingBP'],
                outputCols=['age','RestingBP']).setStrategy('mean') # can also be 'median' and so on

df = imptr.fit(df).transform(df)

In [63]:
# filter to adults only and calculate mean
df.filter('age > 18').select('age').describe().show()
df.where('age > 18')# 'where' is an alias to 'filter'
df.where(df['age'] > 18) # third option
# add another condition ('&' means and, '|' means or)
df.where((df['age'] > 18) | (df['ChestPainType'] == 'ATA'))
# take every record where the 'ChestPainType' is NOT 'ATA'
df.filter(~(df['ChestPainType'] == 'ATA'))

In [64]:
# evaluate a string expression into command
from pyspark.sql.functions import expr
exp = 'age + 0.2 * AgeFixed'
df.withColumn('new_col', expr(exp)).select('new_col').show(3)

In [69]:
# group by age
disease_by_age = df.groupby('age').mean().select(['age','avg(HeartDisease)'])
# sort values in desnding order
from pyspark.sql.functions import desc
disease_by_age.orderBy(desc("age")).show(5)

In [70]:
# aggregate to get several statistics for several columns
# the available aggregate functions are avg, max, min, sum, count
from pyspark.sql import functions as F
df.agg(F.min(df['age']),F.max(df['age']),F.avg(df['sex'])).show()

In [71]:
df.groupby('HeartDisease').agg(F.min(df['age']),F.avg(df['sex'])).show()

In [72]:
# run an SQL query on the data
df.createOrReplaceTempView("df") # tell PySpark how the table will be called in the SQL query
spark.sql("""SELECT sex from df""").show(2)

# we also choose columns using SQL sytnx, with a command that combins '.select()' and '.sql()'
df.selectExpr("age >= 40 as older", "age").show(2)

In [77]:
df.groupby('age').pivot('sex', ("M", "F")).count().show(3)

In [88]:
# pivot - expensive operation
df.selectExpr("age >= 40 as older", "age",'sex').groupBy("sex")\
                    .pivot("older", ("true", "false")).count().show()

In [89]:
df.select(['age','MaxHR','Cholesterol']).show(4)

In [94]:
df.printSchema()

In [103]:
# devide dataset to training features and target
X_column_names = ['age','Cholesterol']
target_colum_name = ['MaxHR']

# convert feature columns into a columns where the vlues are feature vectors
from pyspark.ml.feature import VectorAssembler
v_asmblr = VectorAssembler(inputCols = X_column_names, outputCol = 'Fvec')
df = v_asmblr.transform(df)
X = df.select(['age','Cholesterol','Fvec','MaxHR'])
X.show(3)

In [104]:
# devide dataset into training and testing sets
trainset, testset = X.randomSplit([0.8,0.2])

In [105]:
# predict 'RestingBP' using linear regression
from pyspark.ml.regression import LinearRegression
model = LinearRegression(featuresCol='Fvec', labelCol='MaxHR')
model = model.fit(trainset)
print(model.coefficients)
print(model.intercept)

In [106]:
# evaluate model
model.evaluate(testset).predictions.show(3)

In [111]:
# handel categorical features with ordinal indexing
from pyspark.ml.feature import StringIndexer
indxr = StringIndexer(inputCol='ChestPainType', outputCol='ChestPainTypeInxed')
indxr.fit(df).transform(df).select('ChestPainTypeInxed').show(3)