# **Spark SQL**

## Introducción

### `Ventajas y desventajas de trabajar con Spark en Google Colab`

Ventajas:
- Fácil acceso
- Ejecutar Spark en prácticamente cualquier dispositivo, los recursos están en la nube.
- Como los recursos están la nube, no hay que preocuparse por los recursos de hardware
- Trabajo en equipo, más sencillo el trabajo colaborativo. Varias personas pueden trabajar sobre un mismo notebook.


Desventajas:
- No se guardan las configuraciones de Spark luego de un tiempo
> No obstante el notebook permanece intacto. Se puede volver a ejecutar las líneas de código para tener la configuración nuevamente.
- Escalabilidad, como el servicio es gratuito, los recursos son limitados.
> Para llevarlo a ambientes productivos, necesitamos una infraestructura capaz de brindarnos estas especificaciones.

## Instalaciones para ejecutar Spark en Colab

### `Descarga e instalación de Apache Spark en Colab`

In [None]:
# Instalar SDK Java 8
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
# Descargar Spark 3.2.4
!wget -q https://archive.apache.org/dist/spark/spark-3.2.4/spark-3.2.4-bin-hadoop3.2.tgz

In [None]:
# Descomprimir el archivo descargado de Spark
!tar xf spark-3.2.4-bin-hadoop3.2.tgz

In [None]:
# Establecer las variables de entorno
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.4-bin-hadoop3.2"

In [None]:
# Instalar la librería findspark
!pip install -q findspark

In [None]:
# Instalar pyspark
!pip install -q pyspark

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [None]:
### verificar la instalación ###
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
# Probando la sesión de Spark
df = spark.createDataFrame([{"Hola": "Mundo"} for x in range(10)])
# df.show(10, False)
df.show()

+-----+
| Hola|
+-----+
|Mundo|
|Mundo|
|Mundo|
|Mundo|
|Mundo|
|Mundo|
|Mundo|
|Mundo|
|Mundo|
|Mundo|
+-----+



### `Descarga e instalación de Apache Spark en una sola celda`

In [None]:
# Instalar SDK Java 8
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Descargar Spark 3.2.4
!wget -q https://archive.apache.org/dist/spark/spark-3.2.4/spark-3.2.4-bin-hadoop3.2.tgz

# Descomprimir el archivo descargado de Spark
!tar xf spark-3.2.4-bin-hadoop3.2.tgz

# Establecer las variables de entorno
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.4-bin-hadoop3.2"

# Instalar la librería findspark
!pip install -q findspark

# Instalar pyspark
!pip install -q pyspark

### verificar la instalación ###
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

# Probando la sesión de Spark
df = spark.createDataFrame([{"Hola": "Mundo"} for x in range(10)])
# df.show(10, False)
df.show()

+-----+
| Hola|
+-----+
|Mundo|
|Mundo|
|Mundo|
|Mundo|
|Mundo|
|Mundo|
|Mundo|
|Mundo|
|Mundo|
|Mundo|
+-----+



## Spark UI en Colab

In [None]:
# Instalar SDK java 8

!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Descargar Spark

!wget -q https://archive.apache.org/dist/spark/spark-3.3.4/spark-3.3.4-bin-hadoop3.tgz

# Descomprimir la version de Spark

!tar xf spark-3.3.4-bin-hadoop3.tgz

# Establecer las variables de entorno

import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.4-bin-hadoop3"

# Descargar findspark

!pip install -q findspark

# Crear la sesión de Spark

import findspark

findspark.init()

from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .config('spark.ui.port', '4050')
    .getOrCreate()
)

from google.colab import output

output.serve_kernel_port_as_window(4050, path='/jobs/index.html')

from pyspark.sql.functions import col

spark.range(10000).toDF("id").filter(col('id') / 2 == 0).write.mode('overwrite').parquet('/output')

## Spark SQL Básico

### Dataframes: fuentes de datos I

In [None]:
# Creando DataFrames

import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

rdd = sc.parallelize([item for item in range(10)]).map(lambda x: (x, x ** 2))

rdd.collect()

df = rdd.toDF(['numero', 'cudrado'])

df.printSchema()

df.show()

# Crear un DataFrame a partir de un RDD con schema

rdd1 = sc.parallelize([(1, 'Jose', 35.5), (2, 'Teresa', 54.3), (3, 'Katia', 12.7)])

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

# Primera vía

esquema1 = StructType(
    [
     StructField('id', IntegerType(), True),
     StructField('nombre', StringType(), True),
     StructField('saldo', DoubleType(), True)
    ]
)

# Segunda vía

esquema2 = "`id` INT, `nombre` STRING, `saldo` DOUBLE"

df1 = spark.createDataFrame(rdd1, schema=esquema1)

df1.printSchema()

df1.show()

df2 = spark.createDataFrame(rdd1, schema=esquema2)

df2.printSchema()

df2.show()

# Crear un DataFrame a partir de un rango de números

spark.range(5).toDF('id').show()

spark.range(3, 15).toDF('id').show()

spark.range(0, 20, 2).toDF('id').show()



### Dataframes: fuentes de datos II

In [None]:
# Creando DataFrames

import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

rdd = sc.parallelize([item for item in range(10)]).map(lambda x: (x, x ** 2))

rdd.collect()

df = rdd.toDF(['numero', 'cudrado'])

df.printSchema()

df.show()

# Crear un DataFrame a partir de un RDD con schema

rdd1 = sc.parallelize([(1, 'Jose', 35.5), (2, 'Teresa', 54.3), (3, 'Katia', 12.7)])

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

# Primera vía

esquema1 = StructType(
    [
     StructField('id', IntegerType(), True),
     StructField('nombre', StringType(), True),
     StructField('saldo', DoubleType(), True)
    ]
)

# Segunda vía

esquema2 = "`id` INT, `nombre` STRING, `saldo` DOUBLE"

df1 = spark.createDataFrame(rdd1, schema=esquema1)

df1.printSchema()

df1.show()

df2 = spark.createDataFrame(rdd1, schema=esquema2)

df2.printSchema()

df2.show()

# Crear un DataFrame a partir de un rango de números

spark.range(5).toDF('id').show()

spark.range(3, 15).toDF('id').show()

spark.range(0, 20, 2).toDF('id').show()



### Trabajo con columnas

In [None]:
# Trabajo con columnas

import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

df = spark.read.parquet('./data/dataPARQUET.parquet')

df.printSchema()

# Primera alternativa para referirnos a las columnas

df.select('title').show()

# Segunda alternativa

from pyspark.sql.functions import col

df.select(col('title')).show()



### Transformaciones, funciones, select y selectExpr

In [None]:
# Transformaciones - funciones select y selectExpr

import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# select

df = spark.read.parquet('./data/datos.parquet')

df.printSchema()

from pyspark.sql.functions import col

df.select(col('video_id')).show()

df.select('video_id', 'trending_date').show()

# Esta vía nos dará error

df.select(
    'likes',
    'dislikes',
    ('likes' - 'dislikes')
).show()

# Forma correcta

df.select(
    col('likes'),
    col('dislikes'),
    (col('likes') - col('dislikes')).alias('aceptacion')
).show()

# selectExpr

df.selectExpr('likes', 'dislikes', '(likes - dislikes) as aceptacion').show()

df.selectExpr("count(distinct(video_id)) as videos").show()



### Transformaciones, funciones, filter y where

In [None]:
# Transformaciones - funciones filter y where

import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

df = spark.read.parquet('./data/datos.parquet')

# filter

from pyspark.sql.functions import col

df.show()

df.filter(col('video_id') == '2kyS6SvSYSE').show()

df1 = spark.read.parquet('./data/datos.parquet').where(col('trending_date') != '17.14.11')

df1.show()

df2 = spark.read.parquet('./data/datos.parquet').where(col('likes') > 5000)

df2.filter((col('trending_date') != '17.14.11') & (col('likes') > 7000)).show()

df2.filter(col('trending_date') != '17.14.11').filter(col('likes') > 7000).show()


### Transformaciones, funciones, distinct y dropDuplicates

In [None]:
# Transformaciones - funciones distinct y dropDuplicates

import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

df = spark.read.parquet('./data')

# distinct

df_sin_duplicados = df.distinct()

print('El conteo del dataframe original es {}'.format(df.count()))
print('El conteo del dataframe sin duplicados es {}'.format(df_sin_duplicados.count()))

# función dropDuplicates

dataframe = spark.createDataFrame([(1, 'azul', 567), (2, 'rojo', 487), (1, 'azul', 345), (2, 'verde', 783)]).toDF('id', 'color', 'importe')

dataframe.show()

dataframe.dropDuplicates(['id', 'color']).show()

### Transformaciones, funciones, sort y orderBy

In [None]:
# Transformaciones - funciones sort y orderBy

import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

from pyspark.sql.functions import col

df = (spark.read.parquet('./data')
    .select(col('likes'), col('views'), col('video_id'), col('dislikes'))
    .dropDuplicates(['video_id'])
)

df.show()

# sort

df.sort('likes').show()

from pyspark.sql.functions import desc

df.sort(desc('likes')).show()

# función orderBy

df.orderBy(col('views')).show()

df.orderBy(col('views').desc()).show()

dataframe = spark.createDataFrame([(1, 'azul', 568), (2, 'rojo', 235), (1, 'azul', 456), (2, 'azul', 783)]).toDF('id', 'color', 'importe')

dataframe.show()

dataframe.orderBy(col('color').desc(), col('importe')).show()

# funcion limit

top_10 = df.orderBy(col('views').desc()).limit(10)

top_10.show()


### Transformaciones, funciones, withColumn y withColumnRenamed

In [None]:
# Transformaciones - funciones withColumn y withColumnRenamed

import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

df = spark.read.parquet('./data')

# withColumn

from pyspark.sql.functions import col

df_valoracion = df.withColumn('valoracion', col('likes') - col('dislikes'))

df_valoracion.printSchema()

df_valoracion1 = (df.withColumn('valoracion', col('likes') - col('dislikes'))
                    .withColumn('res_div', col('valoracion') % 10)
)

df_valoracion1.printSchema()

df_valoracion1.select(col('likes'), col('dislikes'), col('valoracion'), col('res_div')).show()

# withColumnRenamed

df_renombrado = df.withColumnRenamed('video_id', 'id')

df_renombrado.printSchema()

df_error = df.withColumnRenamed('nombre_que_no_existe', 'otro_nombre')

df_error.printSchema()


### Transformaciones, funciones, drop, sample, randomSplit

In [None]:
# Transformaciones - funciones drop, sample y randomSplit

import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

df = spark.read.parquet('./data')

# drop

df.printSchema()

df_util = df.drop('comments_disabled')

df_util.printSchema()

df_util = df.drop('comments_disabled', 'ratings_disabled', 'thumbnail_link')

df_util.printSchema()

df_util = df.drop('comments_disabled', 'ratings_disabled', 'thumbnail_link', 'cafe')

df_util.printSchema()

# sample

df_muestra = df.sample(0.8)

num_filas = df.count()
num_filas_muestra = df_muestra.count()

print('El 80% de filas del dataframe original es {}'.format(num_filas - (num_filas*0.2)))
print('El numero de filas del dataframe muestra es {}'.format(num_filas_muestra))

df_muestra = df.sample(fraction=0.8, seed=1234)

df_muestra = df.sample(withReplacement=True, fraction=0.8, seed=1234)

# randomSplit

train, test = df.randomSplit([0.8, 0.2], seed=1234)

train, validation, test = df.randomSplit([0.6, 0.2, 0.2], seed=1234)

train.count()

validation.count()

test.count()


Trabajo con datos incorrectos faltantes

In [None]:
# Trabajo con datos incorrectos o faltantes

import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

df = spark.read.parquet('./data/')

df.count()

df.na.drop().count()

df.na.drop('any').count()

df.dropna().count()

df.na.drop(subset=['views']).count()

df.na.drop(subset=['views', 'dislikes']).count()

from pyspark.sql.functions import col

df.orderBy(col('views')).select(col('views'), col('likes'), col('dislikes')).show()

df.fillna(0).orderBy(col('views')).select(col('views'), col('likes'), col('dislikes')).show()

df.fillna(0, subset=['likes', 'dislikes']).orderBy(col('views')).select(col('views'), col('likes'), col('dislikes')).show()


### Acciones sobre un dataframe

In [None]:
# Acciones sobre un dataframe en Spark SQL

import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

df = spark.read.parquet('./data/')

# show

df.show()

df.show(5)

df.show(5, truncate=False)

# take

df.take(1)

# head

df.head(1)

# collect

df.select('likes').collect()

### Escritura de dataframes

In [None]:
# Escritura de DataFrames

import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

df = spark.read.parquet('./data/')

df1 = df.repartition(2)

df1.write.format('csv').option('sep', '|').save()

df1.coalesce(1).write.format('csv').option('sep', '|').save('./output/csv1')

df.printSchema()

df.select('comments_disabled').distinct().show()

from pyspark.sql.functions import col

df_limpio = df.filter(col('comments_disabled').isin('True', 'False'))

df_limpio.write.partitionBy('comments_disabled').parquet('./output/parquet')


### Lectura 1

In [None]:
# Instalar SDK java 8

!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Descargar Spark

!wget -q https://archive.apache.org/dist/spark/spark-3.3.4/spark-3.3.4-bin-hadoop3.tgz

# Descomprimir la version de Spark

!tar xf spark-3.3.4-bin-hadoop3.tgz

# Establecer las variables de entorno

import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.4-bin-hadoop3"

# Descargar findspark

!pip install -q findspark

# Instalar dotenv para manejar las credenciales

!pip install python-dotenv

# Extraer las credenciales del archivo .env a un diccionario de Python

from dotenv import dotenv_values

config = dotenv_values(".env")

# Crear la sesión de Spark con las configuraciones necesarias para conectarse a AWS S3

import findspark

findspark.init()

from pyspark.sql import SparkSession

spark = (SparkSession
         .builder
         .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.1,com.amazonaws:aws-java-sdk-bundle:1.11.469")
         .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
         .getOrCreate()
         )

# Extraer las credenciales del diccionario

accessKeyId=config.get('ACCESS_KEY')
secretAccessKey=config.get('SECRET_ACCESS_KEY')

# Establecer las configuraciones de Hodoop necesarias

sc = spark.sparkContext

sc._jsc.hadoopConfiguration().set('fs.s3a.access.key', accessKeyId)
sc._jsc.hadoopConfiguration().set('fs.s3a.secret.key', secretAccessKey)
sc._jsc.hadoopConfiguration().set('fs.s3a.path.style.access', 'true')
sc._jsc.hadoopConfiguration().set('fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
sc._jsc.hadoopConfiguration().set('fs.s3a.endpoint', 's3.amazonaws.com')

df = spark.read.parquet('s3a://josemtech/parquet')

df.show()

df1 = spark.read.option('header', 'true').option('inferSchema', 'true').csv('s3a://josemtech/csv/')

df1.show()

df.write.mode('overwrite').parquet('s3a://josemtech/salida')

### Lectura 2

In [None]:
# Instalar SDK java 8

!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Descargar Spark

!wget -q https://archive.apache.org/dist/spark/spark-3.3.4/spark-3.3.4-bin-hadoop3.tgz

# Descomprimir la version de Spark

!tar xf spark-3.3.4-bin-hadoop3.tgz

# Establecer las variables de entorno

import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.4-bin-hadoop3"

# Descargar findspark

!pip install -q findspark

# Extraer las credenciales desde los Secrets

from google.colab import userdata

account_key = userdata.get('ACCOUNT_KEY')

# Crear la sesión de Spark

import findspark

findspark.init()

from pyspark.sql import SparkSession

spark = (SparkSession.builder
         .config("spark.jars.packages", "org.apache.hadoop:hadoop-azure:3.3.6,com.microsoft.azure:azure-storage:8.6.6")
         .config("spark.hadoop.fs.azure.account.key.josemtech.blob.core.windows.net", account_key)
         .config("spark.hadoop.fs.wasbs.impl", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
         .config("spark.hadoop.fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
         .getOrCreate())

# Luctura

df = spark.read.parquet("wasbs://spark-data@josemtech.blob.core.windows.net/parquet")

df.show()

# Escritura

df.write.mode("overwrite").parquet("wasbs://spark-data@josemtech.blob.core.windows.net/test/")

### Lectura 3

In [None]:
# Instalar SDK java 8

!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Descargar Spark

!wget -q https://archive.apache.org/dist/spark/spark-3.3.4/spark-3.3.4-bin-hadoop3.tgz

# Descomprimir la version de Spark

!tar xf spark-3.3.4-bin-hadoop3.tgz

# Establecer las variables de entorno

import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.4-bin-hadoop3"

# Descargar findspark

!pip install -q findspark

# Descargar el jar necesario para conectarse al bucket de GCP

!wget https://repo1.maven.org/maven2/com/google/cloud/bigdataoss/gcs-connector/hadoop3-2.2.9/gcs-connector-hadoop3-2.2.9-shaded.jar

# Mover el jar descargado a la carpeta de jars de Spark

!mv gcs-connector-hadoop3-2.2.9-shaded.jar /content/spark-3.4.2-bin-hadoop3/jars

# Crear la sesión de Spark

import findspark

findspark.init()

from pyspark.sql import SparkSession

spark = (SparkSession.builder
         .config("spark.hadoop.fs.gs.impl","com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
         .config("google.cloud.auth.service.account.json.keyfile","/content/pyspark.json")
         .getOrCreate())

df = spark.read.parquet('gs://josemtech/parquet')

df.show()

df.write.mode('overwrite').parquet('gs://josemtech/salida_parquet')

df1 = spark.read.option('header', 'true').csv('gs://josemtech/csv')

df1.show()

df1.write.mode('overwrite').csv('gs://josemtech/salida_csv')

### Persistencia de dataframes

In [None]:
# Persistencia de DataFrames

import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

df = spark.createDataFrame([(1, 'a'), (2, 'b'), (3, 'c')], ['id', 'valor'])

df.show()

df.persist()

df.unpersist()

df.cache()

from pyspark.storagelevel import StorageLevel

df.persist(StorageLevel.DISK_ONLY)

df.persist(StorageLevel.MEMORY_AND_DISK)


### Ejercicios

## Spark SQL Avanzado

### Agregaciones

In [None]:
# Explorando los datos

import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

df = spark.read.parquet('./data/')

df.printSchema()

df.show(20, truncate=False)


### Funciones

In [None]:
# Funciones count, countDistinct y approx_count_distinct

import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

df = spark.read.parquet('./data/dataframe')

df.printSchema()

df.show()

# count

from pyspark.sql.functions import count

df.select(
    count('nombre').alias('conteo_nombre'),
    count('color').alias('conteo_color')
).show()

df.select(
    count('nombre').alias('conteo_nombre'),
    count('color').alias('conteo_color'),
    count('*').alias('conteo_general')
).show()

# countDistinct

from pyspark.sql.functions import countDistinct

df.select(
    countDistinct('color').alias('colores_dif')
).show()

# approx_count_distinct

from pyspark.sql.functions import approx_count_distinct

dataframe = spark.read.parquet('./data/vuelos')

dataframe.printSchema()

dataframe.select(
    countDistinct('AIRLINE'),
    approx_count_distinct('AIRLINE')
).show()


### Funciones min y max

In [None]:
# Funciones min y max

import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

vuelos = spark.read.parquet('./data')

vuelos.printSchema()

from pyspark.sql.functions import min, max, col

vuelos.select(
    min('AIR_TIME').alias('menor_timepo'),
    max('AIR_TIME').alias('mayor_tiempo')
).show()

vuelos.select(
    min('AIRLINE_DELAY'),
    max('AIRLINE_DELAY')
).show()


### Funciones sum y sumDistinct

In [None]:
# Funciones sum, sumDistinct y avg

import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

vuelos = spark.read.parquet('./data/')

from pyspark.sql.functions import sum, sumDistinct, avg, count

# sum

vuelos.printSchema()

vuelos.select(
    sum('DISTANCE').alias('sum_dis')
).show()

# sumDistinct

vuelos.select(
    sumDistinct('DISTANCE').alias('sum_dis_dif')
).show()

# avg

vuelos.select(
    avg('AIR_TIME').alias('promedio_aire'),
    (sum('AIR_TIME') / count('AIR_TIME')).alias('prom_manual')
).show()


### Agregación con agrupación

In [None]:
# Agregación con agrupación

import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

vuelos = spark.read.parquet('./data/')

vuelos.printSchema()

from pyspark.sql.functions import desc

(vuelos.groupBy('ORIGIN_AIRPORT')
    .count()
    .orderBy(desc('count'))
).show()

(vuelos.groupBy('ORIGIN_AIRPORT', 'DESTINATION_AIRPORT')
    .count()
    .orderBy(desc('count'))
).show()


### Varias agregaciones por grupo

In [None]:
# Varias agregaciones por grupo

import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

vuelos = spark.read.parquet('./data/')

from pyspark.sql.functions import count, min, max, desc, avg

vuelos.groupBy('ORIGIN_AIRPORT').agg(
    count('AIR_TIME').alias('tiempo_aire'),
    min('AIR_TIME').alias('min'),
    max('AIR_TIME').alias('max')
).orderBy(desc('tiempo_aire')).show()

vuelos.groupBy('MONTH').agg(
    count('ARRIVAL_DELAY').alias('conteo_de_retrasos'),
    avg('DISTANCE').alias('prom_dist')
).orderBy(desc('conteo_de_retrasos')).show()


### Agregación con pivote

In [None]:
# Agregación con pivote

import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

estudiantes = spark.read.parquet('./data/')

estudiantes.show()

from pyspark.sql.functions import min, max, avg, col

estudiantes.groupBy('graduacion').pivot('sexo').agg(avg('peso')).show()

estudiantes.groupBy('graduacion').pivot('sexo').agg(avg('peso'), min('peso'), max('peso')).show()

estudiantes.groupBy('graduacion').pivot('sexo', ['M']).agg(avg('peso'), min('peso'), max('peso')).show()

estudiantes.groupBy('graduacion').pivot('sexo', ['F']).agg(avg('peso'), min('peso'), max('peso')).show()


### Inner Join

In [None]:
# Inner Join

import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

empleados = spark.read.parquet('./data/empleados')

departamentos = spark.read.parquet('./data/departamentos')

empleados.show()

departamentos.show()

# Inner join

from pyspark.sql.functions import col

join_df = empleados.join(departamentos, col('num_dpto') == col('id'))

join_df.show()

join_df = empleados.join(departamentos, col('num_dpto') == col('id'), 'inner')

join_df.show()

join_df = empleados.join(departamentos).where(col('num_dpto') == col('id'))

join_df.show()


### Left Outer Join

In [None]:
# Left Outer Join

import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

empleados = spark.read.parquet('./data/empleados/')

departamentos = spark.read.parquet('./data/departamentos/')

from pyspark.sql.functions import col

empleados.join(departamentos, col('num_dpto') == col('id'), 'leftouter').show()

empleados.join(departamentos, col('num_dpto') == col('id'), 'left_outer').show()

empleados.join(departamentos, col('num_dpto') == col('id'), 'left').show()


### Right Outer Join

In [None]:
# Right Outer Join

import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

empleados = spark.read.parquet('./data/empleados/')

departamentos = spark.read.parquet('./data/departamentos/')

from pyspark.sql.functions import col

empleados.join(departamentos, col('num_dpto') == col('id'), 'rightouter').show()

empleados.join(departamentos, col('num_dpto') == col('id'), 'right_outer').show()

empleados.join(departamentos, col('num_dpto') == col('id'), 'right').show()


### Full Outer Join

In [None]:
# Full Outer Join

import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

empleados = spark.read.parquet('./data/empleados/')

departamentos = spark.read.parquet('./data/departamentos/')

from pyspark.sql.functions import col

empleados.join(departamentos, col('num_dpto') == col('id'), 'outer').show()


### Left Anti Join

In [None]:
# Left Anti Join

import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

empleados = spark.read.parquet('./data/empleados/')

departamentos = spark.read.parquet('./data/departamentos/')

from pyspark.sql.functions import col

empleados.join(departamentos, col('num_dpto') == col('id'), 'left_anti').show()

departamentos.join(empleados, col('num_dpto') == col('id'), 'left_anti').show()


### Left Semi Join

In [None]:
# Left Semi Join

import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

empleados = spark.read.parquet('./data/empleados/')

departamentos = spark.read.parquet('./data/departamentos/')

from pyspark.sql.functions import col

empleados.join(departamentos, col('num_dpto') == col('id'), 'left_semi').show()


### Cross Join

In [None]:
# Cross Join

import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

empleados = spark.read.parquet('./data/empleados/')

departamentos = spark.read.parquet('./data/departamentos/')

from pyspark.sql.functions import col

df = empleados.crossJoin(departamentos)

df.show()

df.count()

### Manejo de nombres de columnas duplicados

In [None]:
# Manejo de nombres de columnas duplicados

import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

empleados = spark.read.parquet('./data/empleados/')

departamentos = spark.read.parquet('./data/departamentos/')

from pyspark.sql.functions import col

depa = departamentos.withColumn('num_dpto', col('id'))

depa.printSchema()

empleados.printSchema()

# Devuelve un error

empleados.join(depa, col('num_dpto') == col('num_dpto'))

# Forma correcta

df_con_duplicados = empleados.join(depa, empleados['num_dpto'] == depa['num_dpto'])

df_con_duplicados.printSchema()

df_con_duplicados.select(empleados['num_dpto']).show()

df2 = empleados.join(depa, 'num_dpto')

df2.printSchema()

empleados.join(depa, ['num_dpto']).printSchema()


### Shuffle, hash join y broadcast hash join

In [None]:
# Shuffle Hash Join y Broadcast Hash Join

import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

empleados = spark.read.parquet('./data/empleados/')

departamentos = spark.read.parquet('./data/departamentos/')

from pyspark.sql.functions import col, broadcast

empleados.join(broadcast(departamentos), col('num_dpto') == col('id')).show()

empleados.join(broadcast(departamentos), col('num_dpto') == col('id')).explain()

### Ejercicios

## Funciones en Spark SQL

### Funciones de fecha y hora

In [None]:
# Funciones de fecha y hora

import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

data = spark.read.parquet('./data/convertir')

data.printSchema()

data.show(truncate=False)

from pyspark.sql.functions import col, to_date, to_timestamp

data1 = data.select(
    to_date(col('date')).alias('date1'),
    to_timestamp(col('timestamp')).alias('ts1'),
    to_date(col('date_str'), 'dd-MM-yyyy').alias('date2'),
    to_timestamp(col('ts_str'), 'dd-MM-yyyy mm:ss').alias('ts2')

)

data1.show(truncate=False)

data1.printSchema()

from pyspark.sql.functions import date_format

data1.select(
    date_format(col('date1'), 'dd-MM-yyyy')
).show()

df = spark.read.parquet('./data/calculo')

df.show()

from pyspark.sql.functions import datediff, months_between, last_day

df.select(
    col('nombre'),
    datediff(col('fecha_salida'), col('fecha_ingreso')).alias('dias'),
    months_between(col('fecha_salida'), col('fecha_ingreso')).alias('meses'),
    last_day(col('fecha_salida')).alias('ultimo_dia_mes')
).show()

from pyspark.sql.functions import date_add, date_sub

df.select(
    col('nombre'),
    col('fecha_ingreso'),
    date_add(col('fecha_ingreso'), 14).alias('mas_14_dias'),
    date_sub(col('fecha_ingreso'), 1).alias('menos_1_dia')
).show()

from pyspark.sql.functions import year, month, dayofmonth, dayofyear, hour, minute, second

df.select(
    col('baja_sistema'),
    year(col('baja_sistema')),
    month(col('baja_sistema')),
    dayofmonth(col('baja_sistema')),
    dayofyear(col('baja_sistema')),
    hour(col('baja_sistema')),
    minute(col('baja_sistema')),
    second(col('baja_sistema'))
).show()

### Funciones de trabajo con Strings

In [None]:
# Funciones para trabajo con strings

import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

data = spark.read.parquet('./data/')

data.show()

from pyspark.sql.functions import ltrim, rtrim, trim

data.select(
    ltrim('nombre').alias('ltrim'),
    rtrim('nombre').alias('rtrim'),
    trim('nombre').alias('trim')
).show()

from pyspark.sql.functions import col, lpad, rpad

data.select(
    trim(col('nombre')).alias('trim')
).select(
    lpad(col('trim'), 8, '-').alias('lpad'),
    rpad(col('trim'), 8, '=').alias('rpad')
).show()

df1 = spark.createDataFrame([('Spark', 'es', 'maravilloso')], ['sujeto', 'verbo', 'adjetivo'])

df1.show()

from pyspark.sql.functions import concat_ws, lower, upper, initcap, reverse

df1.select(
    concat_ws(' ', col('sujeto'), col('verbo'), col('adjetivo')).alias('frase')
).select(
    col('frase'),
    lower(col('frase')).alias('minuscula'),
    upper(col('frase')).alias('mayuscula'),
    initcap(col('frase')).alias('initcap'),
    reverse(col('frase')).alias('reversa')
).show()

from pyspark.sql.functions import regexp_replace

df2 = spark.createDataFrame([(' voy a casa por mis llaves',)], ['frase'])

df2.show(truncate=False)

df2.select(
    regexp_replace(col('frase'), 'voy|por', 'ir').alias('nueva_frase')
).show(truncate=False)



### Funciones de trabajo con colecciones

In [None]:
# Funciones para trabajo con colecciones

import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

data = spark.read.parquet('./data/parquet/')

data.show(truncate=False)

data.printSchema()

from pyspark.sql.functions import col, size, sort_array, array_contains

data.select(
    size(col('tareas')).alias('tamaño'),
    sort_array(col('tareas')).alias('arreglo_ordenado'),
    array_contains(col('tareas'), 'buscar agua').alias('buscar_agua')
).show(truncate=False)

from pyspark.sql.functions import explode

data.select(
    col('dia'),
    explode(col('tareas')).alias('tareas')
).show()

# Formato JSON

json_df_str = spark.read.parquet('./data/JSON')

json_df_str.show(truncate=False)

json_df_str.printSchema()

from pyspark.sql.types import StructType, StructField, StringType, ArrayType

schema_json = StructType(
    [
     StructField('dia', StringType(), True),
     StructField('tareas', ArrayType(StringType()), True)
    ]
)

from pyspark.sql.functions import from_json, to_json

json_df = json_df_str.select(
    from_json(col('tareas_str'), schema_json).alias('por_hacer')
)

json_df.printSchema()

json_df.select(
    col('por_hacer').getItem('dia'),
    col('por_hacer').getItem('tareas'),
    col('por_hacer').getItem('tareas').getItem(0).alias('primer_tarea')
).show(truncate=False)

json_df.select(
    to_json(col('por_hacer'))
).show(truncate=False)

### Funciones when, coalesce y lit

In [None]:
# Funciones when, coalesce y lit

import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

data = spark.read.parquet('./data/')

data.show()

from pyspark.sql.functions import col, when, lit, coalesce

data.select(
    col('nombre'),
    when(col('pago') == 1, 'pagado').when(col('pago') == 2, 'sin pagar').otherwise('sin iniciar').alias('pago')
).show()

data.select(
    coalesce(col('nombre'), lit('sin nombre')).alias('nombre')
).show()

### Funciones definidas por el usuario UDF

In [None]:
# Funciones definidas por el usuario UDF

import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

def cubo(n):
    return n * n * n

from pyspark.sql.types import LongType

spark.udf.register('cubo', f_cubo, LongType())

spark.range(1,10).createOrReplaceTempView('df_temp')

spark.sql("SELECT id, cubo(id) AS cubo FROM df_temp").show()

def bienvenida(nombre):
    return ('Hola {}'.format(nombre))

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

bienvenida_udf = udf(lambda x: bienvenida(x), StringType())

df_nombre = spark.createDataFrame([('Jose',), ('Julia',)], ['nombre'])

df_nombre.show()

from pyspark.sql.functions import col

df_nombre.select(
    col('nombre'),
    bienvenida_udf(col('nombre')).alias('bie_nombre')
).show()

@udf(returnType=StringType())
def mayuscula(s):
    return s.upper()

df_nombre.select(
    col('nombre'),
    mayuscula(col('nombre')).alias('may_nombre')
).show()

import pandas as pd

from pyspark.sql.functions import pandas_udf

def cubo_pandas(a: pd.Series) -> pd.Series:
    return a * a * a

cubo_udf = pandas_udf(cubo_pandas, returnType=LongType())

x = pd.Series([1, 2, 3])

print(cubo_pandas(x))

df = spark.range(5)

df.select(
    col('id'),
    cubo_udf(col('id')).alias('cubo_pandas')
).show()

### Funciones de ventana

In [None]:
# Funciones de ventana

import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

df = spark.read.parquet('./data/')

df.show()

from pyspark.sql.window import Window
from pyspark.sql.functions import desc, row_number, rank, dense_rank, col

windowSpec = Window.partitionBy('departamento').orderBy(desc('evaluacion'))

# row_number

df.withColumn('row_number', row_number().over(windowSpec)).filter(col('row_number').isin(1,2)).show()

# rank

df.withColumn('rank', rank().over(windowSpec)).show()

# dense_rank

df.withColumn('dense_rank', dense_rank().over(windowSpec)).show()

# Agregaciones con especificaciones de ventana

windowSpecAgg = Window.partitionBy('departamento')

from pyspark.sql.functions import min, max, avg

(df.withColumn('min', min(col('evaluacion')).over(windowSpecAgg))
.withColumn('max', max(col('evaluacion')).over(windowSpecAgg))
.withColumn('avg', avg(col('evaluacion')).over(windowSpecAgg))
.withColumn('row_number', row_number().over(windowSpec))
 ).show()

### Catalyst Optimizer

In [None]:
# Catalyst Optimizer

import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

data = spark.read.parquet('./data/')

data.printSchema()

data.show()

from pyspark.sql.functions import col

nuevo_df = (data.filter(col('MONTH').isin(6,7,8))
            .withColumn('dis_tiempo_aire', col('DISTANCE') / col('AIR_TIME'))
).select(
    col('AIRLINE'),
    col('dis_tiempo_aire')
).where(col('AIRLINE').isin('AA', 'DL', 'AS'))

nuevo_df.explain(True)

### Ejercicios

-----------
## Fin Notebook
-----------