In [1]:
# May 2016
# Debbie Hofman
# Pre-process node and edges for loading with neo4j batch loader
# Use UTC day partitions

In [2]:
%%writefile data_preprocess.py

import sys, getopt
import json
import datetime
import pytz
import math
import time
import pandas as pd
import os
import csv


from tweetparser_timepart import tweetparser

epoch = datetime.datetime.utcfromtimestamp(0)

nodetables = ['Tweet','User','Source','Place','Hashtag','Link','UserMention']
reltables = ['POSTS','USING','LOCATED_IN','TAGS','CONTAINS','MENTIONS','RETWEETS','REPLY_TO']


def readargs(argv):
    
    inputfile=''
    remoteserver=''
    maxsize=2000
    batchsize=1000
    truncate=False
    
    try:
        opts, args = getopt.getopt(argv,"hi:p:r:m:b:t")
    except getopt.GetoptError:
        print 'data_preprocess.py -i <inputfile> -r <remote server url>'
        sys.exit(2)
    for opt, arg in opts:
        if opt == '-h':
            print 'data_preprocess.py -i <inputfile> -r <remote server url> \
            -m <maxnumberrows> -b <batchsize> -t<truncate>'
            sys.exit()
        elif opt == "-i":
            inputfile = arg
        elif opt == "-b":
            batchsize = int(arg)
        elif opt == "-m":
            maxsize = int(arg)
        elif opt == "-t":
            truncate = True
        
    return inputfile, batchsize, maxsize, truncate

   
def unix_time_millis(dt):
    return int( (dt - epoch).total_seconds() * 1000.0)


def addtweet(data,databatch):
    
    pretweet=''
    
    if (('retweeted' in data) and ('retweeted_status' in data)):
        pretweet = data['retweeted_status'];

    elif (('quoted_status' in data)):
        pretweet = data['quoted_status'];
        
    if (pretweet != ''):
        databatch.append(pretweet) 
    
    databatch.append(data)
    
    
def get_idcols(node_or_rel, columns):
    
    if (node_or_rel=='node'):
        return [col for col in columns if ':ID' in col]

    else:  
        startcol = [col for col in columns if ':START_ID' in col]
        endcol = [col for col in columns if ':END_ID' in col]
        typecol = [col for col in columns if ':TYPE' in col]

        return startcol+endcol+typecol
    

def get_filename(filenamepart, partition_key):
    
    filename='dataloader6/output'+'/'+partition_key+'/'+filenamepart+'.csv'
    
    return filename


def checkpath(partition_key):
    path = 'dataloader6/output'+'/'+ partition_key
    try: 
        os.makedirs(path)
    except OSError:
        if not os.path.isdir(path):
            raise


def getheader(filenamepart, partition_key):
    
    headernamepart=filenamepart+"-header"
    filename=get_filename(headernamepart, partition_key)

    columns=[]
    with open(filename, 'r+') as f:
        for line in f:
            columns = line.split(',')
    
    #print columns
    return columns


def writeheader(filenamepart, partition_key, columns):
    #print filenamepart, columns
    headernamepart=filenamepart+"-header"
    filename=get_filename(headernamepart, partition_key)
    
    if not os.path.isfile(filename):
        with open(filename, 'w') as f:
            f.write(','.join(columns))

            
def writefile(rowlist, filenamepart, node_or_rel, partition_key):

    if (len(rowlist) > 0):
        
        #create a dataframe and drop duplicates
        columns=rowlist[0].keys()
        
        df = pd.DataFrame(rowlist, columns=columns)
        
        df = df.drop_duplicates(subset=get_idcols(node_or_rel, rowlist[0].keys()))
        df = df[columns]
    
        #make sure path exists and write
        checkpath(partition_key)
        
        #make sure header file exists..
        writeheader(filenamepart, partition_key, columns)
        
        #get filename
        filename=get_filename(filenamepart, partition_key)
        
        df.to_csv(filename,\
            index=False,\
            header=False,\
            encoding="utf-8",\
            mode="a")
        
        
def init_file(filenamepart, partition_key) :
    
    filename=get_filename(filenamepart, partition_key)
    
    if os.path.isfile(filename):
        os.remove(filename)

        
def dedup_file(filenamepart, node_or_rel, partition_key) :

    filename=get_filename(filenamepart, partition_key)
    
    if os.path.isfile(filename):
        
        try:
            #Now, read back in the data and dedup one more time (previously dedup within each small batch)
            df=pd.read_csv(filename, encoding="utf-8", dtype={'retweet_id':'str'}, error_bad_lines=False)

            columns = getheader(filenamepart, partition_key)
            #print columns
            df.columns = columns

            df = df.drop_duplicates(subset=get_idcols(node_or_rel, df.columns))
            
            outfile = get_filename(filenamepart+'_dedup', partition_key)
            init_file(filenamepart+'_dedup', partition_key)

            df.to_csv(outfile, \
                      header=False,\
                      index=False,\
                      encoding="utf-8", \
                      mode="w+")
            
        except Exception as e:
            print "error while deduping files"
            print e
            

def initfiles(partition_key):
    
    try:
                
        for k in nodetables:
            init_file(k, partition_key)
                            
        for k in reltables:
            init_file(k, partition_key)
        
    except Exception as e:
        print "error while initializing files"
        print e

        
def writebatch(start, linescount, databatch, parser, partition_key):
    
    try:
        
        parser.parseData(databatch, partition_key)
        
        for k,v in parser.nodelists.iteritems():
            writefile(v, k, 'node', partition_key)
                            
        for k,v in parser.rellists.iteritems():
            writefile(v, k, 'rel', partition_key)
        
        parser.clear()
        
        del databatch[:]
            
        print "{0} lines processed after {1} seconds.".format(linescount,time.time()-start)
        
    except Exception as e:
        print "error at record {0}".format(linescount)
        print e


def dedup_files(start, partition_key):
    
    try:
        
        for k in nodetables:
            dedup_file(k, 'node', partition_key)
            print "dedup {0} complete after {1} seconds.".format(k,time.time()-start)
                            
        for k in reltables:
            dedup_file(k, 'rel', partition_key)
            print "dedup {0} complete after {1} seconds.".format(k,time.time()-start)
        
    except Exception as e:
        print "error while deduping files"
        print e
        raise
    
    
def fix_edges(partition_key, edge_file, node_file):
    
    try:

        #Get edge file, column list, id columns
        df1filename = get_filename(edge_file+'_dedup', partition_key)
        df2filename = get_filename(node_file+'_dedup', partition_key)
        
        if ((os.path.isfile(df1filename)) and (os.path.isfile(df2filename))):
            
            df1=pd.read_csv(df1filename)
            df1.columns = getheader(edge_file, partition_key)
            #df1columns = df1.columns
            edgeidcols = get_idcols('rel', df1.columns)

            #Get node file, id columns
            df2=pd.read_csv(df2filename)
            df2.columns = getheader(node_file, partition_key)
            nodeidcols = get_idcols('node', df2.columns)

            #merge tables (left join on edge file)
            key1 = edgeidcols[1]
            key2 = nodeidcols[0]

            df3=pd.merge(df1, df2, left_on=key1, right_on=key2, how = 'left')

            df3 = df3[(df3[key2] == df3[key1])]
            df3 = df3[df1.columns]

            #remove any lines that have null values. Edges should never have null values.
            df3.dropna(inplace=True)

            #write out modified edge file with bad references removed.
            df3.to_csv(df1filename,\
                header=False,\
                index=False,\
                encoding="utf-8",\
                mode="w+")
    
    except Exception as e:
            print "error fixes edges"
            print e
            raise

def finalizepartition(start, partitionkey):
    
    print 'dedup partition', partitionkey
    dedup_files(start, partitionkey)
                        
    print 'fix edges partition', partitionkey
    fix_edges(partitionkey, 'REPLY_TO', 'Tweet')
    fix_edges(partitionkey, 'MENTIONS', 'User')
    
    print 'completed partition ', partitionkey

    
def is_en(data):
    
    lang=''
    
    if (('lang' in data) and (data['lang']!=None)):
        lang=data['lang']
        
    return (lang=='en') #return boolean


def main(argv):
    
    fname, batchsize, readsize, truncate = readargs(argv)
    print "file=", fname, "key=", "batch=", batchsize, "max=", readsize, "truncate=", truncate

    start = time.time()

    parser = tweetparser()
# Connect to graph and add constraints.
        
    databatch = []
    partitions = []
    limitlines = 0

    with open(fname, 'rb') as f:
        i = 0;
        j = 0;
        lastpartitionkey=''
  
        for line in f:  
            
            if ((line is None) or (j >= readsize)): #test end of file, write remaining data
                break;
            
            j=j+1 
            i=i+1
        
            try:
            
                data = json.loads(line, encoding='utf8')
                
                if (('limit' not in data.keys()) and (is_en(data))):

                    #update partition key based on content of this row
                    partition_key = parser.getPartitionKey(data)
                    
                    newpartition = ((lastpartitionkey!='') and (partition_key != lastpartitionkey))
                    
                    if (newpartition):
                        
                        print 'new partition found',lastpartitionkey,partition_key,j
                        
                        writebatch(start, j, databatch, parser, lastpartitionkey)
                        i=0
                        
                        print 'reset partition key'
                        partitions.append(partition_key)
                        lastpartitionkey = partition_key
                        print lastpartitionkey
                          
                    addtweet(data,databatch)
                                     
                    if (lastpartitionkey==''):
                        lastpartitionkey = partition_key
                        partitions.append(partition_key)

                else:
                    limitlines = limitlines + 1
                    
                                        
                if ((i == batchsize) or (i==readsize)): 
                    writebatch (start, j, databatch, parser, partition_key)
                    i = 0 
                    
            except Exception as e:
                #print data, databatch
                print 'error encountered...', e, j, "lines processed"
    
        if (len(databatch)>0):
            writebatch (start, j, databatch, parser, lastpartitionkey)
            
        partitions = list(set(partitions))
        
        for partitionkey in partitions:
            finalizepartition(start,partitionkey)
        
        end = time.time()
        print end - start
        
        print 'limitlines', limitlines
        
        
if __name__ == "__main__":
    main(sys.argv[1:])
    

Writing data_preprocess.py


In [7]:
!python data_preprocess.py -i './archive/data/election172' -m 1000000 -b 10000


file= ./archive/data/election172 key= batch= 10000 max= 1000000 truncate= False
10000 lines processed after 5.71085500717 seconds.
20000 lines processed after 11.7520539761 seconds.
30000 lines processed after 17.2265031338 seconds.
^CTraceback (most recent call last):
  File "data_preprocess.py", line 395, in <module>
    main(sys.argv[1:])
  File "data_preprocess.py", line 373, in main
    writebatch (start, j, databatch, parser, partition_key)
  File "data_preprocess.py", line 215, in writebatch
    writefile(v, k, 'rel', partition_key)
  File "data_preprocess.py", line 149, in writefile
    mode="a")
  File "/Users/debbiehofman/anaconda/lib/python2.7/site-packages/pandas/core/frame.py", line 1189, in to_csv
    formatter.save()
  File "/Users/debbiehofman/anaconda/lib/python2.7/site-packages/pandas/core/format.py", line 1467, in save
    self._save()
  File "/Users/debbiehofman/anaconda/lib/python2.7/site-packages/pandas/core/format.py", line 1567, in _save
    self._save_chunk(sta

In [4]:
# # # for i in range(172,190):
# %cd ~/documents/DSE/dhofman/capstone
# import os
# files = os.listdir("./archive/data")
# print files
# for fn in files: 
#     filename = "./archive/data/"+fn
#     !python data_preprocess.py -i $filename -m 1200000 -b 10000



### Test code below for data cleanup

In [5]:

#!head -895565 'dataloader2/output/2016_3_2/UserMention.csv' |tail -1

In [6]:
import os
import pandas as pd

        
def init_file(filenamepart, partition_key) :
    
    filename=get_filename(filenamepart, partition_key)
    
    if os.path.isfile(filename):
        os.remove(filename)

        
def get_idcols(node_or_rel, columns):
    
    if (node_or_rel=='node'):
        return [col for col in columns if ':ID' in col]

    else:  
        startcol = [col for col in columns if ':START_ID' in col]
        endcol = [col for col in columns if ':END_ID' in col]
        typecol = [col for col in columns if ':TYPE' in col]

        return startcol+endcol+typecol
    

def getheader(filenamepart, partition_key):
    
    headernamepart=filenamepart+"-header"
    filename=get_filename(headernamepart, partition_key)

    columns=[]
    with open(filename, 'r+') as f:
        for line in f:
            columns = line.split(',')
    
    #print columns
    return columns

def get_filename(filenamepart, partition_key):
    
    filename='dataloader4/output'+'/'+partition_key+'/'+filenamepart+'.csv'
    
    return filename


def dedup_file_test(filenamepart, node_or_rel, partition_key) :

    filename=get_filename(filenamepart, partition_key)
    
    if os.path.isfile(filename):
        
        try:
            #Now, read back in the data and dedup one more time (previously dedup within each small batch)
            df=pd.read_csv(filename, encoding="utf-8", dtype={'retweet_id':'str'}, error_bad_lines=False)

            columns = getheader(filenamepart, partition_key)
            #print columns
            df.columns = columns

            df = df.drop_duplicates(subset=get_idcols(node_or_rel, df.columns))
            
            outfile = get_filename(filenamepart+'_dedup', partition_key)
            init_file(filenamepart+'_dedup', partition_key)
            
            print df.count()
            df.dropna(inplace=True)
            print 'After dropna..'
            print df.count()

            df.to_csv(outfile, \
                      header=False,\
                      index=False,\
                      encoding="utf-8", \
                      mode="w+")
            
        except Exception as e:
            print "error while deduping files"
            print e
            

filenamepart = "TAGS"
nodeorrel = "rel"
partition_key = "2016_2_7"

dedup_file_test(filenamepart, nodeorrel, partition_key)