# <center> Análisis exploratorio de datos </center>

In [None]:
!apt-get update
!apt-get install openjdk-11-jdk -y

## 1. Librerías

In [None]:
import warnings
import numpy as np
import pandas as pd
import seaborn as sns
from google.colab import drive
import matplotlib.pyplot as plt
import plotly.graph_objects as go
from IPython.display import display
from pyspark.sql import SparkSession
from mlxtend.frequent_patterns import apriori, association_rules
from pyspark.sql.functions import col, sum as spark_sum, countDistinct, avg

warnings.filterwarnings("ignore")

## 2. Análisis información cruda

### 2.1. Lectura de datos

In [None]:
drive.mount('/content/drive')
ruta_base = "/content/drive/MyDrive/Dataset PI/trusted/"

In [None]:
spark = SparkSession.builder \
    .appName("LecturaCSV") \
    .getOrCreate()

In [None]:
csv_file_path = ruta_base + "trusted_data.csv"
spark_df = spark.read.csv(csv_file_path, header = True, inferSchema = True)

spark_df.show(5)

### 2.2. Información general

#### 2.2.1. Esquema

In [None]:
spark_df.printSchema()

#### 2.2.2. Número de registros

In [None]:
print(f"Número total de registros: {spark_df.count()}")

#### 2.2.3. Valores nulos por columna

In [None]:
# Contar valores nulos por columna
spark_df.select([spark_sum(col(c).isNull().cast("integer")).alias(c) for c in spark_df.columns]).show()

#### 2.2.4. Registros por ramo

In [None]:
spark_df.groupBy('Ramo').count().show()

#### 2.2.5. Valores únicos

In [None]:
df_refined = spark_df.toPandas()

df_result = (
    df_refined
    .groupby("Ramo")["codPoliza"]
    .nunique()
    .reset_index()
)

df_result.columns = ["Ramo", "Conteo Polizas"]
df_result["porcentaje"] = df_result["Conteo Polizas"] / df_result["Conteo Polizas"].sum()


In [None]:
df_refined.columns
df_refined['codCliente'].nunique()

#### 2.2.6. Registros por grupo empresarial

In [None]:
spark_df.groupBy('nomGrupoEmpresarial').count().show()

### 2.3. Estadísticos descriptivos

In [None]:
# Estadísticas descriptivas para columnas numéricas
spark_df.select([col for col, dtype in spark_df.dtypes if dtype in ['int', 'double', 'float', 'long']]).describe().show()

### 2.4. Distribución de pólizas por ramo

In [None]:
# Ordenar de mayor a menor
df_plot = df_result.sort_values("Conteo Polizas", ascending=False)

# Valores absolutos y relativos
counts_abs = df_plot["Conteo Polizas"].values
counts_pct = df_plot["porcentaje"].values
labels = df_plot["Ramo"].values

plt.figure(figsize=(9, 5))

# Misma paleta y estilo del ejemplo
sns.barplot(
    x=counts_abs,
    y=labels,
    palette="Blues_r"
)

# Etiquetas: conteo + porcentaje
for i, (abs_val, pct_val) in enumerate(zip(counts_abs, counts_pct)):
    plt.text(
        abs_val + counts_abs.max() * 0.01,
        i,
        f"{abs_val:,}  ({pct_val*100:.1f}%)",
        va='center',
        fontsize=10,
        color="#333333"
    )

plt.title(
    "Distribución de pólizas por Ramo",
    fontsize=15,
    fontweight='bold'
)

# Eliminar eje X
plt.xlabel("")
plt.xticks([])

# Estética
sns.despine(left=True, bottom=True)
plt.ylabel("")
plt.tight_layout()

plt.show()
plt.show()

### 2.5. Gráficos de dispersión de columnas numéricas

In [None]:
numeric_cols = [col for col, dtype in spark_df.dtypes if dtype in ['int', 'double', 'float', 'long']]
sample_spark_df = spark_df.select(numeric_cols).limit(1000).toPandas()

sns.pairplot(sample_spark_df)
plt.suptitle('Pair Plot of Numerical Columns (Sampled Data)', y=1.02)
plt.show()

### 2.6. Promedio de registros por cliente, registros por póliza y pólizas por cliente

In [None]:
# Promedio de registros por cliente
avg_records_per_client_spark = spark_df.groupBy('codCliente').count().select(avg('count')).first()[0]
print(f"Promedio de registros por cliente (PySpark): {avg_records_per_client_spark:.2f}")

# Promedio de registros por póliza
avg_records_per_poliza_spark = spark_df.groupBy('codPoliza').count().select(avg('count')).first()[0]
print(f"Promedio de registros por póliza (PySpark): {avg_records_per_poliza_spark:.2f}")

# Promedio de pólizas por cliente
avg_policies_per_client_spark = spark_df.groupBy('codCliente').agg(countDistinct('codPoliza').alias('policy_count')).select(avg('policy_count')).first()[0]
print(f"Promedio de pólizas por cliente (PySpark): {avg_policies_per_client_spark:.2f}")

## 3. Análisis información refinada

### 3.1. Lectura de datos

In [None]:
refined = '/content/drive/MyDrive/Dataset PI/refined/refined_data.csv'
spark_df = spark.read.csv(refined, header=True, inferSchema=True)
spark_df = spark_df.drop("Edad")
spark_df.printSchema()
spark_df.show(5)

### 3.2. Ramos más frecuentes en la siguiente compra

In [None]:
spark_df.groupBy('Y').count().orderBy('count', ascending=False).show()

### 3.3. Ramos más comunes de tener antes de realizar la siguiente compra

In [None]:
for col in ['Vida','Autos','Salud','Cumplimiento','Patrimoniales','Otros']:
    spark_df.groupBy(col).count().show()

### 3.4. Flujo de transición entre ramos

In [None]:
df_pandas = spark_df.select('Ramo','Vida','Autos','Salud','Cumplimiento','Patrimoniales','Otros','Y').toPandas()

In [None]:
flows = df_pandas.groupby(['Ramo', 'Y']).size().reset_index(name='count')
labels = list(pd.concat([flows['Ramo'], flows['Y']]).unique())
flows['source'] = flows['Ramo'].apply(lambda x: labels.index(x))
flows['target'] = flows['Y'].apply(lambda x: labels.index(x))

#Sankey
fig = go.Figure(data=[go.Sankey(
    node = dict(
        pad = 20,
        thickness = 20,
        line = dict(color="black", width=0.5),
        label = labels,
        color = "skyblue"
    ),
    link = dict(
        source = flows['source'],
        target = flows['target'],
        value = flows['count'],
        color = "rgba(0,100,255,0.4)"
    )
)])

fig.update_layout(title_text="Flujos de transición entre ramos (Ramo previo → Y)", font_size=12)
fig.show()

Cada banda muestra la magnitud de clientes que pasan de un ramo previo a un ramo adquirido.
Cuanto más gruesa la banda, más frecuente es esa transición.

### 3.5. Distribución de la siguiente póliza adquirida

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

# Distribución absoluta y relativa
counts_abs = df_pandas['Y'].value_counts().sort_values(ascending=False)
counts_pct = (counts_abs / counts_abs.sum()).sort_values(ascending=False)

plt.figure(figsize=(9, 5))
sns.barplot(x=counts_abs.values, y=counts_abs.index, palette="Blues_r")

# Agregar etiquetas: cantidad + porcentaje
for i, (abs_val, pct_val) in enumerate(zip(counts_abs.values, counts_pct.values)):
    plt.text(abs_val + counts_abs.max()*0.01, i,
             f"{abs_val:,}  ({pct_val*100:.1f}%)",
             va='center', fontsize=10, color="#333333")

plt.title("Distribución de la siguiente póliza adquirida (Y)", fontsize=15, fontweight='bold')

# Eliminar eje X
plt.xlabel("")
plt.xticks([])

# Estética
sns.despine(left=True, bottom=True)
plt.ylabel("")
plt.tight_layout()
plt.show()

### 3.5. Probabilidad de adquirir siguiente póliza a partir de ramo previo

In [None]:
pivot = df_pandas.groupby(['Ramo', 'Y']).size().unstack(fill_value=0)
pivot_prop = pivot.div(pivot.sum(axis=1), axis=0)

ramos_previos = pivot_prop.index.tolist()
ramos_y = pivot_prop.columns.tolist()

palette = sns.color_palette("Blues", n_colors=len(ramos_y) + 3)
palette = [palette[i] for i in range(2, len(palette))]
color_map = dict(zip(ramos_y, palette))

n_ramos = len(ramos_previos)
fig, axes = plt.subplots(nrows=2, ncols=3, figsize=(15, 8))
axes = axes.flatten()

for i, ramo in enumerate(ramos_previos):
    data = pivot_prop.loc[ramo].copy()

    if ramo in data.index:
        data = data.drop(ramo)

    data = data.sort_values(ascending=True)

    ax = axes[i]
    colors = [color_map[y] for y in data.index]

    data.plot(kind='barh', ax=ax, color=colors, edgecolor='black')

    ax.set_title(f"Clientes con póliza de {ramo}", fontsize=13, fontweight='bold', color='#1a1a1a')
    ax.set_ylabel("")

    ax.set_xlabel("")
    ax.set_xticks([])

    for idx, val in enumerate(data):
        if val > 0:
            ax.text(val + 0.01, idx, f"{val*100:.1f}%", va='center', fontsize=9, color='#333333')

    sns.despine(left=True, bottom=True)
    ax.grid(axis='x', linestyle='--', alpha=0.0)

plt.suptitle("Probabilidad de adquirir siguiente póliza (Y) según ramo previo", fontsize=15, fontweight='bold')
plt.tight_layout(rect=[0, 0, 1, 0.96])
plt.show()

### 3.6. Reglas de asociación entre ramos

In [None]:
df_pandas_aux = spark_df.select('Vida','Autos','Salud','Cumplimiento','Patrimoniales','Otros','Y').toPandas()

In [None]:
freq_items = apriori(df_pandas_aux[['Vida','Autos','Salud','Cumplimiento','Patrimoniales','Otros']], min_support=0.0001, use_colnames=True)
rules = association_rules(freq_items, metric="lift", min_threshold=1.0)

display(rules[['antecedents','consequents','support','confidence','lift']].sort_values(by='lift', ascending=False).head(100))

La tabla presenta las principales reglas de asociación identificadas entre los diferentes ramos de pólizas.

Antecedents: muestran los ramos que el cliente posee antes de la compra.

Consequents: indican el ramo adicional que tiende a asociarse o adquirirse junto a los anteriores.

Support: representa la proporción de clientes en los que aparece simultáneamente la combinación completa de ramos (antecedente y consecuente). Valores bajos reflejan combinaciones poco frecuentes.

Confidence: indica la probabilidad condicional de que un cliente adquiera las pólizas del consecuente dado que ya posee las del antecedente. Valores altos señalan asociaciones más confiables.

Lift: mide la fuerza relativa de la relación. Un valor mayor a 1 indica que la coexistencia entre los ramos ocurre con mayor frecuencia de la esperada por azar. Cuanto mayor es el lift, más significativa es la asociación.

En conjunto, la tabla permite identificar combinaciones de productos que tienden a coexistir o a preceder nuevas compras, información útil para estrategias de cross-selling, segmentación de clientes o análisis de comportamiento comercial.

### 3.7. Matriz de correlación de Pearson

In [None]:
binarias = ["Vida", "Autos", "Salud", "Cumpl.", "Patrim.", "Otros"]

corr = df_pandas[binarias].corr()

mask_sup = np.triu(np.ones_like(corr, dtype=bool))

valid_rows = ~(mask_sup.all(axis=1))
valid_cols = ~(mask_sup.all(axis=0))

corr_trimmed = corr.loc[valid_rows, valid_cols]
mask_trimmed = mask_sup[valid_rows][:, valid_cols]

plt.figure(figsize=(8,6))
sns.heatmap(
    corr_trimmed,
    mask=mask_trimmed,
    annot=True,
    cmap="vlag",
    center=0,
    linewidths=0.5,
    square=True,
    vmin=-0.4, vmax=0.4,
    cbar=False
)

plt.title("Correlación de Pearson entre ramos", fontsize=14, fontweight="bold")
plt.tight_layout()
plt.show()

### 3.8. Distribución de la edad