# Spark JDBC

En el corazón de la integración de Spark con bases de datos relacionales encontramos JDBC (Java Database Connectivity). JDBC actúa como un puente esencial, proporcionando una interfaz estandarizada que permite a las aplicaciones Spark comunicarse con bases de datos relacionales. Esta interfaz no es simplemente un canal de comunicación; es un conjunto completo de protocolos y estándares que facilitan operaciones de lectura y escritura de manera eficiente y segura.

## Spark SQL y su Relación con JDBC

Spark SQL emerge como uno de los módulos más poderosos dentro del ecosistema Spark. Este módulo introduce el concepto de DataFrames, una abstracción que permite trabajar con datos estructurados de manera intuitiva y eficiente. Cuando combinamos Spark SQL con JDBC, obtenemos una herramienta extremadamente versátil para el procesamiento de datos.  

La integración de Spark SQL con JDBC va más allá de simples operaciones de lectura y escritura. El sistema permite ejecutar consultas complejas que se benefician del procesamiento distribuido de Spark, mientras mantiene la integridad y las características ACID de las bases de datos relacionales. Esta simbiosis permite aprovechar lo mejor de ambos mundos: la escalabilidad de Spark y la confiabilidad de las bases de datos relacionales.

## Arquitectura

La arquitectura de la integración Spark-JDBC se construye sobre varios componentes fundamentales que trabajan en conjunto. El Driver JDBC actúa como el intérprete principal, traduciendo las instrucciones de Spark en comandos que la base de datos puede entender. Este componente maneja no solo la traducción de comandos, sino también la gestión de tipos de datos y la optimización de consultas.  

El Connection Pool representa otro componente crucial en esta arquitectura. En lugar de crear nuevas conexiones para cada operación, mantiene un conjunto de conexiones activas que pueden ser reutilizadas. Este enfoque reduce significativamente la sobrecarga asociada con el establecimiento de conexiones y mejora el rendimiento general del sistema.  

El sistema de particionamiento en esta arquitectura merece especial atención. Permite dividir grandes conjuntos de datos en fragmentos manejables que pueden procesarse en paralelo. Este particionamiento no es arbitrario; se basa en estrategias sofisticadas que consideran la distribución de datos y los recursos disponibles.

## Operaciones y Optimización

Las operaciones en el contexto de Spark-JDBC pueden clasificarse en tres categorías principales: lectura, escritura y transformación. Las operaciones de lectura pueden variar desde la simple recuperación de tablas completas hasta consultas complejas con múltiples joins y agregaciones. La escritura, por otro lado, puede implicar inserciones masivas, actualizaciones o operaciones de upsert.  

La optimización en este contexto es un arte complejo. El push-down de predicados representa una de las técnicas más importantes, permitiendo que los filtros se ejecuten en la base de datos antes de que los datos se transfieran a Spark. Esto puede reducir significativamente la cantidad de datos transferidos y mejorar el rendimiento general.  

La gestión de recursos y la configuración de parámetros juegan un papel crucial en el rendimiento. El tamaño del fetch, el número de particiones y el tamaño del batch deben ajustarse cuidadosamente según las características específicas de cada caso de uso. Estos ajustes pueden tener un impacto significativo en el rendimiento y la utilización de recursos.

## Ejemplo de conexión de JDBC desde Databricks Community con SQL Server de Azure. 

### Creación de SQL Server en Azure  

- Crear un grupo de recursos. Luego crear un recurso > Bases de datos > **SQL Database**  
- Configura los detalles:  
  -. Nombre del sesrvidor SQL: databricks-sql-server  
  -. Región: Italy North u otra disponible  
  -. Autenticación: Habilita SQL Authentication y configura:  
    1. Usuario: `adminuser`  
    2. Contraseña: `ContraseñaFuerte123`  
- Marca la casilla de "Habilitar acceso a Azure Services"  
  

### Crear la base de datos: AdventureWorksLT  

- En la misma sección, crea una base de datos con nombre cualquiera (en este ejemplo le hemos llamado db-notebook-4). Selecciona el servidor creado (databricks-sql-server).Nivel de precio: Elige el plan más económico (Básico - DTU: 5).   
- Ve al servidor que has creado y en networking configurar el firewall. Añade la IP pública de tu conexión local y habilita "Permitir acceso a todos los servicios de Azure". Habilita la dirección IP publica de databricks, suele ser: 54.200.13.2. Guarda los cambios.  
- En Additional settings, en Data Source escoger Sample para que se habilite la carga de AdventureWorksLT. Lo demas se deja por defecto.  


### Configurar Databricks Community Edition  
Ve a la pestaña Computey selecciona Create Compute con estos requisitos:  
  - Cluster Name: AdventureWorksCluster.  
  - Databricks Runtime Version: 11.3 LTS (Scala 2.12, Spark 3.3.1)  
  - Crear clúster.  
  - Mientras el clúster se esta creando, descarga el controlador JDBC para SQL Server, en este caso usaremos [este](https://tajamar365.sharepoint.com/:u:/s/3405-MasterIA2024-2025/EeR4l4udCBFClfiDcBhI7PMBG-VfNpuLHlAQ7a1FVBC5OA?e=Igqclx).  
  - En Databricks subir el controlador a tu workspace o a tu DBFS.
  - Una vez que el cluster esté activo ve al Cluster y en el boton Libraries cargar el controlador haciendo click en `Install New` y le pasas el path donde has guardado el controlador.  

### Conectar Databricks con SQL Server  
Crea un notebook en Databricks y añade el siguiente código ( con tus datos de configuración) :

In [0]:
# Configuración de conexión JDBC
jdbcHostname = "databricks-sql-server.database.windows.net"  # Servidor SQL
jdbcPort = 1433
jdbcDatabase = "db-notebook-4"  # Nombre exacto de tu base de datos
jdbcUsername = "user"  # Cambiar por tu usuario configurado
jdbcPassword = "Admin12345"  # Cambiar por la contraseña configurada

jdbcUrl = f"jdbc:sqlserver://{jdbcHostname}:{jdbcPort};databaseName={jdbcDatabase}"

# Propiedades de conexión
connectionProperties = {
    "user": jdbcUsername,
    "password": jdbcPassword,
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

+---------+--------------------+-------------+-----+------------+---------+----+-------+-----------------+--------------+-------------------+-------------------+----------------+--------------------+----------------------+--------------------+--------------------+
|ProductID|                Name|ProductNumber|Color|StandardCost|ListPrice|Size| Weight|ProductCategoryID|ProductModelID|      SellStartDate|        SellEndDate|DiscontinuedDate|      ThumbNailPhoto|ThumbnailPhotoFileName|             rowguid|        ModifiedDate|
+---------+--------------------+-------------+-----+------------+---------+----+-------+-----------------+--------------+-------------------+-------------------+----------------+--------------------+----------------------+--------------------+--------------------+
|      680|HL Road Frame - B...|   FR-R92B-58|Black|   1059.3100|1431.5000|  58|1016.04|               18|             6|2002-06-01 00:00:00|               null|            null|[47 49 46 38 39 6...|  no_i

### Consulta de Prueba

In [0]:
# Consulta de prueba
query = "(SELECT TOP 10 * FROM SalesLT.Product) AS temp"  # Cambia por una tabla válida si es necesario

# Leer datos desde SQL Server
try:
    df = spark.read.jdbc(url=jdbcUrl, table=query, properties=connectionProperties)
    df.show()  # Mostrar los datos
except Exception as e:
    print(f"Error al conectar: {e}")

+---------+--------------------+-------------+-----+------------+---------+----+-------+-----------------+--------------+-------------------+-------------------+----------------+--------------------+----------------------+--------------------+--------------------+
|ProductID|                Name|ProductNumber|Color|StandardCost|ListPrice|Size| Weight|ProductCategoryID|ProductModelID|      SellStartDate|        SellEndDate|DiscontinuedDate|      ThumbNailPhoto|ThumbnailPhotoFileName|             rowguid|        ModifiedDate|
+---------+--------------------+-------------+-----+------------+---------+----+-------+-----------------+--------------+-------------------+-------------------+----------------+--------------------+----------------------+--------------------+--------------------+
|      680|HL Road Frame - B...|   FR-R92B-58|Black|   1059.3100|1431.5000|  58|1016.04|               18|             6|2002-06-01 00:00:00|               null|            null|[47 49 46 38 39 6...|  no_i

### Prueba con consultas simples usando Pyspark:  
 

#### 1. Listar Listar todas las tablas disponibles

In [0]:
query = "(SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE') AS temp"
df_tables = spark.read.jdbc(url=jdbcUrl, table=query, properties=connectionProperties)
df_tables.show()


+--------------------+
|          TABLE_NAME|
+--------------------+
|            Customer|
|        ProductModel|
|  ProductDescription|
|             Product|
|ProductModelProdu...|
|     ProductCategory|
|        BuildVersion|
|            ErrorLog|
|             Address|
|     CustomerAddress|
|    SalesOrderDetail|
|    SalesOrderHeader|
+--------------------+



#### 2. Productos con precios mayores a $50

In [0]:
query = "(SELECT ProductID, Name, ListPrice FROM SalesLT.Product WHERE ListPrice > 50) AS temp"
df_filtered = spark.read.jdbc(url=jdbcUrl, table=query, properties=connectionProperties)
df_filtered.show()


+---------+--------------------+---------+
|ProductID|                Name|ListPrice|
+---------+--------------------+---------+
|      680|HL Road Frame - B...|1431.5000|
|      706|HL Road Frame - R...|1431.5000|
|      717|HL Road Frame - R...|1431.5000|
|      718|HL Road Frame - R...|1431.5000|
|      719|HL Road Frame - R...|1431.5000|
|      720|HL Road Frame - R...|1431.5000|
|      721|HL Road Frame - R...|1431.5000|
|      722|LL Road Frame - B...| 337.2200|
|      723|LL Road Frame - B...| 337.2200|
|      724|LL Road Frame - B...| 337.2200|
|      725|LL Road Frame - R...| 337.2200|
|      726|LL Road Frame - R...| 337.2200|
|      727|LL Road Frame - R...| 337.2200|
|      728|LL Road Frame - R...| 337.2200|
|      729|LL Road Frame - R...| 337.2200|
|      730|LL Road Frame - R...| 337.2200|
|      731|ML Road Frame - R...| 594.8300|
|      732|ML Road Frame - R...| 594.8300|
|      733|ML Road Frame - R...| 594.8300|
|      734|ML Road Frame - R...| 594.8300|
+---------+

#### 4. Contar productos por categoría

In [0]:
query = """
(SELECT ProductCategoryID, COUNT(*) AS TotalProducts
 FROM SalesLT.Product
 GROUP BY ProductCategoryID) AS temp
"""
df_count = spark.read.jdbc(url=jdbcUrl, table=query, properties=connectionProperties)
df_count.show()


+-----------------+-------------+
|ProductCategoryID|TotalProducts|
+-----------------+-------------+
|                5|           32|
|                6|           43|
|                7|           22|
|                8|            8|
|                9|            3|
|               10|            2|
|               11|            1|
|               12|            3|
|               13|            2|
|               14|            3|
|               15|            3|
|               16|           28|
|               17|            7|
|               18|           33|
|               19|            9|
|               20|           18|
|               21|           14|
|               22|            3|
|               23|            1|
|               24|            6|
+-----------------+-------------+
only showing top 20 rows



> Actividad: Mejorar la query anterior

#### 5. Contar el total de productos por tamaño

In [0]:
df.groupBy("Size").count().orderBy("count", ascending=False).show()


+----+-----+
|Size|count|
+----+-----+
|null|    4|
|   M|    2|
|  58|    2|
|   L|    1|
|   S|    1|
+----+-----+



> Mejorar/corregir la query anterior

#### 6. Calcular el precio promedio de los productos

In [0]:
df.selectExpr("AVG(ListPrice) AS AveragePrice").show()


+------------+
|AveragePrice|
+------------+
|309.59400000|
+------------+



> ¿y si obtenemos el precio promedio por cada producto?. Mejorar la query

#### 7. Encontrar productos sin categoría asignada

In [0]:
df.filter(df.ProductCategoryID.isNull()).select("ProductID", "Name", "ProductCategoryID").show()


+---------+----+-----------------+
|ProductID|Name|ProductCategoryID|
+---------+----+-----------------+
+---------+----+-----------------+



> ¿Es la query anterior correcta? Demostrar con otra query que si

#### 8. Contar productos por color

In [0]:
df.groupBy("Color").count().orderBy("count", ascending=False).show()


+-----+-----+
|Color|count|
+-----+-----+
|Multi|    3|
|White|    2|
|Black|    2|
|  Red|    2|
| Blue|    1|
+-----+-----+



#### 9. Calcular el costo total de todos los productos

In [0]:
df.selectExpr("SUM(StandardCost) AS TotalCost").show()


+---------+
|TotalCost|
+---------+
|2248.5784|
+---------+



> Mejorar query anterior

#### 10. Productos que contienen una palabra específica en su nombre 

In [0]:
# Filtrar productos que contienen la palabra 'Helmet' en el nombre
df.filter(df.Name.contains("Helmet")).select("ProductID", "Name", "ListPrice").show()


+---------+--------------------+---------+
|ProductID|                Name|ListPrice|
+---------+--------------------+---------+
|      707|Sport-100 Helmet,...|  34.9900|
|      708|Sport-100 Helmet,...|  34.9900|
|      711|Sport-100 Helmet,...|  34.9900|
+---------+--------------------+---------+



> Lista los nombres de todos los productos

#### 11. Listar productos creados después de 2005

In [0]:
df.filter(df.SellStartDate >= "2005-01-01").select("ProductID", "Name", "SellStartDate").show()


+---------+--------------------+-------------------+
|ProductID|                Name|      SellStartDate|
+---------+--------------------+-------------------+
|      707|Sport-100 Helmet,...|2005-07-01 00:00:00|
|      708|Sport-100 Helmet,...|2005-07-01 00:00:00|
|      709|Mountain Bike Soc...|2005-07-01 00:00:00|
|      710|Mountain Bike Soc...|2005-07-01 00:00:00|
|      711|Sport-100 Helmet,...|2005-07-01 00:00:00|
|      712|        AWC Logo Cap|2005-07-01 00:00:00|
|      713|Long-Sleeve Logo ...|2005-07-01 00:00:00|
|      714|Long-Sleeve Logo ...|2005-07-01 00:00:00|
+---------+--------------------+-------------------+



#### 12. Producto más caro por categoría 

In [0]:
from pyspark.sql.functions import col, max as spark_max

df.groupBy("ProductCategoryID").agg(spark_max("ListPrice").alias("MaxPrice")).orderBy("MaxPrice", ascending=False).show()


+-----------------+---------+
|ProductCategoryID| MaxPrice|
+-----------------+---------+
|               18|1431.5000|
|               25|  49.9900|
|               35|  34.9900|
|               27|   9.5000|
|               23|   8.9900|
+-----------------+---------+



#### 13. Calcular el precio promedio por categoría

In [0]:
from pyspark.sql.functions import avg

df.groupBy("ProductCategoryID").agg(avg("ListPrice").alias("AveragePrice")).orderBy("AveragePrice", ascending=False).show()


+-----------------+-------------+
|ProductCategoryID| AveragePrice|
+-----------------+-------------+
|               18|1431.50000000|
|               25|  49.99000000|
|               35|  34.99000000|
|               27|   9.50000000|
|               23|   8.99000000|
+-----------------+-------------+



> Mejorar query anterior

#### 14. Encontrar productos descontinuados (Discontinued no es NULL)

In [0]:
df.filter(df.DiscontinuedDate.isNotNull()).select("ProductID", "Name", "DiscontinuedDate").show()


+---------+----+----------------+
|ProductID|Name|DiscontinuedDate|
+---------+----+----------------+
+---------+----+----------------+



> Comprueba con otra query que la salida anterior es correcta

#### 15. Productos con precios mayores que su costo estándar 

In [0]:
df.filter(df.ListPrice > df.StandardCost).select("ProductID", "Name", "ListPrice", "StandardCost").show()

+---------+--------------------+---------+------------+
|ProductID|                Name|ListPrice|StandardCost|
+---------+--------------------+---------+------------+
|      680|HL Road Frame - B...|1431.5000|   1059.3100|
|      706|HL Road Frame - R...|1431.5000|   1059.3100|
|      707|Sport-100 Helmet,...|  34.9900|     13.0863|
|      708|Sport-100 Helmet,...|  34.9900|     13.0863|
|      709|Mountain Bike Soc...|   9.5000|      3.3963|
|      710|Mountain Bike Soc...|   9.5000|      3.3963|
|      711|Sport-100 Helmet,...|  34.9900|     13.0863|
|      712|        AWC Logo Cap|   8.9900|      6.9223|
|      713|Long-Sleeve Logo ...|  49.9900|     38.4923|
|      714|Long-Sleeve Logo ...|  49.9900|     38.4923|
+---------+--------------------+---------+------------+



### Actividad 1: Repetir las consultas anteriores pero usando SQL (no pyspark) 

#### Registrar las tablas que utilizarás como tabla temporal, por ejemplo:

In [0]:
query = "(SELECT * FROM SalesLT.Product) AS temp"
df = spark.read.jdbc(url=jdbcUrl, table=query, properties=connectionProperties)
df.createOrReplaceTempView("Product")


In [0]:
%sql
SELECT ProductID, Name, ListPrice
FROM Product
ORDER BY ListPrice DESC
LIMIT 5;


ProductID,Name,ListPrice
749,"Road-150 Red, 62",3578.27
753,"Road-150 Red, 56",3578.27
750,"Road-150 Red, 44",3578.27
751,"Road-150 Red, 48",3578.27
752,"Road-150 Red, 52",3578.27


### Actividad 2. Utilizando PySpark responda a las siguientes preguntas:

#### 1. Escribe un código para calcular cuántos productos tienen un ListPrice mayor que el precio promedio de todos los productos.

#### 2. Filtra todos los productos cuyo nombre comience con la letra "A" y muestra su ProductID, Name y ListPrice.

#### 3. Calcula la desviación estándar de la columna StandardCost.

#### 4. Ordena los productos por ListPrice en orden ascendente y muestra los 10 productos más baratos. 

#### 5. Filtra los productos cuyo tamaño sea "M" y cuyo precio sea mayor a $50. 

#### 6. Escribe un código para contar los productos donde la columna Color es nula

#### 7. Escribe un código para listar todas las combinaciones únicas de Color y Size en la tabla.

#### 8. Calcula la diferencia promedio entre ListPrice y StandardCost para todos los productos. 

#### 9. ¿Cuáles son los productos cuya fecha de modificación se encuentra entre el 11 de marzo de 2008 a las 10:01:00 y el 11 de marzo de 2008 a las 10:03:00, mostrando el ProductID, Name y ModifiedDate?

#### 10. ¿Cuáles son los productos de cada categoría (ProductCategoryID) que tienen un precio (ListPrice) mayor al costo estándar (StandardCost), y cuántos productos cumplen esta condición por categoría?

#### 11. Filtra los productos cuyo ListPrice esté entre $20 y $100.

#### 12. Para cada ProductCategoryID, muestra los 5 productos con el costo (StandardCost) más alto.

#### 13. Filtra los productos cuya columna ThumbNailPhoto no es nula y muestra su ProductID, Name, y ThumbNailPhotoFileName.

#### 14. Agrupa los productos por ProductCategoryID y calcula el precio total (SUM(ListPrice)) por categoría. 

#### 15. Filtra los productos cuya fecha de inicio de venta (SellStartDate) esté entre el 1 de enero de 2005 y el 31 de diciembre de 2006. 

### Actividad 3.  
 La Empresa decide "migrar" de sql server a postgreSQL. Efectuar la conexion Databricks Community con PostgreSQL en Azure. Efectuar algunas consultas sobre PostgreSQL usando PySpark y Scala. Utiliza una base de datos cualquiera.  
Si la version de community da muchos problemas utilizar Azure Databricks.