# Puebas

In [5]:
# !pip install pyspark

Collecting py4j==0.10.9.7 (from pyspark)
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl.metadata (1.5 kB)
Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: py4j
Successfully installed py4j-0.10.9.7


In [1]:
import pyspark
from pyspark.sql import SparkSession
import time

In [2]:
spark = SparkSession.builder \
    .appName("Pruebas_Sabri1") \
    .getOrCreate()

# Verificar que la sesión se ha creado correctamente
print(spark)

<pyspark.sql.session.SparkSession object at 0x7fe36d8d3c10>


Importo las funciones de SQL

In [3]:
import pyspark.sql.functions as F

In [4]:
archivo = './data/Sample_Data_1500_Rows.csv'

In [5]:
import os
print(os.path.exists(archivo))

True


In [6]:
df_spark = spark.read.csv(archivo, inferSchema=True, header=True)

In [7]:
print(type(df_spark))

<class 'pyspark.sql.dataframe.DataFrame'>


In [8]:
df_spark.show()

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  1|  Alice| 61|
|  2|  David| 36|
|  3|    Bob| 36|
|  4|  Alice| 40|
|  5|  David| 33|
|  6|  David| 33|
|  7|  David| 46|
|  8|  David| 25|
|  9|    Bob| 45|
| 10|  David| 42|
| 11|    Bob| 42|
| 12|Charlie| 30|
| 13|  Alice| 31|
| 14|  David| 36|
| 15|Charlie| 32|
| 16|  Alice| 32|
| 17|  Alice| 38|
| 18|  Alice| 25|
| 19|Charlie| 62|
| 20|    Bob| 61|
+---+-------+---+
only showing top 20 rows



In [9]:
row_count = df_spark.count()
print(f"El número de filas en el DataFrame es: {row_count}")

El número de filas en el DataFrame es: 1500


In [13]:
df_spark.columns

['id', 'name', 'age']

Estructura del dataframe

In [12]:
df_spark.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)



# Estadísticas

In [10]:
df_spark.describe().show()

+-------+------------------+-----+-----------------+
|summary|                id| name|              age|
+-------+------------------+-----+-----------------+
|  count|              1500| 1500|             1500|
|   mean|             750.5| NULL|           41.086|
| stddev|433.15701541127095| NULL|13.40322230829202|
|    min|                 1|Alice|               18|
|    max|              1500|David|               64|
+-------+------------------+-----+-----------------+



In [14]:
df_spark.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
id,1500,750.5,433.15701541127095,1,1500
name,1500,,,Alice,David
age,1500,41.086,13.40322230829202,18,64


Descripción estadística de una columna

In [15]:
df_spark.describe(['age']).show()

+-------+-----------------+
|summary|              age|
+-------+-----------------+
|  count|             1500|
|   mean|           41.086|
| stddev|13.40322230829202|
|    min|               18|
|    max|               64|
+-------+-----------------+



In [16]:
df_test = df_spark
for i in range (1000):
    df_test = df_test.union(df_spark)
df_test.count()

1501500

In [22]:
tabla_temp = "tabla_temp"
df_spark.createOrReplaceTempView(tabla_temp)

In [23]:
consulta1 = spark.sql(f"DESCRIBE TABLE {tabla_temp}")

In [24]:
# Seleccionar las columnas necesarias y aplicar los filtros
consulta1 = consulta1.select("col_name", "data_type").filter(
    (F.col("col_name") != "cod_mes") & (~F.col("col_name").like("%#%")))

# Mostrar el resultado
consulta1.show()

+--------+---------+
|col_name|data_type|
+--------+---------+
|      id|      int|
|    name|   string|
|     age|      int|
+--------+---------+



Verifico con una consulta de SQL si hay valores nulos

In [25]:
df_test.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df_spark.columns]).show()

+---+----+---+
| id|name|age|
+---+----+---+
|  0|   0|  0|
+---+----+---+



En vez de usar vistas temporales para realizar las consultas, uso las API de DataFrame de PySpark

In [27]:
resultado = df_test.select("id", "age")
resultado.show()

+---+---+
| id|age|
+---+---+
|  1| 61|
|  2| 36|
|  3| 36|
|  4| 40|
|  5| 33|
|  6| 33|
|  7| 46|
|  8| 25|
|  9| 45|
| 10| 42|
| 11| 42|
| 12| 30|
| 13| 31|
| 14| 36|
| 15| 32|
| 16| 32|
| 17| 38|
| 18| 25|
| 19| 62|
| 20| 61|
+---+---+
only showing top 20 rows



In [33]:
#El nombre debe ser distinto de Alice y no debe contener la letra "o"
resultado = df_test.filter((F.col("name") != "Alice") & (~F.col("name").like("%o%")))
resultado.show()

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  2|  David| 36|
|  5|  David| 33|
|  6|  David| 33|
|  7|  David| 46|
|  8|  David| 25|
| 10|  David| 42|
| 12|Charlie| 30|
| 14|  David| 36|
| 15|Charlie| 32|
| 19|Charlie| 62|
| 21|Charlie| 45|
| 22|  David| 62|
| 23|  David| 33|
| 24|Charlie| 41|
| 33|  David| 19|
| 35|  David| 34|
| 37|Charlie| 44|
| 38|  David| 20|
| 39|  David| 35|
| 41|Charlie| 56|
+---+-------+---+
only showing top 20 rows



In [36]:
resultado = spark.sql("SELECT * FROM tabla_temp ORDER BY name")
resultado.show()

+---+-----+---+
| id| name|age|
+---+-----+---+
| 56|Alice| 53|
|118|Alice| 38|
| 59|Alice| 45|
|  4|Alice| 40|
| 66|Alice| 41|
| 16|Alice| 32|
| 68|Alice| 56|
| 18|Alice| 25|
| 69|Alice| 48|
| 30|Alice| 36|
| 70|Alice| 29|
| 34|Alice| 41|
| 74|Alice| 27|
| 43|Alice| 24|
| 75|Alice| 41|
| 96|Alice| 54|
|100|Alice| 55|
|  1|Alice| 61|
|103|Alice| 40|
| 17|Alice| 38|
+---+-----+---+
only showing top 20 rows



In [37]:
resultado = spark.sql("SELECT age, COUNT(*) as conteo FROM tabla_temp GROUP BY age")
resultado.show()

+---+------+
|age|conteo|
+---+------+
| 31|    32|
| 53|    27|
| 34|    38|
| 28|    37|
| 27|    28|
| 26|    33|
| 44|    32|
| 22|    34|
| 47|    41|
| 52|    28|
| 40|    31|
| 20|    25|
| 57|    34|
| 54|    35|
| 48|    37|
| 19|    29|
| 64|    30|
| 41|    28|
| 43|    32|
| 37|    31|
+---+------+
only showing top 20 rows



In [38]:
resultado = spark.sql("SELECT AVG(age) as edad_prom FROM tabla_temp")
resultado.show()

+---------+
|edad_prom|
+---------+
|   41.086|
+---------+

