In [None]:
# Instalamos Spark para Python
!pip install pyspark

import os

# Instalamos Java SDK 8
!apt-get install -y openjdk-8-jdk -qq > /dev/null
!echo $(/usr/libexec/java_home -v 1.8)

#set environment variable
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
!echo 2 | update-alternatives --config java

/bin/bash: line 1: /usr/libexec/java_home: No such file or directory

There are 2 choices for the alternative java (providing /usr/bin/java).

  Selection    Path                                            Priority   Status
------------------------------------------------------------
* 0            /usr/lib/jvm/java-11-openjdk-amd64/bin/java      1111      auto mode
  1            /usr/lib/jvm/java-11-openjdk-amd64/bin/java      1111      manual mode
  2            /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java   1081      manual mode

Press <enter> to keep the current choice[*], or type selection number: update-alternatives: using /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java to provide /usr/bin/java (java) in manual mode


In [None]:
from pyspark import SparkContext
sc = SparkContext("local", "Prueba")

root_path = '/content/'

# Practica 5, ejercicio 5)

El dataset EstacionesMeteorológicas posee información sobre registros de datos climáticos tomados por sus estaciones. Este dataset tiene tuplas con la siguiente información:

<ID_Estación, fecha_registro, temperatura, humedad, precipitación>
Y además está conformado por dos archivos:

a. estacionNorte.txt almacena la información en grados centígrados, porcentaje de humedad, y milímetros de lluvia.

b. estacionSur.txt almacena la información en grados fahrenheit, porcentaje de humedad y centímetros de lluvia.

Implemente una solución en Spark que permita obtener:
• el promedio de temperatura, de humedad y precipitación total entre todas las estaciones.

• el ID de la estación y la fecha que registró

o la temperatura más fría

o la temperatura más calurosa

o la de mayor humedad

o la de menor humedad

o la de más precipitación

o la de menor precipitación

NOTA: En caso de dos estaciones con igual máximo o mínimo, devolver cualquiera de las dos.

In [None]:
#Ejercicio 5a, practica 5
rdd_norte = sc.textFile("/content/Norte*.txt")
rdd_sur = sc.textFile("/content/Sur*.txt")

def formato_norte(line):
    parts = line.split("\t")
    #parts = [ID_estacion, fecha, Tº, humedad, precipitación]
    return (float(parts[2]), float(parts[3]), float(parts[4]))

def formato_sur(line):
    parts = line.split("\t")
    # convertir °F a °C y cm a mm
    temp_c = (float(parts[2]) - 32) * 5/9
    prec_mm = float(parts[4]) * 10
    return (temp_c, float(parts[3]), prec_mm)

datos_norte = rdd_norte.map(formato_norte).filter(lambda x: x is not None) # Filter out None values
datos_sur = rdd_sur.map(formato_sur).filter(lambda x: x is not None) # Filter out None values

todos_datos = datos_norte.union(datos_sur)

# 4. Calcular acumulados y promedios
acumulado = todos_datos.aggregate((0.0, 0.0, 0.0, 0), #Inicializa acum en 0 (T1, H, P, cant tuplas)
    lambda acc, x: (acc[0]+x[0], acc[1]+x[1], acc[2]+x[2], acc[3]+1),
    #x= tulpla, acc= acumulador. Suma para cada acumulador, lo que le corresponde y +1 al que registra cant

    lambda acc1, acc2: (acc1[0]+acc2[0], acc1[1]+acc2[1], acc1[2]+acc2[2], acc1[3]+acc2[3])
    #Suma todos los acumulados parciales de cada particion
)

prom_temp = acumulado[0] / acumulado[3]
prom_hum = acumulado[1] / acumulado[3]
total_prec = acumulado[2]

print(f"Promedio temperatura: {prom_temp:.2f} °C")
print(f"Promedio humedad: {prom_hum:.2f} %")
print(f"Precipitación total: {total_prec:.2f} mm")

ruta_salida = "/content/resultados_meteorologicos.txt"

with open(ruta_salida, "w") as f:
    f.write(f"Promedio temperatura: {prom_temp:.2f} °C\n")
    f.write(f"Promedio humedad: {prom_hum:.2f} %\n")
    f.write(f"Precipitación total: {total_prec:.2f} mm\n")

print(f"✅ Resultados guardados en {ruta_salida}")

Promedio temperatura: 19.83 °C
Promedio humedad: 49.94 %
Precipitación total: 197583366.00 mm
✅ Resultados guardados en /content/resultados_meteorologicos.txt


In [None]:
#Ejercicio 5b, practica 5

def parse_norte(line):
    parts = line.strip().split("\t")
    if len(parts) >= 5:
        try:
            return (parts[0], parts[1], float(parts[2]), float(parts[3]), float(parts[4]))
        except ValueError:
            return None
    else:
        return None

def parse_sur(line):
    parts = line.strip().split("\t")
    if len(parts) >= 5:
        try:
            temp_c = (float(parts[2]) - 32) * 5/9
            prec_mm = float(parts[4]) * 10
            return (parts[0], parts[1], temp_c, float(parts[3]), prec_mm)
        except ValueError:
            return None
    else:
        return None

unidos = rdd_norte.map(parse_norte).filter(lambda x: x is not None)\
         .union(rdd_sur.map(parse_sur).filter(lambda x: x is not None))

mas_fria = unidos.reduce(
    lambda x, y: x if x[2] < y[2] else y
)
mas_calor = unidos.reduce(
    lambda x, y: x if x[2] > y[2] else y
)
mayor_humedad = unidos.reduce(
    lambda x, y: x if x[3] > y[3] else y
)
menor_humedad = unidos.reduce(
    lambda x, y: x if x[3] < y[3] else y
)
mayor_precipitacion = unidos.reduce(
    lambda x, y: x if x[4] > y[4] else y
)
menor_precipitacion = unidos.reduce(
    lambda x, y: x if x[4] < y[4] else y
)
print("Temperatura más fría:", mas_fria[0], mas_fria[1])
print("Temperatura más calurosa:", mas_calor[0], mas_calor[1])

print("Mayor humedad:", mayor_humedad[0], mayor_humedad[1])
print("Menor humedad:", menor_humedad[0], menor_humedad[1])

print("Mayor precipitación:", mayor_precipitacion[0], mayor_precipitacion[1])
print("Menor precipitación:", menor_precipitacion[0], menor_precipitacion[1])

# Ruta de salida
ruta_salida = "/content/resultados_meteorologicos2.txt"

with open(ruta_salida, "w") as f:
    f.write(f"Temperatura más fría: {mas_fria[2]:.2f} °C - Estación {mas_fria[0]}, Fecha {mas_fria[1]}\n")
    f.write(f"Temperatura más calurosa: {mas_calor[2]:.2f} °C - Estación {mas_calor[0]}, Fecha {mas_calor[1]}\n\n")
    f.write(f"Mayor humedad: {mayor_humedad[3]:.2f} % - Estación {mayor_humedad[0]}, Fecha {mayor_humedad[1]}\n")
    f.write(f"Menor humedad: {menor_humedad[3]:.2f} % - Estación {menor_humedad[0]}, Fecha {menor_humedad[1]}\n\n")
    f.write(f"Mayor precipitación: {mayor_precipitacion[4]:.2f} mm - Estación {mayor_precipitacion[0]}, Fecha {mayor_precipitacion[1]}\n")
    f.write(f"Menor precipitación: {menor_precipitacion[4]:.2f} mm - Estación {menor_precipitacion[0]}, Fecha {menor_precipitacion[1]}\n")

print(f" Resultados guardados en {ruta_salida}")

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/socket.py", line 720, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

# Practica 6, Ejercicio 2)

Usando el dataset EstacionesMeteorologicas imprima el id de la estación que tiene el máximo registro de humedad, el id de la estación con máximo registro en temperatura y el id de la estación con el máximo registro de precipitación usando solo seis transformaciones, incluyendo la transformación textFile.

In [None]:
#Practica 6 Ejercicio 2

rdd_norte = sc.textFile("/content/Norte*.txt")
rdd_sur   = sc.textFile("/content/Sur*.txt")

def parse_norte(line):
    parts = line.strip().split("\t")
    if len(parts) >= 5:
        try:
            return (parts[0], parts[1], float(parts[2]), float(parts[3]), float(parts[4]))
        except ValueError:
            return None
    else:
        return None

def parse_sur(line):
    parts = line.strip().split("\t")
    if len(parts) >= 5:
        try:
            temp_c = (float(parts[2]) - 32) * 5/9
            prec_mm = float(parts[4]) * 10
            return (parts[0], parts[1], temp_c, float(parts[3]), prec_mm)
        except ValueError:
            return None
    else:
        return None

# 2. Parsear y filtrar líneas inválidas
datos_norte = rdd_norte.map(parse_norte).filter(lambda x: x is not None)
datos_sur   = rdd_sur.map(parse_sur).filter(lambda x: x is not None)

# 3. Unir Norte y Sur
datos = datos_norte.union(datos_sur)  # cada tupla: (id, fecha, temp, humedad, precip)

# 4. Máximo de humedad: devuelve toda la tupla con max humedad
max_humedad = datos.reduce(lambda a, b: a if a[3] > b[3] else b)

# 5. Máximo de temperatura
max_temp = datos.reduce(lambda a, b: a if a[2] > b[2] else b)

# 6. Máximo de precipitación
max_precip = datos.reduce(lambda a, b: a if a[4] > b[4] else b)

# Imprimir solo los IDs de las estaciones
print("ID estación máxima humedad:", max_humedad[0])
print("ID estación máxima temperatura:", max_temp[0])
print("ID estación máxima precipitación:", max_precip[0])


ID estación máxima humedad: 272
ID estación máxima temperatura: 296
ID estación máxima precipitación: 270
