In [11]:
%use coroutines


In [12]:
%use ktor-client

In [13]:
import java.nio.file.Paths

// /Users/mac/Downloads
val prop = Paths.get(System.getProperty("user.home"), "Downloads")

In [14]:
println(prop)

/Users/mac/Downloads


In [15]:
import java.io.File

val file = File("$prop/client_secret_551260504895-km035mf33md4abv92nlo4oq18a80jhbj.apps.googleusercontent.com.json")

println(file.isFile)

true


In [16]:
println(file.readLines())

[{"installed":{"client_id":"551260504895-km035mf33md4abv92nlo4oq18a80jhbj.apps.googleusercontent.com","project_id":"snappy-thought-423221-t8","auth_uri":"https://accounts.google.com/o/oauth2/auth","token_uri":"https://oauth2.googleapis.com/token","auth_provider_x509_cert_url":"https://www.googleapis.com/oauth2/v1/certs"}}]


In [17]:
// JetDrive props
val uploadUrl = "http://localhost:8080/upload"
val initiateUrl = "$uploadUrl/initiate"
val uploadChunkUrl: (String) -> String = { "$uploadUrl/$it" }
val completeUrl: (String) -> String = { "$uploadUrl/$it/complete" }
val statusUrl: (String) -> String = { "$uploadUrl/status/$it" }

In [18]:
// JetDrive auth token
val access = "eyJhbGciOiJIUzI1NiJ9.eyJ0eXBlIjoiYWNjZXNzIiwic3ViIjoiaW5mb0BhYmMuY29tIiwiaWF0IjoxNzUwNTA1MjkxLCJleHAiOjE3NTA1OTE2OTF9.CIE72XtdfVIFubzVRv7Ui8MSQ6-lnhYAwa2_uDAM2v0"

val refresh = "eyJhbGciOiJIUzI1NiJ9.eyJ0eXBlIjoicmVmcmVzaCIsInN1YiI6ImluZm9AYWJjLmNvbSIsImlhdCI6MTc1MDQxNzg2MCwiZXhwIjoxNzUzMDA5ODYwfQ.ytX-5ofPoWut1cs6Pv2waCQ5_UOjfkH6k4pavBffRKI"

In [19]:
import io.ktor.client.*
import io.ktor.client.engine.cio.*
import io.ktor.client.plugins.auth.*
import io.ktor.client.plugins.auth.providers.*
import io.ktor.client.plugins.contentnegotiation.*
import io.ktor.serialization.kotlinx.json.*

val client = HttpClient(CIO) {
    install(Auth) {
        bearer {
            loadTokens {
                BearerTokens(
                    accessToken = access, refreshToken = refresh)
            }
        }
    }
    install(ContentNegotiation) {
        json(Json {
            prettyPrint = true
            isLenient = true
        })
    }
}

In [20]:
@Serializable
data class UploadInitiateRequest(
    val fileName: String, val fileSize: Long,
    val parentId: String? = null, val hasThumbnail: Boolean = false
)
@Serializable
data class UploadInitiateResponse(val uploadId: String, val chunkSize: Int)
@Serializable
data class UploadProgressResponse(val uploadedChunks: Set<Long>, val totalBytes: Long, val uploadedBytes: Long, val chunkSize: Int)

@Serializable
data class S3UploadProgressResponse(
    val uploadedChunks: List<Int>, val totalBytes: Long,
    val uploadedBytes: Long, val chunkSize: Int, val uploadStatus: String
) {
    val missingChunks: List<Int>
        get() = uploadedChunks
}

@Serializable
data class FileNodeDTO (
    val id: String? = null, val name: String? = null, val type: String? = null, val size: Long? = null,
    val parentId: String? = null, val hasThumbnail: Boolean = false, val mimeType: String? = null,
    val createdAt: String? = null, val updatedAt: String? = null,
)


In [21]:
//val file = File("$prop/exc.png")
val file = File("$prop/GET THE GIRL!!! - The Office - 8x19 - Group Reaction.mp4")
//val file = File("$prop/Single-Threaded Coroutines in Kotlin.mp4")
println(file.isFile)
println(file.length())

true
56405497


In [22]:
fun calculateProgress(response: UploadProgressResponse): Int {
    val percent = (response.uploadedBytes.toDouble() / response.totalBytes .toDouble()) * 100
    return percent.coerceAtMost(100.0).toInt()
}

fun calculateProgress(response: S3UploadProgressResponse): Int {
    val percent = (response.uploadedBytes.toDouble() / response.totalBytes .toDouble()) * 100
    return percent.coerceAtMost(100.0).toInt()
}


In [23]:
import io.ktor.client.call.*
import io.ktor.client.request.*
import io.ktor.client.statement.*
import io.ktor.http.*

suspend fun startS3Upload(file: File, chunkSize: Int, uploadId: String, onChunkUpload: (S3UploadProgressResponse) -> Unit, onComplete: (FileNodeDTO) -> Unit) {
    val inputStream = file.inputStream().buffered()
    var start = 0L
    val total = file.length()
    var chunkIndex = 0

    while (start < total) {
        val buffer = ByteArray(chunkSize)
        val read = inputStream.read(buffer)
        if (read == -1) break

        val end = start + read - 1
        val actualChunk = buffer.copyOf(read)

        val rangeHeader = "bytes $start-$end/$total"
        val response = client.put(uploadChunkUrl(uploadId)) {
            header(HttpHeaders.ContentRange, rangeHeader)
            header(HttpHeaders.ContentType, ContentType.Application.OctetStream)
            setBody(actualChunk)
        }

        if (!response.status.isSuccess()) {
            println("Failed on chunk $chunkIndex: ${response.status}")
            return
        }

        if (response.status.isSuccess()) {
            val progress: S3UploadProgressResponse = response.body()
            onChunkUpload(progress)
        }

        start = end + 1
        chunkIndex++
    }

    println("Finalizing upload...")
    val completeResponse: FileNodeDTO = client.post(completeUrl(uploadId)).body()
    onComplete(completeResponse)

    client.close()
}

In [24]:
// Resumable upload
suspend fun startS3PartialUpload(file: File, chunkSize: Int, uploadId: String, onChunkUpload: (S3UploadProgressResponse) -> Unit, onComplete: (FileNodeDTO) -> Unit) {
    val inputStream = file.inputStream().buffered()
    var start = 0L
    val total = file.length()
    var chunkIndex = 0

    while (start < total) {
        val buffer = ByteArray(chunkSize)
        val read = inputStream.read(buffer)
        if (read == -1) break

        val end = start + read - 1
        val actualChunk = buffer.copyOf(read)

        if (chunkIndex < 2 || chunkIndex % 2 == 0) {

            val rangeHeader = "bytes $start-$end/$total"
            val response: HttpResponse = client.put(uploadChunkUrl(uploadId)) {
                header(HttpHeaders.ContentRange, rangeHeader)
                header(HttpHeaders.ContentType, ContentType.Application.OctetStream)
                setBody(actualChunk)
            }

            if (!response.status.isSuccess()) {
                println("Failed on chunk $chunkIndex: ${response.status}")
                return
            }

            if (response.status.isSuccess()) {
                val progress: S3UploadProgressResponse = response.body()
                onChunkUpload(progress)
            }

        }
        start = end + 1
        chunkIndex++
    }

    println("Finalizing upload...")
    val completeResponse = client.post(completeUrl(uploadId))

    if (!completeResponse.status.isSuccess()) {
        println("File uplaod failed")
        return
    }

    if (completeResponse.status.isSuccess()) {
        val response: FileNodeDTO = completeResponse.body()
        onComplete(response)
    }

    client.close()
}

In [25]:
// Resumable upload
suspend fun startPartialUpload(file: File, chunkSize: Int, uploadId: String, block: (String, HttpResponse) -> Unit) {
    val inputStream = file.inputStream().buffered()
    var start = 0L
    val total = file.length()
    var chunkIndex = 0

    while (start < total) {
        val buffer = ByteArray(chunkSize)
        val read = inputStream.read(buffer)
        if (read == -1) break

        val end = start + read - 1
        val actualChunk = buffer.copyOf(read)

        if (chunkIndex < 2 || chunkIndex % 2 == 0) {

            val rangeHeader = "bytes $start-$end/$total"
            val response: HttpResponse = client.put(uploadChunkUrl(uploadId)) {
                header(HttpHeaders.ContentRange, rangeHeader)
                header(HttpHeaders.ContentType, ContentType.Application.OctetStream)
                setBody(actualChunk)
            }

            block(chunkIndex.toString(),response)

            if (!response.status.isSuccess()) {
                println("Failed on chunk $chunkIndex: ${response.status}")
                return
            }

        }
        start = end + 1
        chunkIndex++
    }

    println("Finalizing upload...")
    val completeResponse = client.post(completeUrl(uploadId))

    println("Upload complete: ${completeResponse.status}")
    client.close()
}

In [26]:
suspend fun resumeS3Upload(
    file: File,
    chunkSize: Int,
    uploadId: String,
    missingChunks: Set<Int>, // direct from backend
    onChunkUpload: (S3UploadProgressResponse) -> Unit,
    onComplete: (FileNodeDTO) -> Unit
) {
    val total = file.length()
    val totalChunks = ((total + chunkSize - 1) / chunkSize).toInt()

    val inputStream = file.inputStream().buffered()

    for (chunkIndex in 0 until totalChunks) {
        val start = chunkIndex * chunkSize.toLong()
        val end = minOf(start + chunkSize, total) - 1
        val bufferSize = (end - start + 1).toInt()

        if (!missingChunks.contains(chunkIndex)) {
            println("Skipping already uploaded chunk $chunkIndex at offset $start")
            inputStream.skip(bufferSize.toLong())
            continue
        }

        val buffer = ByteArray(bufferSize)
        val read = inputStream.read(buffer)
        if (read == -1) break

        val actualChunk = buffer.copyOf(read)
        val rangeHeader = "bytes $start-$end/$total"
        println("Uploading chunk $chunkIndex at offset $start")
        val response: HttpResponse = client.put(uploadChunkUrl(uploadId)) {
            header(HttpHeaders.ContentRange, rangeHeader)
            header(HttpHeaders.ContentType, ContentType.Application.OctetStream)
            setBody(actualChunk)
        }

        if (!response.status.isSuccess()) {
            println("Failed on chunk $chunkIndex: ${response.status}")
            return
        }

        if (response.status.isSuccess()) {
            val progress: S3UploadProgressResponse = response.body()
            onChunkUpload(progress)
        }
    }

    println("Finalizing upload...")
    val completeResponse = client.post(completeUrl(uploadId))

    if (completeResponse.status == HttpStatusCode.PartialContent) {
        println("message: ${completeResponse.bodyAsText()}")
        return
    }

    if (!completeResponse.status.isSuccess()) {
        println("File uplaod failed")
        return
    }

    if (completeResponse.status.isSuccess()) {
        println("completeResponse body: ${completeResponse.bodyAsText()}")
        val response: FileNodeDTO = completeResponse.body()
        onComplete(response)
    }
    client.close()
}


In [27]:
// resumeS3Upload V2
suspend fun resumeS3Upload2(
    file: File,
    chunkSize: Int,
    uploadId: String,
    uploadedChunkIndexes: Set<Int>,
    onChunkUpload: (S3UploadProgressResponse) -> Unit,
    onComplete: (FileNodeDTO) -> Unit
) {
    val inputStream = file.inputStream().buffered()
    val total = file.length()
    var start = 0L
    var chunkIndex = 0

    while (start < total) {
        val buffer = ByteArray(chunkSize)
        val read = inputStream.read(buffer)
        if (read == -1) break

        // Check if this chunk has already been uploaded
        if (uploadedChunkIndexes.contains(chunkIndex)) {
            println("Skipping chunk $chunkIndex at offset $start")
            start += read
            chunkIndex++
            continue
        }

        val end = start + read - 1
        val actualChunk = buffer.copyOf(read)

        val rangeHeader = "bytes $start-$end/$total"
        println("Uploading chunk $chunkIndex at offset $start")
        val response: HttpResponse = client.put(uploadChunkUrl(uploadId)) {
            header(HttpHeaders.ContentRange, rangeHeader)
            header(HttpHeaders.ContentType, ContentType.Application.OctetStream)
            setBody(actualChunk)
        }

        if (!response.status.isSuccess()) {
            println("Failed on chunk $chunkIndex: ${response.status}")
            return
        }

        if (response.status.isSuccess()) {
            val progress: S3UploadProgressResponse = response.body()
            onChunkUpload(progress)
        }

        start = end + 1
        chunkIndex++
    }

    println("Finalizing upload...")
    val completeResponse = client.post(completeUrl(uploadId))
    println("Complete status: $completeResponse")

    if (completeResponse.status == HttpStatusCode.PartialContent) {
        println("message: ${completeResponse.bodyAsText()}")
        return
    }

    if (!completeResponse.status.isSuccess()) {
        println("File uplaod failed")
        return
    }

    if (completeResponse.status.isSuccess()) {
        val response: FileNodeDTO = completeResponse.body()
        onComplete(response)
    }

    client.close()
}


In [28]:
suspend fun resumeUpload(
    file: File,
    chunkSize: Int,
    uploadId: String,
    uploadedChunkOffsets: Set<Long>, // get this from the backend
    block: (String, HttpResponse) -> Unit
) {
    val inputStream = file.inputStream().buffered()
    val total = file.length()
    var start = 0L
    var chunkIndex = 0

    while (start < total) {
        val buffer = ByteArray(chunkSize)
        val read = inputStream.read(buffer)
        if (read == -1) break

        // Check if this chunk has already been uploaded
        if (uploadedChunkOffsets.contains(start)) {
            println("Skipping chunk $chunkIndex at offset $start")
            start += read
            chunkIndex++
            continue
        }

        val end = start + read - 1
        val actualChunk = buffer.copyOf(read)

        val rangeHeader = "bytes $start-$end/$total"
        val response: HttpResponse = client.put(uploadChunkUrl(uploadId)) {
            header(HttpHeaders.ContentRange, rangeHeader)
            header(HttpHeaders.ContentType, ContentType.Application.OctetStream)
            setBody(actualChunk)
        }

        block(chunkIndex.toString(), response)

        if (!response.status.isSuccess()) {
            println("Failed on chunk $chunkIndex (range: $start-$end): ${response.status}")
            return
        }

        start = end + 1
        chunkIndex++
    }

    println("Finalizing upload...")
    val completeResponse = client.post(completeUrl(uploadId))
    println("Upload complete: ${completeResponse.status}")
    client.close()
}


In [None]:
import io.ktor.client.call.*
import io.ktor.client.request.*
import io.ktor.http.*

/*
// ------------------------------- S3Complete ---------------------
runBlocking {
    try {
        val initiateResponse: UploadInitiateResponse = client.post(initiateUrl) {
            contentType(ContentType.Application.Json)
            setBody(UploadInitiateRequest(file.name, file.length(), null))
        }.body()
        println(uploadChunkUrl(initiateResponse.uploadId))
        println(completeUrl("xb"))
        //println("Response: ${initiateResponse}")

        startS3Upload(file = file, chunkSize = initiateResponse.chunkSize, uploadId = initiateResponse.uploadId, onChunkUpload = { progress ->
            val calculateProgress = calculateProgress(progress)
            println("Progress: $calculateProgress%")
        }) { fileNode ->
            println("Upload sucessful: $fileNode")
        }

    } catch (ex: Exception) {
        println("Error: $ex")
    } finally {
        System.exit(1)
    }
}
*/

/*
// ------------------------------------ S3Partial ------------------------------------
runBlocking {
    try {
        val initiateResponse: UploadInitiateResponse = client.post(initiateUrl) {
            contentType(ContentType.Application.Json)
            setBody(UploadInitiateRequest(file.name, file.length()))
        }.body()
        println(uploadChunkUrl(initiateResponse.uploadId))

        startS3PartialUpload(file = file, chunkSize = initiateResponse.chunkSize, uploadId = initiateResponse.uploadId, onChunkUpload = { progress ->
            val calculateProgress = calculateProgress(progress)
            println("Progress: $calculateProgress%")
        }) { fileNode ->
            println("Upload sucessful: $fileNode")
        }

    } catch (ex: Exception) {
        println("Error: $ex")
    } finally {
        System.exit(1)
    }
}
*/

///*
// ------------------------------------ S3Retry ------------------------------------
runBlocking {
    try {
        val uploadId = "2528b45a-e016-49bd-9012-e44f969f8294"
        val statusResponse: S3UploadProgressResponse = client.get(statusUrl(uploadId)).body()
        println("statusResponse uploadedChunks: ${statusResponse.missingChunks}")
        println("statusResponse totalBytes: ${statusResponse.totalBytes}")

        resumeS3Upload(file = file, chunkSize = statusResponse.chunkSize,
            uploadId = uploadId,
            missingChunks = statusResponse.missingChunks.toSet(),
            onChunkUpload = { progress ->
                val calculateProgress = calculateProgress(progress)
                println("Progress: $calculateProgress%")
            }
        ) { fileNode ->
            println("Upload sucessful: $fileNode")
        }

    } catch (ex: Exception) {
        println("Error: $ex")
        throw ex
    } finally {
        System.exit(1)
    }
}
//*/


/*
// ------------------------------------ Partial ------------------------------------
runBlocking {
    try {
        val initiateResponse: UploadInitiateResponse = client.post(initiateUrl) {
            contentType(ContentType.Application.Json)
            setBody(UploadInitiateRequest(file.name, file.length()))
        }.body()
        println(uploadChunkUrl(initiateResponse.uploadId))
        println(completeUrl("xb"))
        //println("Response: ${initiateResponse}")

        startPartialUpload(file = file, chunkSize = initiateResponse.chunkSize, uploadId = initiateResponse.uploadId) { chunk, response ->
            if (!response.status.isSuccess()) {
                println("Failed on chunk $chunk: ${response.status}")
            } else println("Uploaded chunck $chunk")
        }

    } catch (ex: Exception) {
        println("Error: $ex")
    } finally {
        System.exit(1)
    }
}
*/

/*
// ------------------------------------ Retry ------------------------------------
runBlocking {
    try {
        val uploadId = "996cbb9b-0acb-4eac-8aef-9f0c0b541a7f"
        val statusResponse: UploadProgressResponse = client.get(statusUrl(uploadId)).body()
        println("statusResponse uploadedChunks: ${statusResponse.uploadedChunks}")
        println("statusResponse totalBytes: ${statusResponse.getTotalBytes}")

        resumeUpload(file = file, chunkSize = 1048576, uploadId = uploadId, uploadedChunkOffsets = statusResponse.uploadedChunks) { chunk, response ->
            if (!response.status.isSuccess()) {
                println("Failed on chunk $chunk: ${response.status}")
            } else println("Uploaded chunck $chunk")
        }

    } catch (ex: Exception) {
        println("Error: $ex")
    } finally {
        System.exit(1)
    }
}
*/


statusResponse uploadedChunks: [1]
statusResponse totalBytes: 56405497
Skipping already uploaded chunk 0 at offset 0
Uploading chunk 1 at offset 1048576
Progress: 3%
Skipping already uploaded chunk 2 at offset 2097152
Skipping already uploaded chunk 3 at offset 3145728
Skipping already uploaded chunk 4 at offset 4194304
Skipping already uploaded chunk 5 at offset 5242880
Skipping already uploaded chunk 6 at offset 6291456
Skipping already uploaded chunk 7 at offset 7340032
Skipping already uploaded chunk 8 at offset 8388608
Skipping already uploaded chunk 9 at offset 9437184
Skipping already uploaded chunk 10 at offset 10485760
Skipping already uploaded chunk 11 at offset 11534336
Skipping already uploaded chunk 12 at offset 12582912
Skipping already uploaded chunk 13 at offset 13631488
Skipping already uploaded chunk 14 at offset 14680064
Skipping already uploaded chunk 15 at offset 15728640
Skipping already uploaded chunk 16 at offset 16777216
Skipping already uploaded chunk 17 at of