forked from gns24/pydatomic
-
Notifications
You must be signed in to change notification settings - Fork 0
/
datomic.py
99 lines (90 loc) · 3.43 KB
/
datomic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# -*- coding: utf-8 -*-
import httplib2
import urllib
from urlparse import urljoin
from edn import loads
import sys
if 'gevent' in sys.modules:
import gevent
import grequests as requests
else:
gevent = None
import requests
import inspect
import pprint
class Database(object):
def __init__(self, name, conn):
self.name = name
self.conn = conn
def __getattr__(self, name):
def f(*args, **kwargs):
return getattr(self.conn, name)(self.name, *args, **kwargs)
return f
class Datomic(object):
def __init__(self, location, storage):
self.location = location
self.storage = storage
def db_url(self, dbname):
return urljoin(self.location, 'data/') + self.storage + '/' + dbname
def create_database(self, dbname, stream=False, size=None):
r = requests.post(self.db_url(''), data={'db-name':dbname})
if gevent:
pool = Pool(size) if size else None
jobs = [requests.send(r, pool, stream=stream)]
gevent.joinall(jobs)
r = r.response
assert r.status_code in (200, 201), r.text
return Database(dbname, self)
def transact(self, dbname, data, stream=False, size=None):
data = '[%s\n]' % '\n'.join(data)
r = requests.post(self.db_url(dbname)+'/', data={'tx-data':data},
headers={'Accept':'application/edn'})
if gevent:
pool = Pool(size) if size else None
jobs = [requests.send(r, pool, stream=stream)]
gevent.joinall(jobs)
r = r.response
assert r.status_code in (200, 201), (r.status_code, r.text)
return loads(r.content)
def query(self, dbname, query, extra_args=[], history=False, stream=False, size=None):
args = '[{:db/alias ' + self.storage + '/' + dbname
if history:
args += ' :history true'
args += '} ' + ' '.join(str(a) for a in extra_args) + ']'
r = requests.get(urljoin(self.location, 'api/query'),
params={'args' : args, 'q':query},
headers={'Accept':'application/edn'})
if gevent:
pool = Pool(size) if size else None
jobs = [requests.send(r, pool, stream=stream)]
gevent.joinall(jobs)
r = r.response
assert r.status_code == 200, r.text
return loads(r.content)
def entity(self, dbname, eid, stream=False, size=None):
r = requests.get(self.db_url(dbname) + '/-/entity', params={'e':eid},
headers={'Accept':'application/edn'})
if gevent:
pool = Pool(size) if size else None
jobs = [requests.send(r, pool, stream=stream)]
gevent.joinall(jobs)
r = r.response
assert r.status_code == 200
return loads(r.content)
if __name__ == '__main__':
q = """[{
:db/id #db/id[:db.part/db]
:db/ident :person/name
:db/valueType :db.type/string
:db/cardinality :db.cardinality/one
:db/doc "A person's name"
:db.install/_attribute :db.part/db}]"""
conn = Datomic('http://localhost:3000/', 'tdb')
db = conn.create_database('cms')
db.transact(q)
db.transact('[{:db/id #db/id[:db.part/user] :person/name "Peter"}]')
r = db.query('[:find ?e ?n :where [?e :person/name ?n]]')
print r
eid = r[0][0]
print db.query('[:find ?n :in $ ?e :where [?e :person/name ?n]]', [eid], history=True)
print db.entity(eid)