<a href="https://www.kaggle.com/code/rubinr12/pyspark?scriptVersionId=191440190" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

# What is PySpark?
pyspark is a python api for working with apache spark. I will first explain what do I mean by a "python api" for something and then explain what, specifically, is 'apache spark'.

what I mean by **'python api'** is that you can use the syntex and agility of python to interact with and send commands to a system that is not based, at its core, on python. 

with pyspark, you intercat with apache spark - a system designed for working, analyzing and modeling with immense amounts of data in many computers at the same time. putting it in a different way, apache spark allows you to run computations in parallel, instead of sequentially. it allows you to divide one incredibly large task into many smaller tasks, and run each such task on a different machine.this allowes you to accomplish your analysis goals in reasonable time that would not be possible on a single machine.

usually, we would define the amount of data that suits PySpark as what would not fit into a single machine storage (let alone RAM).

**important related concepts:** 
1. distributed computing - when you distribute a task into several smaller task that run at the same time. this is what pyspark allows you to do with many machines, but it can also be done on a single machine with several threads, for example.
2. cluster - a network of machines that can take on tasks from a user, interact with one another and return results. these provide the computing resources that pyspark will use to make the computations.
3. Resilient Distributed Dataset (RDD) - an immutable distributed collection of data. it is not tabular, like DataFrames which we will work with later, and has no data schema. therefore, for tabular data wrangling, DataFrames allowes for more API options and uner-then-hood optimizations. still, you might encounter RDDs as you learn more about Spark, and should be aware of their existence.

**Part of PySpark we will cover:**
1. PySpark SQL - contains commands for data processing and manipulation.
2. PySpark MLlib - includes a variety of models, model training and related commands.

**Spark Architecture:**
to send commands and receive results from a cluster, you will need to initiate a spark session. this object is your tool for interacting with Spark. each user of the cluster will have its own Spark Session, that will allow him to use the cluster in isolation from other users. all of the sessions are communicating with a spark context, which is the master node in the cluster - that is, it assigns each of computers in the cluster tasks and coordinates them. each of the computers in the cluster that perform tasks for a master node is called a worker node. to connect to a worker node, the master node needs to get that node's comput power allocated to it, by a cluster manager, that is responsable for distributing the cluster resources. inside each worker node, there are execute programs that run the tasks - they can run multiple tasks simultaneously, and has their own cashe for storing results. so, each master node can have multiple worker nodes, that can have multiple tasks running.  

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m5.4 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488493 sha256=8b76eb51ef8209432a5577c353f632b2f73c0b828e6aedd7662f4ba69bb65947
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
# a SparkSession object can perform the most common data processing tasks
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('test').getOrCreate() # will return existing session if one was
                                                           # created before and was not closed

In [None]:
spark

**dataset:**
https://www.kaggle.com/fedesoriano/heart-failure-prediction

In [None]:
# read csv, all columns will be of type string
df = spark.read.option('header','true').csv('heart.csv')
# tell pyspark the type of the columns - saves time on large dataset. there are other ways to do this, but that's
# my favorite
schema = 'Age INTEGER, Sex STRING, ChestPainType STRING'
df = spark.read.csv('/Users/mreznik/heart.csv', schema=schema, header=True)
# let PySpark infer the schema
df = spark.read.csv('/Users/mreznik/heart.csv', inferSchema=True, header=True)
# replace nulls with other value at reading time
df = spark.read.csv('/Users/mreznik/heart.csv', nullValue='NA')
# save data
df.write.format("csv").save("heart_save.csv")
# if you want to overwrite the file
df.write.format("csv").mode("overwrite").save("heart_save.csv")

In [None]:
# show head of table
df.show(3)

In [None]:
# count number of rows
df.count()

In [None]:
# show parts of the table
df.select('Age').show(3)
df.select(['Age','Sex']).show(3)

## Pandas DataFrame VS PySpark DataFrame

both represents a table of data with rows and columns. however, under the hood they are different, as PySpark dataframe needs to support distributed computations. as we move forward, we will see more and more features of it that are not present in Pandas DataFrame. that being said - if you know how to use Pandas, than moving to PySpark will feel like a natural transition.

## DAG
directed acyclic graph is the way Spark runs computations. when you give it a series of transformation to apply to the dataset, it build a graph out of those transformations, so it knows what to do - but it does not execute those commands immediately, if it does not have to. rather, it is lazy - it will go through the DAG and apply the transformations only when it must, to provide a needed result. this allows better performance, since spark knows what's ahead of a certain computation and get optimize the process accordingly.

## transformations VS actions
in PySpark, there are two types of command: transformations and actions. transformation commands are added to the DAG, but does not get it to actually be executed. they transform one DataFrame into another, not changing the input DataFrame. on the other hand, actions make PySpark execute the DAG but does not create a new DataFrame - instead, they output the result of the DAG.

## Caching
every time you run a DAG, it will be re-computed from the beginning. that is, the results are not saved in memory. 
so, if we want to save a result so it won't have to be recomputed, we can use the cache command. note, that this will occupy space in the working node's memory - so be careful with the sizes of datasets you are caching! by default, the cached DF is stored to RAM, and is unserialized (not converted into a stream of bytes). you can change both of these - store data to hard disk, serialized it, or both!

## Collecting
even after caching a DataFrame, it still sits in the worker nodes memory. if you want to collect is pieces, assemble them and save them on the master node so you won't have to pull it every time, use the command for collecting. again, be very careful with this, since the collected file will have to fit in the master node memory!

In [None]:
df.cache()
df.collect()

In [None]:
# convert PySpark DataFrame to Pandas DataFrame
pd_df = df.toPandas()
# convert it back
spark_df = spark.createDataFrame(pd_df)

In [None]:
# show first three rows as three row objects, which is how spark represents single rows from a table.
# we will learn more about it later
df.head(3)

In [None]:
# type os columns
df.printSchema()

In [None]:
# column dtypes as list of tuples
df.dtypes

In [None]:
# cast a column from one type to other
from pyspark.sql.types import FloatType
df = df.withColumn("Age",df.Age.cast(FloatType()))
df = df.withColumn("RestingBP",df.Age.cast(FloatType()))

In [None]:
# compute summery statistics
df.select(['Age','RestingBP']).describe().show()

In [None]:
# add a new column or replace existing one
AgeFixed = df['Age'] + 1  # select alwayes returns a DataFrame object, and we need a column object
df = df.withColumn('AgeFixed', AgeFixed)

In [None]:
df.select(['AgeFixed','Age']).describe().show()

In [None]:
# remove columns
df.drop('AgeFixed').show(1) # add df = to get the new DataFrame into a variable

In [None]:
# rename a column
df.withColumnRenamed('Age','age').select('age').show(1)
# to rename more than a single column, i would suggest a loop.
name_pairs = [('Age','age'),('Sex','sex')]
for old_name, new_name in name_pairs:
    df = df.withColumnRenamed(old_name,new_name)

In [None]:
df.select(['age','sex']).show(1)

In [None]:
# drop all rows that contain any NA
df = df.na.drop()
df.count()
# drop all rows where all values are NA
df = df.na.drop(how='all')
# drop all rows where more at least 2 values are NOT NA
df = df.na.drop(thresh=2)
# drop all rows where any value at specific columns are NAs.
df = df.na.drop(how='any', subset=['age','sex']) # 'any' is the defult

In [None]:
# fill missing values in a specific column with a '?'
df = df.na.fill(value='?',subset=['sex'])
# replace NAs with mean of column
from pyspark.ml.feature import Imputer # In statistics, imputation is the process of
                                       # replacing missing data with substituted values
imptr = Imputer(inputCols=['age','RestingBP'],
                outputCols=['age','RestingBP']).setStrategy('mean') # can also be 'median' and so on

df = imptr.fit(df).transform(df)

In [None]:
# filter to adults only and calculate mean
df.filter('age > 18')
df.where('age > 18')# 'where' is an alias to 'filter'
df.where(df['age'] > 18) # third option
# add another condition ('&' means and, '|' means or)
df.where((df['age'] > 18) | (df['ChestPainType'] == 'ATA'))
# take every record where the 'ChestPainType' is NOT 'ATA'
df.filter(~(df['ChestPainType'] == 'ATA'))

In [None]:
df.filter('age > 18').show()

In [None]:
# evaluate a string expression into command
from pyspark.sql.functions import expr
exp = 'age + 0.2 * AgeFixed'
df.withColumn('new_col', expr(exp)).select('new_col').show(3)

In [None]:
# group by age
disease_by_age = df.groupby('age').mean().select(['age','avg(HeartDisease)'])
# sort values in desnding order
from pyspark.sql.functions import desc
disease_by_age.orderBy(desc("age")).show(5)

In [None]:
from pyspark.sql.functions import asc
disease_by_age = df.groupby('age').mean().select(['age','avg(HeartDisease)'])
disease_by_age.orderBy(desc("age")).show(3)

In [None]:
# aggregate to get several statistics for several columns
# the available aggregate functions are avg, max, min, sum, count
from pyspark.sql import functions as F
df.agg(F.min(df['age']),F.max(df['age']),F.avg(df['sex'])).show()

In [None]:
df.groupby('HeartDisease').agg(F.min(df['age']),F.avg(df['sex'])).show()

In [None]:
# run an SQL query on the data
df.createOrReplaceTempView("df") # tell PySpark how the table will be called in the SQL query
spark.sql("""SELECT sex from df""").show(2)

# we also choose columns using SQL sytnx, with a command that combins '.select()' and '.sql()'
df.selectExpr("age >= 40 as older", "age").show(2)

In [None]:
df.groupby('age').pivot('sex', ("M", "F")).count().show(3)››› 

In [None]:
# pivot - expensive operation
df.selectExpr("age >= 40 as older", "age",'sex').groupBy("sex")\
                    .pivot("older", ("true", "false")).count().show()

In [None]:
df.select(['age','MaxHR','Cholesterol']).show(4)

In [None]:
# devide dataset to training features and target
X_column_names = ['Age','Cholesterol']
target_colum_name = ['MaxHR']

# convert feature columns into a columns where the vlues are feature vectors
from pyspark.ml.feature import VectorAssembler
v_asmblr = VectorAssembler(inputCols=X_column_names, outputCol='Fvec')
df = v_asmblr.transform(df)
X = df.select(['Age','Cholesterol','Fvec','MaxHR'])
X.show(3)

In [None]:
# devide dataset into training and testing sets
trainset, testset = X.randomSplit([0.8,0.2])

In [None]:
# predict 'RestingBP' using linear regression
from pyspark.ml.regression import LinearRegression
model = LinearRegression(featuresCol='Fvec', labelCol='MaxHR')
model = model.fit(trainset)
print(model.coefficients)
print(model.intercept)

In [None]:
# evaluate model
model.evaluate(testset).predictions.show(3)

In [None]:
# handel categorical features with ordinal indexing
from pyspark.ml.feature import StringIndexer
indxr = StringIndexer(inputCol='ChestPainType', outputCol='ChestPainTypeInxed')
indxr.fit(df).transform(df).select('ChestPainTypeInxed').show(3)