Permalink
Browse files

wip

* More corruption handling during loading
* Multiprocessing usage
* Better status messages
  • Loading branch information...
1 parent 0edbe61 commit 1bba7a1d4a4253152e47ece630e70c5048bdb583 Zack Maril committed Jun 30, 2014
Showing with 37 additions and 16 deletions.
  1. +2 −2 dedupe/being.py
  2. +35 −14 dedupe/load.py
View
@@ -44,9 +44,9 @@ def countTypes(universe):
def groupMerge(universe, pred, extract,description=None):
if description != None:
+ print(description)
start = countTypes(universe)
- print(description)
-
+
nodes = filter(lambda t: pred(t[1]),universe.nodes(data=True))
d = defaultdict(list)
for k,v in nodes:
View
@@ -2,6 +2,7 @@
import copy
import json
from glob import glob
+import multiprocessing
import networkx as nx
import os
import pickle
@@ -25,21 +26,31 @@ def carefulDict(jOb,d,props):
if found:
d[k] = preProcess(v)
+corruption = "{http://www.PureEdge.com/XFDL/Custom}"
+
+def clean(d):
+ nd = {}
+ for k,v in d.iteritems():
+ nk = k.split(corruption)[-1]
+ if type(v) == dict: ##TODO: map over arrays
+ v = clean(v)
+ nd[nk] = v
+ return nd
+
+
def loadForm(f,t):
t = str(t)
jOb = {}
- corruption = "{http://www.PureEdge.com/XFDL/Custom}"
+ fo = codecs.open(f,"r",encoding="utf-8")
+ jOb = json.loads(fo.read())
+ if corruption+u'LOBBYINGDISCLOSURE{}'.format(t) in jOb:
+ jOb = clean(jOb)
+
try:
- fo = codecs.open(f,"r",encoding="utf-8")
- jOb = json.loads(fo.read())[u'LOBBYINGDISCLOSURE{}'.format(t)]
+ jOb = jOb[u'LOBBYINGDISCLOSURE{}'.format(t)]
except KeyError:
- try:
- corruptedjOb = json.loads(open(f).read())[corruption+u'LOBBYINGDISCLOSURE{}'.format(t)]
- jOb = {}
- for (k,v) in corruptedjOb.iteritems():
- jOb[k.split(corruption)[-1]]=v
- except KeyError:
- return None
+ print("Weird, flipping form number just in case")
+ jOb = jOb[u'LOBBYINGDISCLOSURE{}'.format(str(3-int(t)))]
client = {
#Type
@@ -63,7 +74,7 @@ def loadForm(f,t):
("zip", ["clientZip"]),
("description", ["clientGeneralDescription"]),
("specific_issuse", ["specific_issuse"])])
-
+
firm = {
#Type
@@ -98,6 +109,12 @@ def loadForm(f,t):
}
return (client, firm, employs)
+def loadForm1(x):
+ return loadForm(x,1)
+
+def loadForm2(x):
+ return loadForm(x,2)
+
def loadData():
universe = nx.Graph()
data = None
@@ -107,13 +124,16 @@ def loadData():
data = pickle.load(f)
else:
print("Loading and processing files now")
- data = map(lambda x: loadForm(x,1),glob(os.environ["HOUSEXML"]+"/LD1/*/*/*.json")[0:10])
- data += map(lambda x: loadForm(x,2),glob(os.environ["HOUSEXML"]+"/LD2/*/*/*.json")[0:10])
+ p = multiprocessing.Pool(8)
+ data = p.map(loadForm1,glob(os.environ["HOUSEXML"]+"/LD1/*/*/*.json"),10)
+ data = p.map(loadForm2,glob(os.environ["HOUSEXML"]+"/LD2/*/*/*.json"),10)
print "Saving processed files"
with open(processed_files,"w") as f:
pickle.dump(data,f,2)
-
+
+ print("Starting from {} records".format(len(data)))
+ print("Building universe")
for col in data:
if col == None:
continue
@@ -132,4 +152,5 @@ def loadData():
universe.add_edge(fnode,fbeing,copy.copy(represents))
universe.add_edge(fnode,cnode,employs)
+ print("Universe loaded and built")
return universe

0 comments on commit 1bba7a1

Please sign in to comment.