# PROGRAMA DE CIENCIAS DE LOS DATOS 
## **CURSO: BIG DATA**
### **TAREA #3**

Profesor: MSc. Felipe Meza


Alumno: 
  
  Randal Salazar Viales

![logo](logo.png "Logo")

# Módulo 4 - Big Data

Autor: Juan Esquivel

# Objetivos

## General
Entender y aplicar técnicas de análisis de grandes cantidades de datos para la resolución de problemas concretos a través de tecnologías de manipulación, extracción y sintetización estadística.

## Específicos
- Aplicar bibliotecas para la transformación de datos a gran escala para poder sintetizar el conocimiento para futuro análisis.
- Aplicar técnicas de análisis de datos para extraer patrones que mejoren el entendimiento de un problema concreto.
- Aplicar técnicas para aprendizaje automatizado de patrones, basado en datos existentes, para mejorar la certeza de la solución aplicada a problemas concretos.


# Contenidos

- Introducción a procesamiento a gran escala
- Fuentes y repositorios de datos
- Procesamiento de fuentes (data frames)
- Procesamiento de atributos
- Organización de datos procesados
- Análisis de datos a gran escala
- Uso de modelos de aprendizaje automático a gran escala

# Evaluación

- Tareas Cortas 70%
- Proyecto 30%

# Procesamiento de datos a gran escala
Hoy día es relativamente sencillo tener cantidades muy considerables de datos en una organización, sin tan siquiera darse cuenta. El almacenamiento, local o en la nube, ha llegado a un punto que nos permite simplemente almacenar más y más datos sin tener que preocuparse por costos de almacenamiento. Cualquier sistema de ventas, por ejemplo, almacena información diaria sobre clientes, transacciones, tiendas, productos, etc. Después de un par de años de acumular datos, un programa que *funcionaba bien* de pronto empieza a sufrir efectos de degradación de servicio, debido al crecimiento orgánico de los datos.

Aún más, es más común que no sea factible tener todos los datos relacionados a un problema en una sola máquina física. Por ello, es necesario particionar los datos y distribuirlos en granjas de servidores de almacenamiento. Algunas definiciones colocan la línea limítrofe de Big Data utilizando esa barrera de almacenamiento distribuído.

## Lectura y transformación de datos a gran escala
El mayor tamaño de las fuentes de datos requiere que, para obtener información estratégica, se deba implementar elementos de software que permitan procesar y resumir los datos. Normalmente se refiere a ésto como *ETL: Extract, Transform and Load*.

Primero, los datos deben *Extraerse* de las fuentes. Después, deben ser *Transformados*, ya sea para crear datos más complejos o ajustar para que sea compatible con el destino. Por último, los datos deben ser *Cargados* a un medio donde serán consumidos por los usuarios.

A lo largo de la última década, la necesidad de administrar procesos *ETL* a gran escala se ha convertido en un imperativo para un científico de datos. Esto se realiza, comúnmente, utilizando *frameworks* de alta escalabilidad basados en un modelo de programación llamado *MapReduce*. En esencia, la idea es no pensar sobre los datos como conjuntos enormes de información, sino reducir las operaciones a qué debe realizarse "fila por fila". Queremos aplicar operaciones individuales a cada elemento, independiente de cualquier otro en la colección (al menos en teoría). Las operaciones que modifican cada elemento se llaman funciones *map*. Adicionalmente, podemos generar operaciones que sinteticen múltiples elementos que corresponden a la misma entidad reducidendólos a un solo elemento (*reduce*). Plataformas modernas ya no se apegan al espíritu exacto de *MapReduce*, sin embargo, las ideas básicas se mantienen igual.

En las siguiente sección introducimos los conceptos clásicos de *MapReduce*. Posteriormente, realizaremos una introducción práctica a frameworks contemporáneos. En este módulo utilizaremos Apache Spark para mostrar los conceptos básicos de este tipo de procesamiento. 

# Motivación MapReduce
El problema resuelto por *MapReduce* se ilustra con el ejemplo clásico de la frecuencia de palabras en una colección abundante de texto. Si tenemos un corpus compuesto por millones de libros de texto, podríamos pensar en un algoritmo de fuerza bruta que representa cada libro como un arreglo de tipo `string`:
```
func count(words []string) {
  var counts map[string]int
  
  for _, word:=range words {
    counts[word] += 1
  }
  printSorted(counts)
}
```

En Go el anterior ejemplo puede alcanzar órdenes de magnitudes de centenas de millones, convirtiéndose la memoria en el cuello de botella primario. Es posible que el procesamiento no requiera todas las palabras en memoria, utilizando canales de transmisión (`channel` en Go). Sin embargo, cuando todas las palabras son únicas aún tenemos problemas.

Si dejamos los linderos de una máquina individual y asumimos que tenemos, por ejemplo, 10 núcleos de procesamiento. Podríamos dividir el total de los datos en 10 partes, cada núcleo puede generar conteos para cada segmento de datos y, posteriormente, lo envía a un controlador. El controlador procedería a integrar todos los conteos.

Si queremos repetir este proceso con 1000 máquinas, un solo controlador colapsaría. Podemos agregar una jerarquía de controladores, sin embargo, donde cada 10 máquina envía a un controlador intermedio y esos, a su vez, a un controlador mayor.

Cuando se trabaja con un número tan elevado de núcleos de procesamiento, la probabilidad de que alguna máquina presente errores se incrementa. Si definimos el error que una sola máquina falle como $\epsilon=0.001$ podemos definir la probabilidad que 10 máquinas ejecuten el proceso sin tener errores como $(1-\epsilon)^{10}=0.999^{10}=0.9900448802$. Esto quiere decir que en más del 99% de las ejecuciones no debería existir errores en las máquinas y, por lo tanto, la tarea debería concluirse exitosamente. El mismo razonamiento para 1000 máquinas, sin embargo, nos da una probabilidad que todos los núcleos sean exitosos de 37%, únicamente ($(1-\epsilon)^{1000}=0.999^{1000}=0.3676954248$.

Esto quiere deir que el modelado debe incorporar tolerancia a fallos integralemente, replicando las entradas (3 copias), distribuyendo el mismo trabajo entre máquinas no relacionadas entre sí, utilización de checksums, etc.

## Detalles de MapReduce
De acuerdo a su publicación original *MapReduce* es un modelo de programación para procesar y generar grandes conjuntos de datos. Todo el procesamiento se basa en expresar los datos en pares *llave/valor*. Se debe definir una función de mapeo de los pares originales a una representación intermedia que, posteriormente, es tomada por una función de reducción que une todas las llaves en una sola entrada. Existe una fase oculta a los usuarios que ordena los datos (*shuffle*) previo a la reducción. Si se tienen $R$ máquinas a cargo de la reducción, la fase de shuffle asigna los datos intermedios al nodo con índice `hash(key) % R`.

En gran medida, la popularidad de este modelo es que una gran cantidad de problemas reales se pueden modelar de esta forma. Además, los autores crearon un ambiente de ejecución donde los programas escritos en esta forma eran paralelizados y ejecutados de forma distribuida de manera automática. 

Para el ejemplo del conteo de frecuencias de palabras, el algoritmo para resolverlo sería:

```
map(String key, String[] value):
  // key: document name
  // value: document contents
  for each word in value:
    EmitIntermediate(w, "1")

reduce(String key, Iterator values):
  // key: a word
  // values: a list of counts
  int result = 0;
  for each v in values:
    result += ParseInt(v);
  Emit(AsString(result));
```

El siguiente diagrama muestra el proceso completo que efectúa el ambiente de ejecución de *MapReduce*.

![mapreduce](mr.png "Map Reduce")

## Limitaciones de MapReduce
Como se mencionó anteriormente, los problemas resueltos por MapReduce *deben* expresarse como una colección de elementos *llave/valor*. De no ser posible, no se puede utilizar el framework para bordarlos.

Además, tiene problemas con algoritmos que son iterativos, debido a que en la fase del mapeo se asume que cada elemento es independiente de otros.



# Datos de ejemplo
A manera de ejemplo, asumiremos que una tienda en línea almacena las compras de sus clientes en una base de datos PostgreSQL. Aplicaremos ciertas operaciones básicas sobre los datos para mostrar lo que este tipo de plataformas ofrecen. A continuación, un ejemplo básico de modelo de datos, que se usará posteriormente.

```sql
CREATE TABLE transactions (
  id SERIAL PRIMARY KEY,
  customer_id integer NOT NULL,
  amount integer NOT NULL,
  purchased_at timestamp without time zone NOT NULL
);


INSERT INTO "transactions" (customer_id, amount, purchased_at) VALUES
(1, 55, '2017-03-01 09:00:00'),
(1, 125, '2017-03-01 10:00:00'),
(1, 32, '2017-03-02 13:00:00'),
(1, 64, '2017-03-02 15:00:00'),
(1, 128, '2017-03-03 10:00:00'),
(2, 333, '2017-03-01 09:00:00'),
(2, 334, '2017-03-01 09:01:00'),
(2, 333, '2017-03-01 09:02:00'),
(2, 11, '2017-03-03 20:00:00'),
(2, 44, '2017-03-03 20:15:00');
```

# Apache Spark

Existen multiples *frameworks* para procesar datos a gran escala. Para el propósito de esta clase, utilizaremos Apache Spark ya que es un proyecto abierto que ha sido adoptado por una gran candidad de desarrolladores y también es un motor primario en servicios en la nube, como Amazon Web Services. 

Spark está basado en una abstracción llamada *Resilient Distributed Dataset* (RDD) que es una colección de elementos que pueden ser almacenados, temporal o permanentemente, a lo largo de múltuples nodos físicos de un cluster. Spark permite el uso de múltiples fuentes de datos, desde SQL y NoSQL hasta archivos de texto plano. La tarea primaria de un programador es diseñar las operaciones sucesivas par transformar el RDD de su forma original a la salida deseada.

El siguiente ejemplo de código utiliza la base de datos descrita anteriormente y muestra el contenido almacenado en ella. Nótese que si la tabla fuera muy grande llamar al método `show()` no es recomendable.

# **** INICIO DE GENERADOR DE TRANSACCIONES ****

# TAREA_3: GENERADOR DE TRANSACCIONES

## Conexión a POSTGRESQL desde Jupyter Notebook para:
## Crear Tabla Transactions y descargar 1000 transaccions creadas aleatoriamente

Comenzamos con la importación de librerías y demás parámetros necesarios...

### Importación de Librerías

In [1]:
# Librería de adaptador ente Python y PostgreSQL: psycopg2
import psycopg2

### Definición de variables de conexión entre Psycopg2 y PostgreSQL

In [2]:
# Variables de conexión
var_conexion = "host='localhost' dbname='test' user='postgres' password='bd2019%' "

# Impresión de las Variables de Conexión
print("Variables de conexión a la BD:\n -> ",  var_conexion)

Variables de conexión a la BD:
 ->  host='localhost' dbname='test' user='postgres' password='bd2019%' 


Para fines de la Tarea3, la conexión, carga de datos y consultas, será a la BD PostgreSQL = **test**

### Crear objeto de conexión entre Psycopg2 y la BD PostgreSQL

In [3]:
# Objeto que contenga las Variables de Conexión
obj = psycopg2.connect(var_conexion)

# Objeto Cursor: para enviar Queries (Consultas) a la BD
# Activar conexión del cursor
objCursor = obj.cursor()

### Consultas SQL a la BD en PostgreSQL

Verificación de que se está conectado en la BD: consulta a tabla **tbltest** existente.

In [4]:
# Consulta a la BD 
objCursor.execute("SELECT * FROM tbltest;")
filas = objCursor.fetchall()
filas

[(10, 'Josue Beltran'),
 (18, 'Ana Navarro'),
 (50, 'Guillermo Soto'),
 (15, 'Juan Céspedes'),
 (26, 'Enrique Cortez')]

### Verificación de si existe la tabla Transactions en la BD Test en PostgreSQL

In [5]:
# Consulta a la BD 
objCursor.execute("SELECT exists (Select 1 FROM transactions);")

UndefinedTable: no existe la relación «transactions»
LINE 1: SELECT exists (Select 1 FROM transactions);
                                     ^


Como se puede apreciar la tabla transactions **NO EXISTE** en la BD que se está empleando. Se procederá a cerra la conexión debido a que el error anterior, provoca que NO se pueda efectuar consultas a la BD, luego se abrirá nuevamente la conexión para poder crear la tabla y efetuar las transacciones.

### Cerrar la conexión entre Psycopg2 y la BD PostgreSQL

In [6]:
# Cierre de objeto cursor
objCursor.close()

# Cierre del objeto de las Variables de Conexión
obj.close()

### Abrir nuevamente conexión con BD PostgreSQL
### Crear objeto de conexión entre Psycopg2 y la BD PostgreSQL

In [7]:
# Objeto que contenga las Variables de Conexión
obj = psycopg2.connect(var_conexion)

# Objeto Cursor: para enviar Queries (Consultas) a la BD
# Activar conexión del cursor
objCursor = obj.cursor()

### Creación de la tabla "transactions" en la BD

In [8]:
# Creación de la tabla transactions en la BD
objCursor.execute("CREATE TABLE transactions\
                  (id SERIAL PRIMARY KEY,\
                  customer_id integer NOT NULL,\
                  amount integer NOT NULL,\
                  purchased_at timestamp without time zone NOT NULL);")
#filas = objCursor.fetchall()
#filas

### Confirmación permanente (finalización de la transacción) de creación de tabla "transactions" en BD

In [9]:
# Guardar el Cambio permanente 
obj.commit()

### Consulta de datos existentes en la tabla "transactions" creada

In [10]:
# Consulta a la BD 
objCursor.execute("SELECT * FROM transactions;")
filas = objCursor.fetchall()
filas

[]

### Inserción de datos dados en la Notebook inicial dentro de la tabla "transactions" creada

In [11]:
# Inserción de 10 datos dados en la Notebook, en  de la tabla 'transactions' de la BD "test"
objCursor.execute("INSERT INTO transactions (customer_id, amount, purchased_at) VALUES\
                  (1, 55, '2017-03-01 09:00:00'),\
                  (1, 125, '2017-03-01 10:00:00'),\
                  (1, 32, '2017-03-02 13:00:00'),\
                  (1, 64, '2017-03-02 15:00:00'),\
                  (1, 128, '2017-03-03 10:00:00'),\
                  (2, 333, '2017-03-01 09:00:00'),\
                  (2, 334, '2017-03-01 09:01:00'),\
                  (2, 333, '2017-03-01 09:02:00'),\
                  (2, 11, '2017-03-03 20:00:00'),\
                  (2, 44, '2017-03-03 20:15:00');")

### Verificación de que se ingresó los datos anteriores en la tabla "transactions"

In [12]:
# Consulta a la BD 
objCursor.execute("SELECT * FROM transactions;")
filas = objCursor.fetchall()
filas

[(1, 1, 55, datetime.datetime(2017, 3, 1, 9, 0)),
 (2, 1, 125, datetime.datetime(2017, 3, 1, 10, 0)),
 (3, 1, 32, datetime.datetime(2017, 3, 2, 13, 0)),
 (4, 1, 64, datetime.datetime(2017, 3, 2, 15, 0)),
 (5, 1, 128, datetime.datetime(2017, 3, 3, 10, 0)),
 (6, 2, 333, datetime.datetime(2017, 3, 1, 9, 0)),
 (7, 2, 334, datetime.datetime(2017, 3, 1, 9, 1)),
 (8, 2, 333, datetime.datetime(2017, 3, 1, 9, 2)),
 (9, 2, 11, datetime.datetime(2017, 3, 3, 20, 0)),
 (10, 2, 44, datetime.datetime(2017, 3, 3, 20, 15))]

### Cantidad de datos dentro de la tabla "transactions"

In [13]:
# Consulta a la BD 
objCursor.execute("SELECT count(*) FROM transactions;")
filas = objCursor.fetchall()
filas

[(10,)]

### Generador de transacciones aleatorias, para: customer_id = 5, Cantidad transacciones = 1000

In [14]:
# Generador de Transacciones

# Importación de Librerías
import random
import numpy as np
import time

# c) Generador de fechas:

from datetime import datetime
#from time import gmtime, strftime
from time import localtime, strftime

# Cantidad de transacciones a generar:
Total_Transactions = 1000

# Constantes
# Máxima cantidad customer_id
max_customer = 5
# Máxima cantidad amount
max_amount = 1000
# Hora Máxima para Generar Transacciones
val_purchased = strftime("%Y-%m-%d %H:%M:00", localtime())
# Hora Máxima
hora_max = 18
# Hora Mínima
hora_min = 8

#print(hora_min, hora_max)
print('Fecha de Finalización Transacciones: ')
print(val_purchased)
print(' ')
#print('Hora de Finalización Transacciones: ')

# current date and time (mseconds)
now = datetime.now()
print("date and time:",now)

# current time
time = now.strftime("%H:%M:%S")
print("time:", time)

# current Year
yR = now.strftime("%Y")
print("Year:", yR)
#print("Tipo:", type(yR))
print(' ')

# current Month
mR = now.strftime("%m")
print("Month:", mR)
#print("Tipo:", type(mR))
print(' ')

# current Day
dR = now.strftime("%d")
print("Day:", dR)
#print("Tipo:", type(dR))
print(' ')

# current hour
HR = now.strftime("%H")
print("hour:", HR)
#print("Tipo:", type(HR))
print(' ')

# current minute
MR = now.strftime("%M")
print("minute:", MR)
#print("Tipo:", type(HR))
print(' ')

# Variables a generar
val_customer_id = 0
val_amount = 0
val_purchased_at = ' '
hora_lim = 0

# Matriz Temporal: contador de customer_id generados
# Fila = customer_id de referencia
# Columna 0 = Cantidad de veces que ha salido customer_id
# Columna 1 = Cantidad de días hacia atrás generados
# Columna 2 = Hora a generar la transacción
# Columna 3 = Minuto a generar la transacción
customer_count = np.zeros((max_customer, 4), dtype=int)

customer_date = np.array((max_customer, 4), dtype=str)
#print(customer_count)

# Initialize internal state of the random number generator.
random.seed(13)

# Definición de hora límite: Hora_Registro u Hora_Max
if ((int(HR) < hora_lim) or (int(HR) >= hora_max)):
    hora_lim = hora_max
else:
    hora_lim = int(HR)

print(' ')
#print('Hora Límite: ', hora_lim)
#print(hora_lim)
print (' ')

# Cálculo de cuántas iteraciones debo efectuar para cada customer_id
(cociente, resto) = divmod(Total_Transactions,max_customer)

# Generador de transacciones
if ((resto == 0) or (resto != 0)):
    # Basta con repartir transacciones entre customer_id
    for x in list(range(1, Total_Transactions + 1)):
        # a) Generador de customer_id:
        # Integer from 1 to max_customer, endpoints included
        val_customer_id = random.randint(1, max_customer)
        #print(val_customer_id )
        # Valor de val_customer_id en la matriz
        val_customer_id = val_customer_id - 1
        #print('Modificado: ', val_customer_id)
        # Actualización de de veces que ha salidos valor de customer_id en matriz de datos
        customer_count [val_customer_id, 0] = customer_count [val_customer_id, 0] + 1
        
        # b) Generador de amount:
        # Integer from 1 to 1000, endpoints included
        val_amount = random.randint(1, max_amount)
        
        # c) Generador de fechas:
        # Actualización de Fecha de Transacción a Generar
        # Dependiendo de la Cantidad de Valores Generados por c/customer
        # Cantidad = 1
        if (customer_count[val_customer_id, 0] == 1):
            # Actualización de Hora
            customer_count[val_customer_id, 2] = hora_min
            # Caso Hora < Hora_min
            if (int(HR) < hora_min):
                # Actualización de Día:
                customer_count[val_customer_id, 1] = customer_count[val_customer_id, 1] + 1
        else:
            # Cantidad >= 2
            # Si Día = 0
            if (customer_count[val_customer_id, 1] == 0):
                # Actualización de Minutos
                customer_count[val_customer_id, 3] = customer_count[val_customer_id, 3] + 1
                # Actualización de Horas
                customer_count[val_customer_id, 2] = hora_min
                # Caso Hora < Hora_min
                if (int(HR) < hora_min):
                    # Actualización de Día:
                    customer_count[val_customer_id, 1] = customer_count[val_customer_id, 1] + 1
            else:
                 # Si Día >= 1
                    # Si Minutos +1 > MR
                    if ((customer_count[val_customer_id, 3] + 1) > int(MR)):
                        # Caso Hora + 1 > HR
                        if ((customer_count[val_customer_id, 2] + 1) > int(HR)):
                            # Actualización de Minutos
                            customer_count[val_customer_id, 3] = 0
                            # Actualización de Horas
                            customer_count[val_customer_id, 2] = 0
                            # Actualización de Día:
                            customer_count[val_customer_id, 1] = customer_count[val_customer_id, 1] + 1
                        else:
                            # Actualización de Minutos
                            customer_count[val_customer_id, 3] = customer_count[val_customer_id, 3] + 1
                    else:
                        # Actualización de Minutos
                        customer_count[val_customer_id, 3] = customer_count[val_customer_id, 3] + 1
                        
        #print(val_customer_id + 1, val_amount)
        #print(customer_count)    

        import time
        # Creación de datetime (Fecha y Hora) de Compra de la transacción:
        # t = (Year, Month, Day, Hour, Minute, Seconds, wday, yday, isdst )
        
        # Valores de: día - hora - min, de la transacción
        dia_trans = int(dR) - customer_count[val_customer_id, 1]
        hora_trans = customer_count[val_customer_id, 2]
        min_trans = customer_count[val_customer_id, 3]
        
        t = (int(yR), int(mR), dia_trans, hora_trans, min_trans, 0, 1, 48, 0)
        t = time.mktime(t)
        val_purchased_at = time.strftime("%Y-%m-%d %H:%M:00", time.localtime(t))
        
        # Valores generados a Insertar a la BD PostgreSQL
        #print(val_customer_id + 1, val_amount, val_purchased_at)
        #print (' ')
       
        # Creación de datos a ingresar a BD:
        datos = (val_customer_id + 1, val_amount, val_purchased_at)
        
        #Inserción de datos de 1000 transacciones generadas aleatoriamente a la BD
        objCursor.execute("INSERT INTO transactions (customer_id, amount, purchased_at) VALUES (%s, %s, %s)",\
                  (datos[0], datos[1], datos[2]))        
        
        # Caso en que se solicite una única transacción:
        if ((x == 1) and (x == (Total_Transactions))):
            datosTMP = '{}{}'.format(datos, ';')
            #datosTMP = (datos)
            
        # Primer valor de los datos, cuando se solicita más de una transacción:
        elif (x == 1):
            datosTMP = '{}{}'.format(datos, ',')
            #datosTMP = (datos)
            
        # Valores anteriores al último dato:
        # Para transacciones mayores a 2:
        elif (x < (Total_Transactions)):
            datos = '{}{}'.format(datos, ',')
            datosTMP = '{}'.format(datosTMP) + '\n' + '{}'.format(datos)
            
        # Último dato: x == transactions            
        else:
            datos = '{}{}'.format(datos, ';')
            datosTMP = '{}'.format(datosTMP) + '\n' + '{}'.format(datos)
            #datosTMP = (datosTMP, datos)
            
        # Valores generados a Insertar a la BD PostgreSQL
        #print(datosTMP)
        #print(' ')
        #print ('***')
        
        
print(' ')
print(datosTMP)
print(' ')

Fecha de Finalización Transacciones: 
2020-01-13 12:21:00
 
date and time: 2020-01-13 12:21:24.633017
time: 12:21:24
Year: 2020
 
Month: 01
 
Day: 13
 
hour: 12
 
minute: 21
 
 
 
 
(3, 298, '2020-01-13 08:00:00'),
(2, 668, '2020-01-13 08:00:00'),
(2, 683, '2020-01-13 08:01:00'),
(2, 891, '2020-01-13 08:02:00'),
(2, 657, '2020-01-13 08:03:00'),
(2, 134, '2020-01-13 08:04:00'),
(1, 545, '2020-01-13 08:00:00'),
(2, 763, '2020-01-13 08:05:00'),
(3, 31, '2020-01-13 08:01:00'),
(4, 130, '2020-01-13 08:00:00'),
(5, 15, '2020-01-13 08:00:00'),
(3, 855, '2020-01-13 08:02:00'),
(2, 87, '2020-01-13 08:06:00'),
(3, 847, '2020-01-13 08:03:00'),
(4, 763, '2020-01-13 08:01:00'),
(4, 973, '2020-01-13 08:02:00'),
(2, 813, '2020-01-13 08:07:00'),
(3, 365, '2020-01-13 08:04:00'),
(2, 499, '2020-01-13 08:08:00'),
(5, 578, '2020-01-13 08:01:00'),
(4, 687, '2020-01-13 08:03:00'),
(3, 910, '2020-01-13 08:05:00'),
(4, 654, '2020-01-13 08:04:00'),
(3, 671, '2020-01-13 08:06:00'),
(1, 355, '2020-01-13 08:01:00

### Verificación de que se ingresó los datos anteriores (1000 transacciones aleatorias) en la tabla "transactions"

In [15]:
# Consulta a la BD 
objCursor.execute("SELECT * FROM transactions;")
filas = objCursor.fetchall()
filas

[(1, 1, 55, datetime.datetime(2017, 3, 1, 9, 0)),
 (2, 1, 125, datetime.datetime(2017, 3, 1, 10, 0)),
 (3, 1, 32, datetime.datetime(2017, 3, 2, 13, 0)),
 (4, 1, 64, datetime.datetime(2017, 3, 2, 15, 0)),
 (5, 1, 128, datetime.datetime(2017, 3, 3, 10, 0)),
 (6, 2, 333, datetime.datetime(2017, 3, 1, 9, 0)),
 (7, 2, 334, datetime.datetime(2017, 3, 1, 9, 1)),
 (8, 2, 333, datetime.datetime(2017, 3, 1, 9, 2)),
 (9, 2, 11, datetime.datetime(2017, 3, 3, 20, 0)),
 (10, 2, 44, datetime.datetime(2017, 3, 3, 20, 15)),
 (11, 3, 298, datetime.datetime(2020, 1, 13, 8, 0)),
 (12, 2, 668, datetime.datetime(2020, 1, 13, 8, 0)),
 (13, 2, 683, datetime.datetime(2020, 1, 13, 8, 1)),
 (14, 2, 891, datetime.datetime(2020, 1, 13, 8, 2)),
 (15, 2, 657, datetime.datetime(2020, 1, 13, 8, 3)),
 (16, 2, 134, datetime.datetime(2020, 1, 13, 8, 4)),
 (17, 1, 545, datetime.datetime(2020, 1, 13, 8, 0)),
 (18, 2, 763, datetime.datetime(2020, 1, 13, 8, 5)),
 (19, 3, 31, datetime.datetime(2020, 1, 13, 8, 1)),
 (20, 4, 13

### Cantidad de datos dentro de la tabla "transactions"

In [16]:
# Consulta a la BD 
objCursor.execute("SELECT count(*) FROM transactions;")
filas = objCursor.fetchall()
filas

[(1010,)]

### Confirmación permanente (finalización de la transacción) de ingreso de datos en tabla "transactions"

In [17]:
# Guardar el Cambio permanente 
obj.commit()

### Cerrar la conexión entre Psycopg2 y la BD PostgreSQL

In [18]:
# Cierre de objeto cursor
objCursor.close()

# Cierre del objeto de las Variables de Conexión
obj.close()

# **** FIN DE GENERADOR DE TRANSACCIONES ****

In [19]:
import findspark
findspark.init('C:\spark')

from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, date_format, udf 
from pyspark.sql.types import DateType

spark = SparkSession \
    .builder \
    .appName("Basic JDBC pipeline") \
    .config("spark.driver.extraClassPath", "C:\Spark\jdbcdriver\postgresql-42.2.9.jar") \
    .config("spark.executor.extraClassPath", "C:\Spark\jdbcdriver\postgresql-42.2.9.jar") \
    .getOrCreate()

In [20]:
# Reading single DataFrame in Spark by retrieving all rows from a DB table.
df = spark \
    .read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost/test") \
    .option("user", "postgres") \
    .option("password", "bd2019%") \
    .option("dbtable", "transactions") \
    .load()

df.show()

+---+-----------+------+-------------------+
| id|customer_id|amount|       purchased_at|
+---+-----------+------+-------------------+
|  1|          1|    55|2017-03-01 09:00:00|
|  2|          1|   125|2017-03-01 10:00:00|
|  3|          1|    32|2017-03-02 13:00:00|
|  4|          1|    64|2017-03-02 15:00:00|
|  5|          1|   128|2017-03-03 10:00:00|
|  6|          2|   333|2017-03-01 09:00:00|
|  7|          2|   334|2017-03-01 09:01:00|
|  8|          2|   333|2017-03-01 09:02:00|
|  9|          2|    11|2017-03-03 20:00:00|
| 10|          2|    44|2017-03-03 20:15:00|
| 11|          3|   298|2020-01-13 08:00:00|
| 12|          2|   668|2020-01-13 08:00:00|
| 13|          2|   683|2020-01-13 08:01:00|
| 14|          2|   891|2020-01-13 08:02:00|
| 15|          2|   657|2020-01-13 08:03:00|
| 16|          2|   134|2020-01-13 08:04:00|
| 17|          1|   545|2020-01-13 08:00:00|
| 18|          2|   763|2020-01-13 08:05:00|
| 19|          3|    31|2020-01-13 08:01:00|
| 20|     

Leer datos directamente rara vez es el objetivo primario. Las dos tareas típicas son: transformar algunas de las columnas en representaciones modificadas de las mismas, o bien, agregar grupos de filas en una. A manera de ejemplo, podemos asumir que se quiere la información del total de compras hechas por clientes.

Spark provee ciertas transformaciones básicas. Por ejemplo, transformar fechas a una representación con formato específico, para poder truncar la una estampilla de tiempo a nivel de día. En este caso, se utiliza `date_format` en el módulo `pyspark.sql.functions`. El siguiente código crea una columna nueva después de aplicar la transformación.

In [21]:
formatted_df = df.withColumn("date_string", date_format(col("purchased_at"), 'MM/dd/yyyy'))
formatted_df.show()

+---+-----------+------+-------------------+-----------+
| id|customer_id|amount|       purchased_at|date_string|
+---+-----------+------+-------------------+-----------+
|  1|          1|    55|2017-03-01 09:00:00| 03/01/2017|
|  2|          1|   125|2017-03-01 10:00:00| 03/01/2017|
|  3|          1|    32|2017-03-02 13:00:00| 03/02/2017|
|  4|          1|    64|2017-03-02 15:00:00| 03/02/2017|
|  5|          1|   128|2017-03-03 10:00:00| 03/03/2017|
|  6|          2|   333|2017-03-01 09:00:00| 03/01/2017|
|  7|          2|   334|2017-03-01 09:01:00| 03/01/2017|
|  8|          2|   333|2017-03-01 09:02:00| 03/01/2017|
|  9|          2|    11|2017-03-03 20:00:00| 03/03/2017|
| 10|          2|    44|2017-03-03 20:15:00| 03/03/2017|
| 11|          3|   298|2020-01-13 08:00:00| 01/13/2020|
| 12|          2|   668|2020-01-13 08:00:00| 01/13/2020|
| 13|          2|   683|2020-01-13 08:01:00| 01/13/2020|
| 14|          2|   891|2020-01-13 08:02:00| 01/13/2020|
| 15|          2|   657|2020-01

En el caso que necesitemos crear una función que no es parte de la biblioteca estándar en Spark, es posible definir funciones creadas por el usuario (User Defined Functions o *udf*). La noción básica de una `udf` en Spark un lambda acompañado por el tipo de dato retornado. Lo anterior es estrictamente necesario en lenguajes con un sistema de tipos débil.

Además, es una manera cómoda de encapsular funciones de Python que deben ser aplicadas a celdas del *Dataframe*, pero aún no tienen su implementación en las bibliotecas.

El siguiente ejemplo muestra la columna creada previamente (tipo `string`) y la transforma en un tipo `DateType` propio de Spark.

In [22]:
string_to_date = \
    udf(lambda text_date: datetime.strptime(text_date, '%m/%d/%Y'),
        DateType())

typed_df = formatted_df.withColumn("date", string_to_date(formatted_df.date_string))
typed_df.show()
typed_df.printSchema()


+---+-----------+------+-------------------+-----------+----------+
| id|customer_id|amount|       purchased_at|date_string|      date|
+---+-----------+------+-------------------+-----------+----------+
|  1|          1|    55|2017-03-01 09:00:00| 03/01/2017|2017-03-01|
|  2|          1|   125|2017-03-01 10:00:00| 03/01/2017|2017-03-01|
|  3|          1|    32|2017-03-02 13:00:00| 03/02/2017|2017-03-02|
|  4|          1|    64|2017-03-02 15:00:00| 03/02/2017|2017-03-02|
|  5|          1|   128|2017-03-03 10:00:00| 03/03/2017|2017-03-03|
|  6|          2|   333|2017-03-01 09:00:00| 03/01/2017|2017-03-01|
|  7|          2|   334|2017-03-01 09:01:00| 03/01/2017|2017-03-01|
|  8|          2|   333|2017-03-01 09:02:00| 03/01/2017|2017-03-01|
|  9|          2|    11|2017-03-03 20:00:00| 03/03/2017|2017-03-03|
| 10|          2|    44|2017-03-03 20:15:00| 03/03/2017|2017-03-03|
| 11|          3|   298|2020-01-13 08:00:00| 01/13/2020|2020-01-13|
| 12|          2|   668|2020-01-13 08:00:00| 01/

Para sumar los datos podemos utilizar el concepto común de agrupamiento en SQL. Spark posee una agrupación de `groupBy`, directamente. En este caso, queremos sumar todas las compras por cliente y día:

In [23]:
sum_df = typed_df.groupBy("customer_id", "date").sum()
sum_df.show()

+-----------+----------+-------+----------------+-----------+
|customer_id|      date|sum(id)|sum(customer_id)|sum(amount)|
+-----------+----------+-------+----------------+-----------+
|          5|2020-01-13| 110676|            1040|     100754|
|          2|2017-03-01|     21|               6|       1000|
|          1|2017-03-02|      7|               2|         96|
|          1|2017-03-03|      5|               1|        128|
|          2|2020-01-13|  92175|             396|      96516|
|          3|2020-01-13| 100015|             573|      94898|
|          1|2020-01-13|  97818|             187|      86628|
|          1|2017-03-01|      3|               2|        180|
|          4|2020-01-13| 109816|             864|     107944|
|          2|2017-03-03|     19|               4|         55|
+-----------+----------+-------+----------------+-----------+



Spark sumará todas las columnas **que no se encuentren** especificadas en la operación `groupBy`. Algunos resultados no tienen interpretación últil. Por ejemplo, sumar la columna `customer_id` no da ningún valor agregado. Finalmente, es posible dar a las columnas un nombre más amigable con los consumidores de los datos.

In [24]:
stats_df = \
    sum_df.select(
        col('customer_id'),
        col('date'),
        col('sum(amount)').alias('amount'))

stats_df.printSchema()
stats_df.show()


root
 |-- customer_id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- amount: long (nullable = true)

+-----------+----------+------+
|customer_id|      date|amount|
+-----------+----------+------+
|          5|2020-01-13|100754|
|          2|2017-03-01|  1000|
|          1|2017-03-02|    96|
|          1|2017-03-03|   128|
|          2|2020-01-13| 96516|
|          3|2020-01-13| 94898|
|          1|2020-01-13| 86628|
|          1|2017-03-01|   180|
|          4|2020-01-13|107944|
|          2|2017-03-03|    55|
+-----------+----------+------+



Spark permite cargar información de múltiples fuentes. A continuación se muestra como cargar datos de un archivo CSV que contiene los nombres de los clientes, así como la moneda en que realizan transacciones. Nótese que el CSV no tiene información de tipos de datos, por lo que es buena práctica agregarlos explícitamente.

In [25]:
from pyspark.sql.types import IntegerType, StringType, StructField, StructType

names_df = spark \
    .read \
    .format("csv") \
    .option("path", "names.csv") \
    .option("header", True) \
    .schema(StructType([
                StructField("id", IntegerType()),
                StructField("name", StringType()),
                StructField("currency", StringType())])) \
    .load()

names_df.printSchema()
names_df.show()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- currency: string (nullable = true)

+---+----+--------+
| id|name|currency|
+---+----+--------+
|  1|John|     CRC|
|  2|Jane|     EUR|
+---+----+--------+



Una vez que la información fue cargada, la fuente particular no es relevante. A continuación se muestra como podemos enriquerecer la información utilizando la funcion **JOIN** entre *data frames*.

In [26]:
joint_df = stats_df.join(names_df, stats_df.customer_id == names_df.id)
joint_df.printSchema()
joint_df.show()

root
 |-- customer_id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- amount: long (nullable = true)
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- currency: string (nullable = true)

+-----------+----------+------+---+----+--------+
|customer_id|      date|amount| id|name|currency|
+-----------+----------+------+---+----+--------+
|          2|2017-03-01|  1000|  2|Jane|     EUR|
|          1|2017-03-02|    96|  1|John|     CRC|
|          1|2017-03-03|   128|  1|John|     CRC|
|          2|2020-01-13| 96516|  2|Jane|     EUR|
|          1|2020-01-13| 86628|  1|John|     CRC|
|          1|2017-03-01|   180|  1|John|     CRC|
|          2|2017-03-03|    55|  2|Jane|     EUR|
+-----------+----------+------+---+----+--------+



# Ejercicio
- Cree un pequeño generador de transacciones. Para ello, puede utilizar funciones de numpy o pandas, para crear las transacciones nuevas en memoria y, posteriormente, puede cargarlas a un *Spark Dataframe* que después debe insertar en la base de datos.
- Con los datos nuevos, ejecute el código de éste notebook.

# Configuración
Para ejecutar el código de ejemplo en este notebook es necesario instalar Jupyter localmente. con soporte de Python. También es necesario instalar una serie de módulos utilizando el comando `pip`:

```bash
pip3 install pyspark
pip3 install findspark
```

Referencias adicionales: [Spark](https://spark.apache.org/) y [PostgreSQL](https://www.postgresql.org/).

# Referencias
- Schutt, R; O'Neill C. Doing Data Science - Straight Talk from the Frontline. O'Reilly Media. 2013 (Capítulo 14)
- Dean, J; Ghemawat, S. MapReduce: Simplified Data Processing on Large Clusters. https://static.googleusercontent.com/media/research.google.com/en//archive/mapreduce-osdi04.pdf