# Project 3: Neo4j Code Portion

University of California, Berkeley

Master of Information and Data Science (MIDS) program

w205 - Fundamentals of Data Engineering

## Group: Kara Christensen, Negin Djalali, Gray Selby, Yidai Yao

### Start with setup code (importing libraries and setting up connections

In [1]:
import neo4j

import csv
import json
import openpyxl

import math
import numpy as np
import pandas as pd

import psycopg2


#### Postgres

In [2]:
connection = psycopg2.connect(
    user = "postgres",
    password = "ucb",
    host = "postgres",
    port = "5432",
    database = "postgres"
)

In [3]:
cursor = connection.cursor()

#### Neo4j

In [4]:
driver = neo4j.GraphDatabase.driver(uri="neo4j://neo4j:7687", auth=("neo4j","ucb_mids_w205"))

In [5]:
session = driver.session(database="neo4j")

### Load in Ridership Data from BART (IN PROGRESS, NOT CURRENTLY USED)

In [6]:
def my_export_excel_work_book_sheet_to_csv(excel_file_name, sheet_name, csv_file_name):
    "open the excel file, find the sheet_name, export to csv file "
    
    work_book = openpyxl.load_workbook(excel_file_name)
    
    work_sheet = work_book[sheet_name]
                                       
    print("\nExcel Workbook:", excel_file_name)
          
    print("\nWork Sheet:", sheet_name)
    
    print("\nNumber of Columns:", work_sheet.max_column)
          
    print("\nNumber of Rows:", work_sheet.max_row)
    
    work_sheet_list = []
    
    for i in range(1,work_sheet.max_row+1):
        
        row_list = []
        
        for j in range(1,work_sheet.max_column+1):
            
            row_list.append(work_sheet.cell(row = i, column = j).value)
            
        work_sheet_list.append(row_list)
    
    print("\nExtracting Work Sheet", sheet_name, "to", csv_file_name, "\n")
    
    f = open(csv_file_name, "w")    
    writer = csv.writer(f)
    writer.writerows(work_sheet_list)
    f.close()

In [7]:
my_export_excel_work_book_sheet_to_csv('station-names.xlsx', 'DAS Station Codes', 'station-names.csv')


Excel Workbook: station-names.xlsx

Work Sheet: DAS Station Codes

Number of Columns: 2

Number of Rows: 51

Extracting Work Sheet DAS Station Codes to station-names.csv 



In [9]:
my_export_excel_work_book_sheet_to_csv('Ridership_202309.xlsx', 'Total Trips OD', 'Sept2023Riders.csv')
my_export_excel_work_book_sheet_to_csv('Ridership_202310.xlsx', 'Total Trips OD', 'Oct2023Riders.csv')
my_export_excel_work_book_sheet_to_csv('Ridership_202311.xlsx', 'Total Trips OD', 'Nov2023Riders.csv')


Excel Workbook: Ridership_202309.xlsx

Work Sheet: Total Trips OD

Number of Columns: 52

Number of Rows: 52

Extracting Work Sheet Total Trips OD to Sept2023Riders.csv 


Excel Workbook: Ridership_202310.xlsx

Work Sheet: Total Trips OD

Number of Columns: 52

Number of Rows: 52

Extracting Work Sheet Total Trips OD to Oct2023Riders.csv 


Excel Workbook: Ridership_202311.xlsx

Work Sheet: Total Trips OD

Number of Columns: 52

Number of Rows: 52

Extracting Work Sheet Total Trips OD to Nov2023Riders.csv 



In [10]:
#
# function to run a select query and return rows in a pandas dataframe
# pandas puts all numeric values from postgres to float
# if it will fit in an integer, change it to integer
#

def my_select_query_pandas(query, rollback_before_flag, rollback_after_flag):
    "function to run a select query and return rows in a pandas dataframe"
    
    if rollback_before_flag:
        connection.rollback()
    
    df = pd.read_sql_query(query, connection)
    
    if rollback_after_flag:
        connection.rollback()
    
    # fix the float columns that really should be integers
    
    for column in df:
    
        if df[column].dtype == "float64":

            fraction_flag = False

            for value in df[column].values:
                
                if not np.isnan(value):
                    if value - math.floor(value) != 0:
                        fraction_flag = True

            if not fraction_flag:
                df[column] = df[column].astype('Int64')
    
    return(df)
    

In [11]:
#
# drop all the temp tables in the foreign key order
#

connection.rollback()

query = """

drop table if exists septRiders;
drop table if exists octRiders;
drop table if exists novRiders;
drop table if exists stationNames;

"""

cursor.execute(query)

connection.commit()

In [12]:
#
# create all the temp tables in the foreign key order
#

connection.rollback()

query = """

create table septRiders (
    Station varchar(32), 
    Richmond numeric(6), 
    "El Cerrito Del Norte" numeric(6), 
    "El Cerrito Plaza" numeric(6), 
    "North Berkeley" numeric(6), 
    Berkeley numeric(6), 
    Ashby numeric(6), 
    MacArthur numeric(6), 
    "19th St Oakland" numeric(6), 
    "12th Street/ Oakland City Center" numeric(6), 
    "Lake Merrit" numeric(6), 
    "Fruitvale" numeric(6), 
    Coliseum numeric(6), 
    "San Leandro" numeric(6), 
    Bayfair numeric(6), 
    Hayward numeric(6), 
    "South Hayward" numeric(6), 
    "Union City" numeric(6), 
    Fremont numeric(6), 
    Concord numeric(6), 
    "Pleasant Hill" numeric(6), 
    "Walnut Creek" numeric(6), 
    Lafayette numeric(6), 
    Orinda numeric(6), 
    Rockridge numeric(6), 
    "West Oakland" numeric(6), 
    Embarcadero numeric(6), 
    "Montgomery Street" numeric(6), 
    "Powell Street" numeric(6), 
    "Civic Center" numeric(6), 
    "16th Street Mission" numeric(6), 
    "24th Street Mission" numeric(6), 
    "Glen Park" numeric(6), 
    "Balboa Park" numeric(6), 
    "Daly City" numeric(6), 
    Colma numeric(6), 
    "Castro Valley" numeric(6), 
    "Dublin/Pleasanton" numeric(6), 
    "North Concord" numeric(6), 
    "Pittsburg/Bay Point" numeric(6), 
    "South San Francisco" numeric(6), 
    "San Bruno" numeric(6), 
    "San Francisco International Airport" numeric(6), 
    Millbrae numeric(6), 
    "West Dublin/Pleasanton" numeric(6), 
    "Oakland International Airport" numeric(6), 
    "Warm Springs" numeric(6), 
    "Milpitas" numeric(6), 
    "Berryessa/North San Jose" numeric(6), 
    "Pittsburg Center" numeric(6), 
    "Antioch" numeric(6), 
    Exits numeric(7),
    primary key (Station)
);

create table octRiders (
    Station varchar(32), 
    Richmond numeric(6), 
    "El Cerrito Del Norte" numeric(6), 
    "El Cerrito Plaza" numeric(6), 
    "North Berkeley" numeric(6), 
    Berkeley numeric(6), 
    Ashby numeric(6), 
    MacArthur numeric(6), 
    "19th St Oakland" numeric(6), 
    "12th Street/ Oakland City Center" numeric(6), 
    "Lake Merrit" numeric(6), 
    "Fruitvale" numeric(6), 
    Coliseum numeric(6), 
    "San Leandro" numeric(6), 
    Bayfair numeric(6), 
    Hayward numeric(6), 
    "South Hayward" numeric(6), 
    "Union City" numeric(6), 
    Fremont numeric(6), 
    Concord numeric(6), 
    "Pleasant Hill" numeric(6), 
    "Walnut Creek" numeric(6), 
    Lafayette numeric(6), 
    Orinda numeric(6), 
    Rockridge numeric(6), 
    "West Oakland" numeric(6), 
    Embarcadero numeric(6), 
    "Montgomery Street" numeric(6), 
    "Powell Street" numeric(6), 
    "Civic Center" numeric(6), 
    "16th Street Mission" numeric(6), 
    "24th Street Mission" numeric(6), 
    "Glen Park" numeric(6), 
    "Balboa Park" numeric(6), 
    "Daly City" numeric(6), 
    Colma numeric(6), 
    "Castro Valley" numeric(6), 
    "Dublin/Pleasanton" numeric(6), 
    "North Concord" numeric(6), 
    "Pittsburg/Bay Point" numeric(6), 
    "South San Francisco" numeric(6), 
    "San Bruno" numeric(6), 
    "San Francisco International Airport" numeric(6), 
    Millbrae numeric(6), 
    "West Dublin/Pleasanton" numeric(6), 
    "Oakland International Airport" numeric(6), 
    "Warm Springs" numeric(6), 
    "Milpitas" numeric(6), 
    "Berryessa/North San Jose" numeric(6), 
    "Pittsburg Center" numeric(6), 
    "Antioch" numeric(6), 
    Exits numeric(7),
    primary key (Station)
);

create table NovRiders (
    Station varchar(32), 
    Richmond numeric(6), 
    "El Cerrito Del Norte" numeric(6), 
    "El Cerrito Plaza" numeric(6), 
    "North Berkeley" numeric(6), 
    Berkeley numeric(6), 
    Ashby numeric(6), 
    MacArthur numeric(6), 
    "19th St Oakland" numeric(6), 
    "12th Street/ Oakland City Center" numeric(6), 
    "Lake Merrit" numeric(6), 
    "Fruitvale" numeric(6), 
    Coliseum numeric(6), 
    "San Leandro" numeric(6), 
    Bayfair numeric(6), 
    Hayward numeric(6), 
    "South Hayward" numeric(6), 
    "Union City" numeric(6), 
    Fremont numeric(6), 
    Concord numeric(6), 
    "Pleasant Hill" numeric(6), 
    "Walnut Creek" numeric(6), 
    Lafayette numeric(6), 
    Orinda numeric(6), 
    Rockridge numeric(6), 
    "West Oakland" numeric(6), 
    Embarcadero numeric(6), 
    "Montgomery Street" numeric(6), 
    "Powell Street" numeric(6), 
    "Civic Center" numeric(6), 
    "16th Street Mission" numeric(6), 
    "24th Street Mission" numeric(6), 
    "Glen Park" numeric(6), 
    "Balboa Park" numeric(6), 
    "Daly City" numeric(6), 
    Colma numeric(6), 
    "Castro Valley" numeric(6), 
    "Dublin/Pleasanton" numeric(6), 
    "North Concord" numeric(6), 
    "Pittsburg/Bay Point" numeric(6), 
    "South San Francisco" numeric(6), 
    "San Bruno" numeric(6), 
    "San Francisco International Airport" numeric(6), 
    Millbrae numeric(6), 
    "West Dublin/Pleasanton" numeric(6), 
    "Oakland International Airport" numeric(6), 
    "Warm Springs" numeric(6), 
    "Milpitas" numeric(6), 
    "Berryessa/North San Jose" numeric(6), 
    "Pittsburg Center" numeric(6), 
    "Antioch" numeric(6), 
    Exits numeric(7),
    primary key (Station)
);

create table stationNames (
    "Two-Letter Station Code" char(2),
    "Station Name" varchar(100),
    primary key ("Station Name")
);

"""

cursor.execute(query)

connection.commit()

In [13]:
connection.rollback()

query = """

copy septRiders
from '/user/projects/ChristensenDjalaliSelbyYaoProject3/code/Sept2023Riders.csv' delimiter ',' NULL '' csv header;

copy octRiders
from '/user/projects/ChristensenDjalaliSelbyYaoProject3/code/Oct2023Riders.csv' delimiter ',' NULL '' csv header;

copy novRiders
from '/user/projects/ChristensenDjalaliSelbyYaoProject3/code/Nov2023Riders.csv' delimiter ',' NULL '' csv header;

copy stationNames
from '/user/projects/ChristensenDjalaliSelbyYaoProject3/code/station-names.csv' delimiter ',' NULL '' csv header;

"""

cursor.execute(query)

connection.commit()

In [26]:
rollback_before_flag = True
rollback_after_flag = True

query = """

select 
    stationNames."Station Name",
    septRiders.exits
from septRiders
    left join stationNames
        on stationNames."Two-Letter Station Code" = septRiders.station
        
order by 1

"""

my_select_query_pandas(query, rollback_before_flag, rollback_after_flag)

Unnamed: 0,Station Name,exits
0,12th Street,120430
1,16th Street Mission,146458
2,19th Street,115095
3,24th Street Mission,139386
4,Antioch,44958
5,Ashby,50881
6,Balboa Park,100335
7,Bay Fair,64106
8,Berryessa,38929
9,Castro Valley,29438


In [56]:
rollback_before_flag = True
rollback_after_flag = True

query = """
with allMonths as 
    (select 
        stations.station,
        septRiders.exits as september_Exits,
        octRiders.exits as october_Exits,
        novRiders.exits as november_Exits
    from stations
        join stationNames
            on stationNames."Station Name" = stations.station
        join septRiders 
            on stationNames."Two-Letter Station Code" = septRiders.station
        join octRiders 
            on stationNames."Two-Letter Station Code" = octRiders.station
        join novRiders 
            on stationNames."Two-Letter Station Code" = novRiders.station
    )

select
    station,
    ROUND((september_exits + october_exits+november_exits)/3)::INTEGER as avg_exits
from allMonths

order by station

"""

my_select_query_pandas(query, rollback_before_flag, rollback_after_flag)

Unnamed: 0,station,avg_exits
0,12th Street,118680
1,16th Street Mission,149587
2,19th Street,115691
3,24th Street Mission,143224
4,Antioch,43370
5,Ashby,50593
6,Balboa Park,101316
7,Bay Fair,63190
8,Berryessa,38771
9,Castro Valley,29423


### Create the Neo4j Bart Map

#### First, the supplemental functions we will use

In [62]:
def my_neo4j_wipe_out_database():
    "wipe out database by deleting all nodes and relationships"
    
    query = "match (node)-[relationship]->() delete node, relationship"
    session.run(query)
    
    query = "match (node) delete node"
    session.run(query)

In [36]:
def my_neo4j_run_query_pandas(query, **kwargs):
    "run a query and return the results in a pandas dataframe"
    
    result = session.run(query, **kwargs)
    
    df = pd.DataFrame([r.values() for r in result], columns=result.keys())
    
    return df

In [37]:
def my_neo4j_number_nodes_relationships():
    "print the number of nodes and relationships"
   
    
    query = """
        match (n) 
        return n.name as node_name, labels(n) as labels
        order by n.name
    """
    
    df = my_neo4j_run_query_pandas(query)
    
    number_nodes = df.shape[0]
    
    
    query = """
        match (n1)-[r]->(n2) 
        return n1.name as node_name_1, labels(n1) as node_1_labels, 
            type(r) as relationship_type, n2.name as node_name_2, labels(n2) as node_2_labels
        order by node_name_1, node_name_2
    """
    
    df = my_neo4j_run_query_pandas(query)
    
    number_relationships = df.shape[0]
    
    print("-------------------------")
    print("  Nodes:", number_nodes)
    print("  Relationships:", number_relationships)
    print("-------------------------")


In [76]:
def my_neo4j_create_node(station_name, numRiders, trueStation):
    "create a node with label Station"
    
    if numRiders != 0:
        query = """

        CREATE (:Station {name: $station_name, numRiders: $numRiders, trueStation: $trueStation})

        """
        session.run(query, station_name=station_name, numRiders = numRiders, trueStation = trueStation)
    else:
        query = """

        CREATE (:Station {name: $station_name, trueStation: $trueStation})

        """
    
        session.run(query, station_name=station_name, trueStation = trueStation)

In [39]:
def my_neo4j_create_relationship_one_way(from_station, to_station, weight):
    "create a relationship one way between two stations with a weight"
    
    query = """
    
    MATCH (from:Station), 
          (to:Station)
    WHERE from.name = $from_station and to.name = $to_station
    CREATE (from)-[:LINK {weight: $weight}]->(to)
    
    """
    
    session.run(query, from_station=from_station, to_station=to_station, weight=weight)
    

In [40]:
def my_neo4j_create_relationship_two_way(from_station, to_station, weight):
    "create relationships two way between two stations with a weight"
    
    query = """
    
    MATCH (from:Station), 
          (to:Station)
    WHERE from.name = $from_station and to.name = $to_station
    CREATE (from)-[:LINK {weight: $weight}]->(to),
           (to)-[:LINK {weight: $weight}]->(from)
    
    """
    
    session.run(query, from_station=from_station, to_station=to_station, weight=weight)

#### Next, creating the actual graph itself

In [77]:
my_neo4j_wipe_out_database()

In [78]:
## This section adds the stations (a departure and arrival node for each one)
connection.rollback()

query = """

with allMonths as 
    (select 
        stations.station,
        septRiders.exits as september_Exits,
        octRiders.exits as october_Exits,
        novRiders.exits as november_Exits
    from stations
        join stationNames
            on stationNames."Station Name" = stations.station
        join septRiders 
            on stationNames."Two-Letter Station Code" = septRiders.station
        join octRiders 
            on stationNames."Two-Letter Station Code" = octRiders.station
        join novRiders 
            on stationNames."Two-Letter Station Code" = novRiders.station
    )

select
    station,
    ROUND((september_exits + october_exits+november_exits)/3)::INTEGER as avg_exits
from allMonths

order by station

"""

cursor.execute(query)

connection.rollback()

rows = cursor.fetchall()

for row in rows:
    
    station = row[0]
    numRide = row[1]
    
    my_neo4j_create_node('depart ' + station, numRide, station)
    my_neo4j_create_node('arrive ' + station, numRide, station)
    
    
    
## This section adds the lines, and a relationship from each line at a station to the departure and arrival nodes  
connection.rollback()

query = """

select 
    station,
    line
from lines
order by station, line

"""

cursor.execute(query)

connection.rollback()

rows = cursor.fetchall()

for row in rows:
    
    station = row[0]
    line = row[1]
    
    my_neo4j_create_node(line + ' ' + station, 0, station)
    
    my_neo4j_create_relationship_one_way('depart ' + station,line + ' ' + station,0)
    my_neo4j_create_relationship_one_way(line + ' ' + station,'arrive ' + station,0)
    
    
## This section adds the transfers between lines at each station
connection.rollback()

query = """

select
    station,
    from1.line as from_line,
    to1.line as to_line,
    transfer_time
from lines as from1
    join lines as to1
        using(station)
    join stations
        using(station)
        
where from1.line != to1.line
    
order by 1, 2, 3

"""

cursor.execute(query)

connection.rollback()

rows = cursor.fetchall()

for row in rows:
    
    station = row[0]
    from_line = row[1]
    to_line = row[2]
    transfer_time = row[3]
    
    my_neo4j_create_relationship_one_way(from_line + ' ' + station,to_line + ' ' + station, int(transfer_time))
    
    
## This section adds the relationship between adjacent stations on the same line
connection.rollback()

query = """

select
    from1.line,
    from1.station as from_station,
    to1.station as to_station,
    travel_time
from lines as from1
    join lines as to1
        on from1.line = to1.line
        and from1.sequence + 1 = to1.sequence
    join travel_times
        on (from1.station = station_1 and to1.station = station_2)
            or (from1.station = station_2 and to1.station = station_1)
    

order by 1, 2, 3


"""

cursor.execute(query)

connection.rollback()

rows = cursor.fetchall()

for row in rows:
    
    line = row[0]
    from_station = row[1]
    to_station = row[2]
    travel_time = row[3]
    
    my_neo4j_create_relationship_two_way(line + ' ' + from_station, line + ' ' + to_station, int(travel_time))

In [79]:
my_neo4j_number_nodes_relationships()

-------------------------
  Nodes: 214
  Relationships: 652
-------------------------


### Algorithm 1

In [138]:
query = "CALL gds.graph.drop('ds_graph', false)"
session.run(query)

query = "CALL gds.graph.project('ds_graph', 'Station', 'LINK', {relationshipProperties: 'weight'})"
session.run(query)

query = """

CALL gds.betweenness.stream('ds_graph', {relationshipWeightProperty: 'weight'})
YIELD nodeId, score
RETURN gds.util.asNode(nodeId).name AS name, score as betweenness
ORDER BY betweenness DESC

"""

results = my_neo4j_run_query_pandas(query)

results = results[~results['name'].str.contains('arrive')]
results = results[~results['name'].str.contains('depart')]
results['name'] = results['name'].str.extract('^[\w]* ([\w\s]*)')
includingCloseness = results.groupby(['name'], 
                as_index = False).mean().sort_values(by = 'betweenness', 
                                                     ascending = False).reset_index(drop = True)[:15]

includingCloseness

Unnamed: 0,name,betweenness
0,Rockridge,5509.0
1,MacArthur,5239.0
2,Orinda,4997.0
3,Lafayette,4469.0
4,Walnut Creek,3925.0
5,12th Street,3752.333333
6,Lake Merritt,3723.0
7,West Oakland,3647.75
8,Coliseum,3603.75
9,19th Street,3535.0


In [140]:
rollback_before_flag = True
rollback_after_flag = True

query = """
with allMonths as 
    (select 
        stations.station,
        septRiders.exits as september_Exits,
        octRiders.exits as october_Exits,
        novRiders.exits as november_Exits
    from stations
        join stationNames
            on stationNames."Station Name" = stations.station
        join septRiders 
            on stationNames."Two-Letter Station Code" = septRiders.station
        join octRiders 
            on stationNames."Two-Letter Station Code" = octRiders.station
        join novRiders 
            on stationNames."Two-Letter Station Code" = novRiders.station
    )

select
    station,
    ROUND((september_exits + october_exits+november_exits)/3)::INTEGER as avg_exits
from allMonths

where station in """ + "(" + str(list(includingCloseness['name']))[1:-1] + ")"  + """


order by 2 desc

"""

df = my_select_query_pandas(query, rollback_before_flag, rollback_after_flag)

df[:5]

Unnamed: 0,station,avg_exits
0,Embarcadero,400825
1,Montgomery Street,328108
2,12th Street,118680
3,19th Street,115691
4,Fruitvale,98468


### Algorithm 2

### Algorithm 3