# DataScience - Modulo4 - BigData

# Proyecto: Machine Learning con datos en PostgreSQL

# Integrantes: Kevin Fallas Cascante y Paulino Jose Moreno Solano

## Etapa 2 - Análisis de ML en Big Data, uso de Spark y otros.

## Enunciado:

Se cuanta con un conjunto de datos de la Empresa BIMBO de al rededor de 6 millones de registros de ventas, de los cuales se van a tomar alrededor de 1 millón de estos datos para crear un modelo que nos permita agrupar estas ventas en cuanto a la ruta, producto y cliente, de manera que se pueda tener este conjunto de datos divido en varios segmentos para identificar algunos segmentos de mercado, ya que se quieren obtener los mejores clientes en cuanto a las ventas.

La idea es obtener los datos desde PostGreSQL y procesarlos mediante Spark a través de Python.

### Parte 1 - Cargar el Set de Datos.

#### 1.1 - Cargar de las librerias de PySpark.

In [1]:
# Importar librerias.
import findspark
findspark.init('C:\Spark\spark-2.4.4-bin-hadoop2.7')

from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, date_format, udf 
from pyspark.sql.types import DateType

from pyspark.sql.functions import to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

#### 1.2 - Configurar la sesión de Spark.

In [2]:
# Crear sesión de Spark
spark = SparkSession \
    .builder \
    .appName("pyspark_bimbo_ML") \
    .config("spark.driver.extraClassPath", "postgresql-42.1.4.jar") \
    .config("spark.executor.extraClassPath", "postgresql-42.1.4.jar") \
    .master("local[*]") \
    .getOrCreate()

#### 1.3- Guargar los datos en una estructura básica para ser usados en la conexión con PostgreSQL.

In [3]:
# Registro de datos de credenciales
properties = {
    'driver': 'org.postgresql.Driver',
    'url': 'jdbc:postgresql://localhost/postgres',
    'user': 'postgres',
    'password': 'paumorso',
    'dbtable': 'bimbo',
}

#### 1.4 - Conexión y lectura de la tabla desde PostgreSQL.

In [4]:
# Configurar conexión para lectura de tabla en PostgreSQL.
df = spark.read \
    .format('jdbc') \
    .option('driver', properties['driver']) \
    .option('url', properties['url']) \
    .option('user', properties['user']) \
    .option('password', properties['password']) \
    .option('dbtable', properties['dbtable']) \
    .load()

# Prueba de carga de datos.
df.show()

+------+------+----------+--------+--------+----------+-----------+----------+--------------------+--------------------+--------------------+-----------------+--------------------+----------------+
|    id|Semana|Agencia_ID|Canal_ID|Ruta_SAK|Cliente_ID|Producto_ID|     Fecha|      NombreProducto|              Ciudad|              Estado|    NombreCliente|         Ruta_Ciudad|     Ruta_Estado|
+------+------+----------+--------+--------+----------+-----------+----------+--------------------+--------------------+--------------------+-----------------+--------------------+----------------+
|499765|    10|      1637|       1|    1247|    333336|       4280|2019-03-10|Doraditas 110g TR...|  2367 SAN MARTIN T.|              PUEBLA|          MARIBEL|                 NaN|             NaN|
|499766|    11|     23719|       1|    1277|    938151|       2233|2019-03-17|Pan Blanco 640g B...|  2371 TIERRA BLANCA|            VERACRUZ|    TRES HERMANOS|2089 AG. AZCAPOTZ...|    MÉXICO, D.F.|
|499767|  

In [5]:
#read the dataset
#df=spark.read.csv('BimboDataSet.csv',inferSchema=True,header=True)#.limit(10000)

### Parte 2 - Llevar a cabo un analisis exploratorio de los datos.

#### 2.1 - Validar el tamaño en cuanto a filas y columnas del conjunto de datos.

In [7]:
#check the shape of the data 
print((df.count(),len(df.columns)))

(1008127, 14)


#### 2.2 - Explorar la estrutura del conjunto de datos.

In [8]:
#printSchema
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- Semana: integer (nullable = true)
 |-- Agencia_ID: integer (nullable = true)
 |-- Canal_ID: integer (nullable = true)
 |-- Ruta_SAK: integer (nullable = true)
 |-- Cliente_ID: integer (nullable = true)
 |-- Producto_ID: integer (nullable = true)
 |-- Fecha: string (nullable = true)
 |-- NombreProducto: string (nullable = true)
 |-- Ciudad: string (nullable = true)
 |-- Estado: string (nullable = true)
 |-- NombreCliente: string (nullable = true)
 |-- Ruta_Ciudad: string (nullable = true)
 |-- Ruta_Estado: string (nullable = true)



#### 2.3 - Obtener el nombre de las columnas del conjunto de datos.

In [9]:
#number of columns in dataset
df.columns

['id',
 'Semana',
 'Agencia_ID',
 'Canal_ID',
 'Ruta_SAK',
 'Cliente_ID',
 'Producto_ID',
 'Fecha',
 'NombreProducto',
 'Ciudad',
 'Estado',
 'NombreCliente',
 'Ruta_Ciudad',
 'Ruta_Estado']

#### 2.4 - Ver el conjunto de datos en forma de Spark DataFrame.

In [10]:
#view the data
df.show()

+------+------+----------+--------+--------+----------+-----------+----------+--------------------+--------------------+--------------------+-----------------+--------------------+----------------+
|    id|Semana|Agencia_ID|Canal_ID|Ruta_SAK|Cliente_ID|Producto_ID|     Fecha|      NombreProducto|              Ciudad|              Estado|    NombreCliente|         Ruta_Ciudad|     Ruta_Estado|
+------+------+----------+--------+--------+----------+-----------+----------+--------------------+--------------------+--------------------+-----------------+--------------------+----------------+
|499765|    10|      1637|       1|    1247|    333336|       4280|2019-03-10|Doraditas 110g TR...|  2367 SAN MARTIN T.|              PUEBLA|          MARIBEL|                 NaN|             NaN|
|499766|    11|     23719|       1|    1277|    938151|       2233|2019-03-17|Pan Blanco 640g B...|  2371 TIERRA BLANCA|            VERACRUZ|    TRES HERMANOS|2089 AG. AZCAPOTZ...|    MÉXICO, D.F.|
|499767|  

#### 2.5 - Hacer la misma visualización anterior, pero esta vez en forma de Pandas DataFrame:

In [11]:
df.limit(5).toPandas().head()

Unnamed: 0,id,Semana,Agencia_ID,Canal_ID,Ruta_SAK,Cliente_ID,Producto_ID,Fecha,NombreProducto,Ciudad,Estado,NombreCliente,Ruta_Ciudad,Ruta_Estado
0,499765,10,1637,1,1247,333336,4280,2019-03-10,Doraditas 110g TR 4280,2367 SAN MARTIN T.,PUEBLA,MARIBEL,,
1,499766,11,23719,1,1277,938151,2233,2019-03-17,Pan Blanco 640g BIM 2233,2371 TIERRA BLANCA,VERACRUZ,TRES HERMANOS,2089 AG. AZCAPOTZALCO INSTITUCIONALES,"MÉXICO, D.F."
2,499767,10,22090,1,4402,883099,37569,2019-03-10,Deliciosas Chochochispas 204gPromMTA LAR 37569,2090 AG. TEPEJI DEL RIO,QUERETARO,ROSY,,
3,499768,11,1220,1,1418,1223241,1064,2019-03-17,Panque Marmol 255g BIM 1064,2048 AG. IXTAPALUCA 1,ESTADO DE MÉXICO,SAN JUDITAS,,
4,499769,10,1555,1,2036,2489901,43069,2019-03-10,Pinguinos 2p 80g MTA MLA 43069,2562 MEXICALI PLAZA,BAJA CALIFORNIA NORTE,PUESTO CARMEN,2179 URUAPAN,MICHOACÁN


#### 2.6 - Ejecutar un análisis estadístico de los datos.

In [12]:
#Exploratory Data Analysis
df.describe().show()

+-------+-----------------+-------------------+-----------------+------------------+------------------+------------------+------------------+----------+--------------------+-----------------+--------------+-----------------+-----------------+--------------------+
|summary|               id|             Semana|       Agencia_ID|          Canal_ID|          Ruta_SAK|        Cliente_ID|       Producto_ID|     Fecha|      NombreProducto|           Ciudad|        Estado|    NombreCliente|      Ruta_Ciudad|         Ruta_Estado|
+-------+-----------------+-------------------+-----------------+------------------+------------------+------------------+------------------+----------+--------------------+-----------------+--------------+-----------------+-----------------+--------------------+
|  count|          1008127|            1008127|          1008127|           1008127|           1008127|           1008127|           1008127|   1008127|             1008127|          1008127|       1008127|  

### Parte 3 - Llevar a cabo la transforamción de los datos.

#### 3.1 - Importar "VectorAssembler" para crear un único vector de features y clase.

In [None]:
from pyspark.ml.feature import VectorAssembler

#### 3.2 - Vaidar de nuevo el conjunto de datos.

In [14]:
df.columns

['id',
 'Semana',
 'Agencia_ID',
 'Canal_ID',
 'Ruta_SAK',
 'Cliente_ID',
 'Producto_ID',
 'Fecha',
 'NombreProducto',
 'Ciudad',
 'Estado',
 'NombreCliente',
 'Ruta_Ciudad',
 'Ruta_Estado']

#### 3.3 - Declarar las columnas que vamos a usar en el vector de features y clase.

In [15]:
features = ( 'id',
             'Semana',
             'Agencia_ID',
             'Canal_ID',
             'Ruta_SAK',
             'Cliente_ID',
             'Producto_ID'
           ) 

#### 3.4 - Construir el vector de features y clase, para ser usado por nuestro modelo de entrenamiento.

In [16]:
assembler = VectorAssembler(inputCols=features,outputCol="features")

#### 3.5 - Revisar el conjunto de datos en el vector.

In [18]:
dataset=assembler.transform(df)
dataset.printSchema()

root
 |-- id: integer (nullable = true)
 |-- Semana: integer (nullable = true)
 |-- Agencia_ID: integer (nullable = true)
 |-- Canal_ID: integer (nullable = true)
 |-- Ruta_SAK: integer (nullable = true)
 |-- Cliente_ID: integer (nullable = true)
 |-- Producto_ID: integer (nullable = true)
 |-- Fecha: string (nullable = true)
 |-- NombreProducto: string (nullable = true)
 |-- Ciudad: string (nullable = true)
 |-- Estado: string (nullable = true)
 |-- NombreCliente: string (nullable = true)
 |-- Ruta_Ciudad: string (nullable = true)
 |-- Ruta_Estado: string (nullable = true)
 |-- features: vector (nullable = true)



### Parte 4 - Contruir el modelo de aprendizaje automático.

#### 4.1 - Importar las librerías para el Modelo de KMeans.

In [19]:
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.clustering import KMeans

#### 4.2 - Entrenar el modelo con los Hyper-Parámetros de SetK y SetSeed.

In [20]:
# Trains a k-means model.
kmeans = KMeans().setK(15).setSeed(250)
model = kmeans.fit(dataset)

#### 4.3 - Hacer las predicciones.

In [21]:
# Make predictions
predictions = model.transform(dataset)

#### 4.4 - Evaluar el modelo ya ejecutado.

In [22]:
# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

#### 4.5 - Validar la distancia euclideana.

In [23]:
silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

Silhouette with squared euclidean distance = 0.5737130612965955


#### 4.6 - Validar el costo de computo, basado en los errores.

In [24]:
# Evaluate clustering.
cost = model.computeCost(dataset)
print("Within Set Sum of Squared Errors = " + str(cost))

Within Set Sum of Squared Errors = 5.868214260504297e+16


#### 4.7 - Obtener los centroides.

In [25]:
# Shows the result.
print("Cluster Centers: ")
ctr=[]
centers = model.clusterCenters()
for center in centers:
    ctr.append(center)
    print(center)

Cluster Centers: 
[2.39903381e+05 1.04946633e+01 2.45139936e+03 1.34064621e+00
 2.08034585e+03 8.94117003e+05 2.19312758e+04]
[4.50829000e+05 1.00000000e+01 2.23620000e+04 1.00000000e+00
 9.00000000e+02 2.01515202e+09 1.19000000e+03]
[2.50064521e+05 1.04947894e+01 2.59262516e+03 1.55519184e+00
 2.31548042e+03 4.43955665e+06 2.22736343e+04]
[2.51038268e+05 1.04975850e+01 2.73197865e+03 1.46319552e+00
 2.27206936e+03 7.94577430e+06 2.36616835e+04]
[7.66979182e+05 1.04950189e+01 2.56320062e+03 1.58594805e+00
 2.30839166e+03 2.15906936e+06 2.21924374e+04]
[4.95318606e+05 1.04711929e+01 2.44563159e+03 1.74033000e+00
 2.62444874e+03 9.60288860e+06 2.26312810e+04]
[7.49968288e+05 1.04921040e+01 2.57294927e+03 1.56069028e+00
 2.30998907e+03 4.43677745e+06 2.21840413e+04]
[7.53672911e+05 1.04952510e+01 2.47987500e+03 1.42879095e+00
 2.15943867e+03 1.29063257e+06 2.21036751e+04]
[7.18024170e+05 1.04916916e+01 2.47010054e+03 1.21509564e+00
 1.94217316e+03 6.18927733e+05 2.17964960e+04]
[2.4134267

### Parte 5 - Visualizar los valores obtenidos.

#### 5.1 - Importamos la librería de Pandas.

In [26]:
import pandas as pd

#### 5.2 - Guardar los datos del modelo.

In [27]:
centers = pd.DataFrame(ctr,columns=features)

#### 5.3 - Ver los centroides.

In [28]:
centers

Unnamed: 0,id,Semana,Agencia_ID,Canal_ID,Ruta_SAK,Cliente_ID,Producto_ID
0,239903.381143,10.494663,2451.399362,1.340646,2080.34585,894117.0,21931.27576
1,450829.0,10.0,22362.0,1.0,900.0,2015152000.0,1190.0
2,250064.520504,10.494789,2592.625157,1.555192,2315.480419,4439557.0,22273.63428
3,251038.267871,10.497585,2731.978651,1.463196,2272.069359,7945774.0,23661.683539
4,766979.181975,10.495019,2563.200619,1.585948,2308.39166,2159069.0,22192.437433
5,495318.606167,10.471193,2445.631593,1.74033,2624.448742,9602889.0,22631.281039
6,749968.287519,10.492104,2572.949274,1.56069,2309.989066,4436777.0,22184.041333
7,753672.910954,10.495251,2479.875002,1.428791,2159.438673,1290633.0,22103.675052
8,718024.169729,10.491692,2470.100537,1.215096,1942.173162,618927.7,21796.496004
9,241342.670043,10.495453,2449.141875,1.24323,1970.43532,235613.1,22151.697382


#### 5.4 - Ver la cantidad de registros por cada unas de las predicciones.

In [29]:
predictions.groupBy('prediction').count().show()

+----------+------+
|prediction| count|
+----------+------+
|        12|  3499|
|         1|     2|
|        13| 10557|
|         6| 92835|
|         3| 10352|
|         5|  3697|
|         9|165389|
|         4| 85845|
|         8| 92798|
|         7| 75596|
|        10| 77114|
|        11|141665|
|        14| 72609|
|         2| 93172|
|         0| 82997|
+----------+------+



#### 5.5 - Ver las predicciones como un conjunto de datos.

In [30]:
predictions.show()

+------+------+----------+--------+--------+----------+-----------+----------+--------------------+--------------------+--------------------+-----------------+--------------------+----------------+--------------------+----------+
|    id|Semana|Agencia_ID|Canal_ID|Ruta_SAK|Cliente_ID|Producto_ID|     Fecha|      NombreProducto|              Ciudad|              Estado|    NombreCliente|         Ruta_Ciudad|     Ruta_Estado|            features|prediction|
+------+------+----------+--------+--------+----------+-----------+----------+--------------------+--------------------+--------------------+-----------------+--------------------+----------------+--------------------+----------+
|499765|    10|      1637|       1|    1247|    333336|       4280|2019-03-10|Doraditas 110g TR...|  2367 SAN MARTIN T.|              PUEBLA|          MARIBEL|                 NaN|             NaN|[499765.0,10.0,16...|         9|
|499766|    11|     23719|       1|    1277|    938151|       2233|2019-03-17|Pa

### Conclusiones

Para hacer este modelo, tomamos como base directamente de la documentación de Spark ML ([https://www.bmc.com/blogs/python-spark-k-means-example/](https://www.bmc.com/blogs/python-spark-k-means-example/)) con algunas modificaciones, para poder obtener los datos de PostGreSQL y los Hyper Parámetros necesarios.

De la documentación que anteriormente se hizo mensión, se pueden tomar varios puntos:

1. KMeans (). SetK (2) .setSeed (1) ⁠: el número 2 es el número de grupos en los que se dividen los datos. Vemos que cualquier número mayor que 2 hace que este valor ClusteringEvaluator () caiga por debajo de 0.5, lo que significa que no es una división clara. Otra forma de verificar el número óptimo de grupos sería trazar una curva de codo (elbow curve).

2. predictions = model.transform (conjunto de datos): esto agregará la columna de predicción al marco de datos, para que podamos mostrar qué pacientes califican para cada categoría.

El uso de los centros de clúster para cada división, así como la suma de los errores al cuadrado: una manera de explicar cómo funciona es que se calcula la distancia de cada punto de datos desde su suposición hasta el centro del grupo, ajusta las suposiciones y luego repite hasta que el número alcanza su mínimo. La distancia de cada punto desde este punto central se eleva al cuadrado para que la distancia sea siempre positiva.

El objetivo es tener el menor número posible, la distancia más corta entre todos los puntos de datos, por lo que un resualtado de 0.5737 está dentro de lo aceptado usando como una base de 15 posible agrupaciones.