This accompanies the Youtube video entitled: "The ONLY PySpark Tutorial You Will Ever Need" by Moran Reznik.

Please note that you need to have a running Spark cluster to execute these commands!

Create a SparkSession.

In [None]:
!pip install pyspark

In [None]:
import pyspark

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('test').getOrCreate()

In [None]:
spark

Create a DataFrame from a CSV file.

In [None]:
df = spark.read.option('header','true').csv('heart.csv')

In [None]:
# tell pyspark the type of the columns - saves time on large dataset. there are other ways to do this, but that's my favorite
schema = 'Age INTEGER, Sex STRING, ChestPainType STRING'
df = spark.read.csv('C:/Users/lynst/Documents/GitHub/machine-learning-projects/data-engineering/heart.csv', schema=schema, header=True)

In [None]:
# let PySpark infer the schema
df = spark.read.csv('C:/Users/lynst/Documents/GitHub/machine-learning-projects/data-engineering/heart.csv', inferSchema=True, header=True)

In [None]:
# replace nulls with other value at reading time
df = spark.read.csv('C:/Users/lynst/Documents/GitHub/machine-learning-projects/data-engineering/heart.csv', nullValue='NA')

In [None]:
# save data
df.write.format("csv").save("heart_save.csv")

Saving won't let you write over an existing file. To do that, you need to set the 'mode' to overwrite:

In [None]:
# if you want to overwrite the file
df.write.format("csv").mode("overwrite").save("heart_save.csv")

In [None]:
# show head of table
df.show(3)

In [None]:
# count number of rows
df.count()

In [None]:
# show parts of the table
df.select('Age').show(3)
df.select(['Age', 'Sex']).show(3)

## Caching
every time you run a DAG, it will be re-computed from the beginning. that is, the results are not saved in memory. so, if we want to save a result so it won't have to be recomputed, we can use the cache command. note, that this will occupy space in the working node's memory - so be careful with the sizes of datasets you are caching! by default, the cached DF is stored to RAM, and is unserialized (not converted into a stream of bytes). you can change both of these - store data to hard disk, serialized it, or both!

## Collecting
even after caching a DataFrame, it still sits in the worker nodes memory. if you want to collect is pieces, assemble them and save them on the master node so you won't have to pull it every time, use the command for collecting. again, be very careful with this, since the collected file will have to fit in the master node memory!

In [None]:
df.cache()
df.collect()

In [None]:
# convert PySpark DataFrame to Pandas DataFrame
pd_df = df.toPandas()
# convert it back
spark_df = spark.createDataFrame(pd_df)

In [None]:
# show first three rows as three row objects, which is how spark represents single rows from a table.
# we will learn more about it later
df.head(3)

Print the DataFrame's schema.

In [None]:
# type as columns
df.printSchema()

In [None]:
# column dtypes as list of tuples
df.dtypes

In [None]:
# cast a column from one type to other
from pyspark.sql.types import FloatType

df = df.withColumn("Age", df.Age.cast(FloatType()))
df = df.withColumn("RestingBP", df.Age.cast(FloatType()))

In [None]:
# compute summery statistics
df.select(['Age', 'RestingBP']).describe().show()

In [None]:
# add a new column or replace existing one
AgeFixed = df['Age'] + 1  # select alwayes returns a DataFrame object, and we need a column object
df = df.withColumn('AgeFixed', AgeFixed)

In [None]:
df.select(['AgeFixed', 'Age']).describe().show()

In [None]:
# remove columns
df.drop('AgeFixed').show(1)  # add df = to get the new DataFrame into a variable

In [None]:
# rename a column
df.withColumnRenamed('Age', 'age').select('age').show(1)
# to rename more than a single column, i would suggest a loop.
name_pairs = [('Age', 'age'), ('Sex', 'sex')]
for old_name, new_name in name_pairs:
    df = df.withColumnRenamed(old_name, new_name)

In [None]:
df.select(['age','sex']).show(1)

In [None]:
# drop all rows that contain any NA
df = df.na.drop()
df.count()
# drop all rows where all values are NA
df = df.na.drop(how='all')
# drop all rows where more at least 2 values are NOT NA
df = df.na.drop(thresh=2)
# drop all rows where any value at specific columns are NAs.
df = df.na.drop(how='any', subset=['age', 'sex'])  # 'any' is the defult

In [None]:
# fill missing values in a specific column with a '?'
df = df.na.fill(value='?', subset=['sex'])
# replace NAs with mean of column
from pyspark.ml.feature import Imputer  # In statistics, imputation is the process of
# replacing missing data with substituted values
imptr = Imputer(inputCols=['age', 'RestingBP'],
                outputCols=['age', 'RestingBP']).setStrategy('mean')  # can also be 'median' and so on

df = imptr.fit(df).transform(df)

In [None]:
# filter to adults only and calculate mean
df.filter('age > 18')
df.where('age > 18')  # 'where' is an alias to 'filter'
df.where(df['age'] > 18)  # third option
# add another condition ('&' means and, '|' means or)
df.where((df['age'] > 18) | (df['ChestPainType'] == 'ATA'))
# take every record where the 'ChestPainType' is NOT 'ATA'
df.filter(~(df['ChestPainType'] == 'ATA'))

In [None]:
df.filter('age > 18').show()

In [None]:
# evaluate a string expression into command
from pyspark.sql.functions import expr

exp = 'age + 0.2 * AgeFixed'
df.withColumn('new_col', expr(exp)).select('new_col').show(3)

In [None]:
# group by age
disease_by_age = df.groupby('age').mean().select(['age', 'avg(HeartDisease)'])
# sort values in desnding order
from pyspark.sql.functions import desc

disease_by_age.orderBy(desc("age")).show(5)

In [None]:
from pyspark.sql.functions import asc

disease_by_age = df.groupby('age').mean().select(['age', 'avg(HeartDisease)'])
disease_by_age.orderBy(desc("age")).show(3)

In [None]:
# aggregate to get several statistics for several columns
# the available aggregate functions are avg, max, min, sum, count
from pyspark.sql import functions as F

df.agg(F.min(df['age']), F.max(df['age']), F.avg(df['sex'])).show()

In [None]:
df.groupby('HeartDisease').agg(F.min(df['age']), F.avg(df['sex'])).show()

In [None]:
# run an SQL query on the data
df.createOrReplaceTempView(
    "df")  # tell PySpark how the table will be called in the SQL query
spark.sql("""SELECT sex from df""").show(2)

# we also choose columns using SQL sytnx, with a command that combins '.select()' and '.sql()'
df.selectExpr("age >= 40 as older", "age").show(2)

In [None]:
df.groupby('age').pivot('sex', ("M", "F")).count().show(3)››› 

In [None]:
# pivot - expensive operation
df.selectExpr("age >= 40 as older", "age",'sex').groupBy("sex")\
                    .pivot("older", ("true", "false")).count().show()

In [None]:
df.select(['age', 'MaxHR', 'Cholesterol']).show(4)

In [None]:
# devide dataset to training features and target
X_column_names = ['Age', 'Cholesterol']
target_colum_name = ['MaxHR']

# convert feature columns into a columns where the vlues are feature vectors
from pyspark.ml.feature import VectorAssembler

v_asmblr = VectorAssembler(inputCols=X_column_names, outputCol='Fvec')
df = v_asmblr.transform(df)
X = df.select(['Age', 'Cholesterol', 'Fvec', 'MaxHR'])
X.show(3)

In [None]:
# devide dataset into training and testing sets
trainset, testset = X.randomSplit([0.8, 0.2])

In [None]:
# predict 'RestingBP' using linear regression
from pyspark.ml.regression import LinearRegression

model = LinearRegression(featuresCol='Fvec', labelCol='MaxHR')
model = model.fit(trainset)
print(model.coefficients)
print(model.intercept)

In [None]:
# evaluate model
model.evaluate(testset).predictions.show(3)

In [None]:
# handel categorical features with ordinal indexing
from pyspark.ml.feature import StringIndexer

indxr = StringIndexer(inputCol='ChestPainType', outputCol='ChestPainTypeInxed')
indxr.fit(df).transform(df).select('ChestPainTypeInxed').show(3)

Stop the SparkSession.

In [None]:
spark.stop()