# Project 3 - Setup the Neo4j Graph DB of BART system


University of California, Berkeley

Master of Information and Data Science (MIDS) program

w205 - Fundamentals of Data Engineering

This borrows heavily from the Project 3 exercise code and solutions.



# BART Map

![Bart Map](bart_map.png)

# Included Modules and Packages



In [1]:
import csv

import math
import numpy as np
import pandas as pd

import psycopg2

# Supporting code


Python code supporting the processing of data into and out of the CSV, SQL and Neo4J

In [2]:
#
# function to run a select query and return rows in a pandas dataframe
# pandas puts all numeric values from postgres to float
# if it will fit in an integer, change it to integer
#

def my_select_query_pandas(query, rollback_before_flag, rollback_after_flag):
    "function to run a select query and return rows in a pandas dataframe"
    
    if rollback_before_flag:
        connection.rollback()
    
    df = pd.read_sql_query(query, connection)
    
    if rollback_after_flag:
        connection.rollback()
    
    # fix the float columns that really should be integers
    
    for column in df:
    
        if df[column].dtype == "float64":

            fraction_flag = False

            for value in df[column].values:
                
                if not np.isnan(value):
                    if value - math.floor(value) != 0:
                        fraction_flag = True

            if not fraction_flag:
                df[column] = df[column].astype('Int64')
    
    return(df)
    

In [3]:
connection = psycopg2.connect(
    user = "postgres",
    password = "ucb",
    host = "postgres",
    port = "5432",
    database = "postgres"
)

In [4]:
cursor = connection.cursor()

In [5]:
def my_read_csv_file(file_name, limit):
    "read the csv file and print only the first limit rows"
    
    csv_file = open(file_name, "r")
    
    csv_data = csv.reader(csv_file)
    
    i = 0
    
    for row in csv_data:
        i += 1
        if i <= limit:
            print(row)
            
    print("\nPrinted ", min(limit, i), "lines of ", i, "total lines.")

# Drop all tables if they exist

We're going to create tables stations, lines, travel_times from CSV files.  Drop them if the exist before we create them.


In [6]:
connection.rollback()

query = """

drop table if exists stations;
drop table if exists lines;
drop table if exists travel_times;

"""

cursor.execute(query)

connection.commit()


# Create the stations, lines and travel_times tables

The <b>stations table</b> should be named stations with the following columns:
* station varchar(32)
* latitude numeric(9,6)
* longitude numeric(9,6)
* transfer_time numeric(3) 

station should be the primary key

This is a normal table, not a staging table

transfer_time is in seconds.

The <b>lines table</b> should be named lines with the following columns:

* line varchar(6)
* sequence numeric(2)
* station varchar(32)

line, sequence should be the composite primary key

This is a normal table, not a staging table

The stations in each line are listed in order going one direction. The lines move in both directions.  We will only store the data one way, as this is one of the rules of Third Normal Form (3NF).

The <b>travel_times table</b> should be named travel_times with the following columns:

station_1 varchar(32)
station_2 varchar(32)
travel_time numeric(3)
station_1, station_2 should be the composite primary key

This is a normal table, not a staging table

Each entry shows the travel time between two stations, with station_1 alphabetically less than station_2.

Travel time is the same from station_1 to station_2 as it is from station_2 to station_1.

Each line has the same travel time between the same two stations.

travel_time is in seconds.

In [7]:
connection.rollback()

query = """

create table stations (
  station varchar(32),
  latitude numeric(9,6),
  longitude numeric(9,6),
  transfer_time numeric(3)
);

create table lines (
  line varchar(6),
  sequence numeric(2),
  station varchar(32)
);

create table travel_times (
  station_1 varchar(32),
  station_2 varchar(32),
  travel_time numeric(3)
);


"""

cursor.execute(query)

connection.commit()

# Load stations data into database table

Load the stations, lines and travel_times CSV files into the database tables

In [8]:
connection.rollback()

query = """

copy stations (station, latitude, longitude, transfer_time)
from '/user/projects/project-3-theOG-DS/exercise/stations.csv' delimiter ',' NULL '' csv header;

copy Lines (line, sequence, station)
from '/user/projects/project-3-theOG-DS/exercise/lines.csv' delimiter ',' NULL '' csv header;

copy travel_times (station_1, station_2, travel_time)
from '/user/projects/project-3-theOG-DS/exercise/travel_times.csv' delimiter ',' NULL '' csv header;


"""

cursor.execute(query)

connection.commit()

UndefinedFile: could not open file "/user/projects/project-3-theOG-DS/exercise/stations.csv" for reading: No such file or directory
HINT:  COPY FROM instructs the PostgreSQL server process to read a file. You may want a client-side facility such as psql's \copy.


# 3.1.13 Verify the stations loaded correctly

Write a query to verify the stations loaded correctly

Select all columns and all rows from the table stations

Sort by station

Display the results in a Pandas data frame

The query should return 50 rows into a Pandas dataframe. The first and last rows should look similar to this: 

||station|latitude|longitude|transfer_time|
|---|---|---|---|---|
|0|12th Street|37.803608|-122.272006|282|
|...|...|...|...|...|
|49|West Oakland|37.8049|-122.2951|283|

In [None]:
rollback_before_flag = True
rollback_after_flag = True

query = """

select * from stations

"""

my_select_query_pandas(query, rollback_before_flag, rollback_after_flag)

# 3.1.14 Verify the lines loaded correctly

Write a query to verify the lines loaded correctly

Select all columns and all rows from the table lines

Sort by lines, sequence

Display the results in a Pandas data frame

The query should return 114 rows into a Pandas dataframe. The first and last rows should look similar to this: 

||line|sequence|station|
|---|---|---|---|
|0|blue|1|Dublin|
|...|...|...|...|
|113|yellow|27|SFO|

In [None]:
rollback_before_flag = True
rollback_after_flag = True

query = """

select * from lines

"""

my_select_query_pandas(query, rollback_before_flag, rollback_after_flag)

# 3.1.15 Verify the travel times loaded correctly

Write a query to verify the travel times loaded correctly 

Select all columns and all rows from the travel_times

Sort by station_1, station_2

Display the results in a Pandas data frame

The query should return 51 rows into a Pandas dataframe. The first and last rows should look similar to this: 

||station_1|station_2|travel_time|
|---|---|---|---|
|0|12th Street|19th Street|120|
|...|...|...|...|
|50|South Hayward|Union City|300|

In [None]:
rollback_before_flag = True
rollback_after_flag = True

query = """

select * from travel_times

"""

my_select_query_pandas(query, rollback_before_flag, rollback_after_flag)