<a href="https://colab.research.google.com/github/julihdez36/Analytics/blob/main/spark_overview.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## ¿Qué es PySpark?

PySpark es una API de Python para trabajar con Apache Spark. Primero explicaré qué quiero decir con una "API de Python" para algo y luego explicaré qué es, específicamente, "Apache Spark".

Lo que quiero decir con "API de Python" es que puedes usar la sintaxis y la agilidad de Python para interactuar y enviar comandos a un sistema que no está basado, en esencia, en Python.

Con PySpark, interactúas con Apache Spark, un sistema diseñado para trabajar, analizar y modelar con inmensas cantidades de datos en muchas computadoras al mismo tiempo. Dicho de otra manera, Apache Spark te permite ejecutar cálculos en paralelo, en lugar de secuencialmente. Te permite dividir una tarea increíblemente grande en muchas tareas más pequeñas y ejecutar cada una de esas tareas en una máquina diferente. Esto te permite lograr tus objetivos de análisis en un tiempo razonable que no sería posible en una sola máquina.

Generalmente, definiríamos la cantidad de datos que se adapta a PySpark como lo que no cabe en el almacenamiento de una sola máquina (y mucho menos en la RAM).

## Conceptos relacionados importantes:

1. Computación distribuida: cuando se distribuye una tarea en varias tareas más pequeñas que se ejecutan al mismo tiempo. Esto es lo que PySpark le permite hacer con muchas máquinas, pero también se puede hacer en una sola máquina con varios subprocesos, por ejemplo.
2. Clúster: una red de máquinas que pueden asumir tareas de un usuario, interactuar entre sí y devolver resultados. Estos proporcionan los recursos informáticos que PySpark utilizará para realizar los cálculos.
3. Conjunto de datos distribuidos resilientes (RDD): una colección de datos distribuida e inmutable. No es tabular, como DataFrames con los que trabajaremos más adelante, y no tiene un esquema de datos. Por lo tanto, para la manipulación de datos tabulares, DataFrames permite más opciones de API y optimizaciones poco conocidas. Aun así, es posible que se encuentre con RDD a medida que aprenda más sobre Spark, y debe estar al tanto de su existencia.

## Parte de PySpark que cubriremos:

1. PySpark SQL: contiene comandos para el procesamiento y la manipulación de datos.
2. PySpark MLlib: incluye una variedad de modelos, entrenamiento de modelos y comandos relacionados.

**Arquitectura Spark:** para enviar comandos y recibir resultados de un clúster, deberá iniciar una sesión Spark. Este objeto es su herramienta para interactuar con Spark. Cada usuario del clúster tendrá su propia sesión Spark, que le permitirá usar el clúster de forma aislada de otros usuarios. Todas las sesiones se comunican con un contexto Spark, que es el nodo maestro del clúster; es decir, asigna tareas a cada una de las computadoras del clúster y las coordina. Cada una de las computadoras del clúster que realiza tareas para un nodo maestro se denomina nodo de trabajo. Para conectarse a un nodo de trabajo, el nodo maestro debe obtener la potencia de cómputo de ese nodo asignada por un administrador de clúster, que es responsable de distribuir los recursos del clúster. Dentro de cada nodo de trabajo, hay programas de ejecución que ejecutan las tareas; pueden ejecutar múltiples tareas simultáneamente y tienen su propio caché para almacenar resultados. Por lo tanto, cada nodo maestro puede tener múltiples nodos de trabajo, que pueden tener múltiples tareas en ejecución.

In [None]:
# a SparkSession object can perform the most common data processing tasks
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('test').getOrCreate() # will return existing session if one was
                                                           # created before and was not closed

In [None]:
spark

dataset: https://www.kaggle.com/fedesoriano/heart-failure-prediction

In [None]:
# read csv, all columns will be of type string
df = spark.read.option('header','true').csv('heart.csv')
# Indíquele a pyspark el tipo de columnas: ahorra tiempo en conjuntos de datos grandes. Hay otras formas de hacer esto, pero esa es mi favorita

schema = 'Age INTEGER, Sex STRING, ChestPainType STRING'
df = spark.read.csv('/Users/mreznik/heart.csv', schema=schema, header=True)
# let PySpark infer the schema
df = spark.read.csv('/Users/mreznik/heart.csv', inferSchema=True, header=True)
# replace nulls with other value at reading time
df = spark.read.csv('/Users/mreznik/heart.csv', nullValue='NA')
# save data
df.write.format("csv").save("heart_save.csv")
# if you want to overwrite the file
df.write.format("csv").mode("overwrite").save("heart_save.csv")

In [None]:
# show head of table
df.show(3)

In [None]:
# count number of rows
df.count()

In [None]:
# show parts of the table
df.select('Age').show(3)
df.select(['Age','Sex']).show(3)

## Pandas DataFrame VS PySpark DataFrame

Ambos representan una tabla de datos con filas y columnas. Sin embargo, en esencia son diferentes, ya que el marco de datos de PySpark debe admitir cálculos distribuidos. A medida que avancemos, veremos cada vez más características que no están presentes en Pandas DataFrame. Dicho esto, si sabe cómo usar Pandas, entonces pasarse a PySpark se sentirá como una transición natural.

## DAG

El gráfico acíclico dirigido es la forma en que Spark ejecuta los cálculos. Cuando le da una serie de transformaciones para aplicar al conjunto de datos, crea un gráfico a partir de esas transformaciones, por lo que sabe qué hacer, pero no ejecuta esos comandos inmediatamente, si no es necesario. En cambio, es perezoso: pasará por el DAG y aplicará las transformaciones solo cuando sea necesario, para proporcionar un resultado necesario. Esto permite un mejor rendimiento, ya que Spark sabe qué hay por delante de un determinado cálculo y optimiza el proceso en consecuencia.

## Transformaciones VS acciones

En PySpark, hay dos tipos de comandos: transformaciones y acciones. Los comandos de transformación se agregan al DAG, pero no hacen que se ejecute. Transforman un DataFrame en otro, sin cambiar el DataFrame de entrada. Por otro lado, las acciones hacen que PySpark ejecute el DAG, pero no crean un nuevo DataFrame; en cambio, generan el resultado del DAG.

## Almacenamiento en caché

Cada vez que ejecuta un DAG, se volverá a calcular desde el principio. Es decir, los resultados no se guardan en la memoria. Por lo tanto, si queremos guardar un resultado para que no tenga que volver a calcularse, podemos usar el comando de caché. Tenga en cuenta que esto ocupará espacio en la memoria del nodo de trabajo, así que tenga cuidado con los tamaños de los conjuntos de datos que está almacenando en caché. De manera predeterminada, el DF en caché se almacena en la RAM y no se serializa (no se convierte en un flujo de bytes). Puede cambiar ambos: almacenar los datos en el disco duro, serializarlos o ambos.

## Recopilación

Incluso después de almacenar en caché un DataFrame, aún permanece en la memoria de los nodos de trabajo. Si desea recopilar sus piezas, ensamblarlas y guardarlas en el nodo maestro para no tener que extraerlas cada vez, use el comando para recopilar. Nuevamente, tenga mucho cuidado con esto, ya que el archivo recopilado tendrá que caber en la memoria del nodo maestro.

In [None]:
df.cache()
df.collect()

In [None]:
# convert PySpark DataFrame to Pandas DataFrame
pd_df = df.toPandas()
# convert it back
spark_df = spark.createDataFrame(pd_df)

In [None]:
# show first three rows as three row objects, which is how spark represents single rows from a table.
# we will learn more about it later
df.head(3)

In [None]:
# type os columns
df.printSchema()

In [None]:
# column dtypes as list of tuples
df.dtypes

In [None]:
# cast a column from one type to other
from pyspark.sql.types import FloatType
df = df.withColumn("Age",df.Age.cast(FloatType()))
df = df.withColumn("RestingBP",df.Age.cast(FloatType()))

In [None]:
# compute summery statistics
df.select(['Age','RestingBP']).describe().show()

In [None]:
# add a new column or replace existing one
AgeFixed = df['Age'] + 1  # select alwayes returns a DataFrame object, and we need a column object
df = df.withColumn('AgeFixed', AgeFixed)

In [None]:
df.select(['AgeFixed','Age']).describe().show()


In [None]:
# remove columns
df.drop('AgeFixed').show(1) # add df = to get the new DataFrame into a variable

In [None]:
# rename a column
df.withColumnRenamed('Age','age').select('age').show(1)
# to rename more than a single column, i would suggest a loop.
name_pairs = [('Age','age'),('Sex','sex')]
for old_name, new_name in name_pairs:
    df = df.withColumnRenamed(old_name,new_name)

In [None]:
df.select(['age','sex']).show(1)


In [None]:
# drop all rows that contain any NA
df = df.na.drop()
df.count()
# drop all rows where all values are NA
df = df.na.drop(how='all')
# drop all rows where more at least 2 values are NOT NA
df = df.na.drop(thresh=2)
# drop all rows where any value at specific columns are NAs.
df = df.na.drop(how='any', subset=['age','sex']) # 'any' is the defult

In [None]:
# fill missing values in a specific column with a '?'
df = df.na.fill(value='?',subset=['sex'])
# replace NAs with mean of column
from pyspark.ml.feature import Imputer # In statistics, imputation is the process of
                                       # replacing missing data with substituted values
imptr = Imputer(inputCols=['age','RestingBP'],
                outputCols=['age','RestingBP']).setStrategy('mean') # can also be 'median' and so on

df = imptr.fit(df).transform(df)

In [None]:
# filter to adults only and calculate mean
df.filter('age > 18')
df.where('age > 18')# 'where' is an alias to 'filter'
df.where(df['age'] > 18) # third option
# add another condition ('&' means and, '|' means or)
df.where((df['age'] > 18) | (df['ChestPainType'] == 'ATA'))
# take every record where the 'ChestPainType' is NOT 'ATA'
df.filter(~(df['ChestPainType'] == 'ATA'))

In [None]:
df.filter('age > 18').show()

In [None]:
# evaluate a string expression into command
from pyspark.sql.functions import expr
exp = 'age + 0.2 * AgeFixed'
df.withColumn('new_col', expr(exp)).select('new_col').show(3)

In [None]:
# group by age
disease_by_age = df.groupby('age').mean().select(['age','avg(HeartDisease)'])
# sort values in desnding order
from pyspark.sql.functions import desc
disease_by_age.orderBy(desc("age")).show(5)

In [None]:
from pyspark.sql.functions import asc
disease_by_age = df.groupby('age').mean().select(['age','avg(HeartDisease)'])
disease_by_age.orderBy(desc("age")).show(3)

In [None]:
# aggregate to get several statistics for several columns
# the available aggregate functions are avg, max, min, sum, count
from pyspark.sql import functions as F
df.agg(F.min(df['age']),F.max(df['age']),F.avg(df['sex'])).show()

In [None]:
df.groupby('HeartDisease').agg(F.min(df['age']),F.avg(df['sex'])).show()


In [None]:
# run an SQL query on the data
df.createOrReplaceTempView("df") # tell PySpark how the table will be called in the SQL query
spark.sql("""SELECT sex from df""").show(2)

# we also choose columns using SQL sytnx, with a command that combins '.select()' and '.sql()'
df.selectExpr("age >= 40 as older", "age").show(2)

In [None]:
df.groupby('age').pivot('sex', ("M", "F")).count().show(3)

In [None]:
# pivot - expensive operation
df.selectExpr("age >= 40 as older", "age",'sex').groupBy("sex")\
                    .pivot("older", ("true", "false")).count().show()

In [None]:
df.select(['age','MaxHR','Cholesterol']).show(4)

In [None]:
# devide dataset to training features and target
X_column_names = ['Age','Cholesterol']
target_colum_name = ['MaxHR']

# convert feature columns into a columns where the vlues are feature vectors
from pyspark.ml.feature import VectorAssembler
v_asmblr = VectorAssembler(inputCols=X_column_names, outputCol='Fvec')
df = v_asmblr.transform(df)
X = df.select(['Age','Cholesterol','Fvec','MaxHR'])
X.show(3)

In [None]:
# devide dataset into training and testing sets
trainset, testset = X.randomSplit([0.8,0.2])

In [None]:
# predict 'RestingBP' using linear regression
from pyspark.ml.regression import LinearRegression
model = LinearRegression(featuresCol='Fvec', labelCol='MaxHR')
model = model.fit(trainset)
print(model.coefficients)
print(model.intercept)

In [None]:
# evaluate model
model.evaluate(testset).predictions.show(3)

In [None]:
# handel categorical features with ordinal indexing
from pyspark.ml.feature import StringIndexer
indxr = StringIndexer(inputCol='ChestPainType', outputCol='ChestPainTypeInxed')
indxr.fit(df).transform(df).select('ChestPainTypeInxed').show(3)