## Introduction to PySpark

Every pyspark program that you write will have these “core parts”, which are:

1. importing the pyspark package (or modules)

2. starting your Spark Session

3. defining a set of transformations and actions over Spark DataFrames


> PySpark is organized in a number of modules, such as sql (to access Spark SQL), pandas (to access the Pandas API of Spark), ml (to access Spark MLib).Going further, we can have sub-modules (or modules inside a module) too. As an example, the sql module of pyspark have the functions and window sub-modules.


### Starting your Spark Session

You control your Spark application through a driver process called the SparkSession.

The SparkSession instance is the way Spark executes user-defined manipulations across the clusters. There is a one-to-one correspondence between a SparkSession and Spark Applications.

Every Spark application starts with a Spark Session. Basically, the Spark Session is the entry point to your application. This means that, in every pyspark program that you write, you should always start by defining your Spark Session. We do this, by using the getOrCreate() method from pyspark.sql.SparkSession.builder module.

Just store the result of this method in any python object. Is very common to name this object as spark.


In [None]:
# Importar librería Spark e iniciar sesión
# Toda aplicación de spark debe iniciar con una sesión (spark session)
# guardarla como spark nos permitirá acceder a las funciones de la sesion

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()


In [None]:
# Definiendo conjunto de transformaciones y acciones
# Todo programa spark está compuesto por un conjuntno de transformaciones y acciones sobre un conjunto de spark dataframes
# Los cuales son la estructura básica que alimenta todos los programas de pyspark
# Un programa de pyspark consiste en en transformar múltiples spark dataframes

# Primera aplicación
# Crear una tabla de una columna de 5 números, devolverá una lista simple

df5 = spark.range(5) #método spark (almacena la secuencia como filas en una tabla spark)
type(df5)


In [None]:
df5

DataFrame[id: bigint]

In [None]:
df5.show() # Para ver los resultados

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



In [None]:
lista5 = df5.collect() #mostramos los resultados en una lista de python
lista5

[Row(id=0), Row(id=1), Row(id=2), Row(id=3), Row(id=4)]

In [None]:
# Podemos ver los diferentes métodos de los spark detaframe con dir()
print(dir(df5))

['__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattr__', '__getattribute__', '__getitem__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_collect_as_arrow', '_ipython_key_completions_', '_jcols', '_jdf', '_jmap', '_joinAsOf', '_jseq', '_lazy_rdd', '_repr_html_', '_sc', '_schema', '_session', '_show_string', '_sort_cols', '_sql_ctx', '_support_repr_html', 'agg', 'alias', 'approxQuantile', 'cache', 'checkpoint', 'coalesce', 'colRegex', 'collect', 'columns', 'corr', 'count', 'cov', 'createGlobalTempView', 'createOrReplaceGlobalTempView', 'createOrReplaceTempView', 'createTempView', 'crossJoin', 'crosstab', 'cube', 'describe', 'distinct', 'drop', 'dropDuplicates', 'dropDuplicatesWithinWatermark', 'drop_duplicates', 'dropna', 'dtypes', 'exceptAll', 'explai

In [None]:
# Crando dataframe en spark

# Primera forma: como colección de filas

from datetime import date
from pyspark.sql import Row

data = [
    Row(id = 1, value = 28.3, date = date(2021,1,1)),
    Row(id = 2, value = 15.8, date = date(2021,1,2)),
    Row(id = 3, value = 20.1, date = date(2021,1,3)),
    Row(id = 4, value = 12.6, date = date(2021,1,4))
]

# Trasnformandolo a un df spark

df = spark.createDataFrame(data)

In [None]:
# Dado que Spark es un objeto clase pyspark, imprimirlo es ver una descripción de las columnas presentes
df


DataFrame[id: bigint, value: double, date: date]

In [None]:
# Otra forma de crear un df spark es a través de listas

# Creamos los registros
data = [
    [12114, 'Anne', 21, 1.56, 8, 9, 10, 9, 'Economics', 'SC'],
    [13007, 'Adrian', 23, 1.82, 6, 6, 8, 7, 'Economics', 'SC'],
    [10045, 'George', 29, 1.77, 10, 9, 10, 7, 'Law', 'SC'],
    [12459, 'Adeline', 26, 1.61, 8, 6, 7, 7, 'Law', 'SC'],
    [10190, 'Mayla', 22, 1.67, 7, 7, 7, 9, 'Design', 'AR'],
    [11552, 'Daniel', 24, 1.75, 9, 9, 10, 9, 'Design', 'AR']
]

# Creamos los nombres de las columnas

columns = [
  'StudentID', 'Name', 'Age', 'Height', 'Score1',
  'Score2', 'Score3', 'Score4', 'Course', 'Department'
]

students = spark.createDataFrame(data, columns)
students.show(5)

+---------+-------+---+------+------+------+------+------+---------+----------+
|StudentID|   Name|Age|Height|Score1|Score2|Score3|Score4|   Course|Department|
+---------+-------+---+------+------+------+------+------+---------+----------+
|    12114|   Anne| 21|  1.56|     8|     9|    10|     9|Economics|        SC|
|    13007| Adrian| 23|  1.82|     6|     6|     8|     7|Economics|        SC|
|    10045| George| 29|  1.77|    10|     9|    10|     7|      Law|        SC|
|    12459|Adeline| 26|  1.61|     8|     6|     7|     7|      Law|        SC|
|    10190|  Mayla| 22|  1.67|     7|     7|     7|     9|   Design|        AR|
+---------+-------+---+------+------+------+------+------+---------+----------+
only showing top 5 rows



In [None]:
# Otros métodos para crear df spark se explorarán mas adelante

# También podemos acceder al nombre de las comunas como una lista

print(students.columns)

['StudentID', 'Name', 'Age', 'Height', 'Score1', 'Score2', 'Score3', 'Score4', 'Course', 'Department']


In [None]:
# Para conocer el número de filas de mi df

students.count()

6

Spark admite múltiples tipos de datos, para mas información ir a: [enalce](https://spark.apache.org/docs/latest/sql-ref-datatypes.html)

In [None]:
# Podemos ver informes mas completos del esquema del df y sus tipos

df.printSchema()

root
 |-- id: long (nullable = true)
 |-- value: double (nullable = true)
 |-- date: date (nullable = true)



In [None]:
df.schema # Nos da información con nombre, tipo y si puede contener nulos

StructType([StructField('id', LongType(), True), StructField('value', DoubleType(), True), StructField('date', DateType(), True)])

In [None]:
# Si lo queremos ver por separado
for i in df.schema:
  print(i)

StructField('id', LongType(), True)
StructField('value', DoubleType(), True)
StructField('date', DateType(), True)


In [None]:
# Si deseo acceder solamente a la información del tipado

for i in df.schema:
  print(i.dataType)

LongType()
DoubleType()
DateType()


In [None]:
# Lo mismo con el nombre y el valor booleano

for i in df.schema:
  print(i.name)

id
value
date


# Creación de un esquema DF

Cuando Spark crea un nuevo DataFrame, automáticamente adivinará qué esquema es apropiado para ese DataFrame. Pero a veces se equivoca mucho. En estos casos tienes que decirle a Spark exactamente cómo quieres que sea este esquema DataFrame.

In [None]:
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DateType, StringType, IntegerType
from datetime import date

data = [
    [1, date(2022,1,1), 'Anne'],
    [2, date(2022,1,2), 'Layla'],
    [3, date(2022,1,3), 'Wick'],
    [4, date(2022,1,5), 'Paul']
]

schema = StructType([
    StructField('ID', IntegerType(), True),
    StructField('Date', DateType(), True),
    StructField('Name', StringType(), True)
])

registers = spark.createDataFrame(data, schema = schema)


In [None]:
# Para hacer verificación del tipo podemos hacerlo así
# Preguntamos si es una instancia de una clase, en este caso clase integer

isinstance(df.schema[0].dataType, IntegerType)

False

In [None]:
# El resultado acá es falso porque en verdad se trata de un longtype
# enteros con signo de 8 bytes

from pyspark.sql.types import LongType
isinstance(df.schema[0].dataType, LongType)

True

In [None]:
# Revisemos ahora la clase column
# Notaremos que ya nos hemos familiarizado con ella

print(type(df.id))


<class 'pyspark.sql.column.Column'>


In [None]:
from pyspark.sql.functions import col
id_column = col('ID')
print(id_column)

Column<'ID'>


In [None]:
# Column es sólo una expresión, no nos interesa por ahora el contenido o el df al que pertenece
# Esto tienen que ver con el lazy aspect de Spark

expr1 = id_column*2
print(expr1)

# Spark sabe lo que queremos hacer, pero no ejecuta aun la operación
# Sólo lo ejecutara hasta que se solite la acción

expr2 = (col('Name') == 'Anne') & (col('Gande') > 6)
print(expr2)

Column<'(ID * 2)'>


### Valores literales vs expresiones