In [1]:
import pandas as pd
import numpy as np
import math
import neo4j
from neo4j import GraphDatabase, basic_auth
from graphdatascience import GraphDataScience

## Integración de Datos
### Del Modelo Relacional al Modelo de Grafos

## 1. Introducción
Las bases de datos orientadas a grafos nos ayudan a aprovechar las relaciones que existen entre nuestros datos pero que están ocultas o atrapadas en las tablas del mundo relacional.

El objetivo de este ejercicio es tomar un modelo relacional, donde se almacena información de ventas, y transformarlo en un modelo de grafo. Una vez hecho el grafo vamos a aprovechar la estructura para hacer consultas que en t-sql serían más complejas o más lentas.

## 2. Modelo Relacional
El modelo relacional consiste de 5 tablas: Productos, Cuentas, Equipo de venta, Tráfico web (clicks) y pipeline de ventas. 

Este modelo nos ayuda a consultar las oportunidades que existen en el pipeline, la cuenta a la que pertenece la oportunidad, el representante de ventas que la atiende y los productos que incluye.
Además podemos consultar la jerarquía de los equipos de venta y el tráfico web que se ha generado relacionado a cada producto.

![image.png](attachment:image.png)

## 3. Modelo de Grafo
Para transformar nuestro modelo Entidad-Relación en un modelo de grafo vamos a generar nodos, relaciones y propiedades. Generalmente vamos a crear nodos con los sustantivos y relaciones con los verbos. Las propiedades las vamos a asignar a los nodos o a las propiedades, dependiendo de cómo queramos hacer nuestras consultas de cypher.

![image.png](attachment:image.png)

## 4. Importar Data
Para importar vamos a extraer la data de cada una de las tablas a un archivo CSV y los vamos a cargar a un dataframe

In [3]:
df_accounts = pd.read_csv('C:\\Users\\pa0lo\\Repos\\CRM\\accounts.csv')
df_clicks = pd.read_csv('C:\\Users\\pa0lo\\Repos\\CRM\\clicks.csv')
df_products = pd.read_csv('C:\\Users\\pa0lo\\Repos\\CRM\\products.csv')
df_pipeline = pd.read_csv('C:\\Users\\pa0lo\\Repos\\CRM\\sales_pipeline.csv')
df_teams = pd.read_csv('C:\\Users\\pa0lo\\Repos\\CRM\\sales_teams.csv')

### Perfil de los datos

In [9]:
print('Accounts: ' + str(len(df_accounts)))
print('Clicks: ' + str(len(df_clicks)))
print('Products: ' + str(len(df_products)))
print('Pipeline: ' + str(len(df_pipeline)))
print('Teams: ' + str(len(df_teams)))

Accounts: 97
Clicks: 30244
Products: 8
Pipeline: 14277
Teams: 30


### 5. Conexión a neo4j

In [2]:
url = 'bolt://localhost:7687'
username= 'neo4j'
pwd = '453192'
NEO4J_AUTH = (username, pwd)

In [4]:
_driver = GraphDatabase.driver(url, auth=basic_auth(username, pwd))
gds = GraphDataScience(url, auth=NEO4J_AUTH)
gds.version()

'2.1.1'

### 6. Crear el Modelo
Antes de cargar datos es recomendable crear los índices y restricciones que vamos a usar. En este caso vamos a usar restricciones (constraints) para asegurarnos que no se creen nodos duplicados. Vamos a crear una restricción en cada tipo de nodo.

In [None]:
constraint1 = """CREATE CONSTRAINT produdct_name ON (p:Product) ASSERT p.name IS UNIQUE"""
constraint2 = """CREATE CONSTRAINT agent_name ON (a:Agent) ASSERT a.name IS UNIQUE"""
constraint3 = """CREATE CONSTRAINT manager_name ON (m:Manager) ASSERT m.name IS UNIQUE"""
constraint4 = """CREATE CONSTRAINT account_name ON (a:Account) ASSERT a.name IS UNIQUE"""
constraint5 = """CREATE CONSTRAINT click_source ON (c:Click) ASSERT c.source IS UNIQUE"""
constraint6 = """CREATE CONSTRAINT deal_id ON (d:Deal) ASSERT d.opp_id IS UNIQUE"""

with _driver.session() as session:
        #session.run(constraint1)
        #session.run(constraint2)
        #session.run(constraint3)
        #session.run(constraint4)
        #session.run(constraint5)
        #session.run(constraint6)
session.close()

### 7. Crear Nodos
Es buena práctica crear primero los nodos y después las relaciones. A continuación vamos a cargar cada tipo de nodo con sus propiedades

### 7.1 Accounts

In [15]:
df_accounts.head()

Unnamed: 0,account,revenue,employees
0,Sunnamplex,4592.96,13938.0
1,Silis,5339.57,18053.0
2,Groovestreet,2728.86,6486.0
3,Donware,2009.52,3409.0
4,Wonka Industries,4962.27,4687.0


In [44]:
params = []

statement = """
UNWIND $parameters as row
WITH row
MERGE (a:Account {name: row.Account})
ON CREATE SET
a.revenue = toFloat(row.Name),
a.employees = toInteger(row.Employees)
"""


with _driver.session() as session:
    for index, row in df_accounts.iterrows():
        params_dict = {
                'Account': row['account'],
                'Revenue': row['revenue'],
                'Employees': row['employees']
        }
        params.append(params_dict)
        if index % 20 == 0 and index > 0:
            session.run(statement, parameters = {"parameters": params})
            params = []
        session.run(statement, parameters = {"parameters": params})
    session.close()

### 7.2 Clicks
Para los clicks vamos a generar una variable nueva, total_clicks, para usarla en nuestro modelo.

In [72]:
df_clicks['total_clicks'] = df_clicks.groupby(['source', 'Product'])['Product'].transform('count')

In [73]:
df_clicks.head()

Unnamed: 0,created_on,source,industry,Product,total_clicks
0,11/14/2016,Referral,IT,GTXPlusPro,714
1,11/14/2016,Social,IT,MGRPFU,503
2,11/14/2016,Paid,SaaS,GTK500U,857
3,11/14/2016,Direct,SaaS,MGRPFU,574
4,11/15/2016,Organic,Health Care,GTXPlusPro,588


In [47]:
params = []

statement = """
UNWIND $parameters as row
WITH row
MERGE (c:Click {source: row.Source})
MERGE (i:Industry {name: row.Industry})
"""


with _driver.session() as session:
    for index, row in df_clicks.iterrows():
        params_dict = {
                'Source': row['source'],
                'Industry': row['industry']
        }
        params.append(params_dict)
        if index % 20 == 0 and index > 0:
            session.run(statement, parameters = {"parameters": params})
            params = []
        session.run(statement, parameters = {"parameters": params})
    session.close()

### 7.3 Products

In [36]:
df_products.head()

Unnamed: 0,product,sales_price
0,GTXAdvanced,649.0
1,GTXBasic,641.0
2,MGRPFU,3959.0
3,MGRPFS,64.0
4,GTXPlusBasic,1279.0


In [49]:
params = []

statement = """
UNWIND $parameters as row
WITH row
MERGE (p:Product {name: row.Name})
ON CREATE SET
p.price = toFloat(row.Price)
"""


with _driver.session() as session:
    for index, row in df_products.iterrows():
        params_dict = {
                'Name': row['product'],
                'Price': row['sales_price']
        }
        params.append(params_dict)
        if index % 20 == 0 and index > 0:
            session.run(statement, parameters = {"parameters": params})
            params = []
        session.run(statement, parameters = {"parameters": params})
    session.close()

### 7.4 Teams

In [37]:
df_teams.head()

Unnamed: 0,sales_agent,manager,regional_office
0,Donn Cantrell,Rocco Neubert,Central
1,James Ascencio,Summer Sewald,West
2,Vicki Laflamme,Celia Rouche,West
3,Niesha Huffines,Melvin Marxen,East
4,Kami Bicknell,Summer Sewald,West


In [50]:
params = []

statement = """
UNWIND $parameters as row
WITH row
MERGE (a:Agent {name: row.Agent})
MERGE (m:Manager {name: row.Manager})
MERGE (o:Office {name: row.Office})
"""


with _driver.session() as session:
    for index, row in df_teams.iterrows():
        params_dict = {
                'Agent': row['sales_agent'],
                'Manager': row['manager'],
                'Office': row['regional_office']
        }
        params.append(params_dict)
        if index % 20 == 0 and index > 0:
            session.run(statement, parameters = {"parameters": params})
            params = []
        session.run(statement, parameters = {"parameters": params})
    session.close()

### 7.5 Pipeline

In [6]:
df_pipeline.head()

Unnamed: 0,account,opportunity_id,sales_agent,deal_stage,product,close_date,close_value,created_on
0,Sunnamplex,67HY0MW7,Donn Cantrell,Won,GTXBasic,2017-05-06,500.0,2017-04-24
1,,MA82HVCI,James Ascencio,In_Progress,GTXPro,,,2017-06-15
2,,BRL1KVVH,Vicki Laflamme,Lost,GTXBasic,2017-08-03,0.0,2017-05-19
3,Silis,R22O68FF,Niesha Huffines,Won,GTXBasic,2017-06-27,524.0,2017-03-21
4,Silis,J78AK31N,Kami Bicknell,Won,MGRPFU,2017-08-04,4794.0,2017-05-15


In [7]:
df_pipeline['close_date'] = df_pipeline['close_date'].fillna('')
df_pipeline['close_value'] = df_pipeline['close_value'].fillna('')
df_pipeline['opportunity_id'] = df_pipeline['opportunity_id'].fillna('')

In [8]:
params = []

statement = """
UNWIND $parameters as row
WITH row WHERE row.Opp_Id <> ''
MERGE (d:Deal {opp_id: row.Opp_Id})
ON CREATE SET
d.close_value = CASE WHEN row.Close_Value = '' THEN '' ELSE toFloat(row.Close_Value) END,
d.created_on = CASE WHEN row.Created_On = '' THEN '' ELSE date(row.Created_On) END
"""


with _driver.session() as session:
    for index, row in df_pipeline.iterrows():
        params_dict = {
                'Opp_Id': row['opportunity_id'],
                'Close_Value': row['close_value'],
                'Created_On': row['created_on']
        }
        params.append(params_dict)
        if index % 200 == 0 and index > 0:
            session.run(statement, parameters = {"parameters": params})
            params = []
        session.run(statement, parameters = {"parameters": params})
    session.close()

### 8. Relaciones

### 8.1 Clicks

In [None]:
df_clicks.head()

In [74]:
params = []

statement = """
UNWIND $parameters as row
WITH row
MATCH (c:Click {source: row.Source})
MATCH (p:Product {name: row.Name})
MERGE (c)-[r:CLICKED_ON]->(p)
SET
r.total_clicks= row.Total
"""


with _driver.session() as session:
    for index, row in df_clicks.iterrows():
        params_dict = {
                'Source': row['source'],
                'Name': row['Product'],
                'Total': row['total_clicks']
        }
        params.append(params_dict)
        if index % 200 == 0 and index > 0:
            session.run(statement, parameters = {"parameters": params})
            params = []
        session.run(statement, parameters = {"parameters": params})
    session.close()

In [78]:
params = []

statement = """
UNWIND $parameters as row
WITH row
MATCH (c:Click {source: row.Source})
MATCH (i:Industry {name: row.Name})
MERGE (c)-[r:FROM]->(i)
"""


with _driver.session() as session:
    for index, row in df_clicks.iterrows():
        params_dict = {
                'Source': row['source'],
                'Name': row['industry']
        }
        params.append(params_dict)
        if index % 200 == 0 and index > 0:
            session.run(statement, parameters = {"parameters": params})
            params = []
        session.run(statement, parameters = {"parameters": params})
    session.close()

### 8.2 Equipo de Ventas

In [79]:
params = []

statement = """
UNWIND $parameters as row
WITH row
MATCH (a:Agent {name: row.Agent})
MATCH (m:Manager {name: row.Manager})
MERGE (a)-[:REPORTS_TO]->(m)
"""


with _driver.session() as session:
    for index, row in df_teams.iterrows():
        params_dict = {
                'Agent': row['sales_agent'],
                'Manager': row['manager']
        }
        params.append(params_dict)
        if index % 20 == 0 and index > 0:
            session.run(statement, parameters = {"parameters": params})
            params = []
        session.run(statement, parameters = {"parameters": params})
    session.close()

In [80]:
params = []

statement = """
UNWIND $parameters as row
WITH row
MATCH (m:Manager {name: row.Manager})
MATCH (o:Office {name: row.Office})
MERGE (m)-[:BASED_IN]->(o)
"""


with _driver.session() as session:
    for index, row in df_teams.iterrows():
        params_dict = {
                'Manager': row['manager'],
                'Office': row['regional_office']
        }
        params.append(params_dict)
        if index % 20 == 0 and index > 0:
            session.run(statement, parameters = {"parameters": params})
            params = []
        session.run(statement, parameters = {"parameters": params})
    session.close()

### 8.3 Pipeline

In [None]:
df_pipeline.head()

In [17]:
params = []

statement = """
UNWIND $parameters as row
WITH row
MATCH (d:Deal {opp_id: row.Opp_Id})
MATCH (g:Agent {name: row.Agent})
MERGE (g)-[:HAS_DEAL]->(d)
"""


with _driver.session() as session:
    for index, row in df_pipeline.iterrows():
        params_dict = {
                'Opp_Id': row['opportunity_id'],
                'Agent': row['sales_agent']
        }
        params.append(params_dict)
        if index % 200 == 0 and index > 0:
            session.run(statement, parameters = {"parameters": params})
            params = []
        session.run(statement, parameters = {"parameters": params})
    session.close()

In [18]:
params = []

statement = """
UNWIND $parameters as row
WITH row
MATCH (d:Deal {opp_id: row.Opp_Id})
MATCH (p:Product {name: row.Product})
MERGE (d)-[:CONTAINS]->(p)
"""


with _driver.session() as session:
    for index, row in df_pipeline.iterrows():
        params_dict = {
                'Opp_Id': row['opportunity_id'],
                'Product': row['product']
        }
        params.append(params_dict)
        if index % 200 == 0 and index > 0:
            session.run(statement, parameters = {"parameters": params})
            params = []
        session.run(statement, parameters = {"parameters": params})
    session.close()

In [None]:
params = []

statement = """
UNWIND $parameters as row
WITH row
MATCH (d:Deal {opp_id: row.Opp_Id})
MATCH (a:Account {name: row.Account})
MERGE (a)-[:HAS_DEAL]->(d)
"""


with _driver.session() as session:
    for index, row in df_pipeline.iterrows():
        params_dict = {
                'Opp_Id': row['opportunity_id'],
                'Account': row['account']
        }
        params.append(params_dict)
        if index % 200 == 0 and index > 0:
            session.run(statement, parameters = {"parameters": params})
            params = []
        session.run(statement, parameters = {"parameters": params})
    session.close()