Permalink
Switch branches/tags
Nothing to show
Find file
Fetching contributors…
Cannot retrieve contributors at this time
executable file 130 lines (108 sloc) 3.87 KB
#!/usr/bin/python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import md5, optparse, os, socket, time
from client import *
parser = optparse.OptionParser(usage="usage: %prog [options] <address> [<content> ...]",
description="send messages")
parser.add_option("-H", "--host",
help="host to connect to (default 0.0.0.0)")
parser.add_option("-a", "--auth", action="store_true",
help="enable sasl authentication layer")
parser.add_option("-u", "--username", help="username to use for authentication")
parser.add_option("-w", "--password", help="password to use for authentication")
parser.add_option("-m", "--mechanism", default="ANONYMOUS",
help="sasl mechanism to choose")
parser.add_option("-p", "--port", type=int, default=5672,
help="port to connect to (default %default)")
parser.add_option("-f", "--frame-size", type=int, default=4294967295,
help="specify the maximum frame size")
parser.add_option("-l", "--link", help="link name")
parser.add_option("-t", "--trace", default="err",
help="enable tracing for specified categories")
parser.add_option("-C", "--container", type=str,
default="%s.filesend" % socket.gethostname(),
help="specify the container-id")
opts, args = parser.parse_args()
if len(args) != 2:
parser.error("address and send dir is required")
addr = Target(address=args[0], durable=2)
host = opts.host or os.getenv('AMQP_BROKER') or "0.0.0.0"
conn = Connection(auth=opts.auth)
conn.tracing(*opts.trace.split())
conn.connect(host, opts.port)
conn.open(mechanism=opts.mechanism.upper(), username=opts.username,
password=opts.password, max_frame_size=opts.frame_size,
container_id=opts.container)
ssn = conn.session()
SFX = ".sending"
def name(f):
if f.endswith(SFX):
return f[:-len(SFX)]
else:
return f
def tag(f):
return md5.md5(name(f)).digest()
def path(f):
return os.path.join(dir, f)
def content(f):
return open(path(f)).read()
def message(f):
return Message(content=content(f), delivery_tag=tag(f),
properties={"filename": name(f)})
dir = args[1]
files = [f for f in os.listdir(dir) if os.path.isfile(path(f))]
bytag = {}
for f in files:
bytag[tag(f)] = f
unsettled = {}
for f in files:
if f.endswith(SFX):
unsettled[tag(f)] = None
if opts.link:
link = opts.link
else:
link = dir
lnk = ssn.sender(addr, name=link, unsettled=unsettled)
def settle():
for t, l, r in lnk.pending():
if t in bytag:
f = bytag[t]
os.unlink(path(f))
print "SETTLED", f, r.state
lnk.settle(t, r.state)
for f in files:
if f.endswith(SFX) and tag(f) not in lnk.unsettled:
lnk.send(settled=False, message=message(f))
print "RESUME", path(f)
settle()
for f in files:
if not f.endswith(SFX):
os.rename(path(f), path(f + SFX))
bytag[tag(f)] = f + SFX
lnk.send(settled=False, message=message(f + SFX))
print "SENT", path(f)
settle()
while lnk.pending(True):
settle()
lnk.close()
ssn.close()
conn.close()
for tag, local, remote in lnk.get_unsettled():
print "UNSETTLED:", tag, local, remote