Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance of paramiko is pretty slow (with possible solution) #175

Closed
sturmf opened this issue Jun 13, 2013 · 28 comments
Closed

Performance of paramiko is pretty slow (with possible solution) #175

sturmf opened this issue Jun 13, 2013 · 28 comments

Comments

@sturmf
Copy link

@sturmf sturmf commented Jun 13, 2013

Hi,
actually I wanted to post the text below on a mailing list but it seems paramiko has none any more?

I am using paramiko and doing a sftp file transfer from a twisted sftp server.
Unfortunately it turned out that paramiko is about 20 times slower then e.g. the putty sftp client for large files (100MB in my testcase).
So I started investigating and figured out that in the putty case every requested packet is immediately written to the outgoing buffer and in the paramiko case almost always queued waiting for a window advertisement (twisted/conch/ssh/channel.py line 180 method write).
The reason is that the remote window size in paramiko's case was never big enough!

So I checked which remote window size each of the tools set
(twisted/conch/ssh/connection.py line 116 method ssh_CHANNEL_OPEN).

Putty uses: 2147483647
Paramiko uses: 65536

So for a first test I patched the paramiko transport window size to be the same as putty's

self.trans = paramiko.Transport((hostname, port))
self.trans.window_size = 2147483647

and then I reduced the window size back until I saw the first performance degradations.

Here are some (non scientific) performance measurements with client and server on the same localhost
OS: Win7 64Bit, Python 2.7.3, Twisted-13.0.0, pycrypto-2.6, paramiko-1.10.1, Putty 0.62

Putty psftp: 10.00 MB/sec
Paramiko ws=default: 0.54 MB/sec
Paramiko ws=2147483647: 9.09 MB/sec
Paramiko ws=134217727: 9.09 MB/sec
Paramiko ws=67108863 5.55 MB/sec
Paramiko ws=33554431 2.44 MB/sec

So I think the default winow size of paramiko should be raised to be at least 134217727
but maybe even higher. Since this window size seems to be a different one then the TCP window size it seems to be harmless. As a side note I was now for the first time able the transmit a 1GB File in a reasonable time and ran into the twisted bug http://twistedmatrix.com/trac/ticket/4395 that rekeying is broken.
Unfortunately it is still not fixed in twisted 13.0.0! To workaround the bug set the following:

self.trans = paramiko.Transport((hostname, port))
self.trans.packetizer.REKEY_BYTES = pow(2, 40) # 1TB max, this is a security degradation
self.trans.packetizer.REKEY_PACKETS = pow(2, 40) # 1TB max, this is a security degradation

Finally I enabled compression in paramiko by calling self.trans.use_compression() which increased the download performance to 52.63 MB/sec for a file containing only zeros and decreased the performance to 6.88 MB/sec for a file containing complete random data.

For reference this was the unmodified paramiko profile, somehow even in the optimized case a lot of time is spent in the aquire method. I don't understand this since this measurement of acquire and release on Win7 returns a much higher performance:

python -m timeit -s "from threading import Lock; l=Lock(); a=l.acquire; r=l.release" "a(); r()"
10000000 loops, best of 3: 0.159 usec per loop

before:

746002 function calls in 170.389 seconds

Ordered by: call count

ncalls  tottime  percall  cumtime  percall filename:lineno(function)
145033    0.015    0.000    0.015    0.000 {len}
48405  168.886    0.003  168.886    0.003 {method 'acquire' of 'thread.lock' objects}
37137    0.007    0.000    0.007    0.000 {method 'release' of 'thread.lock' objects}
17103    0.011    0.000    0.011    0.000 {_struct.pack}
17091    0.014    0.000    0.014    0.000 {hasattr}
17076    0.010    0.000    0.010    0.000 {_hashlib.openssl_sha1}
17076    0.022    0.000    0.068    0.000 C:\Program Files (x86)\Python27\lib\site-packages\Crypto\Hash\SHA.py:73(__init__)
17076    0.022    0.000    0.046    0.000 C:\Program Files (x86)\Python27\lib\site-packages\Crypto\Hash\hashalgo.py:34(__init__)
17076    0.015    0.000    0.015    0.000 {method 'update' of '_hashlib.HASH' objects}
12841    0.006    0.000    0.006    0.000 {method 'write' of 'cStringIO.StringO' objects}
12814    0.004    0.000    0.004    0.000 {chr}
12807    0.011    0.000    0.024    0.000 C:\Program Files (x86)\Python27\lib\site-packages\Crypto\Hash\hashalgo.py:53(update)
12305    0.005    0.000    0.005    0.000 C:\Program Files (x86)\Python27\lib\threading.py:62(_note)
12298    0.009    0.000    0.009    0.000 {time.time}
10334    0.009    0.000    0.009    0.000 {isinstance}
10277    0.031    0.000    0.082    0.000 C:\Program Files (x86)\Python27\lib\site-packages\paramiko\channel.py:1171(_check_add_window)
10277    0.019    0.000    0.019    0.000 {method 'tostring' of 'array.array' objects}
10277    0.118    0.000  167.202    0.016 C:\Program Files (x86)\Python27\lib\site-packages\paramiko\buffered_pipe.py:101(read)
10277    0.055    0.000  169.884    0.017 C:\Program Files (x86)\Python27\lib\site-packages\paramiko\channel.py:601(recv)
 9198    0.004    0.000    0.004    0.000 {_struct.unpack}
 9195    0.018    0.000    0.018    0.000 {method 'read' of 'cStringIO.StringI' objects}
[...]

and this with window_size = 134217727

298643 function calls in 11.527 seconds

Ordered by: call count

ncalls  tottime  percall  cumtime  percall filename:lineno(function)
88806    0.008    0.000    0.008    0.000 {len}
24538   10.980    0.000   10.980    0.000 {method 'acquire' of 'thread.lock' objects}
15358    0.002    0.000    0.002    0.000 {method 'release' of 'thread.lock' objects}
 9198    0.003    0.000    0.003    0.000 {_struct.unpack}
 9195    0.012    0.000    0.012    0.000 {method 'read' of 'cStringIO.StringI' objects}
 9195    0.015    0.000    0.029    0.000 C:\Program Files (x86)\Python27\lib\site-packages\paramiko\message.py:103(get_bytes)
 6175    0.004    0.000    0.004    0.000 {isinstance}
 6136    0.016    0.000    0.029    0.000 C:\Program Files (x86)\Python27\lib\site-packages\paramiko\message.py:139(get_int)
 6118    0.021    0.000    0.072    0.000 C:\Program Files (x86)\Python27\lib\site-packages\paramiko\channel.py:1171(_check_add_window)
 6118    0.064    0.000   11.044    0.002 C:\Program Files (x86)\Python27\lib\site-packages\paramiko\buffered_pipe.py:101(read)
 6118    0.017    0.000   11.134    0.002 C:\Program Files (x86)\Python27\lib\site-packages\paramiko\channel.py:601(recv)
 6118    0.022    0.000   11.162    0.002 C:\Program Files (x86)\Python27\lib\site-packages\paramiko\sftp.py:144(_read_all)
 6118    0.013    0.000    0.013    0.000 {method 'tostring' of 'array.array' objects}
 6112    0.001    0.000    0.001    0.000 {method 'append' of 'list' objects}
 6106    0.002    0.000    0.002    0.000 {min}
 6105    0.022    0.000    0.026    0.000 C:\Program Files (x86)\Python27\lib\site-packages\paramiko\sftp_file.py:108(_data_in_prefetch_buffers)
 6105    0.002    0.000    0.002    0.000 {method 'keys' of 'dict' objects}
 3099    0.001    0.000    0.001    0.000 C:\Program Files (x86)\Python27\lib\threading.py:62(_note)
 3092    0.002    0.000    0.002    0.000 {time.time}
 3082    0.001    0.000    0.001    0.000 {cStringIO.StringIO}
 3080    0.005    0.000    0.006    0.000 C:\Program Files (x86)\Python27\lib\site-packages\paramiko\message.py:40(__init__)
 3073    0.000    0.000    0.000    0.000 {ord}
 3062    0.008    0.000    0.008    0.000 {thread.allocate_lock}
 3060    0.002    0.000    0.002    0.000 C:\Program Files (x86)\Python27\lib\threading.py:219(_release_save)
 3060    0.016    0.000   10.935    0.004 C:\Program Files (x86)\Python27\lib\threading.py:234(wait)
 3060    0.002    0.000    0.009    0.000 C:\Program Files (x86)\Python27\lib\threading.py:225(_is_owned)
 3060    0.002    0.000    0.012    0.000 C:\Program Files (x86)\Python27\lib\threading.py:222(_acquire_restore)
 3059    0.029    0.000   11.303    0.004 C:\Program Files (x86)\Python27\lib\site-packages\paramiko\sftp_client.py:716(_read_response)
 3059    0.025    0.000   11.189    0.004 C:\Program Files (x86)\Python27\lib\site-packages\paramiko\sftp.py:174(_read_packet)
[...]
@kilink
Copy link

@kilink kilink commented Aug 4, 2013

Great detective work! I've run into this issue recently as well, where downloading a 33MB file is taking almost 5 minutes (with scp it takes 15 seconds).

Any chance of fixing this?

@kilink
Copy link

@kilink kilink commented Aug 5, 2013

For what it's worth, the default window size in OpenSSH is 2097152.

(see CHAN_TCP_WINDOW_DEFAULT definition in http://www.openbsd.org/cgi-bin/cvsweb/src/usr.bin/ssh/channels.h?rev=1.113)

@antoncohen
Copy link

@antoncohen antoncohen commented Sep 10, 2013

The following worked for me, performance went from 0.55 MB/sec to 4.5 MB/sec:

class FastTransport(paramiko.Transport):
    def __init__(self, sock):
        super(FastTransport, self).__init__(sock)
        self.window_size = 2147483647
        self.packetizer.REKEY_BYTES = pow(2, 40)
        self.packetizer.REKEY_PACKETS = pow(2, 40)

ssh_conn = FastTransport(('host.example.com', 22))
ssh_conn.connect(username='username', password='password')
sftp = paramiko.SFTPClient.from_transport(ssh_conn)
@hqhoang
Copy link

@hqhoang hqhoang commented Sep 23, 2013

This definitely should be reviewed and patched. I have a script that pull Apache log files, and it took 5 minutes to download a 300MB file. Using window_size of 2147483647 reduced the download time to 21 seconds.

@lndbrg
Copy link
Contributor

@lndbrg lndbrg commented Jan 20, 2014

Specification says that both sides are allowed to send a message to adjust the windows size (sizes up to 2^32-1 is ok or as an int: 4294967295):
http://www.ietf.org/rfc/rfc4254.txt

The message to send is SSH_MSG_CHANNEL_WINDOW_ADJUST, in paramiko it is called MSG_CHANNEL_WINDOW_ADJUST. If I understand the code correctly, we are only listiening on the server sending this message.
To fix the slow file transfer, but still keeping it speedy for smaller actions (e.g standard ssh) keeping the small window size, but let the sftp subsystem renegotiate the window size on file transfers.

We should probably also look into how we can modify the packet size, if needed.

@jzwinck
Copy link

@jzwinck jzwinck commented Feb 12, 2014

This is an important bug to fix. Adding this tiny line of code cut the time to transfer one file from 60 seconds to 9 seconds:

# do this right after ssh.connect()
ssh.get_transport().window_size = 3 * 1024 * 1024

2 MB (the OpenSSH default) worked very well too; 3 MB gave me a slight boost (5%). This is on a link with RTT around 250 ms.

@lndbrg
Copy link
Contributor

@lndbrg lndbrg commented Aug 14, 2014

I have a patchset at #372. Could @jzwinck, @hqhoang, @antoncohen, @sturmf and @kilink try it out and see it it works any better?

@lndbrg
Copy link
Contributor

@lndbrg lndbrg commented Aug 14, 2014

That patchset modifies paramiko to use the same default as openssh.

That patchset also allows you to modify the window and packet size per session/channel/sftp-transfer opened, how to do that is documented in the sphinx docs. :)

@bitprophet
Copy link
Member

@bitprophet bitprophet commented Sep 8, 2014

Closing this given the lack of feedback since Olle posted his changeset note. Please leave comments if you've tried the 1.15 release (out today/tomorrow) and you're still experiencing speed issues. Thanks!

@bitprophet bitprophet closed this Sep 8, 2014
@ataki
Copy link

@ataki ataki commented Jun 30, 2015

Not sure if this is exactly related, but I'm experiencing speed issues with put() not closing the connection when writing a 300MB file to an sftp server:

screen shot 2015-06-30 at 2 46 24 pm

The file is written correctly (correct contents and modified date), but the connection lags for 10 min before closing.

I haven't tried with larger file sizes and don't know if this lag time scales linearly.

@gavinwahl
Copy link

@gavinwahl gavinwahl commented Aug 24, 2015

I am still having this bug with paramiko 1.15.2 (and paramiko master). I was reading the strace output, and it looks like paramiko is reading 8 bytes at a time?!

read(3, "...", 8) = 8
poll([{fd=8, events=POLLOUT}], 1, 100)  = 1 ([{fd=8, revents=POLLOUT}])
sendto(8, "..."..., 68, 0, NULL, 0) = 68
futex(0x1f2fb20, FUTEX_WAIT_PRIVATE, 0, NULL) = 0
fstat(3, {st_mode=S_IFCHR|0666, st_rdev=makedev(1, 9), ...}) = 0
read(3, "...", 8)     = 8
poll([{fd=8, events=POLLOUT}], 1, 100)  = 1 ([{fd=8, revents=POLLOUT}])
sendto(8, "..."..., 68, 0, NULL, 0) = 68
futex(0x1f2fb20, FUTEX_WAIT_PRIVATE, 0, NULL) = 0
fstat(3, {st_mode=S_IFCHR|0666, st_rdev=makedev(1, 9), ...}) = 0
...

EDIT: It looks like fd 3, the one it keeps reading 8 bytes from, is /dev/urandom

EDIT: I was stracing the main thread, not the one actually doing the download. Here's the strace of the thread:

poll([{fd=8, events=POLLIN}], 1, 100)   = 1 ([{fd=8, revents=POLLIN}])
recvfrom(8, "...", 16, 0, NULL, NULL) = 16
poll([{fd=8, events=POLLIN}], 1, 100)   = 1 ([{fd=8, revents=POLLIN}])
recvfrom(8, "...", 32804, 0, NULL, NULL) = 2904
poll([{fd=8, events=POLLIN}], 1, 100)   = 1 ([{fd=8, revents=POLLIN}])
recvfrom(8, "...", 29900, 0, NULL, NULL) = 2896
poll([{fd=8, events=POLLIN}], 1, 100)   = 1 ([{fd=8, revents=POLLIN}])
recvfrom(8, "...", 27004, 0, NULL, NULL) = 1448
poll([{fd=8, events=POLLIN}], 1, 100)   = 1 ([{fd=8, revents=POLLIN}])
recvfrom(8, "...", 25556, 0, NULL, NULL) = 1448
poll([{fd=8, events=POLLIN}], 1, 100)   = 1 ([{fd=8, revents=POLLIN}])
recvfrom(8, "...", 24108, 0, NULL, NULL) = 1448
poll([{fd=8, events=POLLIN}], 1, 100)   = 1 ([{fd=8, revents=POLLIN}])
recvfrom(8, "...", 22660, 0, NULL, NULL) = 1448
poll([{fd=8, events=POLLIN}], 1, 100)   = 1 ([{fd=8, revents=POLLIN}])
recvfrom(8, "...", 21212, 0, NULL, NULL) = 1448
poll([{fd=8, events=POLLIN}], 1, 100)   = 1 ([{fd=8, revents=POLLIN}])
recvfrom(8, "...", 19764, 0, NULL, NULL) = 1448
@Jitsusama
Copy link

@Jitsusama Jitsusama commented Oct 13, 2017

As an FYI to anyone else who might run into speed issues when writing/uploading files with SFTPClient and paramiko, I have discovered that setting pipeline mode by calling set_pipelined() on the file drastically speeds up file writes for me.

Here's an example:

import paramiko

ssh_client = SSHClient()
ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh_client.connect('host', username='username', password='password')
sftp_client = ssh_client.open_sftp()

with sftp_client.file('/path/to/file', mode='w') as file:
    file.set_pipelined()
    file.writelines([line + '\n' for line in lines])
@sjd12
Copy link

@sjd12 sjd12 commented May 9, 2018

Hello, Does the window_size solution works with SSHClient? Im new to paramiko. But it looks like its very slow compared to executing a command from the shell client. Im using paramiko to execute hive queries in a hive host from another host . i use python paramiko as below.

ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.load_host_keys(os.path.expanduser(os.path.join("~", ".ssh", "known_hosts")))
ssh.connect(host, username=db_user, password=db_pwd)
sshin, sshout, ssherr= ssh.exec_command(command)
status=sshout.channel.recv_exit_status() #Checks this status to wait for completion of the command execution

the last line consumes much more time compared to the same command run from shell window

@antoncohen
Copy link

@antoncohen antoncohen commented May 9, 2018

If the command outputs a lot of data, window_size might be an issue. There is another issues that mentions window size with recv_exit_status():

#448 (comment)

@sjd12
Copy link

@sjd12 sjd12 commented May 11, 2018

@antoncohen I tried some combinations. And for large data output i did have the hanging error. But window_size does not seem to resolve the issue . But moving sshout.channel.recv_exit_status() after sshout.readlines did fix the issue for me.

My actual problem i posted above was for any command which returns even tiny data. Any command i execute it takes atleast 5 seconds to finish. So because i use several commands one after other, it amounts to a bigger delay overall.

@tinkerjs
Copy link

@tinkerjs tinkerjs commented May 18, 2018

@sdj12 thanks, that helped me as well.

@tkarakis
Copy link

@tkarakis tkarakis commented Oct 1, 2018

I'm running into similar issues with running a python Lambda function (code below). It runs for a few minutes and then Lambda times out. The logs show about 14% of the 15MB file transferred. I can increase the Lambda execution time and/or memory and it has no impact.

This issue happens when connecting to our internal dev SFTP; however, when I connect to the client prod SFTP then the file downloads within seconds.

Any ideas?

import paramiko
import time

def printTotals(transferred, toBeTransferred):
print "Transferred: {0}\tOut of: {1}".format(transferred, toBeTransferred)

class FastTransport(paramiko.Transport):
def init(self, sock):
super(FastTransport, self).init(sock)
self.window_size = 2147483647
print(self.window_size)
self.packetizer.REKEY_BYTES = pow(2, 40)
print(self.packetizer.REKEY_BYTES)
self.packetizer.REKEY_PACKETS = pow(2, 40)
print(self.packetizer.REKEY_PACKETS)

def lambda_handler(event, context):

try:
    ssh_conn = FastTransport((hostname, port))
    ssh_conn.connect(username=username, password=password)
    sftp = paramiko.SFTPClient.from_transport(ssh_conn)
    source="my-file-name.zip"
    dest='/tmp/tmp.zip'
    print('about to get file')
    sftp.get(source, dest,callback=printTotals)
finally:
    print('done')
    sftp.close() 
@lsh-0
Copy link

@lsh-0 lsh-0 commented Mar 13, 2019

another related ticket: #1141

@arocketman
Copy link

@arocketman arocketman commented Oct 18, 2019

I was having a similar issue and I could not afford to copy the file locally because of security reasons, I solved it by using a combination of prefetching and bytesIO:

def fetch_file_as_bytesIO(sftp, path):
    """
    Using the sftp client it retrieves the file on the given path by using pre fetching.
    :param sftp: the sftp client
    :param path: path of the file to retrieve
    :return: bytesIO with the file content
    """
    with sftp.file(path, mode='rb') as file:
        file_size = file.stat().st_size
        file.prefetch(file_size)
        file.set_pipelined()
        return io.BytesIO(file.read(file_size))
@dawsonlp
Copy link

@dawsonlp dawsonlp commented May 22, 2020

Still seeing problems on paramiko.version
'2.7.1'

I've tried all the changes listed above (using the FastTransport detailed by @antoncohen ,
tried to use prefetch and set_pipelined) but I'm still seeing really disappointing performance - about a 60x slowdown for a chunk by chunk version or a 15x slowdown for a simple get. Both seem unreasonably slow.

For my test, I'm downloading a 190MB file from an sftp server. The results look like:
sftp command: 109MB/s - approx 2 seconds - this is essentially 1Gb, which seems likely to be at the network bandwidth limit for our setup.
using paramiko.SFTPClient.get - takes approx 30 seconds
using the paramiko file to retrieve the file in a 64MB chunks takes about 3 minutes

Context - I really need the chunk version because the end goal is to download a bunch of 20 to 30GB files from a third party server once a month as fast as possible and push them to AWS S3 using a multipart upload( a total of maybe 400 GB and really it's ok if not ideal, if it takes 8 or 12 hours total). I won't be using local storage in the context of the executing script, so 64MB chunks streaming through memory to s3 is what the ideal physical implementation looks like.

@dawsonlp
Copy link

@dawsonlp dawsonlp commented May 23, 2020

After tracing into paramiko, the problem I'm seeing is that in paramiko/file.py, the read method is iterating in python script in 32K chunks, and it's just too slow. One of the cpus on the box is pegged at 99% while running.

@dawsonlp
Copy link

@dawsonlp dawsonlp commented May 25, 2020

To get decent performance, I ended up using the SFTPClient.getfo method - it doesn't have the same performance issues that the sftp file does, but I had to set up a write thread and a read thread - the write thread just uses SFTPClient.getfo to write to the FIFO, and then I pull 64 MB chunks from the FIFO and use multipart upload to S3. The full transfer for the 190MB test file from the sftp server to s3 takes about 8 seconds, which is just fine for our needs.

@MarkAWard
Copy link

@MarkAWard MarkAWard commented Jun 3, 2020

@dawsonlp do you mind sharing some sample code, im running in to the same issue trying to solve exactly the same problem and im sure many others can benefit from it as well

@dawsonlp
Copy link

@dawsonlp dawsonlp commented Jun 3, 2020

Happy to - the code below writes via multiload to S3, but it is easy to change the destination by using a different sink function. The entry point is the function at the bottom, sftp_to_s3

Copyright (C) 2020 Laurence Dawson larry.dawson@gmail.com

# This is free software; you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation; either version 2.1 of the License, or (at your option)
# any later version.

"""

import os
import threading
import paramiko
import boto3
from datetime import datetime
import uuid
from time import sleep





class FastTransport(paramiko.Transport):

    def __init__(self, sock):
        super(FastTransport, self).__init__(sock)
        self.window_size = 2147483647
        self.packetizer.REKEY_BYTES = pow(2, 40)
        self.packetizer.REKEY_PACKETS = pow(2, 40)


def sftp_connect(host, port, username, pw):
        try:
            transport = FastTransport((host, port))
            transport.connect(username=username, password=pw)
            conn = paramiko.SFTPClient.from_transport(transport)
            return conn
        except Exception as e:
            print("Failed on exception", repr(e))
            raise


def write_chunk_to_s3(s3conn, buf, bucket_name, upload_id, s3_destination_path, partnum):
    part = s3conn.upload_part(
        Bucket=bucket_name,
        Key=s3_destination_path,
        PartNumber=partnum,
        UploadId= upload_id,
        Body=buf)
    part_output = {
        'PartNumber': partnum,
        'ETag': part['ETag']
    }
    return part_output


def execute_source(FIFO, sftpclient, sourcefilename):
    with open(FIFO, 'wb') as fifo:
        sftpclient.getfo(sourcefilename, fifo)


def execute_sink(FIFO, bucket_name, s3_destination_path, chunksize, filesize):
    """Do not start s3 path with a forward slash"""
    chunk = 1
    chunkcount = ((filesize - 1) // chunksize) + 1
    total_transferred = 0

    with open(FIFO, 'rb') as fifo:
        s3conn = boto3.client('s3')
        uploader = s3conn.create_multipart_upload(Bucket=bucket_name, Key=s3_destination_path)
        parts = []

        try:
            for chunk in range(chunkcount):
                chunk_transferred = 0
                while chunk_transferred < chunksize and total_transferred < filesize:
                    remaining_chunksize = chunksize - chunk_transferred
                    buf = fifo.read(remaining_chunksize)
                    if not buf:
                        print("oops buf is ", buf)
                        sleep(0.01)
                    this_piece = len(buf)
                    print("len(buf)", this_piece)
                    chunk_transferred = chunk_transferred + this_piece
                    total_transferred = total_transferred + this_piece
                    part = write_chunk_to_s3(s3conn, buf, bucket_name, uploader['UploadId'], s3_destination_path, len(parts) + 1)
                    parts.append(part)

            s3conn.complete_multipart_upload(Bucket=bucket_name,
                                          Key=s3_destination_path,
                                          UploadId=uploader["UploadId"],
                                          MultipartUpload={"Parts": parts})
            print("Done uploading", bucket_name, s3_destination_path, uploader["UploadId"])
            print(parts)

        except:
                s3conn.abort_multipart_upload(Bucket=bucket_name,
                                          Key=s3_destination_path,
                                          UploadId=uploader["UploadId"],
                                          MultipartUpload=parts)
                print("aborting multipart upload", uploader["UploadId"])
                raise
    print ("done with ", chunkcount, "chunks, total filesize = ",filesize,"=", total_transferred)




def copy_s3_data(sftpclient, sourcefilename, bucket_name, destfilename, chunksize, filesize ):

    FIFO = f"temp_pipe_{uuid.uuid4().hex}"


    def source():
        execute_source(FIFO, sftpclient, sourcefilename)

    def sink():
        execute_sink(FIFO, bucket_name, destfilename, chunksize, filesize)

    os.mkfifo(FIFO)
    try:
        tsrc = threading.Thread(target=source)
        tsrc.start()

        tdest = threading.Thread(target=sink)
        tdest.start()


        tsrc.join()

        tdest.join()

    finally:
        os.remove(FIFO)


def sftp_to_s3(host, port, sourcefilename, s3_bucket, destfilename, username, pw):
    conn = sftp_connect(host, port, username, pw)
    with conn.open(sourcefilename, 'rb') as src:
        src_filesize = src._get_size()
    chunk_size = 64 * 1024 * 1024  # 64mb seems about right

    t1 = datetime.now()
    copy_s3_data(conn, sourcefilename, s3_bucket, destfilename, chunk_size, src_filesize)
    t2 = datetime.now()
    print(f"copy took {t2 - t1}")


@dawsonlp
Copy link

@dawsonlp dawsonlp commented Jun 3, 2020

By the way, I'm not sure the FastTransport makes any difference, you might just use the paramiko transport directly. Also, it seems that when it runs, each chunk is getting read in one go, so it is possible that the read/write in execute_sink could be simplified too. However it is working well as is.

@MarkAWard
Copy link

@MarkAWard MarkAWard commented Jun 4, 2020

The FastTransport may be needed (I'm still doing some testing to confirm) but without it I was seeing the occasional EOFError and OSError: Socket is closed

@MarkAWard
Copy link

@MarkAWard MarkAWard commented Jun 4, 2020

For downloading/transferring large files paramiko is causing more headaches than its worth, it's slow and unreliable 😢. My new solution looks something like:

dst = "/tmp"
creds = {"username": ..., "password": ...}
sftp_host = "server.files.com"
filepath = "the/file/i/want/is/very/large.zip"
cmd = [
    "wget",
    f"--directory-prefix={dst}",
    f"--user={creds['username']}",
    f"--password={creds['password']}",
    f"ftp://{sftp_host}/{filepath}"
]
logger.info("Download Status: STARTING")
try:
    out = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as exc:
    logger.error("Download Status: FAIL %s: %s", exc.returncode, exc.output)
    raise
else:
    logger.info("Download Status: SUCCESS)
@fastlaner
Copy link

@fastlaner fastlaner commented Jul 7, 2020

snippet from my code:
for i in range(8732):
file_name = sftp.open(getfilename + str(df["fold"][i]) + '/' + df["slice_file_name"][i], bufsize=32768)
file_name.prefetch()
file_name.set_pipelined(pipelined=True)

This sped my file reading from remote server from around 20Mbs to 35-40Mbs using paramiko (with a few spikes of 60-75Mbs)

pure sftp is still around gig speed though lol

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Linked pull requests

Successfully merging a pull request may close this issue.