<a href="https://colab.research.google.com/github/sergiocarp10/unlp-tramo-final/blob/main/P4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Instalamos Spark para Python
!pip install pyspark

import os

# Instalamos Java SDK 8
!apt-get install -y openjdk-8-jdk -qq > /dev/null      
!echo $(/usr/libexec/java_home -v 1.8)

#set environment variable
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"     
!echo 2 | update-alternatives --config java

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 46 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 59.1 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=12faac5fe2d2e85d6e0584ab2785ccaeff7284203418e01d0909c3a2dd13ae9f
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0
/bin/bash: /usr/libexec/java_home: No such file or directory

There are 2 choices for the alternative java (providing /

In [2]:
from pyspark import SparkContext
sc = SparkContext("local", "MyProgram")

In [None]:
root_path = 'drive/My Drive/Colab Notebooks/'

from google.colab import drive
drive.mount('/content/drive', force_remount=True)

import datetime, math
import sys
sys.path.append('/content/' + root_path)

inputDir = root_path + "Banco/"
outputDir = root_path + "output_TP4/"

# ------------------- transformaciones ----------------------

# Cliente: <ID_Cliente, nombre, apellido, DNI, fecha de nacimiento, nacionalidad>
# CajaDeAhorro: <ID_Caja, ID_Cliente, saldo>
# Prestamos: <ID_Caja, cuotas, monto>
# Movimientos: <ID_Caja, monto, timestamp>

def lineToTuple(linea):
  return linea.split('\t')

# ------------------------------------------------------------

# fuentes
clientes = sc.textFile(inputDir + "clientes")
cajas = sc.textFile(inputDir + "cajas")
prestamos = sc.textFile(inputDir + "prestamos")
movimientos = sc.textFile(inputDir + "mov")

# de lineas a tuplas
clientes = clientes.map(lineToTuple)
cajas = cajas.map(lineToTuple)
prestamos = prestamos.map(lineToTuple)
movimientos = movimientos.map(lineToTuple)

# convertimos la fecha de nacimiento en año, mes y día (enteros)
clientes = clientes.map(lambda t: (t[1] + " " + t[2], t[4].split("-"), t[5]))
clientes = clientes.map(lambda t: (t[0], int(t[1][0]), int(t[1][1]), int(t[1][2]), t[2]))

# en cajas pasamos los saldos a real
cajas = cajas.map(lambda c: (c[0], c[1], float(c[2])) )

# en prestamos pasamos las cuotas a numero
prestamos = prestamos.map(lambda t: (t[0], int(t[1]), t[2]) )

# en movimientos pasamos los montos a real // timestamp lo dejamos asi nomas
movimientos = movimientos.map(lambda m: (m[0], float(m[1]), m[2]) )

# ayudita de formato
print(cajas.first())
print(prestamos.first())
print(movimientos.first())
print('\n')

## Inciso A: nombre y apellido de capricornianos, es decir, 22/12 al 19/1
# la cuarta columna (contando desde 0) es la fecha de nacimiento
incisoA = clientes.filter(lambda t: (t[2] == 1 and t[3] < 20) or (t[2] == 12 and t[3] > 21) )
incisoA = incisoA.map(lambda t: t[0] )
print(incisoA.collect())

## Inciso B: nombre y apellido de argentinos
# la nacionalidad es la quinta columna
argentinos = clientes.filter(lambda t: t[4] == 'ARG')
incisoB = argentinos.map(lambda t: t[0])
print(incisoB.collect())

## Inciso C: de (a), cuántos nacieron en verano
print("De capricornio, en verano nacieron " + str(incisoA.count()))

## Inciso D: de (b) el más joven y el más viejo
# para ordenar por más de un parámetro se utiliza una tupla
print(argentinos.sortBy(lambda t: (t[1], t[2], t[3]), ascending=False).first())
print(argentinos.sortBy(lambda t: (t[1], t[2], t[3]), ascending=True).first())

## Inciso E: el préstamo con mayor cantidad de cuotas (columna 1)
maximoCuotas = prestamos.reduce(lambda p1, p2: p1 if p1[1] > p2[1] else p2)

# ... las que tienen misma cantidad de cuotas, el mayor monto
masCuotas = prestamos.filter(lambda p: p[1] == maximoCuotas[1])
mayorMonto = masCuotas.reduce(lambda p1, p2: p1 if p1[2] > p2[2] else p2 )
print(mayorMonto)

## Inciso F: id de clientes con al menos una caja con saldo > 300
saldosMayor = cajas.filter(lambda c: c[2] > 300)
idSaldosMayor = cajas.map(lambda c: c[1] )
print(idSaldosMayor.collect())

## Inciso G: monto de mayor movimiento e ID de caja del ultimo mov
mayorMov = movimientos.reduce(lambda m1, m2: m1 if m1[1] > m2[1] else m2)
print("Monto del mayor movimiento: " + str(mayorMov[1]))

ultimoMov = movimientos.reduce(lambda m1, m2: m1 if m1[2] > m2[2] else m2)
print("ID de caja del ultimo mov: " + ultimoMov[0])

Mounted at /content/drive
('336442', '91893', -87702.5531547622)
('399580', 54, '1432.83477152364')
('505753', 49837.5266323972, '2004-05-12 12:19:07')


['Usrqz Vxfyrk', 'Yhfeci Zvixkb', 'Gmviqsc Hgmdf', 'Moitln Pjcwje', 'Nbvyg Fsvnzp', 'Trahy Fdiph', 'Okqsd Bqecx', 'Yoohis Rlfrnjvd', 'Oyqvost Gfdsx', 'Woanu Dekdye', 'Jnert Troflo', 'Awqhees Tzidotgg', 'Ehsgnd Tzdydv', 'Bacwt Vsxtl', 'Ehzdb Orbppc', 'Qieaijyy Whbofvw', 'Ijeqspr Mssrm', 'Elydgs Zefebcgv', 'Nletwyvm Miwhck', 'Bsjer Jijkf', 'Qtdpbni Kxjmbsy', 'Iwgfdrt Brixxfqj', 'Mchpwp Mzrbthn', 'Jzzzsko Eoccql', 'Gzyscak Gpereqb', 'Orhfqqck Afqidrqj', 'Dzxns Nizxk', 'Pobfhxio Uxpyyic', 'Dppnsx Lqqatpw', 'Xnqzhka Njunmrtf', 'Dxlzxz Epvjfk', 'Jqdqilz Owziy', 'Jiafdnvr Uvdkmrv', 'Jdrpdex Jlfupfr', 'Dyxprln Skbvsvs', 'Gvtxh Opgwkqk', 'Vvcfbzea Mfqxjx', 'Nbzzcp Ijpik', 'Tldil Smqpihbm', 'Xgjwwcw Ohhfwi', 'Upcmog Arsiajxy', 'Ifrxlouz Mwtjtior', 'Hszptnrb Rereyjr', 'Idmvnqz Srudh', 'Mfngw Isowo', 'Pnkiwrja Vybqv', 'Ozobsy Htzujxdd', 'Xdwshpi 

**Ejercicio 5 - Estaciones**

P: desde Spark, con textFile no es posible seleccionar un conjunto de archivos por prefijo, siempre agarra la carpeta entera. No queda otra que separ los archivos de distinto tipo en subcarpetas.

In [3]:
## EJERCICIO 5 - ESTACIONES

root_path = 'drive/My Drive/Colab Notebooks/'

from google.colab import drive
drive.mount('/content/drive', force_remount=True)

import datetime, math
import sys
sys.path.append('/content/' + root_path)

inputDir = root_path + "EstacionesMeteorologicas/"

# ------------------- transformaciones ----------------------

# <ID_Estación, fecha_registro, temperatura, humedad, precipitación>
# norte: en grados centigrados y mm
# sur: en grados fahrenheit tc = (tf - 32) * 1.8, y cm

def splitter(linea):
  return linea.split('\t')

def celsius(fahrenheit):
  return 1.8 * (int(fahrenheit) - 32)

def cm(mm):
  return 10 * int(mm)

# ------------------- carga de archivos ----------------------

norte = sc.textFile(inputDir + "Norte").map(splitter)
sur = sc.textFile(inputDir + "Sur").map(splitter)

# conversion a enteros y a sistema internacional
norte = norte.map(lambda e: (e[0], e[1], int(e[2]), int(e[3]), int(e[4])) )
sur = sur.map(lambda e: (e[0], e[1], celsius(e[2]), int(e[3]), cm(e[4]) ) )

# juntar datasets
estaciones = norte.union(sur)

## Parte 1: promedio de temperatura, humedad y precipitacion
part1 = estaciones.map(lambda e: (e[2], e[3], e[4], 1) )
sumas = part1.reduce(lambda e1, e2: (e1[0] + e2[0], e1[1] + e2[1], e1[2] + e2[2], e1[3] + e2[3]) )
promedios = (sumas[0] / sumas[3], sumas[1] / sumas[3], sumas[2] / sumas[3])
print(promedios)

## Parte 2:
# minimos y maximos: de esta forma es 4 veces más rápido
part2 = estaciones.map(lambda e: (e[2], e[2], e[3], e[3], e[4], e[4]) )
part2 = part2.reduce(lambda e1, e2: 
                          (e1[0] if e1[0] < e2[0] else e2[0], 
                           e1[1] if e1[1] > e2[1] else e2[1],
                           e1[2] if e1[2] > e2[2] else e2[2],
                           e1[3] if e1[3] < e2[3] else e2[3],
                           e1[4] if e1[4] > e2[4] else e2[4],
                           e1[5] if e1[5] < e2[5] else e2[5]) 
                          )

print(part2)

# la temperatura mas fria
#coldest = estaciones.reduce(lambda e1, e2: e1 if e1[2] < e2[2] else e2)

# la mas calurosa
#hotter = estaciones.reduce(lambda e1, e2: e1 if e1[2] > e2[2] else e2)

# mayor humedad
#mayorHum = estaciones.reduce(lambda e1, e2: e1 if e1[3] > e2[3] else e2)

# menor humedad
#menorHum = estaciones.reduce(lambda e1, e2: e1 if e1[3] < e2[3] else e2)

# mayor precipitacion
#mayorPrec = estaciones.reduce(lambda e1, e2: e1 if e1[4] > e2[4] else e2)

# menor precipitación
#menorPrec = estaciones.reduce(lambda e1, e2: e1 if e1[4] < e2[4] else e2)

# salida
#print(coldest)
#print(hotter)
#print(mayorHum)
#print(menorHum)
#print(mayorPrec)
#print(menorPrec)

Mounted at /content/drive
(8.37301267501466, 49.94118997220168, 5038.977990869909)
(-84.60000000000001, 100, 100, 0, 20010, 0)
