Skip to content
This repository
Fetching contributors…

Octocat-spinner-32-eaf2f5

Cannot retrieve contributors at this time

executable file 212 lines (180 sloc) 6.241 kb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
import sqlite3
from socket import *
import logging
import pylantorrent
from pylantorrent.db import LantorrentDB
from pylantorrent.server import LTServer
from pylantorrent.client import LTClient
import os
try:
    import json
except ImportError:
    import simplejson as json
import time
import random
import datetime
from pylantorrent import cbOpts
import sys

def setup_options(argv):

    u = """[options]
Submit a transfer request
"""
    (parser, all_opts) = pylantorrent.get_default_options(u)

    opt = cbOpts("nonblock", "n", "Do not block waiting for completion", False, flag=True)
    all_opts.append(opt)
    opt = cbOpts("reattach", "a", "Reattach", None)
    all_opts.append(opt)
    opt = cbOpts("cancel", "c", "Cancel", False, flag=True)
    all_opts.append(opt)

    (o, args) = pylantorrent.parse_args(parser, all_opts, argv)
    return (o, args, parser)


def wait_until_sent(con, rid):
    done = False
    while not done:
        (done, rc, message) = is_done(con, rid)
        if not done:
            time.sleep(5)
    return (rc, message)

#
def is_done(con, rid):
    error_cnt = 0
    while True:
        try:
            pylantorrent.log(logging.INFO, "checking for done on %s" % (rid))
            done = False
            rc = 0
            s = "select state,message,attempt_count from requests where rid = ?"
            data = (rid,)
            c = con.cursor()
            c.execute(s, data)
            rs = c.fetchone()
            con.commit()
            state = int(rs[0])
            message = rs[1]
            attempt_count = rs[2]
            if state == 1:
                done = True
            elif attempt_count > 2:
                done = True
                rc = 1
                if message == None:
                    message = "too many attempts %d" % (attempt_count)
            con.commit()
            return (done, rc, message)
        except sqlite3.OperationalError, sqlex:
            error_cnt = error_cnt + 1
            if error_cnt >= pylantorrent.config.db_error_max:
                raise sqlex
            time.sleep(random.random() * 2.0)

def delete_rid(con, rid):
    error_cnt = 0
    while True:
        try:
            # cleanup
            c = con.cursor()
            d = "delete from requests where rid = ?"
            data = (rid,)
            c = con.cursor()
            c.execute(d, data)
            con.commit()
            return
        except sqlite3.OperationalError, sqlex:
            error_cnt = error_cnt + 1
            if error_cnt >= pylantorrent.config.db_error_max:
                raise sqlex
            time.sleep(random.random() * 2.0)

def request(argv, con):
    if len(argv) < 4:
        raise Exception("You must provide 4 arguments: <src file> <dst file> <a uuid for this request> <the contanct string of the receiving nodes lt server>")
    src_filename = argv[0]
    dst_filename = argv[1]
    # the user provides the rid. that way we know they have it to look
    # things up later if needed
    rid = argv[2]

    # get the size of the file and verify that it exists
    sz = os.path.getsize(src_filename)

    hostport = argv[3]
    ha = hostport.split(":")
    host = ha[0]
    if host == "":
        hostport = os.environ['SSH_CLIENT']
        ha2 = hostport.split(" ")
        host = ha2[0]
    if len(ha) > 1:
        port = int(ha[1])
    else:
        port = 2893

    now = datetime.datetime.now()
    i = "insert into requests(src_filename, dst_filename, hostname, port, rid, entry_time, state, attempt_count) values (?, ?, ?, ?, ?, ?, ?, ?)"
    data = (src_filename, dst_filename, host, port, rid, now, 0, 0, )

    error_ctr = 0
    while True:
        try:
            c = con.cursor()
            c.execute(i, data)
            con.commit()
            pylantorrent.log(logging.INFO, "new request %s %d" % (rid, sz))
            return (rid, sz)
        except Exception, ex:
            pylantorrent.log(logging.ERROR, "an error occured on request %s" % str(ex))
            error_ctr = error_ctr + 1
            if error_ctr >= pylantorrent.config.db_error_max:
                raise ex
            time.sleep(random.random() * 2.0)

    # should never get here
    raise Exception("LANTorrent should not have gotten here")


def main(argv=sys.argv[1:]):
    """
This program allows a file to be requested from the lantorrent system. The
file will be sent out of band. When the file has been delived the
database entry for this request will be updated. This program will
block until that entry is update.

As options, the program takes the source file, the
target file location, the group_id and the group count.

The lantorrent config file must have the ip and port that the requester
is using for lantorrent delivery.
"""

    pylantorrent.log(logging.INFO, "enter")
    random.seed()

    (o, args, p) = setup_options(argv)

    # use sqlaclh to make sure the db is there
    x = LantorrentDB("sqlite:///%s" % pylantorrent.config.dbfile)
    x.close()
    
    con_str = pylantorrent.config.dbfile
    con = sqlite3.connect(con_str, isolation_level="EXCLUSIVE")

    rc = 0
    sz = -1
    done = False
    message = ""
    if o.reattach is None:
        (rid, sz) = request(args, con)
        try:
            (done, rc, message) = is_done(con, rid)
        except:
            done = False
            rc = 0
            message = "Check on status later, db not ready for polling"
    else:
        rid = o.reattach
        if o.cancel:
            delete_rid(con, rid)
            return 0
        
        (done, rc, message) = is_done(con, rid)

    if not o.nonblock and not done:
        (rc, message) = wait_until_sent(con, rid)
        done = True

    if done:
        delete_rid(con, rid)

    msg = "%d,%s,%s" % (rc, str(done), message)
    print msg

    # always return 0 if we echo the rc to stdout. this tells the
    # user to check the output for the real rc
    return 0


if __name__ == "__main__":
    if 'LANTORRENT_HOME' not in os.environ:
        msg = "The env LANTORRENT_HOME must be set"
        print msg
        raise Exception(msg)

    rc = main()
    # always return 0. an non 0 return code will mean an ssh error
    sys.exit(0)
Something went wrong with that request. Please try again.