# EMQX File Transfer

## Basic setup

Here we're prepping the notebook environment. Nothing worth to look at.

In [1]:
import json
import requests
import pprint
import logging
import mqttft

logging.basicConfig(level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)

In [2]:
! docker-compose up -d

[1A[1B[0G[?25l[+] Running 1/0
[34m ⠿ Network emqx-bridge    Created                                          0.0s
[0m[37m ⠿ Container demo-emqx-1  Starting                                         0.1s
[0m[?25h[1A[1A[1A[0G[?25l[+] Running 1/2
[34m ⠿ Network emqx-bridge    Created                                          0.0s
[0m[37m ⠿ Container demo-emqx-1  Starting                                         0.2s
[0m[?25h[1A[1A[1A[0G[?25l[34m[+] Running 2/2[0m
[34m ⠿ Network emqx-bridge    Created                                          0.0s
[0m[34m ⠿ Container demo-emqx-1  Started                                          0.2s
[0m[?25h

In [3]:
! docker-compose ps

NAME                IMAGE                                                 COMMAND                  SERVICE             CREATED             STATUS              PORTS
demo-emqx-1         docker.io/emqx/emqx-enterprise:5.0.2-rc.2-g886b5147   "/usr/bin/docker-ent…"   emqx                9 seconds ago       Up 8 seconds        4370/tcp, 5369/tcp, 0.0.0.0:1883->1883/tcp, :::1883->1883/tcp, 8083-8084/tcp, 0.0.0.0:8883->8883/tcp, :::8883->8883/tcp, 0.0.0.0:18083->18083/tcp, :::18083->18083/tcp, 11883/tcp


Let's obtain an authorization token for the various HTTP APIs we will use later in this demo.

In [4]:
r = requests.post('http://localhost:18083/api/v5/login', json={'username': 'admin', 'password': 'passw0rd'})
auth = r.json()
authHeaders = {'authorization': f'Bearer {auth["token"]}'}
authHeaders

{'authorization': 'Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE2ODA3NzczMDcxMDAsImlzcyI6IkVNUVgifQ.sa0CuqDONQ-D4UId2ji83WgtPjSSkbadCjPUw_LqpNk'}

## Local filesystem backend

Let's start with local filesystem backend. We setting up two things at once here: segments storage and exports storage, both of which will use local filesystem. Segments storage contains intermediate files for in-progress transfers. Exports storage contains the resulting files for all successfully completed file transfers.

In [5]:
ft_config = {
    'storage': {
        'type': "local",
        'segments': {
            'root': "/opt/emqx/data/file-transfer/segments"
        },
        'exporter': {
            'type': "local",
            'root': "/opt/emqx/data/file-transfer/exports"
        }
    }
}

r = requests.put('http://localhost:18083/api/v5/configs/file_transfer',
                   headers=authHeaders,
                   json=ft_config)
! echo '{r.text}' | jq

[1;39m{
  [0m[34;1m"storage"[0m[1;39m: [0m[1;39m{
    [0m[34;1m"exporter"[0m[1;39m: [0m[1;39m{
      [0m[34;1m"root"[0m[1;39m: [0m[0;32m"/opt/emqx/data/file-transfer/exports"[0m[1;39m,
      [0m[34;1m"type"[0m[1;39m: [0m[0;32m"local"[0m[1;39m
    [1;39m}[0m[1;39m,
    [0m[34;1m"segments"[0m[1;39m: [0m[1;39m{
      [0m[34;1m"gc"[0m[1;39m: [0m[1;39m{
        [0m[34;1m"interval"[0m[1;39m: [0m[0;32m"1h"[0m[1;39m,
        [0m[34;1m"maximum_segments_ttl"[0m[1;39m: [0m[0;32m"24h"[0m[1;39m,
        [0m[34;1m"minimum_segments_ttl"[0m[1;39m: [0m[0;32m"5m"[0m[1;39m
      [1;39m}[0m[1;39m,
      [0m[34;1m"root"[0m[1;39m: [0m[0;32m"/opt/emqx/data/file-transfer/segments"[0m[1;39m
    [1;39m}[0m[1;39m,
    [0m[34;1m"type"[0m[1;39m: [0m[0;32m"local"[0m[1;39m
  [1;39m}[0m[1;39m
[1;39m}[0m


Ok, time to upload our first small text file.

In [6]:
mqttft.transfer('demo-client', 'file-4242', 'assets/test.txt', host='localhost', segment_size=1024)

DEBUG:root:Sending CONNECT (u0, p0, wr0, wq0, wf0, c1, k60) client_id=b'demo-client'
DEBUG:root:Sending PUBLISH (d0, q1, r0, m1), 'b'$file/file-4242/init'', ... (114 bytes)
DEBUG:root:Sending PUBLISH (d0, q1, r0, m2), 'b'$file/file-4242/0'', ... (1024 bytes)
DEBUG:root:Sending PUBLISH (d0, q1, r0, m3), 'b'$file/file-4242/1024'', ... (238 bytes)
DEBUG:root:Sending PUBLISH (d0, q1, r0, m4), 'b'$file/file-4242/fin/1262'' (NULL payload)
DEBUG:root:Received CONNACK (0, 0)
DEBUG:root:Received PUBACK (Mid: 1)
DEBUG:root:Received PUBACK (Mid: 2)
DEBUG:root:Received PUBACK (Mid: 3)
DEBUG:root:Received PUBACK (Mid: 4)
DEBUG:root:Sending DISCONNECT


Let's see what we have now in the local filesystem.

In [7]:
! tree run/data/file-transfer

[01;34mrun/data/file-transfer[0m
├── [01;34mexports[0m
│   ├── [01;34m9A[0m
│   │   └── [01;34m65[0m
│   │       └── [01;34m607A7007317C32B281F7A5D32991585A23EA[0m
│   │           └── [01;34mdemo-client[0m
│   │               └── [01;34mfile-4242[0m
│   │                   ├── test.txt
│   │                   └── test.txt.MANIFEST.json
│   └── [01;34mtmp[0m
└── [01;34msegments[0m
    └── [01;34mdemo-client[0m
        └── [01;34mfile-4242[0m

11 directories, 2 files


Here we also can notice that export storage is bucketed by transfer hash. This should help us better manage large number of exported files.

In [8]:
! cat run/data/file-transfer/exports/**/demo-client/file-4242/test.txt

In publishing and graphic design, Lorem ipsum is a placeholder text commonly
used to demonstrate the visual form of a document or a typeface without relying
on meaningful content. Lorem ipsum may be used as a placeholder before final
copy is available. It is also used to temporarily replace text in a process
called greeking, which allows designers to consider the form of a webpage or
publication, without the meaning of the text influencing the design.

Lorem ipsum is typically a corrupted version of De finibus bonorum et malorum,
a 1st-century BC text by the Roman statesman and philosopher Cicero, with words
altered, added, and removed to make it nonsensical and improper Latin.

Versions of the Lorem ipsum text have been used in typesetting at least since
the 1960s, when it was popularized by advertisements for Letraset transfer
sheets.[1] Lorem ipsum was introduced to the digital world in the mid-1980s,
when Aldus employed it in graphic and word-processing templates for its desktop
pu

In [9]:
! jq . run/data/file-transfer/exports/**/demo-client/file-4242/test.txt.MANIFEST.json

[1;39m[
  [0;32m"filemeta"[0m[1;39m,
  [0;39m1[0m[1;39m,
  [1;39m{
    [0m[34;1m"size"[0m[1;39m: [0m[0;39m1262[0m[1;39m,
    [0m[34;1m"name"[0m[1;39m: [0m[0;32m"test.txt"[0m[1;39m,
    [0m[34;1m"checksum"[0m[1;39m: [0m[0;32m"08BA54A7562D1D5678ED12E77CA1088CCD866F6A0DCD2B62EACB76E4C6BBB0D7"[0m[1;39m
  [1;39m}[0m[1;39m
[1;39m][0m


Nice!

Now it's time to transfer something bigger. A book.

In [10]:
mqttft.transfer('demo-client', 'file-pdf', 'assets/riaklil-en.pdf', host='localhost', port=1883, segment_size=102400)

DEBUG:root:Sending CONNECT (u0, p0, wr0, wq0, wf0, c1, k60) client_id=b'demo-client'
DEBUG:root:Sending PUBLISH (d0, q1, r0, m1), 'b'$file/file-pdf/init'', ... (123 bytes)
DEBUG:root:Sending PUBLISH (d0, q1, r0, m2), 'b'$file/file-pdf/0'', ... (102400 bytes)
DEBUG:root:Sending PUBLISH (d0, q1, r0, m3), 'b'$file/file-pdf/102400'', ... (102400 bytes)
DEBUG:root:Sending PUBLISH (d0, q1, r0, m4), 'b'$file/file-pdf/204800'', ... (102400 bytes)
DEBUG:root:Sending PUBLISH (d0, q1, r0, m5), 'b'$file/file-pdf/307200'', ... (102400 bytes)
DEBUG:root:Sending PUBLISH (d0, q1, r0, m6), 'b'$file/file-pdf/409600'', ... (102400 bytes)
DEBUG:root:Sending PUBLISH (d0, q1, r0, m7), 'b'$file/file-pdf/512000'', ... (102400 bytes)
DEBUG:root:Sending PUBLISH (d0, q1, r0, m8), 'b'$file/file-pdf/614400'', ... (102400 bytes)
DEBUG:root:Sending PUBLISH (d0, q1, r0, m9), 'b'$file/file-pdf/716800'', ... (102400 bytes)
DEBUG:root:Sending PUBLISH (d0, q1, r0, m10), 'b'$file/file-pdf/819200'', ... (102400 bytes)
DEBU

Let's request the File Transfer API this time to see what files we now have.

In [11]:
r = requests.get('http://localhost:18083/api/v5/file_transfer/files', headers=authHeaders)
files = r.json()
! echo '{r.text}' | jq

[1;39m{
  [0m[34;1m"files"[0m[1;39m: [0m[1;39m[
    [1;39m{
      [0m[34;1m"clientid"[0m[1;39m: [0m[0;32m"demo-client"[0m[1;39m,
      [0m[34;1m"fileid"[0m[1;39m: [0m[0;32m"file-4242"[0m[1;39m,
      [0m[34;1m"metadata"[0m[1;39m: [0m[1;39m{
        [0m[34;1m"checksum"[0m[1;39m: [0m[0;32m"08BA54A7562D1D5678ED12E77CA1088CCD866F6A0DCD2B62EACB76E4C6BBB0D7"[0m[1;39m,
        [0m[34;1m"name"[0m[1;39m: [0m[0;32m"test.txt"[0m[1;39m,
        [0m[34;1m"size"[0m[1;39m: [0m[0;39m1262[0m[1;39m
      [1;39m}[0m[1;39m,
      [0m[34;1m"name"[0m[1;39m: [0m[0;32m"test.txt"[0m[1;39m,
      [0m[34;1m"size"[0m[1;39m: [0m[0;39m1262[0m[1;39m,
      [0m[34;1m"timestamp"[0m[1;39m: [0m[0;32m"2023-04-06T09:35:12+00:00"[0m[1;39m,
      [0m[34;1m"uri"[0m[1;39m: [0m[0;32m"/api/v5/file_transfer/file?node=emqx%40emqx.dev&fileref=9A%2F65%2F607A7007317C32B281F7A5D32991585A23EA%2Fdemo-client%2Ffile-4242%2Ftest.txt"[0m[1;39m
    [1;

Nice. Let's try to download the file through provided (relative) URI.

In [12]:
r = requests.get('http://localhost:18083' + files['files'][1]['uri'], headers=authHeaders)
with open('download.pdf', 'wb') as fd:
    for chunk in r.iter_content():
        fd.write(chunk)
! ls -la download.pdf

-rw-r--r-- 1 keynslug keynslug 2156579 Apr  6 12:35 download.pdf


Perfect! Let's move to the S3 stuff.

## S3 Exporter

Remember that we previously configured `local` storage exporter. Why don't we switch to the S3 exporter, which is capable of uploading completely transferred files to any S3-API-compatible storage. This time it will be the AWS S3 itself.

In [13]:
s3_secret = ! cat secrets/aws
s3_exporter_config = {
    'type': 's3',
    'host': "s3.us-east-1.amazonaws.com",
    'port': "443",

    'access_key_id': "AKIAXYPMVIWAHZSHOTES",
    'secret_access_key': s3_secret[0],

    'bucket': "keynslug-emqx-s3-demo",
    'acl': 'public_read',

    'transport_options': {
        'request_timeout': '30s',
        'ipv6_probe': False,
        'ssl': {
            'enable': True
        }
    }
}

ft_config['storage']['exporter'] = s3_exporter_config

r = requests.put('http://localhost:18083/api/v5/configs/file_transfer',
                   headers=authHeaders,
                   json=ft_config)
! echo '{r.text}' | jq 'del(.storage.exporter.secret_access_key)'

[1;39m{
  [0m[34;1m"storage"[0m[1;39m: [0m[1;39m{
    [0m[34;1m"exporter"[0m[1;39m: [0m[1;39m{
      [0m[34;1m"access_key_id"[0m[1;39m: [0m[0;32m"AKIAXYPMVIWAHZSHOTES"[0m[1;39m,
      [0m[34;1m"acl"[0m[1;39m: [0m[0;32m"public_read"[0m[1;39m,
      [0m[34;1m"bucket"[0m[1;39m: [0m[0;32m"keynslug-emqx-s3-demo"[0m[1;39m,
      [0m[34;1m"host"[0m[1;39m: [0m[0;32m"s3.us-east-1.amazonaws.com"[0m[1;39m,
      [0m[34;1m"max_part_size"[0m[1;39m: [0m[0;32m"5gb"[0m[1;39m,
      [0m[34;1m"min_part_size"[0m[1;39m: [0m[0;32m"5mb"[0m[1;39m,
      [0m[34;1m"port"[0m[1;39m: [0m[0;32m"443"[0m[1;39m,
      [0m[34;1m"transport_options"[0m[1;39m: [0m[1;39m{
        [0m[34;1m"connect_timeout"[0m[1;39m: [0m[0;32m"15s"[0m[1;39m,
        [0m[34;1m"enable_pipelining"[0m[1;39m: [0m[0;39m100[0m[1;39m,
        [0m[34;1m"pool_size"[0m[1;39m: [0m[0;39m8[0m[1;39m,
        [0m[34;1m"pool_type"[0m[1;39m: [0m[0;32m"ra

Perfect! Let's repeat those tranfers we already did, so we could see that they will now end up in the AWS S3.

In [15]:
mqttft.transfer('demo-client', 'file-4242', 'assets/test.txt', host='localhost', segment_size=1024)

DEBUG:root:Sending CONNECT (u0, p0, wr0, wq0, wf0, c1, k60) client_id=b'demo-client'
DEBUG:root:Sending PUBLISH (d0, q1, r0, m1), 'b'$file/file-4242/init'', ... (114 bytes)
DEBUG:root:Sending PUBLISH (d0, q1, r0, m2), 'b'$file/file-4242/0'', ... (1024 bytes)
DEBUG:root:Sending PUBLISH (d0, q1, r0, m3), 'b'$file/file-4242/1024'', ... (238 bytes)
DEBUG:root:Sending PUBLISH (d0, q1, r0, m4), 'b'$file/file-4242/fin/1262'' (NULL payload)
DEBUG:root:Received CONNACK (0, 0)
DEBUG:root:Received PUBACK (Mid: 1)
DEBUG:root:Received PUBACK (Mid: 2)
DEBUG:root:Received PUBACK (Mid: 3)
DEBUG:root:Received PUBACK (Mid: 4)
DEBUG:root:Sending DISCONNECT


In [16]:
mqttft.transfer('demo-client', 'file-pdf', 'assets/riaklil-en.pdf', host='localhost', segment_size=102400)

DEBUG:root:Sending CONNECT (u0, p0, wr0, wq0, wf0, c1, k60) client_id=b'demo-client'
DEBUG:root:Sending PUBLISH (d0, q1, r0, m1), 'b'$file/file-pdf/init'', ... (123 bytes)
DEBUG:root:Sending PUBLISH (d0, q1, r0, m2), 'b'$file/file-pdf/0'', ... (102400 bytes)
DEBUG:root:Sending PUBLISH (d0, q1, r0, m3), 'b'$file/file-pdf/102400'', ... (102400 bytes)
DEBUG:root:Sending PUBLISH (d0, q1, r0, m4), 'b'$file/file-pdf/204800'', ... (102400 bytes)
DEBUG:root:Sending PUBLISH (d0, q1, r0, m5), 'b'$file/file-pdf/307200'', ... (102400 bytes)
DEBUG:root:Sending PUBLISH (d0, q1, r0, m6), 'b'$file/file-pdf/409600'', ... (102400 bytes)
DEBUG:root:Sending PUBLISH (d0, q1, r0, m7), 'b'$file/file-pdf/512000'', ... (102400 bytes)
DEBUG:root:Sending PUBLISH (d0, q1, r0, m8), 'b'$file/file-pdf/614400'', ... (102400 bytes)
DEBUG:root:Sending PUBLISH (d0, q1, r0, m9), 'b'$file/file-pdf/716800'', ... (102400 bytes)
DEBUG:root:Sending PUBLISH (d0, q1, r0, m10), 'b'$file/file-pdf/819200'', ... (102400 bytes)
DEBU

Finally. Now looking at what have changed in the File Transfer API response.

In [17]:
r = requests.get('http://localhost:18083/api/v5/file_transfer/files', headers=authHeaders)
! echo '{r.text}' | jq

[1;39m{
  [0m[34;1m"files"[0m[1;39m: [0m[1;39m[
    [1;39m{
      [0m[34;1m"clientid"[0m[1;39m: [0m[0;32m"demo-client"[0m[1;39m,
      [0m[34;1m"fileid"[0m[1;39m: [0m[0;32m"file-4242"[0m[1;39m,
      [0m[34;1m"name"[0m[1;39m: [0m[0;32m"test.txt"[0m[1;39m,
      [0m[34;1m"size"[0m[1;39m: [0m[0;39m1262[0m[1;39m,
      [0m[34;1m"timestamp"[0m[1;39m: [0m[0;32m"2023-04-06T09:36:15+00:00"[0m[1;39m,
      [0m[34;1m"uri"[0m[1;39m: [0m[0;32m"https://s3.us-east-1.amazonaws.com:443/keynslug-emqx-s3-demo/demo-client/file-4242/test.txt?AWSAccessKeyId=AKIAXYPMVIWAHZSHOTES&Signature=gpXMvwpyCpFQhq8oTnu45W%2Fmh0k%3D&Expires=1680777384"[0m[1;39m
    [1;39m}[0m[1;39m,
    [1;39m{
      [0m[34;1m"clientid"[0m[1;39m: [0m[0;32m"demo-client"[0m[1;39m,
      [0m[34;1m"fileid"[0m[1;39m: [0m[0;32m"file-pdf"[0m[1;39m,
      [0m[34;1m"name"[0m[1;39m: [0m[0;32m"riaklil-en.pdf"[0m[1;39m,
      [0m[34;1m"size"[0m[1;39m: [0m[0;

Well time to try to transfer something MASSIVE (well, not really, 35 MiB, but otherwise we'd wait a bit too much for a demo session).

In [17]:
mqttft.transfer('demo-client', 'file-movie', 'assets/tears_of_steel_1080p.40.webm', host='localhost', segment_size=409600)

DEBUG:root:Sending CONNECT (u0, p0, wr0, wq0, wf0, c1, k60) client_id=b'demo-client'
DEBUG:root:Sending PUBLISH (d0, q1, r0, m1), 'b'$file/file-movie/init'', ... (138 bytes)
DEBUG:root:Sending PUBLISH (d0, q1, r0, m2), 'b'$file/file-movie/0'', ... (409600 bytes)
DEBUG:root:Sending PUBLISH (d0, q1, r0, m3), 'b'$file/file-movie/409600'', ... (409600 bytes)
DEBUG:root:Sending PUBLISH (d0, q1, r0, m4), 'b'$file/file-movie/819200'', ... (409600 bytes)
DEBUG:root:Sending PUBLISH (d0, q1, r0, m5), 'b'$file/file-movie/1228800'', ... (409600 bytes)
DEBUG:root:Sending PUBLISH (d0, q1, r0, m6), 'b'$file/file-movie/1638400'', ... (409600 bytes)
DEBUG:root:Sending PUBLISH (d0, q1, r0, m7), 'b'$file/file-movie/2048000'', ... (409600 bytes)
DEBUG:root:Sending PUBLISH (d0, q1, r0, m8), 'b'$file/file-movie/2457600'', ... (409600 bytes)
DEBUG:root:Sending PUBLISH (d0, q1, r0, m9), 'b'$file/file-movie/2867200'', ... (409600 bytes)
DEBUG:root:Sending PUBLISH (d0, q1, r0, m10), 'b'$file/file-movie/3276800'

Cool. It took somewhat long to transfer that stuff to the `us-east-1` (which is, admittedly, quite far away from here). But we still have a couple ideas to try that should improve throughput considerably.