JSON manipulation: https://realpython.com/python-json/

In [570]:
import os, fnmatch
import json
import pprint
from neo4j.v1 import GraphDatabase
from neo4j.v1 import exceptions

In [498]:
data_folder = '/Users/linkalis/GIS8990_DistributedSpatialDatabases/testdatasets/data_split_5000/'
logs_folder = '/Users/linkalis/GIS8990_DistributedSpatialDatabases/testdatasets/data_split_5000/logs/'

## Extractor

In [499]:
class Extractor:
    ''' Takes a folder name and a logs directory path and initializes a log file containing the name of 
    every file in the target folder. The get_next_file() method gets the next file in the folder that 
    hasn't yet been loaded into the database. It then reads in the next file and returns it as 
    list of dictionary objects for further manipulation. '''
    
    def __init__(self, data_path, logs_path):
        self.data_path = data_path
        self.logs_path = logs_path
        
        # Create a directory to store the log files, if necessary
        logs_dir = os.path.dirname(self.logs_path)
        if not os.path.exists(logs_dir):
            os.makedirs(logs_dir)
        
        # Create a 'files_to_load.txt' file, then write the name of every file in the directory to this file
        files_to_load_log  = open(logs_dir + "/files_to_load.txt", "w")
        data_files_list = os.listdir(self.data_path) 
        file_type = "*.txt"  
        for file in data_files_list:  
            if fnmatch.fnmatch(file, file_type):
                files_to_load_log.write(file)
                files_to_load_log.write("\n")
        files_to_load_log.close()     

    def get_next_file(self):
        ''' Reads from the files_to_load.txt file and gets the name of the next file in the list.
        Calls read_file() to read in the target file in as list of dictionaries. Returns a tuple 
        that includes the list of dictionaries representing the JSON data, along with the filename
        so we can keep track of this file in subsequent tasks. '''
        files_to_load_log  = open(self.logs_path + "/files_to_load.txt", "r")
        next_file_name = files_to_load_log.readline()
        print("Next file is: " + next_file_name)
        next_file_path = self.data_path + next_file_name.rstrip("\n") # strip the newline character from the end of filename
        return(self.read_file(next_file_path), next_file_name)
        
    def read_file(self, file_to_read):
        ''' Reads the JSON-formatted file line by line and returns each line as a dictionary. '''
        print("Reading file: " + file_to_read)
        reading_file = open(file_to_read, "r") # open as read-only
        list_of_jsondicts = []
        for line in reading_file.readlines():
            list_of_jsondicts.append(json.loads(line))
        print("Read " + str(len(list_of_jsondicts)) + " data rows.")
        return(list_of_jsondicts)

In [500]:
extractor = Extractor(data_folder, logs_folder)

In [501]:
next_file_data, next_file_metadata = extractor.get_next_file()

Next file is: 500M_unicode_splitan.txt

Reading file: /Users/linkalis/GIS8990_DistributedSpatialDatabases/testdatasets/data_split_5000/500M_unicode_splitan.txt
Read 5000 data rows.


## Cleaner

In [492]:
class Cleaner: 
    ''' Takes a batch of data that's been extracted as a list of dictionaries. 
    The clean_data() method iterates over each data element in the list, running it 
    through a series of cleaning steps. Returns the cleaned data back as a list. '''
    
    def __init__(self, data_list, metadata, logs_path):
        self.data_list = data_list
        self.logs_path = logs_path
        self.file_name = metadata
        
    def clean_data(self):
        step1_log = []
        step2_log = []
        
        #i = 0
        # Iterate over each data element, progressing through each cleaning step on each element. 
        # Log the ids of data elemtns that contain nulls and/or errors to the cleaning log as we go.
        for data_element in self.data_list:
            #print(i)
            data_clean_step1 = self.fix_null_places(data_element, step1_log)
            data_clean_step2 = self.fix_bounding_box(data_clean_step1, step2_log)
            data_clean_step3 = self.get_centroid(data_clean_step2)
            #i += 1
        
        # Put the cleaning log arrays into a dictionary and write the result to the cleaning log file.
        #log_dict = dict()
        #log_dict['file_name'] = self.file_name
        #log_dict['null_places_fixed'] = step1_log
        #log_dict['bounding_boxes_fixed'] = step2_log
        #cleaning_log  = open(self.cleaning_log_path + "/cleaning_log.txt", "a+") # open file in append mode
        #cleaning_log.write(log_dict)
        #cleaning_log.close()
        
        return(self.data_list)
        
    def fix_null_places(self, data_element, log_array):
        ''' Since place values are critical to our data model, substitute dummy 
        values if we have a place value that equals 'None'. This will keep it from 
        blowing up the database when we try to insert.
        '''
        if data_element['place'] is None:
            data_element['place'] = dict()
            data_element['place']['id'] = "9999999"
            data_element['place']['name'] = "No Place"
            data_element['place']['full_name'] = "No Place Available"
            data_element['place']['country'] = "No Country Available"
            data_element['place']['country_code'] = "ZZ"
            data_element['place']['place_type'] = "NA"
            data_element['place']['url'] = "NA"
            data_element['place']['bounding_box'] = dict() # initialize dictionary to hold bounding box
            data_element['place']['bounding_box']['type'] = "Polygon"
            data_element['place']['bounding_box']['coordinates'] = list() # initialize coordinate list w/in bounding box
            data_element['place']['bounding_box']['coordinates'].append([]) # append the [0] element to hold four pairs of coordinates
            data_element['place']['bounding_box']['coordinates'][0].append([0.0, 0.0]) # append 'dummy' coordinates
            data_element['place']['bounding_box']['coordinates'][0].append([0.0, 0.0])
            data_element['place']['bounding_box']['coordinates'][0].append([0.0, 0.0])
            data_element['place']['bounding_box']['coordinates'][0].append([0.0, 0.0])
            log_array.append(data_element["id_str"])
        
        return(data_element)
    
            
    def fix_bounding_box(self, data_element, log_array):
        ''' Fix a few issues that are going on with bounding boxes:
        1. Twitter Place bounding boxes only have four points. Need to close them off 
        so they're a complete polygon. Take the first coordinate of the bounding box 
        array and repeat it at the end of the bounding box array.
        2. If the bounding box is actually a point (i.e. all of the four points are 
        the same), then "fake out" a bounding box by transforming into a small rectangle 
        with a small buffer around the point.  We can recognize these by looking for 
        place.place_type == 'poi'.
        '''
        #print(data_element['id_str'])
        original_bounding_box = data_element['place']['bounding_box']['coordinates'][0]
        #print(original_bounding_box)
        #print(data_element['place']['place_type'])
        
        if (data_element['place']['place_type'] == 'poi' or data_element['place']['place_type'] == 'NA'):
            point_bounding_box = [[None for x in range(2)] for y in range(5)]
            point_bounding_box[0][0] = original_bounding_box[0][0] - 0.0001
            point_bounding_box[0][1] = original_bounding_box[0][1] - 0.0001
            point_bounding_box[1][0] = original_bounding_box[1][0] - 0.0001
            point_bounding_box[1][1] = original_bounding_box[1][1] + 0.0001
            point_bounding_box[2][0] = original_bounding_box[2][0] + 0.0001
            point_bounding_box[2][1] = original_bounding_box[2][1] + 0.0001
            point_bounding_box[3][0] = original_bounding_box[3][0] + 0.0001
            point_bounding_box[3][1] = original_bounding_box[3][1] - 0.0001
            point_bounding_box[4][0] = original_bounding_box[0][0] - 0.0001
            point_bounding_box[4][1] = original_bounding_box[0][1] - 0.0001
            data_element['place']['better_bounding_box'] = dict()
            data_element['place']['better_bounding_box']['type'] = "Polygon"
            data_element['place']['better_bounding_box']['coordinates'] = list()
            data_element['place']['better_bounding_box']['coordinates'].append([])
            data_element['place']['better_bounding_box']['coordinates'][0] = point_bounding_box
            #print(data_element['place']['better_bounding_box']['coordinates'])
            log_array.append(data_element["id_str"])
            return(data_element)
        else:
            first_coords = original_bounding_box[0]
            #print(first_coords)
            original_bounding_box.append(first_coords)
            #print(original_bounding_box)
            data_element['place']['better_bounding_box'] = dict()
            data_element['place']['better_bounding_box']['type'] = "Polygon"
            data_element['place']['better_bounding_box']['coordinates'] = list()
            data_element['place']['better_bounding_box']['coordinates'].append([])
            data_element['place']['better_bounding_box']['coordinates'][0] = original_bounding_box
            #print(data_element['place']['better_bounding_box']['coordinates'])
            return(data_element)
                  
    def get_centroid(self, data_element):
        bounding_box = data_element['place']['better_bounding_box']['coordinates'][0];
        lower_left = bounding_box[0];
        upper_right = bounding_box[2];
        centroid_long = lower_left[0] + ((upper_right[0] - lower_left[0]) / 2);
        centroid_lat = lower_left[1] + ((upper_right[1] - lower_left[1]) / 2);
        data_element['place']['centroid'] = dict()
        data_element['place']['centroid']['type'] = "Point"
        data_element['place']['centroid']['coordinates'] = [centroid_long, centroid_lat]
        

In [None]:
cleaner = Cleaner(next_file_data, next_file_metadata, logs_folder)
cleaned_data = cleaner.clean_data()

In [597]:
#next_file_data[122]['id_str'] # has coordinates and a place
pprint.pprint(cleaned_data[576]['place']['centroid']['coordinates'])
pprint.pprint(cleaned_data[4234]['place']['centroid']['coordinates'])
pprint.pprint(cleaned_data[575]['place']['centroid']['coordinates'])

[0.0, 0.0]
[0.0, 0.0]
[-82.67290299999999, 27.7957195]


## Loader

https://neo4j.com/developer/python/

https://www.lynda.com/Neo4j-tutorials/Use-Neo4j-driver-Python/601789/659331-4.html

In [599]:
neo4j_query_string = """MERGE (t:Tweet {tweet_id: toInteger($tweet_id)})
                ON CREATE SET t.text = $text,
                    t.lang = $lang,
                    t.timestamp_ms = toInteger($timestamp_ms),
                    t.favorited = $favorited,
                    t.retweeted = $retweeted,
                    t.retweet_count = toInteger($retweet_count),
                    t.favorite_count = toInteger($favorite_count),
                    t.quote_count = toInteger($quote_count),
                    t.reply_count = toInteger($reply_count),
                    t.coordinates = point({ 
                        longitude: toFloat($tweet_coordinates_long), 
                        latitude: toFloat($tweet_coordinates_lat) 
                    })


                MERGE (u:User {user_id: toInteger($user_id)})
                SET	u.name = $user_name,
                    u.screen_name = $user_screen_name,
                    u.description = $user_description,
                    u.location = $user_location

                MERGE (t)-[:TWEETED_BY]->(u)
                MERGE (u)-[:TWEETED]->(t)
                
                MERGE (p:Place {place_id: toString($place_id)})
                SET	p.name = $place_name,
                    p.full_name = $place_full_name,
                    p.country = $place_country,
                    p.country_code = $place_country_code,
                    p.place_type = $place_type,
                    p.bounding_box_LL = point({ 
                        longitude: toFloat($place_bounding_box_LL_long), 
                        latitude: toFloat($place_bounding_box_LL_lat) 
                    }),
                    p.bounding_box_UR = point({ 
                        longitude: toFloat($place_bounding_box_UR_long), 
                        latitude: toFloat($place_bounding_box_UR_lat) 
                    }),
                    p.centroid = point({ 
                        longitude: toFloat($place_centroid_long), 
                        latitude: toFloat($place_centroid_lat) 
                    })
                
                MERGE (t)-[:LOCATED_AT]->(p)"""


class Loader:
    
    def __init__(self, data_list, metadata, logs_path):
        self.data_list = data_list
        self.logs_path = logs_path
        self.file_name = metadata

        # Initialize Neo4j driver
        uri = "bolt://localhost:7687"
        user = input("Username: ")
        pwd = input("Password: ")
        self.driver = GraphDatabase.driver(uri, auth=(user, pwd))
        
        # Create log files, if they don't already exist

    def load_data(self):
        for data_element in self.data_list:
            print("Loading tweet with id: " + data_element['id_str'] + "; User id: " + str(data_element['user']['id']) + "; Place id: " + str(data_element['place']['id']))
            self.write_data(data_element)
            print("Loaded!")
    
    @staticmethod
    def structure_data_for_load(tx, data_element):
        results = tx.run(neo4j_query_string, parameters={
                        'tweet_id': data_element['id_str'],
                        'text': data_element['text'],
                        'lang': data_element['lang'],
                        'timestamp_ms': data_element['timestamp_ms'],
                        ADD
                        'favorited': data_element['favorited'],
                        'retweeted': data_element['retweeted'],
                        'retweet_count': data_element['retweet_count'],
                        'favorite_count': data_element['favorite_count'],
                        'quote_count': data_element['quote_count'],
                        'reply_count': data_element['reply_count'],
                        'tweet_coordinates_long': data_element['coordinates']['coordinates'][0],
                        'tweet_coordinates_lat': data_element['coordinates']['coordinates'][1],
                        'user_id': data_element['user']['id'],
                        'user_name': data_element['user']['id'],
                        'user_screen_name': data_element['user']['screen_name'],
                        'user_description': data_element['user']['description'],
                        'user_location': data_element['user']['location'],
                        'place_id': data_element['place']['id'],
                        'place_name': data_element['place']['name'],
                        'place_full_name': data_element['place']['full_name'],
                        'place_country': data_element['place']['country'],
                        'place_country_code': data_element['place']['country_code'],
                        'place_type': data_element['place']['place_type'],
                        'place_bounding_box_LL_long': data_element['place']['better_bounding_box']['coordinates'][0][0][0],
                        'place_bounding_box_LL_lat': data_element['place']['better_bounding_box']['coordinates'][0][0][1],
                        'place_bounding_box_UR_long': data_element['place']['better_bounding_box']['coordinates'][0][2][0],
                        'place_bounding_box_UR_lat': data_element['place']['better_bounding_box']['coordinates'][0][2][1],
                        'place_centroid_long': data_element['place']['centroid']['coordinates'][0],
                        'place_centroid_lat': data_element['place']['centroid']['coordinates'][1]
        })
        return(results)
    
    def write_data(self, data_element):
        with self.driver.session() as session:
            tx = session.begin_transaction()
            db_result = self.structure_data_for_load(tx, data_element)
            tx.commit()
            print(db_result)
     
    def log_successful_load(self):
        ''' If load succeeds, record the time it took to run and write filename to a log file 
        indicating that it's been successfully loaded so we don't load it again. Also remove it from 
        the files_to_load log file so we don't try to re-load it. ''' 
        pass
        
    
    def log_failed_load(self):
        ''' If load failed, record the time it ran and record a failed load to the log file. '''
        
    def close(self):
        self.driver.close()

In [600]:
loader = Loader(cleaned_data, next_file_metadata, logs_folder)
loader.load_data()

Username: neo4j
Password: n0sql4m3
Loading tweet with id: 947699650961821698; User id: 1340410436; Place id: f0592a72d560435a
<neo4j.v1.result.BoltStatementResult object at 0x11a9d0ef0>
Loaded!
Loading tweet with id: 947699650932375552; User id: 396171102; Place id: 1f5306f35e51eee4
<neo4j.v1.result.BoltStatementResult object at 0x11a9d0c18>
Loaded!
Loading tweet with id: 947699650781310977; User id: 3265758302; Place id: 004ec16c62325149
<neo4j.v1.result.BoltStatementResult object at 0x11a9d0dd8>
Loaded!
Loading tweet with id: 947699650949029888; User id: 125836761; Place id: 41f575b7eebcd4b7
<neo4j.v1.result.BoltStatementResult object at 0x11a9d0c18>
Loaded!
Loading tweet with id: 947699650701639680; User id: 200848289; Place id: 95484b83e4708f58
<neo4j.v1.result.BoltStatementResult object at 0x11a9d0588>
Loaded!
Loading tweet with id: 947699650177495040; User id: 985433762; Place id: 67d92742f1ebf307
<neo4j.v1.result.BoltStatementResult object at 0x11a9d0550>
Loaded!
Loading tweet w

KeyboardInterrupt: 

Exception ignored in: 'neo4j.bolt._io.ChunkedInputBuffer.receive'
Traceback (most recent call last):
  File "/Users/linkalis/anaconda/lib/python3.6/ssl.py", line 1002, in recv_into
    return self.read(nbytes, buffer)
  File "/Users/linkalis/anaconda/lib/python3.6/ssl.py", line 865, in read
    return self._sslobj.read(len, buffer)
  File "/Users/linkalis/anaconda/lib/python3.6/ssl.py", line 625, in read
    v = self._sslobj.read(len, buffer)
KeyboardInterrupt: 


ServiceUnavailable: Failed to write to closed connection Address(host='::1', port=7687, flow_info=0, scope_id=0)

## JUNK

In [505]:
# https://developer.twitter.com/en/docs/geo/places-near-location/api-reference/get-geo-reverse_geocode.html

twurl '/1.1/geo/reverse_geocode.json?lat=2.204446&long=102.189931&granularity=country'
twurl '/1.1/geo/reverse_geocode.json?lat=2.255562&long=102.250785&granularity=country'

SyntaxError: invalid syntax (<ipython-input-505-7cca7c542ae1>, line 1)

In [587]:
# Testing Neo4j import

# Initialize Neo4j driver
uri = "bolt://localhost:7687"
user = input("Username: ")
pwd = input("Password: ")
driver = GraphDatabase.driver(uri, auth=(user, pwd))

with driver.session() as session:
    result = session.run("MATCH (n:Tweet) RETURN n LIMIT 25")
    
    for record in result:
        print(record)

Username: neo4j
Password: n0sql4m3
<Record n=<Node id=39 labels={'Tweet'} properties={'timestamp_ms': 1514783008666, 'text': 'https://t.co/omlcNUB5kV', 'lang': 'und', 'tweet_id': 947694742837628928, 'retweeted': False, 'favorited': False}>>
<Record n=<Node id=41 labels={'Tweet'} properties={'timestamp_ms': 1514783008854, 'text': 'Happy New Years Hoe We Made It ? https://t.co/JVArFMvTBD', 'lang': 'en', 'tweet_id': 947694743626178560, 'favorited': False, 'retweeted': False}>>
<Record n=<Node id=42 labels={'Tweet'} properties={'timestamp_ms': 1514783008958, 'text': '@Co2_eSports fuck yeah lol', 'lang': 'en', 'tweet_id': 947694744062144512, 'retweeted': False, 'favorited': False}>>
<Record n=<Node id=43 labels={'Tweet'} properties={'timestamp_ms': 1514783008735, 'text': 'Feliz ano novoooo ? https://t.co/ga7QPyWjG9', 'lang': 'pt', 'tweet_id': 947694743126933505, 'favorited': False, 'retweeted': False}>>
<Record n=<Node id=45 labels={'Tweet'} properties={'timestamp_ms': 1514783009010, 'text'