# Datos

En Spark hay tres formas de trabajar con los datos: RDD, DataFrames y DataSets. En Pyspark >>v 2.0 sobretodo se termina trabajando en DataFrames porque `py` no tiene una interfaz nativa de Datasets y es mucho más fácil e intuititvo que trabajar con RDD.



# PySpark: RDD


**Colección** de registros o filas (recordad, en **memoria**), que representan una entidad (por ejemplo, un vuelo, un curso, un alumno) sobre las que se puede aplicar operaciones - métodos - de *transformación* en otro RDD, o invocar *acciones* para recuperar resultados.



1. Particionado: se distribuyen los datos por todos los nodos del cluster. Nº x defecto -> numero de cores disponibles
2. Inmutables: una vez se crean, no pueden cambiar
3. Resilentes: si un nodo cae, los datos se pueden reconstruir

Otras características: RDD no tiene soporte para la ejecución optimizada, esto es, no optimiza la secuencia de acciones a realizar para obtener el resultado, y permite tipado fuerte.

El procesamiento ocurre de forma distribuida en los nodos, pero en el caso de Spark los datos se almacenan en la memoria de cada nodo del cluster

# Operaciones

Existen dos tipos de operaciones:
+ Transformaciones: transforman el RDD en otro RDD 
+ Acciones: obtienen el resultado (collect)

Lo más importante en este punto es tener claro que las transformaciones se pueden aplicar en cadena, pero no se ejecutan hasta que se hace la llamada a la acción. De este modo se genera un Grafo Acíclico Dirigido (DAG) de transformaciones.

# Dataframes

Se asimilan a una tabla de BBDD.
Se generan sobre los RDD y por tanto comparten sus características, pero te facilitan una mayor abstracción al realizar las operaciones.


## Nota: Datasets

En Py no no se suelen usar los datasets. Podéis pensar en ellos como una interfaz evolucionada de los dataframes, mezclando lo mejor de los dos mundos: tipado fuerte y optimización.


# Cargar pyspark en el sistema

SparkSession es el punto de entrada de un programa Spark. En versiones anteriores, si quisiéramos trabajar con Hive o SQL, habría que declarar varios contextos. Con Spark Session nos abstraemos de los diferentes contextos y utilizamos un único punto de entrada para leer datos, gestionar los recursos del cluster o trabajar con metadatos.

Para más info sobre `Spark Session` https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.SparkSession.html

Y https://spark.apache.org/docs/2.3.0/configuration.html

No todas las opciones siempre van a ser iguales. En este caso, he especificado que quiero que se use spark en local con tantos threads como me permita mi máquina ( `master("local[*]")`) y dependerá, también, de la plataforma donde estés ejecutando spark.

Por ejemplo, en GCP puedo tener esta configuración:

`spark = SparkSession.builder \
    ``.appName('PySpark MyApp')`
    `.config('spark.jars', 'gs://spark-lib/
    `bigquery`spark-bigquery-latest_2.12.jar')`
    `.config('spark.driver.maxResultSize', '150g')\
    .getOrCreate()`
``spark.conf.set("spark.sql.repl.eagerEval.enabled", True)``


In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F, types as T
spark = SparkSession.builder.appName("Varios").master("local[*]").getOrCreate()
spark

23/09/25 11:34:26 WARN Utils: Your hostname, EliteX2 resolves to a loopback address: 127.0.1.1; using 192.168.5.94 instead (on interface wifi0)
23/09/25 11:34:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/25 11:34:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Web UI Spark
https://medium.com/iwannabedatadriven/entenWeb UI Sparkdiendo-la-interfaz-de-usuario-de-spark-web-ui-spark-ii-5f84f05545a

In [None]:
# http://localhost:4040

# Importación 

En versiones anteriores de Spark, la forma de crear un RDD a partir de textos era con las funciones `parallelize((tupla1),(tupla2),...,(tuplaN))` o leyendo los archivos de textos `textFile(ruta, particiones)` Ahora se puede utilizar la interfaz `read.` para leer los archivos y transformando los datos a un dataframe

Spark permite la lectura de diferentes formatos: `json, parquet, avro, csv o accediendo mediante jdbc a datos de la bbdd`. En entornos cloud, existen interfaces para conectar con sus repositorios (por ejemplo, en Google, con BigQuery)

In [2]:
archivo = 'california_housing_train.csv'
df_spark = spark.read.csv(archivo, inferSchema=True, header=True)

# imprimir tipo de archivo
print(type(df_spark))

# RDD tradicional: spark.textFile(ruta,...)


[Stage 0:>                                                          (0 + 1) / 1]                                                                                

<class 'pyspark.sql.dataframe.DataFrame'>


# RDD

## Revisar el número de particiones que tenemos en el dataset

In [3]:
rddspark = df_spark.rdd
type(rddspark)

pyspark.rdd.RDD

In [4]:
df_spark2 = df_spark.repartition(3)

In [5]:
print(df_spark2.rdd.getNumPartitions())


[Stage 2:>                                                          (0 + 1) / 1]

3


In [6]:
rddspark.getNumPartitions()


1

In [7]:
rddspark2 = rddspark.repartition(3)


In [8]:
rddspark2.getNumPartitions()

3

# Definición de datos en un RDD

RDD es schemaless. Esto proporciona gran flexibilidad a la hora de trabajar con él. Permite trabajar con tuplas, listas o diccionarios a la hora de definir el RDD. Por ejemplo:




In [9]:
## Analizamos el dataframe 

df_spark.limit(10).toPandas().describe()

                                                                                

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value
count,10.0,10.0,10.0,10.0,10.0,10.0,10.0,10.0,10.0
mean,-114.542,34.0,26.0,2832.9,658.9,1042.4,389.7,2.21791,68300.0
std,0.089294,0.519145,11.205158,2377.174534,595.440537,858.024242,283.480766,0.647966,13363.299659
min,-114.6,33.57,14.0,720.0,168.0,333.0,117.0,1.4936,48100.0
25%,-114.5875,33.615,17.5,1403.75,254.25,542.25,229.25,1.736225,60175.0
50%,-114.575,33.665,22.5,1499.0,331.5,729.0,266.5,2.0516,70150.0
75%,-114.5625,34.3475,32.75,4318.5,1051.25,1100.5,469.75,2.5553,78575.0
max,-114.31,34.83,46.0,7650.0,1901.0,3134.0,1056.0,3.3438,85700.0


In [10]:

data3 = rddspark2.take(3)
for f in data3:
    print("data3 longitude: "+ str(f[0]) +", latitude: "+str(f[1]))

[Stage 4:>                                                          (0 + 1) / 1]

data3 longitude: -114.31, latitude: 34.19
data3 longitude: -114.47, latitude: 34.4
data3 longitude: -114.56, latitude: 33.69


                                                                                

En este ejemplo anterior, hemos accedido a los datos que previamente estaban formateados por un dataset. Hemos cogido 3 registros y hemos accedido a ellos por su valor.


## Creación de RDD mediante parallelize 

In [12]:
midata = spark.sparkContext.parallelize([('Huanito','25'),['Mercedes',25,'sport'],{'CASA':'Madrid'}])

Este **RDD** está formado por una **tupla**, una **lista** y un **diccionario**:

In [13]:
data4 = midata.take(2)
for f in data4:
    print("data4 Value:"+ str(f[0]) +", Value:"+str(f[1]))

data4 Value:Huanito, Value:25
data4 Value:Mercedes, Value:25


Si intentamos acceder ahora al diccionario del mismo modo, obtendremos un error. **Es flexible en cuanto la posibilidad de crear nuevos campos para cada registro, pero  tenemos que controlar las estructuras conforme accedamos al dato**:

In [14]:
data4 = midata.take(3)
for f in data4:
    print("data4 Key:"+ str(f[0]) +", Value:"+str(f[1]))

data4 Key:Huanito, Value:25
data4 Key:Mercedes, Value:25


KeyError: 0

**Creación de un RDD usando el objeto ROW**

Fijaos: estamos usando un objeto `Row`. `Row` define una fila en un dataframe. Sin embargo, también define un registro en un RDD.

In [15]:
from pyspark.sql.types import Row

data = spark.sparkContext.parallelize([Row(
                           id=1,
                           name="Cesar",
                           nota=50
                        ),
                        Row(
                            id=2,
                            name="Maria",
                            nota=80
                        ),
                        Row(
                            id=3,
                            name="Juan",
                            nota=75
                        )])

In [16]:
data.collect()

[Row(id=1, name='Cesar', nota=50),
 Row(id=2, name='Maria', nota=80),
 Row(id=3, name='Juan', nota=75)]

In [17]:
type(data)

pyspark.rdd.RDD

In [18]:
data.toDF()

DataFrame[id: bigint, name: string, nota: bigint]

# Map y flatMap

`Map` y `flatMap` permiten realizar **operaciones de tranformación sobre los campos**. 

Sin embargo, hasta que no se hace una operación, no podemos acceder al RDD como lo haríamos en un objeto Py normal. Las acciones collect, top, count etc, son algunas acciones con las que podemos trabajar.

`map` es el método de transformación básico, realiza una transformación sobre el RDD para obtener otro RDD

In [19]:
print ('Obtener housing_median_age de la RDD:\n')
print(rddspark2.map(lambda x: x.housing_median_age))

Obtener housing_median_age de la RDD:

PythonRDD[45] at RDD at PythonRDD.scala:53


In [20]:
rddspark2.map(lambda x: x.housing_median_age).collect()

[15.0,
 19.0,
 17.0,
 14.0,
 20.0,
 29.0,
 25.0,
 41.0,
 34.0,
 46.0,
 38.0,
 35.0,
 16.0,
 19.0,
 19.0,
 29.0,
 33.0,
 21.0,
 15.0,
 19.0,
 25.0,
 35.0,
 34.0,
 14.0,
 23.0,
 17.0,
 27.0,
 41.0,
 23.0,
 33.0,
 16.0,
 34.0,
 20.0,
 15.0,
 21.0,
 5.0,
 10.0,
 14.0,
 5.0,
 20.0,
 5.0,
 12.0,
 27.0,
 8.0,
 15.0,
 16.0,
 26.0,
 24.0,
 20.0,
 8.0,
 23.0,
 28.0,
 22.0,
 28.0,
 19.0,
 34.0,
 19.0,
 14.0,
 23.0,
 26.0,
 11.0,
 7.0,
 18.0,
 10.0,
 19.0,
 13.0,
 5.0,
 11.0,
 24.0,
 9.0,
 14.0,
 15.0,
 26.0,
 17.0,
 8.0,
 24.0,
 19.0,
 18.0,
 17.0,
 18.0,
 20.0,
 15.0,
 21.0,
 16.0,
 12.0,
 21.0,
 13.0,
 13.0,
 17.0,
 21.0,
 16.0,
 23.0,
 11.0,
 14.0,
 18.0,
 13.0,
 16.0,
 21.0,
 26.0,
 12.0,
 15.0,
 6.0,
 5.0,
 20.0,
 16.0,
 26.0,
 5.0,
 17.0,
 13.0,
 11.0,
 9.0,
 29.0,
 10.0,
 2.0,
 23.0,
 14.0,
 14.0,
 16.0,
 15.0,
 9.0,
 21.0,
 16.0,
 14.0,
 22.0,
 32.0,
 15.0,
 8.0,
 18.0,
 23.0,
 16.0,
 14.0,
 19.0,
 17.0,
 24.0,
 24.0,
 32.0,
 38.0,
 35.0,
 28.0,
 21.0,
 19.0,
 33.0,
 20.0,
 25.0,
 13.0,
 

In [21]:
print(rddspark2.map(lambda x: x.housing_median_age).collect()[0:10])

[15.0, 19.0, 17.0, 14.0, 20.0, 29.0, 25.0, 41.0, 34.0, 46.0]


OJO: top no te coge los 10 primeros datos. Take, sí. En este caso  `take` se comporta como `collect[0:x]` 

In [22]:
print(rddspark2.map(lambda x: x.housing_median_age).top(10)) 


[52.0, 52.0, 52.0, 52.0, 52.0, 52.0, 52.0, 52.0, 52.0, 52.0]


In [23]:
rddspark2.map(lambda x: x.housing_median_age)

PythonRDD[49] at RDD at PythonRDD.scala:53

In [24]:
print(rddspark2.map(lambda x: x.housing_median_age).take(10))

[15.0, 19.0, 17.0, 14.0, 20.0, 29.0, 25.0, 41.0, 34.0, 46.0]


In [25]:
print(rddspark2.map(lambda x: (x[0],x[1])))

PythonRDD[51] at RDD at PythonRDD.scala:53


In [26]:
rddspark2.map(lambda x: (x[0],x[1])).collect()

[(-114.31, 34.19),
 (-114.47, 34.4),
 (-114.56, 33.69),
 (-114.57, 33.64),
 (-114.57, 33.57),
 (-114.58, 33.63),
 (-114.58, 33.61),
 (-114.59, 34.83),
 (-114.59, 33.61),
 (-114.6, 34.83),
 (-115.38, 32.82),
 (-115.38, 32.81),
 (-115.39, 32.76),
 (-115.4, 32.86),
 (-115.4, 32.7),
 (-115.41, 32.99),
 (-115.46, 33.19),
 (-115.48, 32.8),
 (-115.48, 32.68),
 (-115.49, 32.87),
 (-115.53, 32.99),
 (-115.53, 32.97),
 (-115.53, 32.97),
 (-115.53, 32.73),
 (-115.54, 32.99),
 (-115.54, 32.99),
 (-115.54, 32.98),
 (-115.54, 32.97),
 (-115.54, 32.79),
 (-115.55, 32.98),
 (-115.57, 32.8),
 (-115.57, 32.79),
 (-115.57, 32.78),
 (-115.57, 32.78),
 (-115.58, 33.88),
 (-115.58, 32.81),
 (-115.58, 32.81),
 (-115.58, 32.79),
 (-115.58, 32.78),
 (-115.59, 32.85),
 (-115.94, 33.38),
 (-115.95, 33.28),
 (-115.96, 33.3),
 (-115.98, 33.32),
 (-115.99, 33.4),
 (-116.0, 33.19),
 (-116.0, 32.74),
 (-116.01, 33.51),
 (-116.01, 33.41),
 (-116.02, 34.18),
 (-116.2, 33.63),
 (-116.2, 32.64),
 (-116.21, 33.75),
 (-116

Si queremos **filtrar** elementos, usamos `filter`

In [27]:
rddspark2.filter(lambda x : x['housing_median_age']>46).count()

1617

In [28]:
rddspark2.filter(lambda x : x['housing_median_age']>46).distinct().collect()



[Row(longitude=-117.14, latitude=32.71, housing_median_age=52.0, total_rooms=800.0, total_bedrooms=313.0, population=1337.0, households=282.0, median_income=1.5594, median_house_value=87500.0),
 Row(longitude=-117.17, latitude=32.73, housing_median_age=52.0, total_rooms=408.0, total_bedrooms=143.0, population=313.0, households=143.0, median_income=1.815, median_house_value=116700.0),
 Row(longitude=-117.22, latitude=32.74, housing_median_age=52.0, total_rooms=1283.0, total_bedrooms=173.0, population=436.0, households=190.0, median_income=7.4029, median_house_value=345700.0),
 Row(longitude=-117.31, latitude=34.11, housing_median_age=52.0, total_rooms=851.0, total_bedrooms=190.0, population=731.0, households=190.0, median_income=1.9044, median_house_value=64900.0),
 Row(longitude=-117.31, latitude=34.1, housing_median_age=52.0, total_rooms=1457.0, total_bedrooms=415.0, population=1238.0, households=341.0, median_income=2.0089, median_house_value=68100.0),
 Row(longitude=-117.37, latitud

## Map vs flatmap

`map` Siempre devuelve el mismo número de registros, mientras  `flatMap` devuelve más de un registro para cada registro (~ lo aplana).  

In [29]:
#flatMap

data = ["Project Gutenberg’s",
        "Alice’s Adventures in Wonderland",
        "Project Gutenberg’s",
        "Adventures in Wonderland",
        "Project Gutenberg’s"]
rdd=spark.sparkContext.parallelize(data)
print ('element as is')
print ('**********************************************')

for element in rdd.collect():
    print(element)

print ('**********************************************')
print ('flat map by whitespace')
print ('**********************************************')

rdd2 = rdd.flatMap(lambda x: x.split(' '))
for element in rdd2.collect():
    print(element)

    
print ('**********************************************')
print ('map by whitespace')
print ('**********************************************')

rdd2 = rdd.map(lambda x: x.split(' '))
for element in rdd2.collect():
    print(element)


element as is
**********************************************
Project Gutenberg’s
Alice’s Adventures in Wonderland
Project Gutenberg’s
Adventures in Wonderland
Project Gutenberg’s
**********************************************
flat map by whitespace
**********************************************
Project
Gutenberg’s
Alice’s
Adventures
in
Wonderland
Project
Gutenberg’s
Adventures
in
Wonderland
Project
Gutenberg’s
**********************************************
map by whitespace
**********************************************
['Project', 'Gutenberg’s']
['Alice’s', 'Adventures', 'in', 'Wonderland']
['Project', 'Gutenberg’s']
['Adventures', 'in', 'Wonderland']
['Project', 'Gutenberg’s']


## Reduce By Key

`reduceByKey`
Permite **mezclar valores de un rdd en forma clave:valor**. Escanea los valores que le llegan, si encuentra una nueva clave la añade, y si encuentra pares clave-valor que comparten una misma clave, aplica la función definida en su llamada.

In [30]:
datitos = spark.sparkContext.parallelize([[{'nombre':'Juanito'},{'localidad':'MAD'},{'edad':25}],[{'nombre':'Fulanito'},{'localidad':'MAD'},{'edad':14}],[{'nombre':'Menganito'},{'localidad':'TOL'},{'edad':35}]])

In [31]:
datitos

ParallelCollectionRDD[62] at readRDDFromFile at PythonRDD.scala:287

In [32]:
datitos.collect()

[[{'nombre': 'Juanito'}, {'localidad': 'MAD'}, {'edad': 25}],
 [{'nombre': 'Fulanito'}, {'localidad': 'MAD'}, {'edad': 14}],
 [{'nombre': 'Menganito'}, {'localidad': 'TOL'}, {'edad': 35}]]

Para aplicar las transformaciones, primero debemos usar `map`. 

Fijaos que, en este caso, estamos diciendo que un registro es una lista de diccionarios. Tenemos que tratar, por tanto, nuestros datos con este formato

In [33]:
minidatitos = datitos.map(lambda x : ( x[1].get('localidad'),x[2].get('edad')))

In [34]:
minidatitos.collect()

[('MAD', 25), ('MAD', 14), ('TOL', 35)]

In [35]:
minidatitos.reduceByKey(lambda x,y : (x + y) / 2).collect()

[('TOL', 35), ('MAD', 19.5)]


# Dataframe

Dataframe nos permite trabajar con RDD a más alto nivel, como si de una tabla de BBDD se tratase.

No impone un tipado fuerte, pero sí un esquema (implícito o explícito) sobre los datos.

## Descriptivos

¿Numero de registros en el dataframe?

In [36]:
df_spark.count()

17000




## Estructura del dataframe

In [37]:
df_spark.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)



¿Nombre de las Columnas de dataframe?

In [38]:
df_spark.columns

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income',
 'median_house_value']

Ver los primeros 20 registros del dataframe

In [39]:
df_spark.collect()[0]

Row(longitude=-114.31, latitude=34.19, housing_median_age=15.0, total_rooms=5612.0, total_bedrooms=1283.0, population=1015.0, households=472.0, median_income=1.4936, median_house_value=66900.0)

In [40]:
df_spark.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|
|  -114.57|   33.57|              20.0|     1454.0|         326.0|     624.0|     262.0|        1.925|           65500.0|
|  -114.58|   33.63|    

In [41]:
arrayData = [ ['el caso'],['es'],[' asi es nuestra casa']
]

df = spark.createDataFrame(data=arrayData)
df.printSchema()
df.show()
df2 = df.select(
        F.explode(
            F.split(df._1, " ")
        ).alias("word"))
df2.show()

root
 |-- _1: string (nullable = true)

+--------------------+
|                  _1|
+--------------------+
|             el caso|
|                  es|
| asi es nuestra casa|
+--------------------+

+-------+
|   word|
+-------+
|     el|
|   caso|
|     es|
|       |
|    asi|
|     es|
|nuestra|
|   casa|
+-------+



## Descricipcion Estadistica del dataframe

Anteriormente, ya hemos realizado un describe sobre el `pandas dataframe` obtenido a partir del `spark dataframe`. Este método  no es igual al describe sobre un `pandas`. Por ejemplo:

In [42]:
df_spark.describe

<bound method DataFrame.describe of DataFrame[longitude: double, latitude: double, housing_median_age: double, total_rooms: double, total_bedrooms: double, population: double, households: double, median_income: double, median_house_value: double]>

In [43]:
df_spark.limit(10).toPandas().describe()

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value
count,10.0,10.0,10.0,10.0,10.0,10.0,10.0,10.0,10.0
mean,-114.542,34.0,26.0,2832.9,658.9,1042.4,389.7,2.21791,68300.0
std,0.089294,0.519145,11.205158,2377.174534,595.440537,858.024242,283.480766,0.647966,13363.299659
min,-114.6,33.57,14.0,720.0,168.0,333.0,117.0,1.4936,48100.0
25%,-114.5875,33.615,17.5,1403.75,254.25,542.25,229.25,1.736225,60175.0
50%,-114.575,33.665,22.5,1499.0,331.5,729.0,266.5,2.0516,70150.0
75%,-114.5625,34.3475,32.75,4318.5,1051.25,1100.5,469.75,2.5553,78575.0
max,-114.31,34.83,46.0,7650.0,1901.0,3134.0,1056.0,3.3438,85700.0


`describe` se asemeja más a un `printSchema

In [44]:
df_spark.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)



Si queremos un comportamiento similar a la de `pandas`, tenemos que llamar al método `show` o identificar la columna en concreto. Por ejemplo, descripcion estadistica de una sola columna ('median_house_value'). 

**Pregunta**: qué está haciendo

In [45]:
df_spark.describe()

DataFrame[summary: string, longitude: string, latitude: string, housing_median_age: string, total_rooms: string, total_bedrooms: string, population: string, households: string, median_income: string, median_house_value: string]

In [46]:
df_spark.describe().show()

23/09/25 10:49:25 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------+-------------------+------------------+------------------+-----------------+-----------------+------------------+-----------------+------------------+------------------+
|summary|          longitude|          latitude|housing_median_age|      total_rooms|   total_bedrooms|        population|       households|     median_income|median_house_value|
+-------+-------------------+------------------+------------------+-----------------+-----------------+------------------+-----------------+------------------+------------------+
|  count|              17000|             17000|             17000|            17000|            17000|             17000|            17000|             17000|             17000|
|   mean|-119.56210823529375|  35.6252247058827| 28.58935294117647|2643.664411764706|539.4108235294118|1429.5739411764705|501.2219411764706| 3.883578100000021|207300.91235294117|
| stddev| 2.0051664084260357|2.1373397946570867|12.586936981660406|2179.947071452777|421.4994515798648| 1

In [47]:
df_spark.describe(['median_house_value']).show()

+-------+------------------+
|summary|median_house_value|
+-------+------------------+
|  count|             17000|
|   mean|207300.91235294117|
| stddev|115983.76438720895|
|    min|           14999.0|
|    max|          500001.0|
+-------+------------------+



# Operaciones básicas

## Devolver los datos del dataframe

Para devolver todos los datos de un pyspark dataframe, se usa la función `collect`:

In [48]:
df_spark.collect()

[Row(longitude=-114.31, latitude=34.19, housing_median_age=15.0, total_rooms=5612.0, total_bedrooms=1283.0, population=1015.0, households=472.0, median_income=1.4936, median_house_value=66900.0),
 Row(longitude=-114.47, latitude=34.4, housing_median_age=19.0, total_rooms=7650.0, total_bedrooms=1901.0, population=1129.0, households=463.0, median_income=1.82, median_house_value=80100.0),
 Row(longitude=-114.56, latitude=33.69, housing_median_age=17.0, total_rooms=720.0, total_bedrooms=174.0, population=333.0, households=117.0, median_income=1.6509, median_house_value=85700.0),
 Row(longitude=-114.57, latitude=33.64, housing_median_age=14.0, total_rooms=1501.0, total_bedrooms=337.0, population=515.0, households=226.0, median_income=3.1917, median_house_value=73400.0),
 Row(longitude=-114.57, latitude=33.57, housing_median_age=20.0, total_rooms=1454.0, total_bedrooms=326.0, population=624.0, households=262.0, median_income=1.925, median_house_value=65500.0),
 Row(longitude=-114.58, latitud

### Nota:

Fijaos que devuelve una lista de filas. No es la mejor forma de trabajar con los datos. Para gestionar esta situación, tenemos que usar funciones de **`selección`** y **`filtrado`** de columnas y registros:

# Filtrado de datos

## Selección -- columnas

In [49]:
df_spark.select(['total_rooms','total_bedrooms']).show()

+-----------+--------------+
|total_rooms|total_bedrooms|
+-----------+--------------+
|     5612.0|        1283.0|
|     7650.0|        1901.0|
|      720.0|         174.0|
|     1501.0|         337.0|
|     1454.0|         326.0|
|     1387.0|         236.0|
|     2907.0|         680.0|
|      812.0|         168.0|
|     4789.0|        1175.0|
|     1497.0|         309.0|
|     3741.0|         801.0|
|     1988.0|         483.0|
|     1291.0|         248.0|
|     2478.0|         464.0|
|     1448.0|         378.0|
|     2556.0|         587.0|
|     1678.0|         322.0|
|       44.0|          33.0|
|     1388.0|         386.0|
|       97.0|          24.0|
+-----------+--------------+
only showing top 20 rows



## Filtrado -- filas

In [50]:
filtrado = df_spark.filter(df_spark.total_rooms>1000)

In [51]:
filtrado

DataFrame[longitude: double, latitude: double, housing_median_age: double, total_rooms: double, total_bedrooms: double, population: double, households: double, median_income: double, median_house_value: double]

In [52]:
filtrado.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|
|  -114.57|   33.57|              20.0|     1454.0|         326.0|     624.0|     262.0|        1.925|           65500.0|
|  -114.58|   33.63|              29.0|     1387.0|         236.0|     671.0|     239.0|       3.3438|           74000.0|
|  -114.58|   33.61|    

## Filtrado -- where

In [53]:
filtrado.where(filtrado.total_bedrooms==1283).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|
|  -117.78|   33.78|               6.0|     9792.0|        1283.0|    3744.0|    1179.0|      10.1714|          481500.0|
|   -118.3|   34.06|              33.0|     2437.0|        1283.0|    3906.0|    1084.0|       2.0332|          270000.0|
|   -118.4|   34.17|              24.0|     4443.0|        1283.0|    2421.0|    1180.0|       2.2652|          269200.0|
|   -121.4|   38.49|              12.0|     7290.0|        1283.0|    3960.0|    1248.0|       3.5968|          106300.0|
+---------+--------+----

In [54]:
filtrado_pd = filtrado.toPandas()

In [55]:
filtrado_pd.loc[filtrado_pd.total_bedrooms==1283]

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value
0,-114.31,34.19,15.0,5612.0,1283.0,1015.0,472.0,1.4936,66900.0
2701,-117.78,33.78,6.0,9792.0,1283.0,3744.0,1179.0,10.1714,481500.0
6070,-118.3,34.06,33.0,2437.0,1283.0,3906.0,1084.0,2.0332,270000.0
7030,-118.4,34.17,24.0,4443.0,1283.0,2421.0,1180.0,2.2652,269200.0
10672,-121.4,38.49,12.0,7290.0,1283.0,3960.0,1248.0,3.5968,106300.0


## Acceso a filas / columnas

No existe algo parecido a `.iloc` o `.loc``

Tenemos que jugar con `collect`, `show`, `select` y `filter`

Por ejemplo: 

### Seleccionar habitaciones y población cuando las habitaciones sean mayor a ... 100!! Pero devuelve sólo los 10 primeros resultados



In [56]:
df_spark.select('total_bedrooms','population').where(filtrado.total_bedrooms>100)

DataFrame[total_bedrooms: double, population: double]

In [57]:
df_spark.select('total_bedrooms','population').where(filtrado.total_bedrooms>100).collect()[:10]

[Row(total_bedrooms=1283.0, population=1015.0),
 Row(total_bedrooms=1901.0, population=1129.0),
 Row(total_bedrooms=174.0, population=333.0),
 Row(total_bedrooms=337.0, population=515.0),
 Row(total_bedrooms=326.0, population=624.0),
 Row(total_bedrooms=236.0, population=671.0),
 Row(total_bedrooms=680.0, population=1841.0),
 Row(total_bedrooms=168.0, population=375.0),
 Row(total_bedrooms=1175.0, population=3134.0),
 Row(total_bedrooms=309.0, population=787.0)]

In [58]:
df_spark.select('total_bedrooms','population').where(filtrado.total_bedrooms>100).show(10)

+--------------+----------+
|total_bedrooms|population|
+--------------+----------+
|        1283.0|    1015.0|
|        1901.0|    1129.0|
|         174.0|     333.0|
|         337.0|     515.0|
|         326.0|     624.0|
|         236.0|     671.0|
|         680.0|    1841.0|
|         168.0|     375.0|
|        1175.0|    3134.0|
|         309.0|     787.0|
+--------------+----------+
only showing top 10 rows



### Si quisiéramos coger los datos de toda una columna

`select (F.collect_list(nombrecolumna)).first()[0]`



In [59]:
df_spark.select(F.collect_list('total_bedrooms')).first()[0]

[1283.0,
 1901.0,
 174.0,
 337.0,
 326.0,
 236.0,
 680.0,
 168.0,
 1175.0,
 309.0,
 801.0,
 483.0,
 248.0,
 464.0,
 378.0,
 587.0,
 322.0,
 33.0,
 386.0,
 24.0,
 360.0,
 243.0,
 95.0,
 129.0,
 397.0,
 139.0,
 322.0,
 270.0,
 191.0,
 294.0,
 394.0,
 262.0,
 196.0,
 171.0,
 113.0,
 220.0,
 373.0,
 246.0,
 666.0,
 104.0,
 389.0,
 440.0,
 573.0,
 72.0,
 913.0,
 492.0,
 523.0,
 218.0,
 287.0,
 610.0,
 136.0,
 283.0,
 262.0,
 382.0,
 366.0,
 387.0,
 337.0,
 275.0,
 581.0,
 199.0,
 634.0,
 340.0,
 545.0,
 325.0,
 373.0,
 268.0,
 395.0,
 454.0,
 403.0,
 365.0,
 530.0,
 316.0,
 142.0,
 221.0,
 162.0,
 606.0,
 480.0,
 416.0,
 375.0,
 328.0,
 835.0,
 438.0,
 490.0,
 202.0,
 283.0,
 217.0,
 269.0,
 256.0,
 301.0,
 289.0,
 594.0,
 208.0,
 235.0,
 279.0,
 282.0,
 143.0,
 203.0,
 507.0,
 414.0,
 274.0,
 307.0,
 177.0,
 187.0,
 317.0,
 244.0,
 231.0,
 235.0,
 340.0,
 99.0,
 238.0,
 448.0,
 103.0,
 81.0,
 18.0,
 379.0,
 1257.0,
 49.0,
 248.0,
 95.0,
 272.0,
 43.0,
 25.0,
 81.0,
 46.0,
 536.0,
 57.0,
 2

In [60]:
df_spark.toPandas()[['total_bedrooms']]

Unnamed: 0,total_bedrooms
0,1283.0
1,1901.0
2,174.0
3,337.0
4,326.0
...,...
16995,394.0
16996,528.0
16997,531.0
16998,552.0


# Sorting

En general, podemos ordenadar:

1. Llamando a la columna con método .asc() o .desc(), por ejemplo:

`df.sort(columna.asc())`

2. Llamando a la función de ordenación ascendente o descendente  sobre el nombre de la columna:

`df.sort (F.asc('columna'))`

## Sort

In [61]:
ordenado = filtrado.sort(filtrado.median_income.asc())

In [62]:
ordenado.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -118.25|   34.05|              52.0|     2806.0|        1944.0|    2232.0|    1605.0|       0.6775|          350000.0|
|  -118.23|   33.95|              37.0|     2667.0|         671.0|    2865.0|     683.0|       0.6831|           87500.0|
|  -119.73|   36.72|              26.0|     2645.0|        1005.0|    1660.0|     991.0|       0.6991|           89500.0|
|  -118.18|   33.85|              30.0|     2548.0|         717.0|    2086.0|     700.0|       0.7007|          134400.0|
|  -120.67|    35.3|              19.0|     1540.0|         715.0|    1799.0|     635.0|       0.7025|          500001.0|
|  -117.29|   34.11|    

## Order by

Lo vamos a complicar un poco: vamos a renombrar median_income y haremos el order by por esa columna renombrada

In [63]:
filtrado.select('longitude','latitude',F.col('median_income').alias('cosito')).orderBy(F.col('cosito').asc()).show()

+---------+--------+------+
|longitude|latitude|cosito|
+---------+--------+------+
|  -118.25|   34.05|0.6775|
|  -118.23|   33.95|0.6831|
|  -119.73|   36.72|0.6991|
|  -118.18|   33.85|0.7007|
|  -120.67|    35.3|0.7025|
|  -117.29|   34.11|0.7075|
|  -118.29|   34.03|0.7473|
|  -122.19|   37.76|0.7683|
|  -119.28|   36.33|  0.78|
|   -119.8|   36.74| 0.799|
|  -122.29|   37.81|0.8026|
|   -122.3|   37.81|0.8056|
|  -118.25|   34.05|0.8131|
|  -122.28|   37.82|0.8172|
|  -118.24|   34.05|0.8288|
|  -118.31|   34.03|0.8351|
|  -118.14|   34.15|0.8438|
|  -122.42|   37.78|0.8543|
|  -114.63|   32.76|0.8585|
|  -118.98|   35.38|0.8639|
+---------+--------+------+
only showing top 20 rows



In [64]:
filtrado.select('longitude','latitude',F.col('median_income').alias('cosito')).orderBy(F.asc('cosito')).show()

+---------+--------+------+
|longitude|latitude|cosito|
+---------+--------+------+
|  -118.25|   34.05|0.6775|
|  -118.23|   33.95|0.6831|
|  -119.73|   36.72|0.6991|
|  -118.18|   33.85|0.7007|
|  -120.67|    35.3|0.7025|
|  -117.29|   34.11|0.7075|
|  -118.29|   34.03|0.7473|
|  -122.19|   37.76|0.7683|
|  -119.28|   36.33|  0.78|
|   -119.8|   36.74| 0.799|
|  -122.29|   37.81|0.8026|
|   -122.3|   37.81|0.8056|
|  -118.25|   34.05|0.8131|
|  -122.28|   37.82|0.8172|
|  -118.24|   34.05|0.8288|
|  -118.31|   34.03|0.8351|
|  -118.14|   34.15|0.8438|
|  -122.42|   37.78|0.8543|
|  -114.63|   32.76|0.8585|
|  -118.98|   35.38|0.8639|
+---------+--------+------+
only showing top 20 rows



# Operaciones sobre columnas

Las operaciones sobre columnas se realizan con el método `withColumn (nombre, función))`

Para instanciar las columnas, dependiendo de la función, se puede hacer directamente con el nombre de la columna o, por ejemplo, habiendo importado el módulo `pyspark.sql.functions as F`, haciendo la llamada `F.col(nombre_columna)` 

## Operaciones sobre una columna

In [65]:
fp = filtrado.toPandas()
fp['housing_median_age'] = fp ['housing_median_age'] + 1

In [66]:
filtrado.select('housing_median_age').show(3)

+------------------+
|housing_median_age|
+------------------+
|              15.0|
|              19.0|
|              14.0|
+------------------+
only showing top 3 rows



In [67]:
 filtrado.withColumn('housing_median_age',F.col('housing_median_age')+1).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.31|   34.19|              16.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|
|  -114.47|    34.4|              20.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
|  -114.57|   33.64|              15.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|
|  -114.57|   33.57|              21.0|     1454.0|         326.0|     624.0|     262.0|        1.925|           65500.0|
|  -114.58|   33.63|              30.0|     1387.0|         236.0|     671.0|     239.0|       3.3438|           74000.0|
|  -114.58|   33.61|    

In [68]:
filtrado.withColumn('housing_median_age',filtrado.housing_median_age-1).show()



+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.31|   34.19|              14.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|
|  -114.47|    34.4|              18.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
|  -114.57|   33.64|              13.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|
|  -114.57|   33.57|              19.0|     1454.0|         326.0|     624.0|     262.0|        1.925|           65500.0|
|  -114.58|   33.63|              28.0|     1387.0|         236.0|     671.0|     239.0|       3.3438|           74000.0|
|  -114.58|   33.61|    

## Operaciones con CASE - WHEN

Podemos hacer un `case when` como en `SQL`. Para ello usamos la función `F.when().otherwise()`

`F.Lit` permite establecer valores literales en una columna. Ojo, cuando se pase como parámetro un valor a una función `udf` que no sea columna, es necesario utilizarlo. 

Más adelante tenéis un ejercicio que entregar para que defináis una `udf`


In [69]:
filtrado.withColumn('catgoria_edad',F.when(
    (filtrado.median_income.isNotNull() & filtrado.housing_median_age.isNotNull()), 'GUAY'
).otherwise(F.lit('N/A'))).show() 
                    
                   

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+-------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|catgoria_edad|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+-------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|         GUAY|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|         GUAY|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|         GUAY|
|  -114.57|   33.57|              20.0|     1454.0|         326.0|     624.0|     262.0|        1.925|           65500.0|         GUAY|
|  -114.58|   33.63|              29.0|     1387

In [114]:
añadido = filtrado.withColumn('catgoria_cliente',F.when(
    (filtrado.median_income>3), 'TARGET'
).when((filtrado.median_income>1) & (filtrado.median_income<3),'CUASITARGET').otherwise(F.lit('IGNORE')))

añadido.show()

      

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+------------+------------+----------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|fecha_actual|fecha_cierre|catgoria_cliente|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+------------+------------+----------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|  2023-09-25|  2023-12-25|     CUASITARGET|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|  2023-09-25|  2023-12-25|     CUASITARGET|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|  2023-09-25|  2023-12-25|          TARGET|
|  -114.57

SELECT CASE WHEN MEDIAN_INCOME>3 THEN 'TARGET' 
            WHEN (MEDIAN_INCOME > 1 AND MEDIAN_INCOME<3) THEN 'CUASI' ELSE 'IGNORE' END AS 'COLUMNA'
            

### Renombrado de columnas

En el apartado de `sorting`,  ya hemos visto un par de ejemplos.

Vamos a refrescarlo!

1. `withColumnRenamed`


In [115]:
añadido = añadido.withColumnRenamed('catgoria_cliente','categoria_xelenial')
añadido.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+------------+------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|fecha_actual|fecha_cierre|categoria_xelenial|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+------------+------------+------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|  2023-09-25|  2023-12-25|       CUASITARGET|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|  2023-09-25|  2023-12-25|       CUASITARGET|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|  2023-09-25|  2023-12-25|            TARGET

2. Utilizando `alias` sobre una columna determinada, por ejemplo:

In [72]:
df_spark.select(F.col('housing_median_age').alias('edad casa')).limit(1).show()

+---------+
|edad casa|
+---------+
|     15.0|
+---------+



## Operaciones sobre una columna, utilizando más de una columna

In [105]:
## Vamos a añadir una nueva columna que sea el porcentaje de households por población

añadido.withColumn('household_per_pop', F.col('households')/F.col('population')).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+-------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|  household_per_pop|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+-------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|0.46502463054187193|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|0.41009743135518156|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0| 0.4388349514563107|
|  -114.57|   33.57|              20.0|     1454.0|         326.0|     624.0|     262.0|        1.925|           65500.0| 0.4198717948717949|
|  -11

# Otras operaciones interesantes

## Eliminar duplicados 



In [82]:
eliminetor = añadido.drop_duplicates(['longitude'])

In [83]:
eliminetor.where(eliminetor.longitude == -114.31).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+



## Funciones sobre fechas

In [84]:
filtrado = filtrado.withColumn('fecha_actual',F.current_date())
filtrado.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|fecha_actual|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|  2023-09-25|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|  2023-09-25|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|  2023-09-25|
|  -114.57|   33.57|              20.0|     1454.0|         326.0|     624.0|     262.0|        1.925|           65500.0|  2023-09-25|
|  -114.58|   33.63|              29.0|     1387.0|    

In [85]:
filtrado = filtrado.withColumn('fecha_cierre',F.add_months(F.col('fecha_actual'),3))
filtrado.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+------------+------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|fecha_actual|fecha_cierre|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+------------+------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|  2023-09-25|  2023-12-25|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|  2023-09-25|  2023-12-25|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|  2023-09-25|  2023-12-25|
|  -114.57|   33.57|              20.0|     1454.0|         326.0|     624.0|     262.0|        1.925|          

In [86]:
filtrado.withColumn('diferencia',F.datediff(filtrado.fecha_actual,filtrado.fecha_cierre)).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+------------+------------+----------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|fecha_actual|fecha_cierre|diferencia|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+------------+------------+----------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|  2023-09-25|  2023-12-25|       -91|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|  2023-09-25|  2023-12-25|       -91|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|  2023-09-25|  2023-12-25|       -91|
|  -114.57|   33.57|              20.0|     14

# Operaciones avanzadas

## Join

In [98]:
jugadores = spark.read.csv('player.csv',inferSchema=True, header=True, sep=';')
atributos = spark.read.csv('player_attributes.csv',inferSchema=True, header=True, sep=';')

                                                                                

In [99]:
jugadores.show()

+---+-------------+--------------------+------------------+-------------------+------+------+
| id|player_api_id|         player_name|player_fifa_api_id|           birthday|height|weight|
+---+-------------+--------------------+------------------+-------------------+------+------+
|  1|       505942|  Aaron Appindangoye|            218353|1992-02-29 00:00:00|182.88|   187|
|  2|       155782|     Aaron Cresswell|            189615|1989-12-15 00:00:00|170.18|   146|
|  3|       162549|         Aaron Doran|            186170|1991-05-13 00:00:00|170.18|   163|
|  4|        30572|       Aaron Galindo|            140161|1982-05-08 00:00:00|182.88|   198|
|  5|        23780|        Aaron Hughes|             17725|1979-11-08 00:00:00|182.88|   154|
|  6|        27316|          Aaron Hunt|            158138|1986-09-04 00:00:00|182.88|   161|
|  7|       564793|          Aaron Kuhl|            221280|1996-01-30 00:00:00|172.72|   146|
|  8|        30895|        Aaron Lennon|            152747|1

In [100]:
atributos.show()

+---+------------------+-------------+-------------------+--------------+---------+--------------+-------------------+-------------------+--------+---------+----------------+-------------+-------+---------+-----+------------------+------------+------------+------------+------------+-------+---------+-------+----------+-------+-------+--------+----------+----------+-------------+-----------+------+---------+-------+---------------+--------------+---------+-----------+----------+--------------+-----------+
| id|player_fifa_api_id|player_api_id|               date|overall_rating|potential|preferred_foot|attacking_work_rate|defensive_work_rate|crossing|finishing|heading_accuracy|short_passing|volleys|dribbling|curve|free_kick_accuracy|long_passing|ball_control|acceleration|sprint_speed|agility|reactions|balance|shot_power|jumping|stamina|strength|long_shots|aggression|interceptions|positioning|vision|penalties|marking|standing_tackle|sliding_tackle|gk_diving|gk_handling|gk_kicking|gk_posit

In [101]:
resultado = jugadores.select('player_api_id','height').join(atributos,on='player_api_id',how='left')

In [102]:
resultado.show()



+-------------+------+---+------------------+-------------------+--------------+---------+--------------+-------------------+-------------------+--------+---------+----------------+-------------+-------+---------+-----+------------------+------------+------------+------------+------------+-------+---------+-------+----------+-------+-------+--------+----------+----------+-------------+-----------+------+---------+-------+---------------+--------------+---------+-----------+----------+--------------+-----------+
|player_api_id|height| id|player_fifa_api_id|               date|overall_rating|potential|preferred_foot|attacking_work_rate|defensive_work_rate|crossing|finishing|heading_accuracy|short_passing|volleys|dribbling|curve|free_kick_accuracy|long_passing|ball_control|acceleration|sprint_speed|agility|reactions|balance|shot_power|jumping|stamina|strength|long_shots|aggression|interceptions|positioning|vision|penalties|marking|standing_tackle|sliding_tackle|gk_diving|gk_handling|gk_ki

                                                                                

## Agregaciones

Las agregaciones se realizan con la secuencia de operaciones siguientes:

`groupBy` + `agg (función[.alias()])`

`groubBy` + `función de agregación` -- `sum / mean / max / min ...` `[.withColumnRenamed]`

`groupBy` + `agg ({'campo':'funcion_de_agregacion})`
..

In [112]:
añadido.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)



In [116]:
añadido.groupby('categoria_xelenial').mean('median_house_value').withColumnRenamed('avg(median_house_value)','media_valor').show()

+------------------+------------------+
|categoria_xelenial|       media_valor|
+------------------+------------------+
|            IGNORE|150011.30985915492|
|            TARGET|250504.73766414644|
|       CUASITARGET|133579.02144288577|
+------------------+------------------+



In [117]:
añadido.groupby('categoria_xelenial').mean('median_house_value').show()

+------------------+-----------------------+
|categoria_xelenial|avg(median_house_value)|
+------------------+-----------------------+
|            IGNORE|     150011.30985915492|
|            TARGET|     250504.73766414644|
|       CUASITARGET|     133579.02144288577|
+------------------+-----------------------+



In [118]:


añadido.groupby('categoria_xelenial').agg(F.mean("median_house_value").alias("media")).show()

+------------------+------------------+
|categoria_xelenial|             media|
+------------------+------------------+
|            IGNORE|150011.30985915492|
|            TARGET|250504.73766414644|
|       CUASITARGET|133579.02144288577|
+------------------+------------------+



In [119]:
añadido.groupby('categoria_xelenial').agg(F.count_distinct('median_house_value').alias('conteo')).show()

[Stage 127:>                                                        (0 + 1) / 1]

+------------------+------+
|categoria_xelenial|conteo|
+------------------+------+
|            IGNORE|    63|
|            TARGET|  3269|
|       CUASITARGET|  1763|
+------------------+------+



                                                                                

In [120]:
atributos.groupBy('preferred_foot').agg(F.count_distinct('player_api_id').alias('tipo')).show()



+--------------+----+
|preferred_foot|tipo|
+--------------+----+
|          null| 756|
|          left|3202|
|         right|8979|
+--------------+----+



                                                                                

In [121]:
atributos.groupBy('preferred_foot').agg(F.first('player_api_id').alias('elprimero')).show()

+--------------+---------+
|preferred_foot|elprimero|
+--------------+---------+
|          null|    31684|
|          left|   155782|
|         right|   505942|
+--------------+---------+





In [122]:
atributos.groupBy('preferred_foot').mean('overall_rating').show()

+--------------+-------------------+
|preferred_foot|avg(overall_rating)|
+--------------+-------------------+
|          null|               null|
|          left|   68.6261820132788|
|         right|  68.59155835241928|
+--------------+-------------------+



In [123]:
resultado.groupBy('preferred_foot').mean('overall_rating').withColumnRenamed('avg(overall_rating)','media').show()

+--------------+-----------------+
|preferred_foot|            media|
+--------------+-----------------+
|          null|             null|
|          left| 68.6261820132788|
|         right|68.59155835241928|
+--------------+-----------------+



In [124]:
%%time
resultado.groupBy('preferred_foot').sum('overall_rating').withColumnRenamed('sum(overall_rating)','total').show()

+--------------+-------+
|preferred_foot|  total|
+--------------+-------+
|          null|   null|
|          left|3069855|
|         right|9493689|
+--------------+-------+

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 1.18 s


In [126]:
from pyspark.sql.functions import udf
@udf
def funcioncita (cosa, cosa2):
  return xx

# SQL 

SPARL SQL es una librería que funciona sobre SPARK Core. Permite gestionar tus datos sobre SPARK como si se tratase de SQL, facilitando la interacción con el usuario.

Spark, por defecto, utiliza Apache Hive para guardar todos los metadatos sobre las tablas que se almacenan (hive metastore). Del mismo modo, spark permite crear tablas gestionadas o no gestionadas.

Para una tabla gestionada, Spark gestion datos y metadatos en el archivo, que puede ser un sistema de ficheros, HDFS o Storage en GCP. Para una tabla sin gestionar, Spark sólo guarda los metadatos mientras que tú gesitionas esa información.

De este modo, si se realiza una operación de creación o borrado en una tabla sin gestionar desde spark, sólo se borraran los metadatos, pero los datos se mantendrán.

Esta información se puede gestionar en la configuración de Spark.

Del mismo modo, Spark puede crear **vistas** para consultar la información con Spark SQL. Las vistas pueden ser globales o locales a la sesión y desaparecen cuando la sesión de Spark desaparece.

En los ejemplos subsiguientes nosotros sólo manejaremos vistas.

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Información de tráfico aéreo") \
    .getOrCreate()

In [127]:
airlines = spark.read\
                .format("csv")\
                .option("header", "true")\
                .load('airlines.csv')

In [128]:
airlines

DataFrame[Name: string,  IATA: string,  ICAO: string,  Country: string,  Active: string]

Las vistas temporales están ligadas a esta sesión y app. Si quisiéramos una vista que estuviera ligada a todas las sesiones, deberíamos seleccionar GlobalTempView

In [129]:
airlines.createOrReplaceTempView("airlines")

In [130]:
airlines

DataFrame[Name: string,  IATA: string,  ICAO: string,  Country: string,  Active: string]

In [131]:
airlines = spark.sql("SELECT * FROM airlines")
airlines.columns

['Name', ' IATA', ' ICAO', ' Country', ' Active']

In [132]:
type(airlines)

pyspark.sql.dataframe.DataFrame

In [133]:
airlines.show(1)

+----------+-----+-----+---------+-------+
|      Name| IATA| ICAO|  Country| Active|
+----------+-----+-----+---------+-------+
|Air Asia X|   ZM|  IDX|Indonesia|      Y|
+----------+-----+-----+---------+-------+
only showing top 1 row



In [134]:
airlines.show()

+--------------------+-----+-----+--------------+-------+
|                Name| IATA| ICAO|       Country| Active|
+--------------------+-----+-----+--------------+-------+
|          Air Asia X|   ZM|  IDX|     Indonesia|      Y|
|            Alghanim|   ZL|  KYA| United States|      Y|
|Norwegian Air Int...|   ZK|  IBK|        Norway|      Y|
|Boliviana de Avia...|   ZI|  BOV|       Bolivia|      Y|
| AtlasGlobal Ukraine|   ZH|  UJX|       Ukraine|      Y|
|           Pouya Air|   ZE|  PYA|          Iran|      Y|
|Mann Yadanarpon A...|   ZB|  MYP|         Burma|      Y|
|               BA101|   Z9|  710|United Kingdom|      Y|
| Fly Africa Zimbabwe|   Z6|  FZW|      Zimbabwe|      Y|
|           Jet Suite|   Z4|  RSP| United States|      Y|
|FTI Fluggesellschaft|   Z3|  FTI|       Germany|      N|
|              Go2Sky|   Z2|  RLX|      Slovakia|      Y|
|          Tomsk-Avia|   YT|  TKS|        Russia|      Y|
|     Simrik Airlines|   YQ|  RMK|         Nepal|      Y|
|      Russkie

In [138]:
vuelos = spark.read\
                .format("csv")\
                .option("header", "true")\
                .load('flights.csv')

In [139]:
## Generación de una vista temporal en la sesión sobre una bbdd por defecto

vuelos.createOrReplaceTempView("vuelos")
vuelos = spark.sql("SELECT * FROM vuelos")

In [140]:
vuelos.show()

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|2008|    1|         3|        4|   2003|      1955|   2211|      2225|           WN|      335

In [141]:
vuelos.count()

100000

In [142]:
spark.sql("select count(*) as registros from vuelos").show()

+---------+
|registros|
+---------+
|   100000|
+---------+



In [143]:
spark.sql("select sum(distance) as distancia_total from vuelos").show()



+---------------+
|distancia_total|
+---------------+
|    6.3058632E7|
+---------------+



                                                                                

In [144]:


total_distance_df = spark.sql("SELECT distance FROM vuelos")\
                         .agg({"distance":"sum"})\
                         .withColumnRenamed("sum(distance)","distancia_total")

In [145]:
total_distance_df.show()

+---------------+
|distancia_total|
+---------------+
|    6.3058632E7|
+---------------+



In [149]:
spark.sql ("select dest, avg(DepDelay) as retraso from vuelos group by dest order by 2 desc ").show()

[Stage 172:>                                                        (0 + 3) / 3]

+----+------------------+
|dest|           retraso|
+----+------------------+
| SAV|              43.0|
| SFO| 36.82403433476395|
| ROC|              28.0|
| IAH|24.692307692307693|
| GSO|              22.5|
| BFL|21.333333333333332|
| MYR|              20.0|
| RIC|              19.0|
| SLC|  15.5960219478738|
| EWR|13.777777777777779|
| RNO| 13.55028157683025|
| OKC|13.497635933806146|
| PDX| 13.47018150388937|
| LAS|13.395097007068733|
| MRY|13.311926605504587|
| IAD|13.186274509803921|
| LAX|13.084322678843227|
| TUS| 12.71103896103896|
| DEN|12.643714466203411|
| BOI| 12.56043956043956|
+----+------------------+
only showing top 20 rows



