In [33]:
import findspark #el encuentra spark
findspark.init() #buscamos spark

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, StructField, IntegerType, FloatType #construir schema
import pandas as pd
from pyspark.sql import functions as F #funciones para castear datos

### ¿Para que es Spark?

Manipulacion de datos a escala de big data, procesamiento de grandes volumenes de datos.
Surge de la programacion distribuida.
MapReduce (Google) evoluciona a Hadoop (capturar varias maquinas para que funcionen como una) y luego ApacheSpark.

### ApacheSpark (forma local)

ApacheSpark (forma local):
Particiona la CPU para ejecutar de forma distribuida el procesamiento.
Es ideal solo para la manipulacion de dataframes.
Paradigma orientada a eventos (tiene que pasar algo para ejecutar un codigo).
El driver (Linea de codigo de Spark) se encarga de controlar todo
El cluster manager se encarga de controlar que los ejecutores cumplan con su tarea.
Cada ejecutor tiene diferentes cosas que ejecutar (tareas).
Los ejecutores se alojan en un nodo, que es cuanta CPU y RAM va a tener el ejecutor.

In [2]:
spark = SparkSession.builder.getOrCreate() #obtener de la cesion el contexto y crear
"""
Master
local[*] Esta usando todos los nucleos del procesador
"""

'\nMaster\nlocal[*] Esta usando todos los nucleos del procesador\n'

<code>
spark = SparkSession.builder.appName("PySparkSession") #cambiar el nombre de la app
.confg("spark.master", "local[5]") #colocar numero de nucleos
.confg("spark.shuffle.sql.partitions", "3") #numero de particiones
.getOrCreate() 
</code>

In [4]:
spark = SparkSession.builder.appName("PySparkSession")\
    .config("spark.master", "local[5]")\
    .config("spark.shuffle.sql.partitions", "3")\
    .getOrCreate() 

In [5]:
spark.conf.set('spark.master', 'local[6]') #cambiar numero de nucleos

Creacion de dataframes

In [7]:
#primero definir el schema: nombre de las columnas y los datos
columns = ["id", "name", "subject", "grade"]
values = [
    (1, "alumno_1", "calculo",  4.2),
    (2, "alumno_2", "fisica",   4.2),
    (3, "alumno_3", "etica",    4.2),
    (4, "alumno_4", "geometria",4.2),
]

#esto no saca de por si un dataframe, saca un dataframe particionado
df = spark.createDataFrame(data = values, schema = columns)

In [9]:
data = [
    {"id":1, "nombre":"alumno_1", "asignatura":"calculo",  "nota":4.2},
    {"id":2, "nombre":"alumno_2", "asignatura":"fisica",   "nota":4.2},
    {"id":3, "nombre":"alumno_3", "asignatura":"etica",    "nota":4.2},
    {"id":4, "nombre":"alumno_4", "asignatura":"geometria","nota":4.2},
]
df_dict = spark.createDataFrame(data)

In [13]:
df_pandas = spark.createDataFrame(
    pd.DataFrame(columns = columns, data = values)
    )

In [8]:
df.show() #mostrar dataframe

+---+--------+---------+-----+
| id|    name|  subject|grade|
+---+--------+---------+-----+
|  1|alumno_1|  calculo|  4.2|
|  2|alumno_2|   fisica|  4.2|
|  3|alumno_3|    etica|  4.2|
|  4|alumno_4|geometria|  4.2|
+---+--------+---------+-----+



In [10]:
df.columns

['id', 'name', 'subject', 'grade']

In [11]:
df.count() #cuantas filas tiene
len(df.columns) #cuantas columnas tiene

4

Transformaciones y acciones:

Transformaciones : Transforma la data (veloces)

Accion: Aquel que obtiene o deja tangible la transformacion (no es veloz)

Hacer la mayor cantidad de transformaciones y la menor cantidad de acciones.

In [17]:
#acciones
#para sacar una fila en especifico sacar todas las del df y filtrar por la fila en especifico
df.take(1) #cuantas filas quiero obtener
df.take(1)[0].id #acceder al id de la fila

1

In [19]:
#transformaciones
df_expr = df.selectExpr("grade + 0.5 as new_grade") #aplicar expresion a una columna

Dataframes siempre tienen esquema: Nombre de la columna, tipo de dato y si permite nulos.

Una buena practica es que siempre al leer un archivo con PySpark crear el esquema del mismo.

In [20]:
df.printSchema()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- subject: string (nullable = true)
 |-- grade: double (nullable = true)



In [23]:
data_schema = StructType(
    [
        StructField("id", IntegerType(), False),
        StructField("nombre", StringType(), False),
        StructField("asignatura", StringType(), False),
        StructField("nota", FloatType(), False)
    ]
) 

data = [
    {"id":1, "nombre":"alumno_1", "asignatura":"calculo",  "nota":4.2},
    {"id":2, "nombre":"alumno_2", "asignatura":"fisica",   "nota":4.2},
    {"id":3, "nombre":"alumno_3", "asignatura":"etica",    "nota":4.2},
    {"id":4, "nombre":"alumno_4", "asignatura":"geometria","nota":4.2},
]

df_notas = spark.createDataFrame(data = data, schema = data_schema)

In [27]:
#acceder a columna de Spark
df_notas[["id"]].show()

+---+
| id|
+---+
|  1|
|  2|
|  3|
|  4|
+---+



In [29]:
#acceder a multiples columnas con Spark
df_notas.select(["id", "nota"]).show()

+---+----+
| id|nota|
+---+----+
|  1| 4.2|
|  2| 4.2|
|  3| 4.2|
|  4| 4.2|
+---+----+



In [30]:
#renombrando columnas
df_notas = df_notas.withColumnRenamed("nota", "grade")\
                   .withColumnRenamed("nombre", "name")
df_notas.show()

+---+--------+----------+-----+
| id|  nombre|asignatura|grade|
+---+--------+----------+-----+
|  1|alumno_1|   calculo|  4.2|
|  2|alumno_2|    fisica|  4.2|
|  3|alumno_3|     etica|  4.2|
|  4|alumno_4| geometria|  4.2|
+---+--------+----------+-----+



In [35]:
#castear variables Spark
df_notas = df_notas.withColumn("grade", F.col("grade").cast(IntegerType())) #F.col para acceder directamente a la columna

In [38]:
#ordenar datos descendente
df_notas.sort(
    F.desc("id")
).show()

+---+--------+----------+-----+
| id|  nombre|asignatura|grade|
+---+--------+----------+-----+
|  4|alumno_4| geometria|    4|
|  3|alumno_3|     etica|    4|
|  2|alumno_2|    fisica|    4|
|  1|alumno_1|   calculo|    4|
+---+--------+----------+-----+



In [40]:
#ordenamiento compuesto
df_notas.sort(
    ["id", "grade"], 
    ascending = False).show()

+---+--------+----------+-----+
| id|  nombre|asignatura|grade|
+---+--------+----------+-----+
|  4|alumno_4| geometria|    4|
|  3|alumno_3|     etica|    4|
|  2|alumno_2|    fisica|    4|
|  1|alumno_1|   calculo|    4|
+---+--------+----------+-----+



In [44]:
#filtrado de datos
df_notas[df_notas["grade"] == 4]

#usando filter (una transformacion, forma mas optimizada)
df_notas.filter((F.col("grade") == 4) & (F.col("id") == 1)).show()

+---+--------+----------+-----+
| id|  nombre|asignatura|grade|
+---+--------+----------+-----+
|  1|alumno_1|   calculo|    4|
+---+--------+----------+-----+



Parquet:
- Minimiza memoria.
- En la mayorioa de casos el esquema de la data se mantiene.
- Lectura mas rapida.
- Es el estandar para archivos nube.

In [None]:
#leer csv
df_csv = spark.read.csv(
    "C:\Users\Admin\Desktop\projects\stores_sales_bi\recursos\otros\uncleaned_datasets\carsWorldWide.csv", 
    header = True,
    nullValue= "?", #definir string como nulo
    sep = ",", #separador
    encoding = "unicode_escape"
    )