# Project 3: NEO4J customers and population served by target BART lines

University of California, Berkeley

Master of Information and Data Science (MIDS) program

w205 - Fundamentals of Data Engineering

In [1]:
import neo4j

import csv

import math
import numpy as np
import pandas as pd

import psycopg2

### Neo4J setup

In [2]:
driver = neo4j.GraphDatabase.driver(uri="neo4j://neo4j:7687", auth=("neo4j","w205"))

In [3]:
session = driver.session(database="neo4j")

In [4]:
def my_neo4j_wipe_out_database():
    "wipe out database by deleting all nodes and relationships"
    
    query = "match (node)-[relationship]->() delete node, relationship"
    session.run(query)
    
    query = "match (node) delete node"
    session.run(query)

In [5]:
def my_neo4j_run_query_pandas(query, **kwargs):
    "run a query and return the results in a pandas dataframe"
    
    result = session.run(query, **kwargs)
    
    df = pd.DataFrame([r.values() for r in result], columns=result.keys())
    
    return df

In [6]:
def my_neo4j_number_nodes_relationships():
    "print the number of nodes and relationships"
   
    
    query = """
        match (n) 
        return n.name as node_name, labels(n) as labels
        order by n.name
    """
    
    df = my_neo4j_run_query_pandas(query)
    
    number_nodes = df.shape[0]
    
    
    query = """
        match (n1)-[r]->(n2) 
        return n1.name as node_name_1, labels(n1) as node_1_labels, 
            type(r) as relationship_type, n2.name as node_name_2, labels(n2) as node_2_labels
        order by node_name_1, node_name_2
    """
    
    df = my_neo4j_run_query_pandas(query)
    
    number_relationships = df.shape[0]
    
    print("-------------------------")
    print("  Nodes:", number_nodes)
    print("  Relationships:", number_relationships)
    print("-------------------------")


In [7]:
def my_neo4j_create_node(station_name):
    "create a node with label Station"
    
    query = """
    
    CREATE (:Station {name: $station_name})
    
    """
    
    session.run(query, station_name=station_name)
    

In [8]:
def my_neo4j_create_customer_node(customer_id):
    "create a node with label Customer"
    
    query = """
    
    CREATE (:Customer {id: $customer_id})
    
    """
    
    session.run(query, customer_id=customer_id)
    

In [9]:
def my_neo4j_create_relationship_one_way(from_station, to_station, weight):
    "create a relationship one way between two stations with a weight"
    
    query = """
    
    MATCH (from:Station), 
          (to:Station)
    WHERE from.name = $from_station and to.name = $to_station
    CREATE (from)-[:LINK {weight: $weight}]->(to)
    
    """
    
    session.run(query, from_station=from_station, to_station=to_station, weight=weight)
    

In [10]:
def my_neo4j_create_relationship_station_customer_one_way(from_station, to_customer_id, weight):
    "create a relationship one way between a station and a customer with a weight"
    
    query = """
    
    MATCH (from:Station), 
          (to:Customer)
    WHERE from.name = $from_station and to.id = $to_customer_id
    CREATE (from)-[:REACH {weight: $weight}]->(to)
    
    """
    
    session.run(query, from_station=from_station, to_customer_id=to_customer_id, weight=weight)

In [11]:
def my_neo4j_create_relationship_two_way(from_station, to_station, weight):
    "create relationships two way between two stations with a weight"
    
    query = """
    
    MATCH (from:Station), 
          (to:Station)
    WHERE from.name = $from_station and to.name = $to_station
    CREATE (from)-[:LINK {weight: $weight}]->(to),
           (to)-[:LINK {weight: $weight}]->(from)
    
    """
    
    session.run(query, from_station=from_station, to_station=to_station, weight=weight)
    

In [12]:
def my_read_csv_file(file_name, limit):
    "read the csv file and print only the first limit rows"
    
    csv_file = open(file_name, "r")
    
    csv_data = csv.reader(csv_file)
    
    i = 0
    
    for row in csv_data:
        i += 1
        if i <= limit:
            print(row)
            
    print("\nPrinted ", min(limit, i), "lines of ", i, "total lines.")

In [13]:
connection = psycopg2.connect(
    user = "postgres",
    password = "ucb",
    host = "postgres",
    port = "5432",
    database = "postgres"
)

In [14]:
cursor = connection.cursor()

### Distance calculation helper functions

In [15]:
def my_calculate_box(point, miles):
    "Given a point and miles, calculate the box in form left, right, top, bottom"
    
    geod = Geodesic.WGS84

    kilometers = miles * 1.60934
    meters = kilometers * 1000

    g = geod.Direct(point[0], point[1], 270, meters)
    left = (g['lat2'], g['lon2'])

    g = geod.Direct(point[0], point[1], 90, meters)
    right = (g['lat2'], g['lon2'])

    g = geod.Direct(point[0], point[1], 0, meters)
    top = (g['lat2'], g['lon2'])

    g = geod.Direct(point[0], point[1], 180, meters)
    bottom = (g['lat2'], g['lon2'])
    
    return(left, right, top, bottom)

In [16]:
def my_station_get_zips(station, miles):
    "given a station, pull all zip codes with miles distance, print them, sum the population"
    
    connection.rollback()
    
    query = "select latitude, longitude from stations "
    query += "where station = '" + station + "'"
    
    cursor.execute(query)
    
    connection.rollback()
    
    rows = cursor.fetchall()
    
    for row in rows:
        latitude = row[0]
        longitude = row[1]
        
    point = (latitude, longitude)
        
    (left, right, top, bottom) = my_calculate_box(point, miles)
    
    query = "select zip, population from zip_codes "
    query += " where latitude >= " + str(bottom[0])
    query += " and latitude <= " + str(top [0])
    query += " and longitude >= " + str(left[1])
    query += " and longitude <= " + str(right[1])
    query += " order by 1 "

    cursor.execute(query)
    
    connection.rollback()
    
    rows = cursor.fetchall()
    
    print("\n-------------------------------------------------------------------------------")
    print("  Zip Codes within " + str(miles) + " mile(s) of " + station + " BART Station")
    print("-------------------------------------------------------------------------------\n")
    
    total_population = 0
    
    for row in rows:
        zip = row[0]
        population = row[1]
        print("     zip:", zip, "  population: ", f'{population:10,}')
        total_population += population
        
    
    print("\n-------------------------------------------------------------------------------")
    print("  Total Population: ", f'{total_population:10,}')
    print("-------------------------------------------------------------------------------")

### Wipe and initialize Neo4j db

In [17]:
my_neo4j_wipe_out_database()
my_neo4j_number_nodes_relationships()

-------------------------
  Nodes: 0
  Relationships: 0
-------------------------


### Create dbs from CSV

In [18]:
connection.rollback()

query = """

drop table if exists station_zips_customers;
drop table if exists lines;
drop table if exists travel_times;
drop table if exists stations;

"""

cursor.execute(query)

connection.commit()


In [19]:
connection.rollback()

query = """

create table station_zips_customers (
  station varchar(32),
  zip varchar(32),
  customer_id varchar(10),
  street varchar(32),
  primary key (station, customer_id)
)

"""

cursor.execute(query)

connection.commit()

In [20]:
connection.rollback()

query = """

create table stations (
    station varchar(32),
    latitude numeric(9,6),
    longitude numeric(9,6),
    transfer_time numeric(3),
    primary key (station))

"""

cursor.execute(query)

connection.commit()

In [21]:
connection.rollback()

query = """

create table lines (
  line varchar(6),
  sequence numeric(2),
  station varchar(32),
  primary key (line, sequence)
)

"""

cursor.execute(query)

connection.commit()

In [22]:
connection.rollback()

query = """

create table travel_times (
  station_1 varchar(32),
  station_2 varchar(32),
  travel_time numeric(3),
  primary key (station_1, station_2)
)

"""

cursor.execute(query)

connection.commit()

In [23]:
connection.rollback()

query = """

copy station_zips_customers
from '/user/projects/project-3-mmartin131/code/data/station_zips_customers.csv' delimiter ',' NULL '' csv header;

"""

cursor.execute(query)

connection.commit()

In [24]:
connection.rollback()

query = """

copy lines
from '/user/projects/project-3-mmartin131/exercise/lines.csv' delimiter ',' NULL '' csv header;


"""

cursor.execute(query)

connection.commit()


In [25]:
connection.rollback()

query = """

copy travel_times
from '/user/projects/project-3-mmartin131/exercise/travel_times.csv' delimiter ',' NULL '' csv header;

"""

cursor.execute(query)

connection.commit()

### Create neo4j relationships from DB

In [26]:
# Build station nodes
connection.rollback()

query = """

select station, line
from lines
order by station, line

"""

cursor.execute(query)

connection.rollback()

rows = cursor.fetchall()

for row in rows:
    
    station = row[0]
    line = row[1]
    depart = 'depart ' + station
    arrive = 'arrive ' + station

    my_neo4j_create_node(station)
    my_neo4j_create_relationship_one_way(depart, station, 0)
    my_neo4j_create_relationship_one_way(station, arrive, 0)
    

In [27]:
# Build customer nodes
connection.rollback()

query = """

select station, customer_id
from station_zips_customers
order by station

"""

cursor.execute(query)

connection.rollback()

rows = cursor.fetchall()

for row in rows:
    
    station = row[0]
    customer_id = row[1]

    my_neo4j_create_customer_node(customer_id)
    my_neo4j_create_relationship_station_customer_one_way(station, customer_id, 0)
    

### Verify number of relationships

In [28]:
my_neo4j_number_nodes_relationships()

-------------------------
  Nodes: 4529
  Relationships: 21580
-------------------------


### Given a station name, inspect how many customers it can reach

In [29]:
def number_of_customers_from_station(station_name):
    "How many customers can a station reach"
    
    query = """
       MATCH (:Station {name: $station_name})-->(customer)
        RETURN customer.id
    """
    
    df = my_neo4j_run_query_pandas(query, station_name=station_name)
    
    return df.drop_duplicates()

# Test code, change the name of the station to see how many customers it can reach.
df = number_of_customers_from_station("Downtown Berkeley")

df.shape[0]

634

In [30]:
#Create dataframe of customers and customer connections
query = """

select station
from lines
where line in ('red','orange')


"""

cursor.execute(query)

connection.rollback()

rows = cursor.fetchall()

station = []
customer_connections = []

for row in rows:
    df = number_of_customers_from_station(row[0])
    station.append(row[0])
    customer_connections.append(df.shape[0])

#Convert lists to dataframe and merge on index
df_station = pd.DataFrame(station, columns = ['station'])
df_customer_connections = pd.DataFrame(customer_connections, columns = ['customer_connections'])
df_stations_customer_connections = df_station.join(df_customer_connections)
df_stations_customer_connections.sort_values(by = 'customer_connections', ascending = False).drop_duplicates()

Unnamed: 0,station,customer_connections
4,Downtown Berkeley,634
5,Ashby,487
3,North Berkeley,443
33,Powell Street,333
32,Montgomery Street,274
34,Civic Center,210
30,West Oakland,203
31,Embarcadero,196
10,Fruitvale,180
2,El Cerrito Plaza,175


In [31]:
# Travel distance from Ashby BART to pilot stations: 
def my_neo4j_shortest_path(from_station, to_station):
    "given a from station and to station, run and print the shortest path"
    
    query = "CALL gds.graph.drop('ds_graph', false)"
    session.run(query)

    query = "CALL gds.graph.create('ds_graph', 'Station', 'LINK', {relationshipProperties: 'weight'})"
    session.run(query)

    query = """

    MATCH (source:Station {name: $source}), (target:Station {name: $target})
    CALL gds.shortestPath.dijkstra.stream(
        'ds_graph', 
        { sourceNode: source, 
          targetNode: target, 
          relationshipWeightProperty: 'weight'
        }
    )
    YIELD index, sourceNode, targetNode, totalCost, nodeIds, costs, path
    RETURN
        gds.util.asNode(sourceNode).name AS from,
        gds.util.asNode(targetNode).name AS to,
        totalCost,
        [nodeId IN nodeIds | gds.util.asNode(nodeId).name] AS nodes,
        costs
    ORDER BY index

    """

    result = session.run(query, source=from_station, target=to_station)
    
    for r in result:
        
        total_cost = int(r['totalCost'])
        
        print("\n--------------------------------")
        print("   Total Cost: ", total_cost)
        print("   Minutes: ", round(total_cost / 60.0,1))
        print("--------------------------------")
        
        nodes = r['nodes']
        costs = r['costs']
        
        i = 0
        previous = 0
        
        for n in nodes:
            
            print(n + ", " + str(int(costs[i]) - previous)  + ", " + str(int(costs[i])))
            
            previous = int(costs[i])
            i += 1
    

In [32]:
#Test
my_neo4j_shortest_path('depart Dublin', 'arrive Antioch')

In [33]:
#Ashby to Downtown Berkeley
my_neo4j_shortest_path('depart Ashby', 'arrive Downtown Berkeley')

In [34]:
#Ashby to North Berkeley
my_neo4j_shortest_path('depart Ashby', 'arrive North Berkeley')

In [35]:
#Ashby to Powell St
my_neo4j_shortest_path('depart Ashby', 'Powell Street')

In [36]:
#Asby to West Oakland
my_neo4j_shortest_path('depart Ashby', 'West Oakland')