# Laboratorio 9 Data Science

## Nelson  García Bravatti


# Fundamentos de Spark y Python — Laboratorio con DataFrames (Walmart Stock)

**Curso:** Data Science Sección 20

**Duración estimada:** 1.5–2.5 horas

**Modalidad:** Individual (colaboración para dudas conceptuales permitida, entrega individual)

## Objetivos de aprendizaje
Al finalizar, podrá:
1. Iniciar una **SparkSession** y trabajar con **PySpark DataFrames**.
2. **Cargar** un CSV con encabezados e inferencia de tipos.
3. Explorar estructura: **columnas**, **esquema** y **muestras**.
4. Ejecutar **descriptivos** y agregaciones.
5. Aplicar **filtros**, **transformaciones** y **creación de columnas**.
6. Calcular medidas estadísticas (p. ej., **correlación de Pearson**).
7. Realizar consultas **temporales** (día con máximo precio; máximos por año).
8. Comunicar hallazgos de forma ordenada y reproducible.


## Datos
- **Archivo:** `walmart_stock.csv`
- **Periodo:** 2012–2017
- **Columnas típicas:** `Date`, `Open`, `High`, `Low`, `Close`, `Volume`, `Adj Close`

> *Nota:* No modifique el CSV; todas las transformaciones se realizan en el notebook.


## Requisitos previos
- Python 3.9+ (o entorno equivalente en **Google Colab**)
- **Apache Spark 3.x** con PySpark (o `pyspark` preinstalado en Colab)
- Conocimientos básicos de: tipos de datos, funciones de agregación, y uso de notebooks.


## Entregables
1. **Notebook ejecutado** (`.ipynb`) con todas las celdas y salidas visibles.
2. **Conclusiones breves** (5–10 líneas) al final del notebook con interpretaciones clave.
3. Código **comentado** y ordenado.

**Formato de entrega:** Subir `.ipynb` .


## 1) Configuración del entorno
**Opción A — Local:**
```python
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("Lab Spark DF — Walmart") \
    .getOrCreate()
print("Spark version:", spark.version)
```

**Opción B — Colab (sugerida si no tiene Spark local):**
```python
!pip -q install pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Lab Spark DF — Walmart").getOrCreate()
print("Spark version:", spark.version)
```


## 2) Tareas (complete en orden y deje **toda** la evidencia en el notebook)

### 2.1 Inicie una sesión de Spark (si no está iniciada)

In [1]:
!pip -q install pyspark

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Lab Spark DF — Walmart").getOrCreate()
print("Spark version:", spark.version)

Spark version: 3.5.1


### 2.2 Cargue el archivo CSV

In [3]:
#Cargar el archivo CSV
file_path = '/content/walmart_stock.csv'
df_stocks = (
    spark.read
         .option("header", True)
         .option("inferSchema", True)
         .csv(file_path)
)

### 2.3 ¿Cuáles son los nombres de las columnas?

In [4]:
#Obtener los nombres de las columnas
columns = df_stocks.columns
print("Columnas:", columns)

Columnas: ['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']


### 2.4 Muestre el **esquema** de los datos

In [5]:
#Mostrar el esquema de los datos
df_stocks.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



### 2.5 Muestre las **primeras 5 filas**

In [6]:
#Ver las primeras filas del dataframe
df_stocks.show(5)

+----------+------------------+---------+---------+------------------+--------+------------------+
|      Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+----------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
|2012-01-06|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922|
|2012-01-09|         59.029999|59.549999|58.919998|             59.18| 6679300|51.616215000000004|
+----------+------------------+---------+---------+------------------+--------+------------------+
only showing top 5 rows



### 2.6 Descriptivos con `describe()` + interpretación

In [7]:
from pyspark.sql import functions as F, types as T

#Seleccionar columnas numéricas para describir
numeric_types = (T.DoubleType, T.IntegerType, T.LongType, T.FloatType, T.ShortType, T.DecimalType)
num_cols = [f.name for f in df_stocks.schema.fields if isinstance(f.dataType, numeric_types)]

#Validar que existan columnas numéricas
if not num_cols:
    raise ValueError("No se detectaron columnas numéricas para describir. Revisa el esquema/lectura del CSV.")

#Descriptivos básicos con describe()
desc_basic = df_stocks.select(num_cols).describe()
desc_basic.show(truncate=False)


+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|Open              |High             |Low              |Close            |Volume           |Adj Close        |
+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|count  |1258              |1258             |1258             |1258             |1258             |1258             |
|mean   |72.35785375357709 |72.83938807631165|71.9186009594594 |72.38844998012726|8222093.481717011|67.23883848728146|
|stddev |6.76809024470826  |6.768186808159218|6.744075756255496|6.756859163732991|4519780.8431556  |6.722609449996857|
|min    |56.389998999999996|57.060001        |56.299999        |56.419998        |2094900          |50.363689        |
|max    |90.800003         |90.970001        |89.25            |90.470001        |80898100         |84.91421600000001|
+-------+------------------+-----------------+--

*(Escriba aquí sus interpretaciones de al menos dos métricas de `describe()`)*

### Interpretación de métricas:

- Media y desviación estándar del precio de cierre (Close)

  - Media ≈ 72.39; desviación estándar ≈ 6.76.

En el periodo observado (n = 1,258 sesiones), el precio de cierre típico de la acción se sitúa alrededor de 72.39. La variabilidad relativa es moderada (CV ≈ 9.3%), lo que sugiere que, aunque existen fluctuaciones diarias, el precio se ha mantenido en un rango relativamente acotado respecto de su nivel medio.

- Media y desviación estándar del volumen (Volume)

  - Media ≈ 8.22 millones; desviación estándar ≈ 4.52 millones.

El volumen promedio negociado por sesión es cercano a 8.22 millones de acciones, pero presenta una dispersión alta (CV ≈ 54.9%). Esto indica que la actividad de negociación varía de forma marcada entre sesiones, con días de negociación significativamente más intensa alternando con días de menor actividad.

### 2.7 Máximo y mínimo de `Volume`

In [8]:
#Agregación para máx y mín
agg_volume = df_stocks.agg(
    F.max("Volume").alias("max_volume"),
    F.min("Volume").alias("min_volume")
)

agg_volume.show()

vol_metrics = agg_volume.first()
max_volume = vol_metrics["max_volume"]
min_volume = vol_metrics["min_volume"]

print(f"Max Volume: {max_volume:,d}")
print(f"Min Volume: {min_volume:,d}")

+----------+----------+
|max_volume|min_volume|
+----------+----------+
|  80898100|   2094900|
+----------+----------+

Max Volume: 80,898,100
Min Volume: 2,094,900


### 2.8 ¿Cuántos días tuvieron `Close < 60`?

In [10]:
#Filtrar días con Close definido y menor a 60
days_close_lt_60 = (
    df_stocks
    .filter(F.col("Close").isNotNull() & (F.col("Close") < 60))
    .count()
)

print(f"Días con Close < 60: {days_close_lt_60}")


(
    df_stocks
    .filter(F.col("Close") < 60)
    .select("Date", "Close")
    .orderBy(F.col("Close").asc())
    .show(10, truncate=False)
)

Días con Close < 60: 81
+----------+------------------+
|Date      |Close             |
+----------+------------------+
|2015-11-13|56.419998         |
|2015-11-12|56.950001         |
|2015-10-30|57.240002000000004|
|2012-04-25|57.360001000000004|
|2015-10-27|57.48             |
|2015-11-11|57.580002         |
|2015-11-02|57.610001000000004|
|2015-10-28|57.639998999999996|
|2012-04-24|57.77             |
|2015-11-16|57.869999         |
+----------+------------------+
only showing top 10 rows



### 2.9 Crée la columna `Tasa_HV = High/Volume`

In [11]:
df_stocks_feat = (
    df_stocks
    .withColumn(
        "Tasa_HV",
        F.when(F.col("High").isNotNull() & F.col("Volume").isNotNull() & (F.col("Volume") > 0),
               (F.col("High") / F.col("Volume")).cast(T.DoubleType()))
         .otherwise(F.lit(None).cast(T.DoubleType()))
    )
).cache()

#Verificar tipo de la nueva columna y primera muestra
df_stocks_feat.select("High", "Volume", "Tasa_HV").printSchema()
df_stocks_feat.select("Date", "High", "Volume", "Tasa_HV").show(5, truncate=False)

root
 |-- High: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Tasa_HV: double (nullable = true)

+----------+---------+--------+--------------------+
|Date      |High     |Volume  |Tasa_HV             |
+----------+---------+--------+--------------------+
|2012-01-03|61.060001|12668800|4.819714653321546E-6|
|2012-01-04|60.349998|9593300 |6.290848613094555E-6|
|2012-01-05|59.619999|12768200|4.669412994783916E-6|
|2012-01-06|59.450001|8069400 |7.367338463826307E-6|
|2012-01-09|59.549999|6679300 |8.915604778943901E-6|
+----------+---------+--------+--------------------+
only showing top 5 rows



La métrica Tasa_HV relaciona el nivel de precio intradía máximo (High) con la intensidad de negociación (Volume). Una Tasa_HV más alta sugiere que, para un mismo volumen, el precio logró máximos relativamente mayores (o, equivalente, que con poco volumen se alcanzaron máximos altos), lo que puede señalar movimientos “caros” en términos de liquidez y posibles episodios de menor profundidad del libro de órdenes. A la inversa, valores bajos indican que máximos similares requirieron más volumen, coherente con mayor presión/participación del mercado.

### 2.10 ¿Qué porcentaje del tiempo `High > 80`?

In [12]:
#Denominador: días con High no nulo
total_dias = df_stocks_feat.filter(F.col("High").isNotNull()).count()

#Numerador: días con High > 80
dias_high_gt_80 = (
    df_stocks_feat
    .filter(F.col("High").isNotNull() & (F.col("High") > 80))
    .count()
)

porcentaje_high_gt_80 = (dias_high_gt_80 / total_dias * 100.0) if total_dias > 0 else 0.0

print(f"Días con High > 80: {dias_high_gt_80} de {total_dias}")
print(f"Porcentaje del tiempo con High > 80: {porcentaje_high_gt_80:.2f}%")

(
    df_stocks_feat
    .filter(F.col("High") > 80)
    .select("Date", "High")
    .orderBy(F.col("Date").asc())
    .show(10, truncate=False)
)

Días con High > 80: 115 de 1258
Porcentaje del tiempo con High > 80: 9.14%
+----------+-----------------+
|Date      |High             |
+----------+-----------------+
|2013-11-25|80.57            |
|2013-11-26|80.68            |
|2013-11-27|81.0             |
|2013-11-29|81.349998        |
|2013-12-02|81.279999        |
|2013-12-03|81.33000200000001|
|2013-12-04|81.370003        |
|2013-12-06|80.230003        |
|2013-12-09|80.43            |
|2014-11-10|80.129997        |
+----------+-----------------+
only showing top 10 rows



### 2.11 Correlación de Pearson entre `High` y `Volume` + interpretación

In [13]:
df_base = globals().get("df_stocks_feat", globals().get("df_stocks"))

#Subconjunto con columnas requeridas y sin nulos/volumen no positivo
df_corr = (
    df_base
    .select("High", "Volume")
    .filter(F.col("High").isNotNull() & F.col("Volume").isNotNull() & (F.col("Volume") > 0))
    .cache()
)

#Correlación de Pearson
r_high_volume = df_corr.stat.corr("High", "Volume", method="pearson")
n_obs = df_corr.count()

print(f"n = {n_obs}")
print(f"Correlación de Pearson (High vs. Volume): r = {r_high_volume:.4f}")

r2 = (r_high_volume ** 2) if r_high_volume is not None else None
if r2 is not None:
    print(f"Coeficiente de determinación aproximado: r^2 = {r2:.4f}")


n = 1258
Correlación de Pearson (High vs. Volume): r = -0.3384
Coeficiente de determinación aproximado: r^2 = 0.1145


*(Escriba aquí una interpretación breve del signo y magnitud de la correlación)*

La correlación r=-0.3384 indica una relación lineal negativa débil-moderada: en días con mayores máximos intradía (High), tiende a negociarse menos volumen, y viceversa. Su magnitud implica que la asociación es limitada: r^2 ≈ 0.1145 sugiere que ≈11.5% de la variabilidad de High se relaciona linealmente con Volume; el resto responde a otros factores o relaciones no lineales.

### 2.12 ¿Qué día tuvo el **precio más alto** (`High`)? Devuelva la fila completa

In [14]:
df_base = df_stocks.cache()

#Obtener el valor máximo de High
max_high = df_base.agg(F.max("High").alias("max_high")).first()["max_high"]

#Devolver las filas
df_max_high_rows = (
    df_base
    .filter(F.col("High") == max_high)
    .orderBy(F.col("Date").asc())
)

print(f"Valor máximo de High: {max_high}")
df_max_high_rows.show(truncate=False)

Valor máximo de High: 90.970001
+----------+---------+---------+-----+---------+-------+---------+
|Date      |Open     |High     |Low  |Close    |Volume |Adj Close|
+----------+---------+---------+-----+---------+-------+---------+
|2015-01-13|90.800003|90.970001|88.93|89.309998|8215400|83.825448|
+----------+---------+---------+-----+---------+-------+---------+



### 2.13 **Media** de la columna `Close`

In [15]:
mean_close_row = df_base.agg(F.mean("Close").alias("mean_close")).first()
mean_close = mean_close_row["mean_close"]

print(f"Media de Close: {mean_close:.6f}")

Media de Close: 72.388450


### 2.14 **Máximo `High` por año**

In [16]:
#Máximo por año cronológicamente
df_high_by_year = (
    df_base
    .withColumn("Year", F.year("Date"))
    .groupBy("Year")
    .agg(F.max("High").alias("max_high_year"))
    .orderBy(F.col("Year").asc())
)

df_high_by_year.show(truncate=False)

+----+-------------+
|Year|max_high_year|
+----+-------------+
|2012|77.599998    |
|2013|81.370003    |
|2014|88.089996    |
|2015|90.970001    |
|2016|75.190002    |
+----+-------------+



## 3) Conclusiones (5–10 líneas)

*Escriba aquí sus principales hallazgos e interpretaciones.*

## Buenas prácticas
- Comente bloques no triviales.
- Nombres de variables **claros** (`df_prices`, `max_high_year`, etc.).
- Reutilice resultados intermedios para evitar recalcular.
- Si usa Colab, fije versiones cuando sea necesario.
- Cierre la sesión de Spark al final si corre local: `spark.stop()`.


## Errores comunes
- Olvidar `inferSchema=True` → todo se carga como `string`.
- Mezclar API RDD con DataFrames sin necesidad.
- Usar funciones de Python puras en `withColumn` (use `pyspark.sql.functions`).
- Intentar graficar DataFrames de Spark directamente: primero **convierta** a Pandas con `.toPandas()` en subconjuntos pequeños.


## Rúbrica de evaluación (100 puntos)
**A. Preparación del ambiente** (10 pts)
- (10) SparkSession creada sin errores; versiones y entorno claros.

**B. Carga y documentación de datos** (15 pts)
- (8) CSV cargado con `header` y `inferSchema` correctos.
- (7) Comentarios breves sobre las columnas y supuestos.

**C. Exploración básica** (10 pts)
- (4) Lista de columnas.
- (3) `printSchema()` bien interpretado.
- (3) `show(5)` con observaciones puntuales.

**D. Descriptivos** (10 pts)
- (6) `describe()` ejecutado y leído correctamente.
- (4) Al menos 2 interpretaciones numéricas.

**E. Agregaciones y filtros** (10 pts)
- (5) Máx./mín. de `Volume` correctos.
- (5) Conteo de días con `Close < 60` correcto.

**F. Ingeniería de características** (10 pts)
- (8) Columna `Tasa_HV = High/Volume` correcta y con tipo numérico.
- (2) Justificación breve del indicador.

**G. Métricas estadísticas** (10 pts)
- (7) Correlación `High`–`Volume` calculada.
- (3) Interpretación del valor (signo y magnitud).

**H. Consultas temporales** (10 pts)
- (5) Día con `High` máximo identificado.
- (5) Máximo `High` por año con agrupación y orden correctos.

**I. Comunicación de resultados** (10 pts)
- (6) Conclusiones finales claras y concisas (5–10 líneas).
- (4) Orden, legibilidad y limpieza del notebook.

**J. Estilo y calidad de código** (5 pts)
- (5) Convenciones PEP8 razonables, nombres significativos y ausencia de código muerto.

> **Total: 100 puntos**
