In [1]:
import json
import requests
import datetime
import pytz
import time
import pandas as pd
import matplotlib
from pandas.io.json import json_normalize
from sqlalchemy import create_engine, types


In [2]:
engine = create_engine('postgresql://postgres:postgres@localhost:5432/mbta_data')
table_name = 'red_predictions'

In [3]:
# TODO: Drop table if it already exists

with open('data/day2/day2.txt', 'r') as infile:
    
    msg_type = None
    msg_type_next = True
    msg_dict_list = []
    count = 0
    start_time = time.time()
    for line in infile:
        
        count = count + 1
        
        # If we receive a keep-alive message, as in during the night when there is no activity
        if 'keep-alive' in line:
            continue
            
        # If the incoming line is empty, which happens occasionally (but is rare)
        if line=='\n':
            continue
            
        if msg_type == 'RESET': # A special case where roughly once a day, a whole new batch of information comes in at once

            t = datetime.datetime.strptime(line[:19], '%m/%d/%Y %H:%M:%S')

            start_idx = line.find('[')
            end_idx = line.find(']')
            init_data = line[start_idx:end_idx+1]
            init_data = init_data.replace('null', 'None')
            init_list_msgs = eval(init_data)
            init_list_msgs = [dict(msg, message_time=t, message_type='RESET') for msg in init_list_msgs]

            testData = json_normalize(init_list_msgs, sep='_')

            # Force the correct types for certain columns that to_sql would get wrong otherwise
            testData.to_sql(table_name, engine, index=False, if_exists='append', dtype={'attributes_status' : types.TEXT(),
                                                                                     'attributes_arrival_time' : types.TIMESTAMP(),
                                                                                     'attributes_departure_time' : types.TIMESTAMP(),
                                                                                     'attributes_updated_at' : types.TIMESTAMP(),
                                                                                     'relationships_vehicle.data' : types.TEXT(),
                                                                                     'attributes_schedule_relationship' : types.TEXT(),
                                                                                     'relationships_stop_data' : types.TEXT(),
                                                                                     })
            msg_type_next = True
            msg_type = None
            continue
            
        
        if count % 500 == 0: # Process batches of data, 500 lines at a time
            end_time = time.time()
            testData = json_normalize(msg_dict_list, sep='_')
            try:
                testData.to_sql(table_name, engine, if_exists='append', index=False) #TODO: Use to_hdf instead?
            except: # In case one message in the list has weird formatting that messes up json_normalize
                for msg_dict in msg_dict_list:
                    try:
                        testData = json_normalize(msg_dict, sep='_')
                        testData.to_sql(table_name, engine, if_exists='append', index=False)
                    except:
                        print('Error processing line')
                        print(line)
                        print(testData)
            msg_dict_list = []
            print('Processed message number {} after {} seconds'.format(count, end_time-start_time))
            start_time = end_time
            
        if msg_type_next: # If the line shows whether the following line will be an add, update, remove, or reset
            msg_info = line
            if 'reset' in msg_info: # A special case where roughly once a day, a whole new batch of information comes in at once   
                msg_type = 'RESET'
            elif 'update' in msg_info:
                msg_type = 'UPDATE'
            elif 'add' in msg_info:
                msg_type = 'ADD'
            elif 'remove' in msg_info:
                msg_type = 'REMOVE'
            msg_type_next = False
        else: # Add this message's dictionary to the list for processing as a batch
            msg = line
            
            t = datetime.datetime.strptime(msg[:19], '%m/%d/%Y %H:%M:%S')
            
            w = msg.find('{')
            msg = msg[w:-2]
            msg = msg.replace('null', 'None')
            
            msg_dict = eval(msg)            
            msg_dict = dict(msg_dict, message_time=t, message_type=msg_type)
            msg_dict_list.append(msg_dict)
            msg_type_next = True

Processed message number 500 after 3.4800102710723877 seconds
Processed message number 1000 after 0.9954307079315186 seconds
Processed message number 1500 after 0.7265827655792236 seconds
Processed message number 2000 after 0.5327000617980957 seconds
Processed message number 2500 after 0.5486841201782227 seconds
Processed message number 3000 after 0.976445198059082 seconds
Processed message number 3500 after 1.4102163314819336 seconds
Processed message number 4000 after 1.118337631225586 seconds
Processed message number 4500 after 1.1213557720184326 seconds
Processed message number 5000 after 0.8455185890197754 seconds
Processed message number 5500 after 0.787543773651123 seconds
Processed message number 6000 after 0.7965493202209473 seconds
Processed message number 6500 after 0.7125883102416992 seconds
Processed message number 7000 after 0.683631181716919 seconds
Processed message number 7500 after 0.6406159400939941 seconds
Processed message number 8000 after 0.6476254463195801 secon

Processed message number 500 after 2.1795363426208496 seconds
Processed message number 1000 after 0.7615923881530762 seconds
Processed message number 1500 after 0.9264698028564453 seconds
Processed message number 2000 after 0.7905476093292236 seconds
Processed message number 2500 after 0.8336727619171143 seconds
Processed message number 3000 after 0.8755099773406982 seconds
Processed message number 3500 after 1.0214433670043945 seconds
Processed message number 4000 after 0.9694457054138184 seconds
Processed message number 4500 after 0.9414412975311279 seconds
Processed message number 5000 after 1.0464015007019043 seconds
Processed message number 5500 after 0.9714367389678955 seconds
Processed message number 6000 after 0.9934313297271729 seconds
Processed message number 6500 after 0.9294726848602295 seconds
Processed message number 7000 after 0.6796071529388428 seconds
Processed message number 7500 after 2.0389816761016846 seconds
Processed message number 8000 after 1.1833231449127197 s

In [4]:
line

"10/02/2018 09:30:52: b'event: add'\n"

In [26]:
msg_info

'\n'

In [17]:
msg_dict_list[-1]

{'id': 'prediction-38066185-L-70094-50',
 'message_time': datetime.datetime(2018, 10, 2, 9, 30, 42),
 'message_type': 'REMOVE',
 'type': 'prediction'}

In [8]:
msg_type_next

False

In [9]:
msg

''

In [19]:
msg_info=='\n'

True