diff --git a/examples/features/http_upload_download/CMakeLists.txt b/examples/features/http_upload_download/CMakeLists.txt index 4e9b793b..195bdd00 100644 --- a/examples/features/http_upload_download/CMakeLists.txt +++ b/examples/features/http_upload_download/CMakeLists.txt @@ -2,7 +2,7 @@ # # Tencent is pleased to support the open source community by making tRPC available. # -# Copyright (C) 2023 Tencent. +# Copyright (C) 2023 THL A29 Limited, a Tencent company. # All rights reserved. # # If you have downloaded a copy of the tRPC source code from Tencent, diff --git a/examples/features/http_upload_download/README.md b/examples/features/http_upload_download/README.md index 9f23b539..daa25e4f 100644 --- a/examples/features/http_upload_download/README.md +++ b/examples/features/http_upload_download/README.md @@ -103,4 +103,352 @@ upload a file with content-length finish uploading, write size: 10485760 name: upload a file to server, ok: 1 final result of http calling: 1 +``` + + + + +代码体验报告:基于 tRPC 的文件传输增强功能开发 +一、项目概述 +项目名称:tRPC 文件传输增强模块 + +开发时间:2025年7月 + +开发工具/语言:C++ + +项目目标:在学习和使用 tRPC 的基础上,扩展实现文件上传/下载功能,支持进度条显示、SHA256 校验、多文件流处理、限速下载等,提高文件传输的可靠性与用户体验。 + +二、开发环境 +操作系统:ubuntu22.04 + +编程语言及版本:C++17 + +使用的框架/库: + +tRPC + +picosha2.h(用于 C++ 端 SHA256 哈希计算) + +其他工具: + +VS Code + +三、功能实现 + +✅ 文件上传下载统计可视化功能 + +*在上传或下载过程中,实时显示进度条,提升用户体验 + +*在上传完成时自动记录总耗时与平均速率 + +*使用 C++ 构建控制台进度条,适用于服务端日志或 CLI 工具 + +``` text +[Download Request] 📏 Content-Length: 10485760 bytes +[Download Request] 📦 Progress: [█████ ] 10.00% +[Download Request] 📦 Progress: [██████████ ] 20.00% +[Download Request] 📦 Progress: [███████████████ ] 30.00% +[Download Request] 📦 Progress: [████████████████████ ] 40.00% +[Download Request] 📦 Progress: [█████████████████████████ ] 50.00% +[Download Request] 📦 Progress: [██████████████████████████████ ] 60.00% +[Download Request] 📦 Progress: [███████████████████████████████████ ] 70.00% +[Download Request] 📦 Progress: [████████████████████████████████████████ ] 80.00% +[Download Request] 📦 Progress: [█████████████████████████████████████████████ ] 90.00% +[Download Request] 📦 Progress: [██████████████████████████████████████████████████] 100.00% +[Download Request] 📦 Progress: [██████████████████████████████████████████████████] 100.00% +[Download Request] 📥 Download complete: 10485760 bytes in 40 ms, avg speed: 250.00 MB/s +``` + + +✅ SHA256 校验(使用 picosha2.h) + +*在 C++ 模块中集成 picosha2.h 实现高效哈希计算 + +*确保文件上传/下载过程中的完整性,防止传输数据被篡改或破损 + +📥 计算 SHA256(文件端) + +``` cpp +#include +#include +#include "picosha2.h" // 放入你的 include 路径 + +namespace http::demo { + std::string CalculateSHA256(const std::string& file_path) { + std::ifstream file(file_path, std::ios::binary); + if (!file.is_open()) return ""; + + std::vector data; + char buffer[8192]; + + while (!file.eof()) { + file.read(buffer, sizeof(buffer)); + std::streamsize bytes_read = file.gcount(); + if (bytes_read > 0) { + data.insert(data.end(), buffer, buffer + bytes_read); + } + } + + return picosha2::hash256_hex_string(data); + } +} + +``` + +📤 服务端添加 SHA256 校验头 + +``` cpp +std::string hash_tex = CalculateSHA256(download_src_path_); +rsp->SetHeader("X-File-SHA256", hash_tex); +TRPC_FMT_INFO("[Download Request] Calculated SHA256 of downloaded file: {}", hash_tex); + +``` + +📦 客户端下载后进行校验 + +用户可选择是否启用本地校验,通过 FLAGS_enable_download_hash 控制 + +``` cpp +if (FLAGS_enable_download_hash && http_header.Has("X-File-SHA256")) { + std::string expected_hash = http_header.Get("X-File-SHA256"); + std::string actual_hash = CalculateSHA256(dst_path); + TRPC_FMT_INFO("downloaded client: {}", actual_hash); + + if (actual_hash != expected_hash) { + TRPC_FMT_ERROR("❌ SHA256 mismatch! Expected={}, Actual={}", expected_hash, actual_hash); + // 可触发错误处理流程 + } else { + TRPC_FMT_INFO("✅ Verified {} with SHA256: {}", dst_path, actual_hash); + } +} + +``` + +``` text +[2025-07-28 16:11:35.358] [thread 241376] [info] [examples/features/http_upload_download/server/file_storage_handler.cc:392] ✅ Sent download_src.bin, size: 10485760, hash: 0959acfa65ad143b3b27a5a2f7d4f7e3fb2ac005df1d1ac7e27dae8177ab1017 +[Multi-File Download] 🔢 Received bytes: 10485760, Expected: 10485760 +downloaded client: 0959acfa65ad143b3b27a5a2f7d4f7e3fb2ac005df1d1ac7e27dae8177ab1017 +[Download Request] ✅ SHA256 verified successfully + +``` +✅ 限速下载 + +*控制下载速率,避免服务端带宽过载,同时为用户提供平稳的传输体验。 + +通过记录发送数据的时间和字节数,动态计算实际传输速率,并在必要时引入延迟(sleep)控制发送速率不超过设定阈值。 + +``` cpp +if (enable_limit && duration_ms > 0) { + std::size_t actual_rate = sent_bytes * 1000 / duration_ms; // bytes/sec + + if (actual_rate > rate_limit_bytes_per_sec) { + std::size_t expected_duration = sent_bytes * 1000 / rate_limit_bytes_per_sec; + std::size_t extra_delay = expected_duration > duration_ms + ? expected_duration - duration_ms + : 0; + std::this_thread::sleep_for(std::chrono::milliseconds(extra_delay)); // 延迟传输 + } + + last_send_time = std::chrono::steady_clock::now(); // 更新发送时间 + sent_bytes = 0; // 清空计数器供下一轮使用 +} + + +``` +设置 constexpr std::size_t rate_limit_bytes_per_sec = 900 * 1024; // 512KB/s +``` text +[Download Request] 📦 Progress: [█████ ] 10.00% +[Download Request] 📦 Progress: [██████████ ] 20.00% +[Download Request] 📦 Progress: [███████████████ ] 30.00% +[Download Request] 📦 Progress: [████████████████████ ] 40.00% +[Download Request] 📦 Progress: [█████████████████████████ ] 50.00% +[Download Request] 📦 Progress: [██████████████████████████████ ] 60.00% +[Download Request] 📦 Progress: [███████████████████████████████████ ] 70.00% +[Download Request] 📦 Progress: [████████████████████████████████████████ ] 80.00% +[Download Request] 📦 Progress: [█████████████████████████████████████████████ ] 90.00% +[Download Request] 📦 Progress: [██████████████████████████████████████████████████] 100.00% +[2025-07-28 16:48:59.654] [thread 244291] [info] [examples/features/http_upload_download/server/file_storage_handler.cc:137] [Download Request] finish providing file, write size: 10485760 +[Download Request] 📦 Progress: [██████████████████████████████████████████████████] 100.00% +[Download Request] 📥 Download complete: 10485760 bytes in 11352 ms, avg speed: 0.88 MB/s +``` + +✅ 文件流式多文件传输(Streaming Multi-File Transfer) + +*流式读取:使用 std::ifstream 按块读取,适合大文件处理 + +*逐个发送文件:每个文件构造独立的 header(包含文件名、大小、hash),确保接收端可以识别和校验 + +*边读取边发送:不加载整个文件到内存,避免资源占用过高,适合多大文件连续传输 + +*封装结构体头部:通过 FileSegmentHeader 发送结构化元信息,利于客户端解析与验证 + +*非阻塞串流发送:利用 writer.Write(...) 实现异步非阻塞写入到流通道 + +🧩 多文件识别与调度 + +``` cpp +const std::string full_url = req->GetUrl(); +std::string path_only; +std::size_t pos = full_url.find('?'); + +if (pos != std::string::npos) { + path_only = full_url.substr(0, pos); +} else { + path_only = full_url; +} + +if (path_only == "/multi-download") { + TRPC_FMT_INFO("[Multi-File Download] Start processing"); + return DownloadMultipleFiles(ctx, req, rsp); // 🌟 调用多文件下载分发逻辑 +} + + +``` + +🚚 多文件发送核心代码 + +``` cpp +for (const auto& path : file_paths) { + std::string hash = CalculateSHA256(path); + TRPC_FMT_INFO("[Multi-File Download] 📦 SHA256: {}", hash); + + std::ifstream fin(path, std::ios::binary | std::ios::ate); + if (!fin.is_open()) { + TRPC_FMT_ERROR("❌ Failed to open: {}", path); + continue; + } + + std::size_t file_size = fin.tellg(); + fin.seekg(0); + + FileSegmentHeader header; + std::memset(&header, 0, sizeof(header)); + std::strncpy(header.filename, GetBaseName(path).c_str(), sizeof(header.filename)); + std::strncpy(header.hash_hex, hash.c_str(), sizeof(header.hash_hex)); + header.file_size = file_size; + + // ✉️ 发送 Header + ::trpc::NoncontiguousBufferBuilder builder; + builder.Append(static_cast(&header), sizeof(header)); + writer.Write(builder.DestructiveGet()); + + // 📤 分块发送文件内容 + while (fin) {[Multi-File Download] 🔍 Verifying download_src.bin, expected=0959acfa65ad143b3b27a5a2f7d4f7e3fb2ac005df1d1ac7e27dae8177ab1017, actual=0959acfa65ad143b3b27a5a2f7d4f7e3fb2ac005df1d1ac7e27dae8177ab1017 +[Multi-File Download] ✅ Verified download_src.bin + ::trpc::BufferBuilder buffer_builder; + fin.read(buffer_builder.data(), buffer_builder.SizeAvailable()); + std::size_t n = fin.gcount(); + if (n > 0) { + ::trpc::NoncontiguousBuffer buffer; + buffer.Append(buffer_builder.Seal(n)); + if (!writer.Write(std::move(buffer)).OK()) { + TRPC_FMT_ERROR("Write failed on file: {}", path); + return ::trpc::kStreamRstStatus; + } + } + } + + fin.close(); + TRPC_FMT_INFO("✅ Finished sending file: {}", header.filename); +} + +writer.WriteDone(); // 标记传输结束 + + +``` + +📦 FileSegmentHeader 示例结构说明 + +``` cpp + +struct FileSegmentHeader { + char filename[128]; // 文件名 + char hash_hex[64]; // SHA256 校验值(HEX格式) + std::uint64_t file_size; // 文件大小(字节) +}; + +``` + +``` text +[2025-07-28 16:11:35.358] [thread 241376] [info] [examples/features/http_upload_download/server/file_storage_handler.cc:392] ✅ Sent download_src.bin, size: 10485760, hash: 0959acfa65ad143b3b27a5a2f7d4f7e3fb2ac005df1d1ac7e27dae8177ab1017 +[Multi-File Download] 🔢 Received bytes: 10485760, Expected: 10485760 +downloaded client: 0959acfa65ad143b3b27a5a2f7d4f7e3fb2ac005df1d1ac7e27dae8177ab1017 +[Multi-File Download] 🔢 File saved: download_src.bin, size: 10485760 +[Multi-File Download] 🔍 Verifying download_src.bin, expected=0959acfa65ad143b3b27a5a2f7d4f7e3fb2ac005df1d1ac7e27dae8177ab1017, actual=0959acfa65ad143b3b27a5a2f7d4f7e3fb2ac005df1d1ac7e27dae8177ab1017 +[Multi-File Download] ✅ Verified download_src.bin +[2025-07-28 16:11:35.583] [thread 241376] [info] [examples/features/http_upload_download/server/file_storage_handler.cc:392] ✅ Sent download_dst.bin, size: 10485760, hash: 0959acfa65ad143b3b27a5a2f7d4f7e3fb2ac005df1d1ac7e27dae8177ab1017 +[Multi-File Download] 🔢 Received bytes: 10485760, Expected: 10485760 +[Multi-File Download] 🔢 File saved: download_dst.bin, size: 10485760 +[Multi-File Download] 🔍 Verifying download_dst.bin, expected=0959acfa65ad143b3b27a5a2f7d4f7e3fb2ac005df1d1ac7e27dae8177ab1017, actual=0959acfa65ad143b3b27a5a2f7d4f7e3fb2ac005df1d1ac7e27dae8177ab1017 +[Multi-File Download] ✅ Verified download_dst.bin +``` + + +四、开发过程体验 +🧩 遇到的问题及解决方案 + +1. SHA256 校验值不一致问题 + +🧨 问题表现 +服务端或客户端写入文件后立即调用 CalculateSHA256() 校验文件内容 + +获取到的哈希值与期望值不匹配(即使文件名和大小相同) + +🎯 原因分析 + +文件流(如 std::ofstream fout)在写入时使用缓冲机制。如果未主动刷新或关闭,部分数据可能仍停留在缓冲区,尚未写入磁盘 。 + +调用 CalculateSHA256() 时读取的是磁盘文件内容,如果未 flush,会导致读取数据不完整,从而哈希值错误。 + +✅ 解决办法 + +在文件写入完成后,务必执行如下操作: + +```cpp +fout.flush(); // 💡 强制刷新缓冲区 +fout.close(); // ✅ 自动关闭文件 & 刷新数据 + +``` +调用 CalculateSHA256() 校验之前,确保文件已完全写入磁盘。 + +2.添加限速下载 客户端出现超时 + +🧨 问题表现 + +客户端下载添加如下代码,定位问题。 + +```cpp + else if (status.ToString().find("timeout") != std::string::npos){ + TRPC_FMT_WARN("[Download Request] ⚠️ Read timed out, retrying..."); + continue; + } +``` + +🎯 原因分析 + +在调试过程中,发现客户端在 stream.Read() 时长时间阻塞,怀疑与服务端限速策略有关。为验证和规避这一问题,查阅了 HttpReadStream 的流式同步接口文档,发现其支持以下两种读取方式: +### HttpReadStream 流式同步接口列表 + +| 对象类型 | 接口签名 | 功能说明 | 参数说明 | 返回值 | +|---------------|--------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------|------------------------------------------------------------------|------------| +| HttpReadStream | `Status Read(NoncontiguousBuffer& item, size_t max_bytes)` | 读取指定长度的数据,整体读取过程受服务端配置的 timeout 控制 | `max_bytes`:读取字节数,如果剩余内容不足则立即返回并标识 EOF | `Status` | +| HttpReadStream | `Status Read(NoncontiguousBuffer& item, size_t max_bytes, const std::chrono::time_point& expiry)` | 读取指定长度的数据,阻塞直到达到指定的时间点 | `expiry`:如 `trpc::ReadSteadyClock() + std::chrono::milliseconds(3)` | `Status` | + + +对比后采用了第二种接口形式,明确设置了读取操作的超时时间点,有效防止了长期阻塞。最终代码调整如下: + +```cpp +status = stream.Read(buffer, kBufferSize, std::chrono::seconds(3)); +``` +通过配合 status.ToString().find("timeout") 的判断,实现了对限速场景的优雅处理,客户端逻辑更加健壮稳定。 + +✅ 解决办法 + +添加:status = stream.Read(buffer, kBufferSize, std::chrono::seconds(3)); + +```cpp + if(FLAGS_enable_download_limit) + status = stream.Read(buffer, kBufferSize, std::chrono::seconds(3)); + else + status = stream.Read(buffer, kBufferSize); ``` \ No newline at end of file diff --git a/examples/features/http_upload_download/client/BUILD b/examples/features/http_upload_download/client/BUILD index ecf62ca6..962afb54 100644 --- a/examples/features/http_upload_download/client/BUILD +++ b/examples/features/http_upload_download/client/BUILD @@ -6,6 +6,7 @@ cc_binary( name = "download_client", srcs = ["download_client.cc"], linkopts = ["-ldl"], + deps = [ "@trpc_cpp//trpc/client:trpc_client", "@trpc_cpp//trpc/client:make_client_context", @@ -15,6 +16,7 @@ cc_binary( "@trpc_cpp//trpc/common/config:trpc_config", "@trpc_cpp//trpc/coroutine:fiber", "@trpc_cpp//trpc/util/log:logging", + "//examples/features/http_upload_download/common:file_hasher", ], ) @@ -31,5 +33,7 @@ cc_binary( "@trpc_cpp//trpc/common/config:trpc_config", "@trpc_cpp//trpc/coroutine:fiber", "@trpc_cpp//trpc/util/log:logging", + + "//examples/features/http_upload_download/common:file_hasher", ], ) diff --git a/examples/features/http_upload_download/client/download_client.cc b/examples/features/http_upload_download/client/download_client.cc index aaf08b71..08f5d6c2 100644 --- a/examples/features/http_upload_download/client/download_client.cc +++ b/examples/features/http_upload_download/client/download_client.cc @@ -2,7 +2,7 @@ // // Tencent is pleased to support the open source community by making tRPC available. // -// Copyright (C) 2023 Tencent. +// Copyright (C) 2023 THL A29 Limited, a Tencent company. // All rights reserved. // // If you have downloaded a copy of the tRPC source code from Tencent, @@ -17,7 +17,7 @@ #include #include "gflags/gflags.h" - +#include "../common/picosha2.h" #include "trpc/client/http/http_service_proxy.h" #include "trpc/client/make_client_context.h" #include "trpc/client/trpc_client.h" @@ -27,115 +27,378 @@ #include "trpc/coroutine/fiber_latch.h" #include "trpc/util/log/logging.h" +#include "examples/features/http_upload_download/common/file_hasher.h" + DEFINE_string(service_name, "http_upload_download_client", "callee service name"); DEFINE_string(client_config, "trpc_cpp_fiber.yaml", ""); DEFINE_string(addr, "127.0.0.1:24858", "ip:port"); DEFINE_string(dst_path, "download_dst.bin", "file path to store the content which will be downloaded from the server"); +DEFINE_bool(enable_download_hash, true, "Enable SHA256 hash verification during download"); +DEFINE_bool(enable_download_limit, true, "Enable SHA256 hash verification during download"); +DEFINE_string(multi_download_dir, "./downloads", "Directory to save multiple downloaded files"); + + +struct FileSegmentHeader { + char filename[128]; // 文件名(null-terminated) + uint64_t file_size; // 文件内容字节数 + char hash_hex[64]; // 文件的 SHA256 +}; + +std::string ByteDump(const void* data, std::size_t len) { + const unsigned char* bytes = static_cast(data); + std::ostringstream oss; + oss << std::hex << std::setfill('0'); + + for (std::size_t i = 0; i < len; ++i) { + oss << std::setw(2) << static_cast(bytes[i]) << " "; + if ((i + 1) % 16 == 0) oss << "\n"; // 每16字节换行 + } + + return oss.str(); +} namespace http::demo { using HttpServiceProxyPtr = std::shared_ptr<::trpc::http::HttpServiceProxy>; + bool Download(const HttpServiceProxyPtr& proxy, const std::string& url, const std::string dst_path) { + // 打开输出文件(保存下载内容) auto fout = std::ofstream(dst_path, std::ios::binary); if (!fout.is_open()) { - TRPC_FMT_ERROR("failed to open file, file_path:{}", dst_path); + TRPC_FMT_ERROR("[Download Request] failed to open file, file_path:{}", dst_path); return false; } + // 创建客户端上下文并设置超时时间 auto ctx = ::trpc::MakeClientContext(proxy); - ctx->SetTimeout(5000); - // Creates HTTP stream. + ctx->SetTimeout(50000); + + // 通过 HTTP GET 创建流式连接 auto stream = proxy->Get(ctx, url); + + if (!stream.GetStatus().OK()) { - TRPC_FMT_ERROR("failed to create client stream"); + TRPC_FMT_ERROR("[Download Request] failed to create client stream"); return false; } - // Reads response header. + // 读取响应头 int http_status = 0; ::trpc::http::HttpHeader http_header; ::trpc::Status status = stream.ReadHeaders(http_status, http_header); if (!status.OK()) { - TRPC_FMT_ERROR("failed to read http header: {}", status.ToString()); + TRPC_FMT_ERROR("[Download Request] failed to read http header: {}", status.ToString()); return false; } else if (http_status != ::trpc::http::ResponseStatus::kOk) { - TRPC_FMT_ERROR("http response status:{}", http_status); + TRPC_FMT_ERROR("[Download Request] http response status:{}", http_status); return false; } + + std::size_t total_size = 0; + if (http_header.Has("X-File-Length")) { + total_size = std::stoull(http_header.Get("X-File-Length")); + TRPC_FMT_INFO("[Download Request] 📏 Content-Length: {} bytes", total_size); + } else { + TRPC_FMT_WARN("[Download Request] ⚠️ No Content-Length found, can't compute progress"); + } + + // 定义每次读取的 buffer 大小 constexpr std::size_t kBufferSize{1024 * 1024}; size_t nread{0}; - // Reads response content. + auto start_time = std::chrono::steady_clock::now(); + + // 读取响应内容块(直到 EOF) for (;;) { ::trpc::NoncontiguousBuffer buffer; - status = stream.Read(buffer, kBufferSize); + if(FLAGS_enable_download_limit) + status = stream.Read(buffer, kBufferSize, std::chrono::seconds(3)); + else + status = stream.Read(buffer, kBufferSize); + if (status.OK()) { nread += buffer.ByteSize(); + + if (total_size > 0) {//下载:速率 hasher + + +/* +1.支持一次上传多个文件(使用 multipart 或多路流) + +2.可以在每个文件流中记录独立的上传进度 + +3.建立文件元信息结构(如文件名、大小、用户ID) + + +4.按分片序号或 byte range 进行断点续传 + +5.支持临时存储和标记上传状态(已完成/未完成) + +6.与客户端配合实现分布式大文件传输 + + +提供 streaming response(边上传边返回进度) + +支持 WebSocket 或 SSE(服务端事件)用于浏览器反馈 + +可打通到前端 UI 组件显示进度条和速率曲线 + + +集成 Prometheus 导出指标: + +上传文件总数 + +总流量 + +当前并发上传数 + +单文件上传耗时分布 + +可用于 Grafana 进行可视化 + +限速上传(流控机制) + +超时断开连接 + +大文件分区限流,防止拖慢整个服务 + +上传后自动移动到分目录(按日期/用户) + +增加文件元信息存储(如 SQLite、Redis) + +可实现上传后的异步处理队列(如压缩、转码) +*/ + const int bar_width = 50; // 进度条宽度 + double progress = static_cast(nread) / total_size; + int pos = static_cast(bar_width * progress); + + std::stringstream ss; + ss << "[Download Request] 📦 Progress: ["; + for (int i = 0; i < bar_width; ++i) { + ss << (i < pos ? "█" : " "); + } + ss << "] " << std::fixed << std::setprecision(2) << (progress * 100.0) << "%"; + + TRPC_FMT_INFO("{}", ss.str()); + } + + // 将 buffer 中的每个块写入文件 for (const auto& block : buffer) { fout.write(block.data(), block.size()); } continue; } else if (status.StreamEof()) { + // 读取完毕,跳出循环 + auto end_time = std::chrono::steady_clock::now(); + auto duration_ms = std::chrono::duration_cast(end_time - start_time).count(); + double speed_mb = static_cast(nread) / (1024 * 1024) / (duration_ms / 1000.0); + + TRPC_FMT_INFO("[Download Request] 📥 Download complete: {} bytes in {} ms, avg speed: {:.2f} MB/s", nread, duration_ms, speed_mb); + break; } - TRPC_FMT_ERROR("failed to read response content: {}", status.ToString()); + else if (status.ToString().find("timeout") != std::string::npos){ + TRPC_FMT_WARN("[Download Request] ⚠️ Read timed out, retrying..."); + + continue; + } + TRPC_FMT_ERROR("[Download Request] failed to read response content from client : {}", status.ToString()); + return false; + } + + TRPC_FMT_INFO("[Download Request] finish downloading, read size: {}", nread); + + // 确保文件全部写入磁盘(防止哈希不一致) + fout.close(); + + // 如果开启哈希验证并且响应头中包含 hash 字段 + std::string expected_hash; + if (FLAGS_enable_download_hash && http_header.Has("X-File-SHA256")) { + expected_hash = http_header.Get("X-File-SHA256"); + + // 对本地下载后的文件进行 SHA256 计算 + std::string actual_hash = CalculateSHA256(dst_path); + TRPC_FMT_INFO("downloaded client: {}", actual_hash); + + // 比较本地哈希与服务端发来的 hash + if (expected_hash != actual_hash) { + TRPC_FMT_ERROR("[Download Request] ❌SHA256 mismatch! File may be corrupted."); + return false; + } else { + TRPC_FMT_INFO("[Download Request] ✅ SHA256 verified successfully "); + } + } else { + // 如果未收到 hash,则跳过校验 + TRPC_FMT_WARN("[Download Request] No SHA256 header received from server, skipping verification..."); + } + + return true; +} + + + +bool DownloadMultipleFiles(const HttpServiceProxyPtr& proxy, const std::string& url) { + + + auto ctx = ::trpc::MakeClientContext(proxy); + ctx->SetTimeout(10000); + auto stream = proxy->Get(ctx, url); + if (!stream.GetStatus().OK()) { + TRPC_FMT_ERROR("[Multi-File Download] failed to create client stream"); return false; } - TRPC_FMT_INFO("finish downloading, read size: {}", nread); + int http_status = 0; + ::trpc::http::HttpHeader http_header; + if (!stream.ReadHeaders(http_status, http_header).OK()) { + TRPC_FMT_ERROR("[Multi-File Download] failed to read http header"); + return false; + } + int file_index = 0; // 文件计数器 + + for (;;) { + + + // 1. 接收 header + ::trpc::NoncontiguousBuffer header_buf; + if (!stream.Read(header_buf, sizeof(FileSegmentHeader)).OK()) { + TRPC_FMT_INFO("[Multi-File Download] 📥 Finished reading {} file(s).", file_index); + break; + } + + FileSegmentHeader header; + if (header_buf.ByteSize() < sizeof(FileSegmentHeader)) { + TRPC_FMT_ERROR(" [Multi-File Download] Incomplete header received, size = {}", header_buf.ByteSize()); + return false; + } + + std::memcpy(&header, header_buf.begin()->data(), sizeof(FileSegmentHeader)); + /* + for (const auto& block : header_buf) { + TRPC_FMT_INFO("📏 Header block size: {}", block.size()); + } + */ + + file_index++; + + // 2. 准备写入文件 + std::string full_path = "download/" + std::string(header.filename); + std::ofstream fout(full_path, std::ios::binary); + if (!fout.is_open()) { + TRPC_FMT_ERROR("[Multi-File Download] cannot open file {}", header.filename); + return false; + } + //TRPC_FMT_INFO("🧬 Raw header bytes: {}",ByteDump(header_buf.begin()->data(), sizeof(FileSegmentHeader))); + + std::size_t received = 0; + while (received < header.file_size) { + ::trpc::NoncontiguousBuffer data_buf; + size_t chunk_size = std::min(1024 * 1024, header.file_size - received); + if (!stream.Read(data_buf, chunk_size).OK()) return false; + + + for (const auto& block : data_buf) { + fout.write(block.data(), block.size()); + received += block.size(); + } + } + TRPC_ASSERT(received == header.file_size); + TRPC_FMT_INFO("[Multi-File Download] 🔢 Received bytes: {}, Expected: {}", received, header.file_size); + + fout.flush(); // 强制刷新缓冲区 + fout.close(); + + TRPC_FMT_INFO("[Multi-File Download] 🔢 File saved: {}, size: {}", header.filename, header.file_size); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + // 3. 校验哈希(可选) + if (FLAGS_enable_download_hash) { + std::string actual_hash = CalculateSHA256(full_path); + TRPC_FMT_INFO("[Multi-File Download] 🔍 Verifying {}, expected={}, actual={}", header.filename, header.hash_hex, actual_hash); + //TRPC_FMT_INFO("🔍 Calculating hash for saved file at path: {}", full_path); + + if (actual_hash != std::string(header.hash_hex)) { + TRPC_FMT_ERROR("[Multi-File Download] ❌ Hash mismatch for {}", header.filename); + return false; + } else { + TRPC_FMT_INFO("[Multi-File Download] ✅ Verified {}", header.filename); + } + } + } + return true; } + +// 主业务入口,执行所有 HTTP 调用任务 int Run() { - bool final_ok{true}; + bool final_ok{true}; // 用于汇总所有任务是否执行成功 + // 定义一个结构体,表示每个 HTTP 调用任务 struct http_calling_args_t { - std::string calling_name; - std::function calling_executor; - bool ok; + std::string calling_name; // 描述任务名称(用于日志) + std::function calling_executor; // 实际执行的函数(返回 true / false 表示成功与否) + bool ok; // 执行结果(由执行器函数更新) }; + // 初始化 HTTP 客户端配置项 ::trpc::ServiceProxyOption option; - option.name = FLAGS_service_name; - option.codec_name = "http"; - option.network = "tcp"; - option.conn_type = "long"; - option.timeout = 5000; - option.selector_name = "direct"; - option.target = FLAGS_addr; + option.name = FLAGS_service_name; // 服务名(来自命令行参数) + option.codec_name = "http"; // 使用 HTTP 协议编码 + option.network = "tcp"; // 底层传输使用 TCP + option.conn_type = "long"; // 长连接 + option.timeout = 5000; // 请求超时时间(ms) + option.selector_name = "direct"; // 选择策略:直接连接目标地址 + option.target = FLAGS_addr; // 服务地址(host:port) + // 创建 HTTP 客户端代理对象 auto http_client = ::trpc::GetTrpcClient()->GetProxy<::trpc::http::HttpServiceProxy>(FLAGS_service_name, option); + + // 任务列表: std::vector callings{ - {"download a file from the server", - [http_client, dst_path = FLAGS_dst_path]() { - return Download(http_client, "http://example.com/download", dst_path); - }, - false}, + + { + "download a file from the server", // 描述任务 + [http_client, dst_path = FLAGS_dst_path]() { + return Download(http_client, "http://example.com/download", dst_path); // 实际执行器 + }, + false // 初始状态未执行 + }, + {"download multiple files from the server", + [http_client]() { + return DownloadMultipleFiles(http_client, "http://example.com/multi-download?files=download_src.bin,download_dst.bin"); + }, + false}, }; + // 初始化 fiber latch 等待器,用于等待所有任务完成 auto latch_count = static_cast(callings.size()); ::trpc::FiberLatch callings_latch{latch_count}; + // 启动所有任务,每个任务运行在一个独立的 fiber 中 for (auto& c : callings) { ::trpc::StartFiberDetached([&callings_latch, &c]() { - c.ok = c.calling_executor(); - callings_latch.CountDown(); + c.ok = c.calling_executor(); // 执行任务,更新状态 + callings_latch.CountDown(); // 当前任务完成,通知 latch 减一 }); } + // 等待所有任务完成 callings_latch.Wait(); + // 输出每个任务的执行结果,并统计整体是否成功 for (const auto& c : callings) { - final_ok &= c.ok; + final_ok &= c.ok; // 只要有一个任务失败,最终结果为 false std::cout << "name: " << c.calling_name << ", ok: " << c.ok << std::endl; } + // 输出整体执行结果 std::cout << "final result of http calling: " << final_ok << std::endl; - return final_ok ? 0 : -1; -} -} // namespace http::demo + // 返回值决定是否退出异常:0 表示全部成功,-1 表示有失败 + return final_ok; +} // namespace http::demo +} bool ParseClientConfig(int argc, char* argv[]) { google::ParseCommandLineFlags(&argc, &argv, true); google::CommandLineFlagInfo info; @@ -165,3 +428,5 @@ int main(int argc, char* argv[]) { // `RunInTrpcRuntime` function return ::trpc::RunInTrpcRuntime([]() { return http::demo::Run(); }); } + + diff --git a/examples/features/http_upload_download/client/picosha2.h b/examples/features/http_upload_download/client/picosha2.h new file mode 100644 index 00000000..ef965e65 --- /dev/null +++ b/examples/features/http_upload_download/client/picosha2.h @@ -0,0 +1,388 @@ +/* +The MIT License (MIT) + +Copyright (C) 2017 okdshin + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ +#ifndef PICOSHA2_H +#define PICOSHA2_H +// picosha2:20140213 + +#ifndef PICOSHA2_BUFFER_SIZE_FOR_INPUT_ITERATOR +#define PICOSHA2_BUFFER_SIZE_FOR_INPUT_ITERATOR \ + 1048576 //=1024*1024: default is 1MB memory +#endif + +#include +#include +#include +#include +#include +#include +namespace picosha2 { +typedef unsigned long word_t; +typedef unsigned char byte_t; + +static const size_t k_digest_size = 32; + +namespace detail { +inline byte_t mask_8bit(byte_t x) { return x & 0xff; } + +inline word_t mask_32bit(word_t x) { return x & 0xffffffff; } + +const word_t add_constant[64] = { + 0x428a2f98, 0x71374491, 0xb5c0fbcf, 0xe9b5dba5, 0x3956c25b, 0x59f111f1, + 0x923f82a4, 0xab1c5ed5, 0xd807aa98, 0x12835b01, 0x243185be, 0x550c7dc3, + 0x72be5d74, 0x80deb1fe, 0x9bdc06a7, 0xc19bf174, 0xe49b69c1, 0xefbe4786, + 0x0fc19dc6, 0x240ca1cc, 0x2de92c6f, 0x4a7484aa, 0x5cb0a9dc, 0x76f988da, + 0x983e5152, 0xa831c66d, 0xb00327c8, 0xbf597fc7, 0xc6e00bf3, 0xd5a79147, + 0x06ca6351, 0x14292967, 0x27b70a85, 0x2e1b2138, 0x4d2c6dfc, 0x53380d13, + 0x650a7354, 0x766a0abb, 0x81c2c92e, 0x92722c85, 0xa2bfe8a1, 0xa81a664b, + 0xc24b8b70, 0xc76c51a3, 0xd192e819, 0xd6990624, 0xf40e3585, 0x106aa070, + 0x19a4c116, 0x1e376c08, 0x2748774c, 0x34b0bcb5, 0x391c0cb3, 0x4ed8aa4a, + 0x5b9cca4f, 0x682e6ff3, 0x748f82ee, 0x78a5636f, 0x84c87814, 0x8cc70208, + 0x90befffa, 0xa4506ceb, 0xbef9a3f7, 0xc67178f2}; + +const word_t initial_message_digest[8] = {0x6a09e667, 0xbb67ae85, 0x3c6ef372, + 0xa54ff53a, 0x510e527f, 0x9b05688c, + 0x1f83d9ab, 0x5be0cd19}; + +inline word_t ch(word_t x, word_t y, word_t z) { return (x & y) ^ ((~x) & z); } + +inline word_t maj(word_t x, word_t y, word_t z) { + return (x & y) ^ (x & z) ^ (y & z); +} + +inline word_t rotr(word_t x, std::size_t n) { + assert(n < 32); + return mask_32bit((x >> n) | (x << (32 - n))); +} + +inline word_t bsig0(word_t x) { return rotr(x, 2) ^ rotr(x, 13) ^ rotr(x, 22); } + +inline word_t bsig1(word_t x) { return rotr(x, 6) ^ rotr(x, 11) ^ rotr(x, 25); } + +inline word_t shr(word_t x, std::size_t n) { + assert(n < 32); + return x >> n; +} + +inline word_t ssig0(word_t x) { return rotr(x, 7) ^ rotr(x, 18) ^ shr(x, 3); } + +inline word_t ssig1(word_t x) { return rotr(x, 17) ^ rotr(x, 19) ^ shr(x, 10); } + +template +void hash256_block(RaIter1 message_digest, RaIter2 first, RaIter2 last) { + assert(first + 64 == last); + static_cast(last); // for avoiding unused-variable warning + word_t w[64]; + std::fill(w, w + 64, word_t(0)); + for (std::size_t i = 0; i < 16; ++i) { + w[i] = (static_cast(mask_8bit(*(first + i * 4))) << 24) | + (static_cast(mask_8bit(*(first + i * 4 + 1))) << 16) | + (static_cast(mask_8bit(*(first + i * 4 + 2))) << 8) | + (static_cast(mask_8bit(*(first + i * 4 + 3)))); + } + for (std::size_t i = 16; i < 64; ++i) { + w[i] = mask_32bit(ssig1(w[i - 2]) + w[i - 7] + ssig0(w[i - 15]) + + w[i - 16]); + } + + word_t a = *message_digest; + word_t b = *(message_digest + 1); + word_t c = *(message_digest + 2); + word_t d = *(message_digest + 3); + word_t e = *(message_digest + 4); + word_t f = *(message_digest + 5); + word_t g = *(message_digest + 6); + word_t h = *(message_digest + 7); + + for (std::size_t i = 0; i < 64; ++i) { + word_t temp1 = h + bsig1(e) + ch(e, f, g) + add_constant[i] + w[i]; + word_t temp2 = bsig0(a) + maj(a, b, c); + h = g; + g = f; + f = e; + e = mask_32bit(d + temp1); + d = c; + c = b; + b = a; + a = mask_32bit(temp1 + temp2); + } + *message_digest += a; + *(message_digest + 1) += b; + *(message_digest + 2) += c; + *(message_digest + 3) += d; + *(message_digest + 4) += e; + *(message_digest + 5) += f; + *(message_digest + 6) += g; + *(message_digest + 7) += h; + for (std::size_t i = 0; i < 8; ++i) { + *(message_digest + i) = mask_32bit(*(message_digest + i)); + } +} + +} // namespace detail + +template +void output_hex(InIter first, InIter last, std::ostream& os) { + os.setf(std::ios::hex, std::ios::basefield); + while (first != last) { + os.width(2); + os.fill('0'); + os << static_cast(*first); + ++first; + } + os.setf(std::ios::dec, std::ios::basefield); +} + +template +void bytes_to_hex_string(InIter first, InIter last, std::string& hex_str) { + std::ostringstream oss; + output_hex(first, last, oss); + hex_str.assign(oss.str()); +} + +template +void bytes_to_hex_string(const InContainer& bytes, std::string& hex_str) { + bytes_to_hex_string(bytes.begin(), bytes.end(), hex_str); +} + +template +std::string bytes_to_hex_string(InIter first, InIter last) { + std::string hex_str; + bytes_to_hex_string(first, last, hex_str); + return hex_str; +} + +template +std::string bytes_to_hex_string(const InContainer& bytes) { + std::string hex_str; + bytes_to_hex_string(bytes, hex_str); + return hex_str; +} + +class hash256_one_by_one { + public: + hash256_one_by_one() { init(); } + + void init() { + buffer_.clear(); + std::fill(data_length_digits_, data_length_digits_ + 4, word_t(0)); + std::copy(detail::initial_message_digest, + detail::initial_message_digest + 8, h_); + } + + template + void process(RaIter first, RaIter last) { + add_to_data_length(static_cast(std::distance(first, last))); + std::copy(first, last, std::back_inserter(buffer_)); + std::size_t i = 0; + for (; i + 64 <= buffer_.size(); i += 64) { + detail::hash256_block(h_, buffer_.begin() + i, + buffer_.begin() + i + 64); + } + buffer_.erase(buffer_.begin(), buffer_.begin() + i); + } + + void finish() { + byte_t temp[64]; + std::fill(temp, temp + 64, byte_t(0)); + std::size_t remains = buffer_.size(); + std::copy(buffer_.begin(), buffer_.end(), temp); + assert(remains < 64); + + // This branch is not executed actually (`remains` is always lower than 64), + // but needed to avoid g++ false-positive warning. + // See https://github.com/okdshin/PicoSHA2/issues/25 + // vvvvvvvvvvvvvvvv + if(remains >= 64) { + remains = 63; + } + // ^^^^^^^^^^^^^^^^ + + temp[remains] = 0x80; + + if (remains > 55) { + std::fill(temp + remains + 1, temp + 64, byte_t(0)); + detail::hash256_block(h_, temp, temp + 64); + std::fill(temp, temp + 64 - 4, byte_t(0)); + } else { + std::fill(temp + remains + 1, temp + 64 - 4, byte_t(0)); + } + + write_data_bit_length(&(temp[56])); + detail::hash256_block(h_, temp, temp + 64); + } + + template + void get_hash_bytes(OutIter first, OutIter last) const { + for (const word_t* iter = h_; iter != h_ + 8; ++iter) { + for (std::size_t i = 0; i < 4 && first != last; ++i) { + *(first++) = detail::mask_8bit( + static_cast((*iter >> (24 - 8 * i)))); + } + } + } + + private: + void add_to_data_length(word_t n) { + word_t carry = 0; + data_length_digits_[0] += n; + for (std::size_t i = 0; i < 4; ++i) { + data_length_digits_[i] += carry; + if (data_length_digits_[i] >= 65536u) { + carry = data_length_digits_[i] >> 16; + data_length_digits_[i] &= 65535u; + } else { + break; + } + } + } + void write_data_bit_length(byte_t* begin) { + word_t data_bit_length_digits[4]; + std::copy(data_length_digits_, data_length_digits_ + 4, + data_bit_length_digits); + + // convert byte length to bit length (multiply 8 or shift 3 times left) + word_t carry = 0; + for (std::size_t i = 0; i < 4; ++i) { + word_t before_val = data_bit_length_digits[i]; + data_bit_length_digits[i] <<= 3; + data_bit_length_digits[i] |= carry; + data_bit_length_digits[i] &= 65535u; + carry = (before_val >> (16 - 3)) & 65535u; + } + + // write data_bit_length + for (int i = 3; i >= 0; --i) { + (*begin++) = static_cast(data_bit_length_digits[i] >> 8); + (*begin++) = static_cast(data_bit_length_digits[i]); + } + } + std::vector buffer_; + word_t data_length_digits_[4]; // as 64bit integer (16bit x 4 integer) + word_t h_[8]; +}; + +inline void get_hash_hex_string(const hash256_one_by_one& hasher, + std::string& hex_str) { + byte_t hash[k_digest_size]; + hasher.get_hash_bytes(hash, hash + k_digest_size); + return bytes_to_hex_string(hash, hash + k_digest_size, hex_str); +} + +inline std::string get_hash_hex_string(const hash256_one_by_one& hasher) { + std::string hex_str; + get_hash_hex_string(hasher, hex_str); + return hex_str; +} + +namespace impl { +template +void hash256_impl(RaIter first, RaIter last, OutIter first2, OutIter last2, int, + std::random_access_iterator_tag) { + hash256_one_by_one hasher; + // hasher.init(); + hasher.process(first, last); + hasher.finish(); + hasher.get_hash_bytes(first2, last2); +} + +template +void hash256_impl(InputIter first, InputIter last, OutIter first2, + OutIter last2, int buffer_size, std::input_iterator_tag) { + std::vector buffer(buffer_size); + hash256_one_by_one hasher; + // hasher.init(); + while (first != last) { + int size = buffer_size; + for (int i = 0; i != buffer_size; ++i, ++first) { + if (first == last) { + size = i; + break; + } + buffer[i] = *first; + } + hasher.process(buffer.begin(), buffer.begin() + size); + } + hasher.finish(); + hasher.get_hash_bytes(first2, last2); +} +} + +template +void hash256(InIter first, InIter last, OutIter first2, OutIter last2, + int buffer_size = PICOSHA2_BUFFER_SIZE_FOR_INPUT_ITERATOR) { + picosha2::impl::hash256_impl( + first, last, first2, last2, buffer_size, + typename std::iterator_traits::iterator_category()); +} + +template +void hash256(InIter first, InIter last, OutContainer& dst) { + hash256(first, last, dst.begin(), dst.end()); +} + +template +void hash256(const InContainer& src, OutIter first, OutIter last) { + hash256(src.begin(), src.end(), first, last); +} + +template +void hash256(const InContainer& src, OutContainer& dst) { + hash256(src.begin(), src.end(), dst.begin(), dst.end()); +} + +template +void hash256_hex_string(InIter first, InIter last, std::string& hex_str) { + byte_t hashed[k_digest_size]; + hash256(first, last, hashed, hashed + k_digest_size); + std::ostringstream oss; + output_hex(hashed, hashed + k_digest_size, oss); + hex_str.assign(oss.str()); +} + +template +std::string hash256_hex_string(InIter first, InIter last) { + std::string hex_str; + hash256_hex_string(first, last, hex_str); + return hex_str; +} + +inline void hash256_hex_string(const std::string& src, std::string& hex_str) { + hash256_hex_string(src.begin(), src.end(), hex_str); +} + +template +void hash256_hex_string(const InContainer& src, std::string& hex_str) { + hash256_hex_string(src.begin(), src.end(), hex_str); +} + +template +std::string hash256_hex_string(const InContainer& src) { + return hash256_hex_string(src.begin(), src.end()); +} +templatevoid hash256(std::ifstream& f, OutIter first, OutIter last){ + hash256(std::istreambuf_iterator(f), std::istreambuf_iterator(), first,last); + +} +}// namespace picosha2 +#endif // PICOSHA2_H diff --git a/examples/features/http_upload_download/client/upload_client.cc b/examples/features/http_upload_download/client/upload_client.cc index 09d39706..6b8444f7 100644 --- a/examples/features/http_upload_download/client/upload_client.cc +++ b/examples/features/http_upload_download/client/upload_client.cc @@ -2,7 +2,7 @@ // // Tencent is pleased to support the open source community by making tRPC available. // -// Copyright (C) 2023 Tencent. +// Copyright (C) 2023 THL A29 Limited, a Tencent company. // All rights reserved. // // If you have downloaded a copy of the tRPC source code from Tencent, @@ -27,20 +27,27 @@ #include "trpc/coroutine/fiber_latch.h" #include "trpc/util/log/logging.h" +#include "examples/features/http_upload_download/common/file_hasher.h" + + DEFINE_string(service_name, "http_upload_download_client", "callee service name"); DEFINE_string(client_config, "trpc_cpp_fiber.yaml", ""); DEFINE_string(addr, "127.0.0.1:24858", "ip:port"); DEFINE_string(src_path, "upload_src.bin", "file path to store the content which will be upload to the server"); DEFINE_bool(use_chunked, true, "send request content in chunked"); +DEFINE_bool(enable_upload_hash, true, "Enable SHA256 hash verification during upload"); + + + namespace http::demo { using HttpServiceProxyPtr = std::shared_ptr<::trpc::http::HttpServiceProxy>; bool UploadWithChunked(const HttpServiceProxyPtr& proxy, const std::string& url, const std::string src_path) { - TRPC_FMT_INFO("upload a file with chunked"); + TRPC_FMT_INFO("[Upload Request] upload a file with chunked"); auto fin = std::ifstream(src_path, std::ios::binary); if (!fin.is_open()) { - TRPC_FMT_ERROR("failed to open file, file_path: {}", src_path); + TRPC_FMT_ERROR("[Upload Request] failed to open file, file_path: {}", src_path); return false; } @@ -48,10 +55,17 @@ bool UploadWithChunked(const HttpServiceProxyPtr& proxy, const std::string& url, ctx->SetTimeout(5000); // Send request content in chunked. ctx->SetHttpHeader(::trpc::http::kHeaderTransferEncoding, ::trpc::http::kTransferEncodingChunked); + + if (FLAGS_enable_upload_hash) { + std::string hash = CalculateSHA256(src_path); + + ctx->SetHttpHeader("X-File-Hash", hash); + } + // Creates HTTP stream. auto stream = proxy->Post(ctx, url); if (!stream.GetStatus().OK()) { - TRPC_FMT_ERROR("failed to create client stream"); + TRPC_FMT_ERROR("[Upload Request] failed to create client stream"); return false; } @@ -70,7 +84,7 @@ bool UploadWithChunked(const HttpServiceProxyPtr& proxy, const std::string& url, nwrite += n; continue; } - TRPC_FMT_ERROR("failed to write request content: {}", status.ToString()); + TRPC_FMT_ERROR("[Upload Request] failed to write request content: {}", status.ToString()); return false; } else if (fin.eof()) { status = stream.WriteDone(); @@ -78,7 +92,7 @@ bool UploadWithChunked(const HttpServiceProxyPtr& proxy, const std::string& url, TRPC_FMT_ERROR("failed to send write-done: {}", status.ToString()); return false; } - TRPC_FMT_ERROR("failed to read file"); + TRPC_FMT_ERROR("[Upload Request] failed to read file"); return false; } @@ -87,19 +101,19 @@ bool UploadWithChunked(const HttpServiceProxyPtr& proxy, const std::string& url, // Reads response header. ::trpc::Status status = stream.ReadHeaders(http_status, http_header); if (!status.OK()) { - TRPC_FMT_ERROR("failed to read http header: {}", status.ToString()); + TRPC_FMT_ERROR("[Upload Request] failed to read http header: {}", status.ToString()); return false; } else if (http_status != ::trpc::http::ResponseStatus::kOk) { - TRPC_FMT_ERROR("http response status: {}", http_status); + TRPC_FMT_ERROR("[Upload Request] http response status: {}", http_status); return false; } - TRPC_FMT_INFO("finish uploading, write size: {}", nwrite); + TRPC_FMT_INFO("[Upload Request] finish uploading, write size: {}", nwrite); return true; } bool UploadWithContentLength(const HttpServiceProxyPtr& proxy, const std::string& url, const std::string src_path) { - TRPC_FMT_INFO("upload a file with content-length"); + TRPC_FMT_INFO("[Upload Request] upload a file with content-length"); auto fin = std::ifstream(src_path, std::ios::binary); if (!fin.is_open()) { TRPC_FMT_ERROR("failed to open file, file_path: {}", src_path); @@ -114,7 +128,7 @@ bool UploadWithContentLength(const HttpServiceProxyPtr& proxy, const std::string return fsize; }(fin); if (fsize <= 0) { - TRPC_FMT_ERROR("failed to read file size, file_path: {}", src_path); + TRPC_FMT_ERROR("[Upload Request] failed to read file size, file_path: {}", src_path); return false; } @@ -122,10 +136,16 @@ bool UploadWithContentLength(const HttpServiceProxyPtr& proxy, const std::string ctx->SetTimeout(5000); // Send request content with content-length. ctx->SetHttpHeader(::trpc::http::kHeaderContentLength, std::to_string(fsize)); + + if (FLAGS_enable_upload_hash) { + std::string hash = CalculateSHA256(src_path); + ctx->SetHttpHeader("X-File-Hash", hash); + } + // Creates HTTP stream. auto stream = proxy->Post(ctx, url); if (!stream.GetStatus().OK()) { - TRPC_FMT_ERROR("failed to create client stream"); + TRPC_FMT_ERROR("[Upload Request] failed to create client stream"); return false; } @@ -144,7 +164,7 @@ bool UploadWithContentLength(const HttpServiceProxyPtr& proxy, const std::string nwrite += n; continue; } - TRPC_FMT_ERROR("failed to write request content: {}", status.ToString()); + TRPC_FMT_ERROR("[Upload Request] failed to write request content: {}", status.ToString()); return false; } else if (fin.eof()) { status = stream.WriteDone(); @@ -152,7 +172,7 @@ bool UploadWithContentLength(const HttpServiceProxyPtr& proxy, const std::string TRPC_FMT_ERROR("failed to send write-done: {}", status.ToString()); return false; } - TRPC_FMT_ERROR("failed to read file"); + TRPC_FMT_ERROR("[Upload Request] failed to read file"); return false; } @@ -161,14 +181,14 @@ bool UploadWithContentLength(const HttpServiceProxyPtr& proxy, const std::string // Reads response header. status = stream.ReadHeaders(http_status, http_header); if (!status.OK()) { - TRPC_FMT_ERROR("failed to read http header: {}", status.ToString()); + TRPC_FMT_ERROR("[Upload Request] failed to read http header: {}", status.ToString()); return false; } else if (http_status != ::trpc::http::ResponseStatus::kOk) { - TRPC_FMT_ERROR("http response status: {}", http_status); + TRPC_FMT_ERROR("[Upload Request] http response status: {}", http_status); return false; } - TRPC_FMT_INFO("finish uploading, write size: {}", nwrite); + TRPC_FMT_INFO("[Upload Request] finish uploading, write size: {}", nwrite); return true; } diff --git a/examples/features/http_upload_download/common/BUILD b/examples/features/http_upload_download/common/BUILD new file mode 100644 index 00000000..71f35960 --- /dev/null +++ b/examples/features/http_upload_download/common/BUILD @@ -0,0 +1,11 @@ +licenses(["notice"]) + +package(default_visibility = ["//visibility:public"]) + +cc_library( + name = "file_hasher", + srcs = ["file_hasher.cc"], + hdrs = ["file_hasher.h", "picosha2.h"], + visibility = ["//visibility:public"], +) + diff --git a/examples/features/http_upload_download/common/file_hasher.cc b/examples/features/http_upload_download/common/file_hasher.cc new file mode 100644 index 00000000..fb69f889 --- /dev/null +++ b/examples/features/http_upload_download/common/file_hasher.cc @@ -0,0 +1,28 @@ +#include +#include +#include +#include +#include +#include "picosha2.h" // 下载自 https://github.com/okdshin/PicoSHA2 + +namespace http::demo { +std::string CalculateSHA256(const std::string& file_path) { + std::ifstream file(file_path, std::ios::binary); + if (!file.is_open()) return ""; + + std::vector data; + char buffer[8192]; + + while (!file.eof()) { + file.read(buffer, sizeof(buffer)); + std::streamsize bytes_read = file.gcount(); + if (bytes_read > 0) { + data.insert(data.end(), buffer, buffer + bytes_read); + } + } + + return picosha2::hash256_hex_string(data); +} + + +} diff --git a/examples/features/http_upload_download/common/file_hasher.h b/examples/features/http_upload_download/common/file_hasher.h new file mode 100644 index 00000000..fd9ab6f2 --- /dev/null +++ b/examples/features/http_upload_download/common/file_hasher.h @@ -0,0 +1,27 @@ +#include +#include +#include +#include +#include +#include "picosha2.h" // 下载自 https://github.com/okdshin/PicoSHA2 + +namespace http::demo { + +std::string CalculateSHA256(const std::string& file_path) { + std::ifstream file(file_path, std::ios::binary); + if (!file.is_open()) return ""; + + std::vector data; + char buffer[8192]; + while (file.read(buffer, sizeof(buffer))) { + data.insert(data.end(), buffer, buffer + file.gcount()); + } + if (file.gcount() > 0) { + data.insert(data.end(), buffer, buffer + file.gcount()); + } + + return picosha2::hash256_hex_string(data); +} + +// 你也可以直接在 Post() 方法中调用该函数来校验上传结果 +} diff --git a/examples/features/http_upload_download/common/picosha2.h b/examples/features/http_upload_download/common/picosha2.h new file mode 100644 index 00000000..ef965e65 --- /dev/null +++ b/examples/features/http_upload_download/common/picosha2.h @@ -0,0 +1,388 @@ +/* +The MIT License (MIT) + +Copyright (C) 2017 okdshin + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ +#ifndef PICOSHA2_H +#define PICOSHA2_H +// picosha2:20140213 + +#ifndef PICOSHA2_BUFFER_SIZE_FOR_INPUT_ITERATOR +#define PICOSHA2_BUFFER_SIZE_FOR_INPUT_ITERATOR \ + 1048576 //=1024*1024: default is 1MB memory +#endif + +#include +#include +#include +#include +#include +#include +namespace picosha2 { +typedef unsigned long word_t; +typedef unsigned char byte_t; + +static const size_t k_digest_size = 32; + +namespace detail { +inline byte_t mask_8bit(byte_t x) { return x & 0xff; } + +inline word_t mask_32bit(word_t x) { return x & 0xffffffff; } + +const word_t add_constant[64] = { + 0x428a2f98, 0x71374491, 0xb5c0fbcf, 0xe9b5dba5, 0x3956c25b, 0x59f111f1, + 0x923f82a4, 0xab1c5ed5, 0xd807aa98, 0x12835b01, 0x243185be, 0x550c7dc3, + 0x72be5d74, 0x80deb1fe, 0x9bdc06a7, 0xc19bf174, 0xe49b69c1, 0xefbe4786, + 0x0fc19dc6, 0x240ca1cc, 0x2de92c6f, 0x4a7484aa, 0x5cb0a9dc, 0x76f988da, + 0x983e5152, 0xa831c66d, 0xb00327c8, 0xbf597fc7, 0xc6e00bf3, 0xd5a79147, + 0x06ca6351, 0x14292967, 0x27b70a85, 0x2e1b2138, 0x4d2c6dfc, 0x53380d13, + 0x650a7354, 0x766a0abb, 0x81c2c92e, 0x92722c85, 0xa2bfe8a1, 0xa81a664b, + 0xc24b8b70, 0xc76c51a3, 0xd192e819, 0xd6990624, 0xf40e3585, 0x106aa070, + 0x19a4c116, 0x1e376c08, 0x2748774c, 0x34b0bcb5, 0x391c0cb3, 0x4ed8aa4a, + 0x5b9cca4f, 0x682e6ff3, 0x748f82ee, 0x78a5636f, 0x84c87814, 0x8cc70208, + 0x90befffa, 0xa4506ceb, 0xbef9a3f7, 0xc67178f2}; + +const word_t initial_message_digest[8] = {0x6a09e667, 0xbb67ae85, 0x3c6ef372, + 0xa54ff53a, 0x510e527f, 0x9b05688c, + 0x1f83d9ab, 0x5be0cd19}; + +inline word_t ch(word_t x, word_t y, word_t z) { return (x & y) ^ ((~x) & z); } + +inline word_t maj(word_t x, word_t y, word_t z) { + return (x & y) ^ (x & z) ^ (y & z); +} + +inline word_t rotr(word_t x, std::size_t n) { + assert(n < 32); + return mask_32bit((x >> n) | (x << (32 - n))); +} + +inline word_t bsig0(word_t x) { return rotr(x, 2) ^ rotr(x, 13) ^ rotr(x, 22); } + +inline word_t bsig1(word_t x) { return rotr(x, 6) ^ rotr(x, 11) ^ rotr(x, 25); } + +inline word_t shr(word_t x, std::size_t n) { + assert(n < 32); + return x >> n; +} + +inline word_t ssig0(word_t x) { return rotr(x, 7) ^ rotr(x, 18) ^ shr(x, 3); } + +inline word_t ssig1(word_t x) { return rotr(x, 17) ^ rotr(x, 19) ^ shr(x, 10); } + +template +void hash256_block(RaIter1 message_digest, RaIter2 first, RaIter2 last) { + assert(first + 64 == last); + static_cast(last); // for avoiding unused-variable warning + word_t w[64]; + std::fill(w, w + 64, word_t(0)); + for (std::size_t i = 0; i < 16; ++i) { + w[i] = (static_cast(mask_8bit(*(first + i * 4))) << 24) | + (static_cast(mask_8bit(*(first + i * 4 + 1))) << 16) | + (static_cast(mask_8bit(*(first + i * 4 + 2))) << 8) | + (static_cast(mask_8bit(*(first + i * 4 + 3)))); + } + for (std::size_t i = 16; i < 64; ++i) { + w[i] = mask_32bit(ssig1(w[i - 2]) + w[i - 7] + ssig0(w[i - 15]) + + w[i - 16]); + } + + word_t a = *message_digest; + word_t b = *(message_digest + 1); + word_t c = *(message_digest + 2); + word_t d = *(message_digest + 3); + word_t e = *(message_digest + 4); + word_t f = *(message_digest + 5); + word_t g = *(message_digest + 6); + word_t h = *(message_digest + 7); + + for (std::size_t i = 0; i < 64; ++i) { + word_t temp1 = h + bsig1(e) + ch(e, f, g) + add_constant[i] + w[i]; + word_t temp2 = bsig0(a) + maj(a, b, c); + h = g; + g = f; + f = e; + e = mask_32bit(d + temp1); + d = c; + c = b; + b = a; + a = mask_32bit(temp1 + temp2); + } + *message_digest += a; + *(message_digest + 1) += b; + *(message_digest + 2) += c; + *(message_digest + 3) += d; + *(message_digest + 4) += e; + *(message_digest + 5) += f; + *(message_digest + 6) += g; + *(message_digest + 7) += h; + for (std::size_t i = 0; i < 8; ++i) { + *(message_digest + i) = mask_32bit(*(message_digest + i)); + } +} + +} // namespace detail + +template +void output_hex(InIter first, InIter last, std::ostream& os) { + os.setf(std::ios::hex, std::ios::basefield); + while (first != last) { + os.width(2); + os.fill('0'); + os << static_cast(*first); + ++first; + } + os.setf(std::ios::dec, std::ios::basefield); +} + +template +void bytes_to_hex_string(InIter first, InIter last, std::string& hex_str) { + std::ostringstream oss; + output_hex(first, last, oss); + hex_str.assign(oss.str()); +} + +template +void bytes_to_hex_string(const InContainer& bytes, std::string& hex_str) { + bytes_to_hex_string(bytes.begin(), bytes.end(), hex_str); +} + +template +std::string bytes_to_hex_string(InIter first, InIter last) { + std::string hex_str; + bytes_to_hex_string(first, last, hex_str); + return hex_str; +} + +template +std::string bytes_to_hex_string(const InContainer& bytes) { + std::string hex_str; + bytes_to_hex_string(bytes, hex_str); + return hex_str; +} + +class hash256_one_by_one { + public: + hash256_one_by_one() { init(); } + + void init() { + buffer_.clear(); + std::fill(data_length_digits_, data_length_digits_ + 4, word_t(0)); + std::copy(detail::initial_message_digest, + detail::initial_message_digest + 8, h_); + } + + template + void process(RaIter first, RaIter last) { + add_to_data_length(static_cast(std::distance(first, last))); + std::copy(first, last, std::back_inserter(buffer_)); + std::size_t i = 0; + for (; i + 64 <= buffer_.size(); i += 64) { + detail::hash256_block(h_, buffer_.begin() + i, + buffer_.begin() + i + 64); + } + buffer_.erase(buffer_.begin(), buffer_.begin() + i); + } + + void finish() { + byte_t temp[64]; + std::fill(temp, temp + 64, byte_t(0)); + std::size_t remains = buffer_.size(); + std::copy(buffer_.begin(), buffer_.end(), temp); + assert(remains < 64); + + // This branch is not executed actually (`remains` is always lower than 64), + // but needed to avoid g++ false-positive warning. + // See https://github.com/okdshin/PicoSHA2/issues/25 + // vvvvvvvvvvvvvvvv + if(remains >= 64) { + remains = 63; + } + // ^^^^^^^^^^^^^^^^ + + temp[remains] = 0x80; + + if (remains > 55) { + std::fill(temp + remains + 1, temp + 64, byte_t(0)); + detail::hash256_block(h_, temp, temp + 64); + std::fill(temp, temp + 64 - 4, byte_t(0)); + } else { + std::fill(temp + remains + 1, temp + 64 - 4, byte_t(0)); + } + + write_data_bit_length(&(temp[56])); + detail::hash256_block(h_, temp, temp + 64); + } + + template + void get_hash_bytes(OutIter first, OutIter last) const { + for (const word_t* iter = h_; iter != h_ + 8; ++iter) { + for (std::size_t i = 0; i < 4 && first != last; ++i) { + *(first++) = detail::mask_8bit( + static_cast((*iter >> (24 - 8 * i)))); + } + } + } + + private: + void add_to_data_length(word_t n) { + word_t carry = 0; + data_length_digits_[0] += n; + for (std::size_t i = 0; i < 4; ++i) { + data_length_digits_[i] += carry; + if (data_length_digits_[i] >= 65536u) { + carry = data_length_digits_[i] >> 16; + data_length_digits_[i] &= 65535u; + } else { + break; + } + } + } + void write_data_bit_length(byte_t* begin) { + word_t data_bit_length_digits[4]; + std::copy(data_length_digits_, data_length_digits_ + 4, + data_bit_length_digits); + + // convert byte length to bit length (multiply 8 or shift 3 times left) + word_t carry = 0; + for (std::size_t i = 0; i < 4; ++i) { + word_t before_val = data_bit_length_digits[i]; + data_bit_length_digits[i] <<= 3; + data_bit_length_digits[i] |= carry; + data_bit_length_digits[i] &= 65535u; + carry = (before_val >> (16 - 3)) & 65535u; + } + + // write data_bit_length + for (int i = 3; i >= 0; --i) { + (*begin++) = static_cast(data_bit_length_digits[i] >> 8); + (*begin++) = static_cast(data_bit_length_digits[i]); + } + } + std::vector buffer_; + word_t data_length_digits_[4]; // as 64bit integer (16bit x 4 integer) + word_t h_[8]; +}; + +inline void get_hash_hex_string(const hash256_one_by_one& hasher, + std::string& hex_str) { + byte_t hash[k_digest_size]; + hasher.get_hash_bytes(hash, hash + k_digest_size); + return bytes_to_hex_string(hash, hash + k_digest_size, hex_str); +} + +inline std::string get_hash_hex_string(const hash256_one_by_one& hasher) { + std::string hex_str; + get_hash_hex_string(hasher, hex_str); + return hex_str; +} + +namespace impl { +template +void hash256_impl(RaIter first, RaIter last, OutIter first2, OutIter last2, int, + std::random_access_iterator_tag) { + hash256_one_by_one hasher; + // hasher.init(); + hasher.process(first, last); + hasher.finish(); + hasher.get_hash_bytes(first2, last2); +} + +template +void hash256_impl(InputIter first, InputIter last, OutIter first2, + OutIter last2, int buffer_size, std::input_iterator_tag) { + std::vector buffer(buffer_size); + hash256_one_by_one hasher; + // hasher.init(); + while (first != last) { + int size = buffer_size; + for (int i = 0; i != buffer_size; ++i, ++first) { + if (first == last) { + size = i; + break; + } + buffer[i] = *first; + } + hasher.process(buffer.begin(), buffer.begin() + size); + } + hasher.finish(); + hasher.get_hash_bytes(first2, last2); +} +} + +template +void hash256(InIter first, InIter last, OutIter first2, OutIter last2, + int buffer_size = PICOSHA2_BUFFER_SIZE_FOR_INPUT_ITERATOR) { + picosha2::impl::hash256_impl( + first, last, first2, last2, buffer_size, + typename std::iterator_traits::iterator_category()); +} + +template +void hash256(InIter first, InIter last, OutContainer& dst) { + hash256(first, last, dst.begin(), dst.end()); +} + +template +void hash256(const InContainer& src, OutIter first, OutIter last) { + hash256(src.begin(), src.end(), first, last); +} + +template +void hash256(const InContainer& src, OutContainer& dst) { + hash256(src.begin(), src.end(), dst.begin(), dst.end()); +} + +template +void hash256_hex_string(InIter first, InIter last, std::string& hex_str) { + byte_t hashed[k_digest_size]; + hash256(first, last, hashed, hashed + k_digest_size); + std::ostringstream oss; + output_hex(hashed, hashed + k_digest_size, oss); + hex_str.assign(oss.str()); +} + +template +std::string hash256_hex_string(InIter first, InIter last) { + std::string hex_str; + hash256_hex_string(first, last, hex_str); + return hex_str; +} + +inline void hash256_hex_string(const std::string& src, std::string& hex_str) { + hash256_hex_string(src.begin(), src.end(), hex_str); +} + +template +void hash256_hex_string(const InContainer& src, std::string& hex_str) { + hash256_hex_string(src.begin(), src.end(), hex_str); +} + +template +std::string hash256_hex_string(const InContainer& src) { + return hash256_hex_string(src.begin(), src.end()); +} +templatevoid hash256(std::ifstream& f, OutIter first, OutIter last){ + hash256(std::istreambuf_iterator(f), std::istreambuf_iterator(), first,last); + +} +}// namespace picosha2 +#endif // PICOSHA2_H diff --git a/examples/features/http_upload_download/run.sh b/examples/features/http_upload_download/run.sh index 6113049a..d312c5bf 100755 --- a/examples/features/http_upload_download/run.sh +++ b/examples/features/http_upload_download/run.sh @@ -2,7 +2,7 @@ # # Tencent is pleased to support the open source community by making tRPC available. # -# Copyright (C) 2023 Tencent. +# Copyright (C) 2023 THL A29 Limited, a Tencent company. # All rights reserved. # # If you have downloaded a copy of the tRPC source code from Tencent, diff --git a/examples/features/http_upload_download/run_cmake.sh b/examples/features/http_upload_download/run_cmake.sh index 43f4fc37..6e354606 100755 --- a/examples/features/http_upload_download/run_cmake.sh +++ b/examples/features/http_upload_download/run_cmake.sh @@ -2,7 +2,7 @@ # # Tencent is pleased to support the open source community by making tRPC available. # -# Copyright (C) 2023 Tencent. +# Copyright (C) 2023 THL A29 Limited, a Tencent company. # All rights reserved. # # If you have downloaded a copy of the tRPC source code from Tencent, diff --git a/examples/features/http_upload_download/server/BUILD b/examples/features/http_upload_download/server/BUILD index 5e98a73b..8a520c48 100644 --- a/examples/features/http_upload_download/server/BUILD +++ b/examples/features/http_upload_download/server/BUILD @@ -21,5 +21,6 @@ cc_library( hdrs = ["file_storage_handler.h"], deps = [ "@trpc_cpp//trpc/util/http/stream:http_stream_handler", + "//examples/features/http_upload_download/common:file_hasher", ], ) diff --git a/examples/features/http_upload_download/server/file_storage_handler.cc b/examples/features/http_upload_download/server/file_storage_handler.cc index d379e7fc..7e98659c 100644 --- a/examples/features/http_upload_download/server/file_storage_handler.cc +++ b/examples/features/http_upload_download/server/file_storage_handler.cc @@ -2,7 +2,7 @@ // // Tencent is pleased to support the open source community by making tRPC available. // -// Copyright (C) 2023 Tencent. +// Copyright (C) 2023 THL A29 Limited, a Tencent company. // All rights reserved. // // If you have downloaded a copy of the tRPC source code from Tencent, @@ -13,102 +13,389 @@ #include "examples/features/http_upload_download/server/file_storage_handler.h" + #include +#include + +#include "examples/features/http_upload_download/common/file_hasher.h" + +#include "../common/picosha2.h" + +struct FileSegmentHeader { + char filename[128]; // 文件名(null-terminated) + uint64_t file_size; // 文件内容字节数 + char hash_hex[64]; // 文件的 SHA256 +}; +std::string GetBaseName(const std::string& filepath) { + std::size_t pos = filepath.find_last_of("/\\"); + return (pos != std::string::npos) ? filepath.substr(pos + 1) : filepath; +} + +bool enable_limit = true; +constexpr std::size_t rate_limit_bytes_per_sec = 900 * 1024; // 512KB/s namespace http::demo { // Provides file downloading. ::trpc::Status FileStorageHandler::Get(const ::trpc::ServerContextPtr& ctx, const ::trpc::http::RequestPtr& req, ::trpc::http::Response* rsp) { - auto fin = std::ifstream(download_src_path_, std::ios::binary); + + const std::string full_url = req->GetUrl(); + std::string path_only; + std::size_t pos = full_url.find('?'); + + if (pos != std::string::npos) { + path_only = full_url.substr(0, pos); + } else { + path_only = full_url; + } + + if (path_only == "/multi-download") { + TRPC_FMT_INFO("[Multi-File Download] Start processing"); + return DownloadMultipleFiles(ctx, req, rsp); + } + + + TRPC_FMT_INFO("[Download Request] Received download signal"); + //auto fin = std::ifstream(download_src_path_, std::ios::binary); + std::ifstream fin(download_src_path_, std::ios::binary | std::ios::ate); if (!fin.is_open()) { - TRPC_FMT_ERROR("failed to open file: {}", download_src_path_); + TRPC_FMT_ERROR("[Download Request] failed to open file: {}", download_src_path_); rsp->SetStatus(::trpc::http::ResponseStatus::kInternalServerError); return ::trpc::kSuccStatus; } + std::size_t file_size = fin.tellg(); // 获取总字节数 + fin.seekg(0, std::ios::beg); // 重置文件指针以便后续读取 + rsp->SetHeader("X-File-Length", std::to_string(file_size)); + + + std::string hash_tex = CalculateSHA256(download_src_path_); + rsp->SetHeader("X-File-SHA256",hash_tex); + TRPC_FMT_INFO("[Download Request] Calculated SHA256 of downloaded file: {}", hash_tex); // Send response content in chunked. rsp->SetHeader(::trpc::http::kHeaderTransferEncoding, ::trpc::http::kTransferEncodingChunked); auto& writer = rsp->GetStream(); ::trpc::Status status = writer.WriteHeader(); if (!status.OK()) { - TRPC_FMT_ERROR("failed to send response header: {}", status.ToString()); + TRPC_FMT_ERROR("[Download Request] failed to send response header: {}", status.ToString()); return ::trpc::kStreamRstStatus; } - + std::size_t nwrite{0}; ::trpc::BufferBuilder buffer_builder; + + std::size_t sent_bytes = 0; + auto last_send_time = std::chrono::steady_clock::now(); for (;;) { - fin.read(buffer_builder.data(), buffer_builder.SizeAvailable()); + + fin.read(buffer_builder.data(), buffer_builder.SizeAvailable()); // 对当前发送内容进行哈希更新 std::size_t n = fin.gcount(); if (n > 0) { + + ::trpc::NoncontiguousBuffer buffer; - buffer.Append(buffer_builder.Seal(n)); + buffer.Append(buffer_builder.Seal(n)); status = writer.Write(std::move(buffer)); if (status.OK()) { nwrite += n; + sent_bytes+=n; + //limit + /*std::chrono::milliseconds delay( + static_cast(1000.0 * n / rate_limit_bytes_per_sec)); + std::this_thread::sleep_for(delay);*/ + // 限速控制:根据实际发送速率动态延迟 + auto now = std::chrono::steady_clock::now(); + auto duration_ms = std::chrono::duration_cast(now - last_send_time).count(); + + if (enable_limit && duration_ms > 0) { + std::size_t actual_rate = sent_bytes * 1000 / duration_ms; // bytes/sec + if (actual_rate > rate_limit_bytes_per_sec) { + std::size_t expected_duration = sent_bytes * 1000 / rate_limit_bytes_per_sec; + std::size_t extra_delay = expected_duration > duration_ms ? expected_duration - duration_ms : 0; + std::this_thread::sleep_for(std::chrono::milliseconds(extra_delay)); + } + last_send_time = std::chrono::steady_clock::now(); + sent_bytes = 0; + } + + continue; } - TRPC_FMT_ERROR("failed to write content: {}", status.ToString()); + TRPC_FMT_ERROR("[Download Request] failed to write content: {}", status.ToString()); return ::trpc::kStreamRstStatus; } else if (fin.eof()) { status = writer.WriteDone(); if (status.OK()) break; - TRPC_FMT_ERROR("failed to send write-done: {}", status.ToString()); + TRPC_FMT_ERROR("[Download Request] failed to send write-done: {}", status.ToString()); return ::trpc::kStreamRstStatus; } - TRPC_FMT_ERROR("failed to read file"); + TRPC_FMT_ERROR("[Download Request] failed to read file"); return ::trpc::kStreamRstStatus; } - TRPC_FMT_INFO("finish providing file, write size: {}", nwrite); + + TRPC_FMT_INFO("[Download Request] finish providing file, write size: {}", nwrite); return ::trpc::kSuccStatus; } // Provides file uploading. -::trpc::Status FileStorageHandler::Post(const ::trpc::ServerContextPtr& ctx, const ::trpc::http::RequestPtr& req, +// 服务端文件上传处理逻辑 +::trpc::Status FileStorageHandler::Post(const ::trpc::ServerContextPtr& ctx, + const ::trpc::http::RequestPtr& req, ::trpc::http::Response* rsp) { + std::size_t total_size = 0; + + // 从请求头中获取 Content-Length,如果有的话 if (req->HasHeader(::trpc::http::kHeaderContentLength)) { - TRPC_FMT_DEBUG("the request has Content-Length: {}", req->GetHeader(::trpc::http::kHeaderContentLength)); + total_size = std::stoull(req->GetHeader(::trpc::http::kHeaderContentLength)); + TRPC_FMT_INFO("[Upload Request] Total upload size: {} bytes", total_size); } else { - TRPC_FMT_DEBUG("the request has no Content-Length, may be chunked"); + TRPC_FMT_INFO("[Upload Request] No Content-Length header, possibly chunked upload"); } + // 打开目标文件用于接收上传内容 auto fout = std::ofstream(upload_dst_path_, std::ios::binary); if (!fout.is_open()) { - TRPC_FMT_ERROR("failed to open file: {}", download_src_path_); + TRPC_FMT_ERROR("[Upload Request] failed to open file: {}", download_src_path_); rsp->SetStatus(::trpc::http::ResponseStatus::kInternalServerError); return ::trpc::kSuccStatus; } ::trpc::Status status; - auto& reader = req->GetStream(); - constexpr std::size_t kBufferSize{1024 * 1024}; + auto& reader = req->GetStream(); // 获取 HTTP 请求体的流 + constexpr std::size_t kBufferSize{1024 * 1024}; // 每次读取 1MB std::size_t nread{0}; + auto start_time = std::chrono::steady_clock::now(); // 上传开始时间 + + // 数据读取循环 for (;;) { ::trpc::NoncontiguousBuffer buffer; status = reader.Read(buffer, kBufferSize); if (status.OK()) { + // 更新读取字节数 nread += buffer.ByteSize(); + + // 写入磁盘 for (const auto& block : buffer) { fout.write(block.data(), block.size()); } + + // 上传进度日志 + TRPC_FMT_INFO_IF(TRPC_EVERY_N(100), "Uploaded {} bytes so far", nread); + if (total_size > 0) { + const int bar_width = 50; // 控制进度条宽度 + double progress = static_cast(nread) / total_size; + int pos = static_cast(bar_width * progress); + + std::stringstream ss; + ss << "[Upload Request] Progress: ["; + + for (int i = 0; i < bar_width; ++i) { + if (i < pos) + ss << "█"; // 已完成部分 + else + ss << " "; // 未完成部分 + } + + ss << "] " << std::fixed << std::setprecision(2) << (progress * 100.0) << "%"; + + TRPC_FMT_INFO("{}", ss.str()); + } + +/*if (total_size > 0) { + double progress = static_cast(nread) / total_size * 100; + TRPC_FMT_INFO("[Upload Request] Uploaded progress: {:.2f}%", progress); + }*/ continue; } else if (status.StreamEof()) { + // 上传结束时记录速率 + auto end_time = std::chrono::steady_clock::now(); + auto duration_ms = std::chrono::duration_cast(end_time - start_time).count(); + double speed_mb = static_cast(nread) / 1024 / 1024 / (duration_ms / 1000.0); + TRPC_FMT_INFO("[Upload Request] Upload complete: {} bytes in {} ms, avg speed: {:.2f} MB/s", nread, duration_ms, speed_mb); break; } - TRPC_FMT_ERROR("failed to read request content: {}", status.ToString()); + + // 出现异常 + TRPC_FMT_ERROR("[Upload Request] failed to read request content: {}", status.ToString()); return ::trpc::kStreamRstStatus; } + // 提取客户端传来的哈希值用于比对 + std::string expected_hash; + if (req->HasHeader("X-File-Hash")) { + expected_hash = req->GetHeader("X-File-Hash"); + TRPC_FMT_INFO("[Upload Request] Expected file hash from client: {}", expected_hash); + } + + // 重新计算服务端收到的文件的哈希值 + std::string actual_hash = CalculateSHA256(upload_dst_path_); + TRPC_FMT_INFO("[Upload Request] Calculated SHA256 of uploaded file: {}", actual_hash); + + // 校验结果比对 + if (!expected_hash.empty() && actual_hash != expected_hash) { + TRPC_FMT_ERROR("[Upload Request] ❌File hash mismatch! Integrity check failed."); + rsp->SetStatus(::trpc::http::ResponseStatus::kBadRequest); + return ::trpc::kSuccStatus; + } + else{ + TRPC_FMT_INFO("[Upload Request] ✅ SHA256 verified successfully"); + } + // 响应成功 rsp->SetStatus(::trpc::http::ResponseStatus::kOk); auto& writer = rsp->GetStream(); status = writer.WriteHeader(); if (!status.OK()) { - TRPC_FMT_ERROR("failed to send response header: {}", status.ToString()); + TRPC_FMT_ERROR("[Upload Request] failed to send response header: {}", status.ToString()); return ::trpc::kStreamRstStatus; } - TRPC_FMT_INFO("finish storing the file, read size: {}", nread); + + TRPC_FMT_INFO("[Upload Request] File upload and hash verification complete, read size: {}", nread); return ::trpc::kSuccStatus; } -} // namespace http::demo + +#include +#include +#include + +std::string ParseQueryParameter(const std::string& url, const std::string& key) { + // 提取 ? 后面的 query 字符串 + std::size_t qpos = url.find('?'); + if (qpos == std::string::npos || qpos + 1 >= url.size()) return ""; + + std::string query = url.substr(qpos + 1); + std::stringstream qs_stream(query); + std::string kv; + + while (std::getline(qs_stream, kv, '&')) { + std::size_t eq_pos = kv.find('='); + if (eq_pos == std::string::npos) continue; + + std::string param_key = kv.substr(0, eq_pos); + std::string param_val = kv.substr(eq_pos + 1); + + if (param_key == key) return param_val; + } + + return ""; // 未找到指定参数 +} + +std::string ByteDump(const char* data, std::size_t size) { + std::ostringstream oss; + for (std::size_t i = 0; i < size; ++i) { + oss << std::hex << std::setw(2) << std::setfill('0') + << (static_cast(data[i]) & 0xff) << " "; + } + return oss.str(); +} + +::trpc::Status FileStorageHandler::DownloadMultipleFiles( + const ::trpc::ServerContextPtr& ctx, + const ::trpc::http::RequestPtr& req, + ::trpc::http::Response* rsp) { + + // 设置为 chunked 编码响应 + rsp->SetHeader(::trpc::http::kHeaderTransferEncoding, ::trpc::http::kTransferEncodingChunked); + auto& writer = rsp->GetStream(); + + ::trpc::Status status = writer.WriteHeader(); + if (!status.OK()) { + TRPC_FMT_ERROR("[Multi-File Download] failed to write response header"); + return ::trpc::kStreamRstStatus; + } + + + std::string files_param = ParseQueryParameter(req->GetUrl(), "files"); + std::vector file_paths; + + std::stringstream ss(files_param); + std::string item; + while (std::getline(ss, item, ',')) { + if (!item.empty()) { + file_paths.push_back( item); + TRPC_FMT_INFO("{}", item); + } + } + + for (const auto& path : file_paths) { + std::string hash_before = CalculateSHA256(path); + TRPC_FMT_INFO("[Multi-File Download] 📦 Pre-read hash of {}: {}", path, hash_before); + + std::ifstream fin; + fin.open(path, std::ios::binary | std::ios::ate); + + //std::ifstream fin(path, std::ios::binary | std::ios::ate); + if (!fin.is_open()) { + TRPC_FMT_ERROR("[Multi-File Download] failed to open file: {}", path); + continue; // 可跳过或直接返回错误 + } + + std::size_t file_size = fin.tellg(); + fin.clear(); // 清除状态 + fin.seekg(0, std::ios::beg); // 保证从头读取 + + // 构造 header + FileSegmentHeader header; + std::memset(&header, 0, sizeof(header)); + //TRPC_FMT_INFO("📦 filename before strncpy: {}", GetBaseName(path).c_str()); + std::strncpy(header.filename, GetBaseName(path).c_str(), sizeof(header.filename)); + //TRPC_FMT_INFO("📦 header.filername after strncpy: {}", std::string(header.filename)); + std::string hash_hex = CalculateSHA256(path); + //TRPC_FMT_INFO("📦 hash_hex before strncpy: {}", hash_hex); + std::strncpy(header.hash_hex, hash_hex.c_str(), sizeof(header.hash_hex)); + //std::snprintf(header.hash_hex, sizeof(header.hash_hex), "%s", hash_hex.c_str()); + + //TRPC_FMT_INFO("📦 header.hash_hex after strncpy: {}", std::string(header.hash_hex)); + //TRPC_FMT_INFO("🧬 header.hash_hex raw dump: {}", ByteDump(header.hash_hex, sizeof(header.hash_hex))); + + header.file_size = file_size; + + // 发送 header + ::trpc::NoncontiguousBufferBuilder builder; + + + builder.Append(static_cast(&header), sizeof(header)); + writer.Write(builder.DestructiveGet()); + + + + + for (;;) { + + + + ::trpc::BufferBuilder buffer_builder; + std::size_t nwrite{0}; + fin.read(buffer_builder.data(), buffer_builder.SizeAvailable()); + std::size_t n = fin.gcount(); + if (n > 0) { + ::trpc::NoncontiguousBuffer buffer; + buffer.Append(buffer_builder.Seal(n)); + //TRPC_FMT_INFO("📦 First 16 bytes of chunk: {}", ByteDump(buffer.begin()->data(), std::min(16, buffer.begin()->size()))); + + status = writer.Write(std::move(buffer)); + if (!status.OK()) { + TRPC_FMT_ERROR("failed to write content: {}", status.ToString()); + return ::trpc::kStreamRstStatus; + } + nwrite += n; + } else if (fin.eof()) { + break; + } else { + TRPC_FMT_ERROR("failed to read file: {}", path); + return ::trpc::kStreamRstStatus; + } + } + + fin.close(); + TRPC_FMT_INFO("✅ Sent {}, size: {}, hash: {}", header.filename, header.file_size, hash_hex); + } + + writer.WriteDone(); + return ::trpc::kSuccStatus; +} + +} + + diff --git a/examples/features/http_upload_download/server/file_storage_handler.h b/examples/features/http_upload_download/server/file_storage_handler.h index 9386194e..ef2d8e58 100644 --- a/examples/features/http_upload_download/server/file_storage_handler.h +++ b/examples/features/http_upload_download/server/file_storage_handler.h @@ -2,7 +2,7 @@ // // Tencent is pleased to support the open source community by making tRPC available. // -// Copyright (C) 2023 Tencent. +// Copyright (C) 2023 THL A29 Limited, a Tencent company. // All rights reserved. // // If you have downloaded a copy of the tRPC source code from Tencent, @@ -29,6 +29,12 @@ class FileStorageHandler : public ::trpc::http::HttpStreamHandler { // Provides file uploading. ::trpc::Status Post(const ::trpc::ServerContextPtr& ctx, const ::trpc::http::RequestPtr& req, ::trpc::http::Response* rsp) override; + + ::trpc::Status DownloadMultipleFiles( + const ::trpc::ServerContextPtr& ctx, + const ::trpc::http::RequestPtr& req, + ::trpc::http::Response* rsp); + private: std::string upload_dst_path_{"upload_dst.bin"}; diff --git a/examples/features/http_upload_download/server/http_server.cc b/examples/features/http_upload_download/server/http_server.cc index 636c969c..8c1f5c06 100644 --- a/examples/features/http_upload_download/server/http_server.cc +++ b/examples/features/http_upload_download/server/http_server.cc @@ -2,7 +2,7 @@ // // Tencent is pleased to support the open source community by making tRPC available. // -// Copyright (C) 2023 Tencent. +// Copyright (C) 2023 THL A29 Limited, a Tencent company. // All rights reserved. // // If you have downloaded a copy of the tRPC source code from Tencent, @@ -31,11 +31,19 @@ class HttpdServer : public ::trpc::TrpcApp { int Initialize() override { auto file_storage_handler = std::make_shared(FLAGS_upload_dst_path, FLAGS_download_src_path); + + + + auto SetHttpRoutes = [file_storage_handler](::trpc::http::HttpRoutes& r) -> void { // Provides file downloading. r.Add(::trpc::http::MethodType::GET, ::trpc::http::Path("/download"), file_storage_handler); // Provides file uploading. r.Add(::trpc::http::MethodType::POST, ::trpc::http::Path("/upload"), file_storage_handler); + + // 多文件下载的路由注册,注意 lambda 捕获 & 返回类型 + + r.Add(::trpc::http::MethodType::GET, ::trpc::http::Path("/multi-download"), file_storage_handler); }; auto http_service = std::make_shared<::trpc::HttpService>(); diff --git a/examples/features/http_upload_download/server/picosha2.h b/examples/features/http_upload_download/server/picosha2.h new file mode 100644 index 00000000..ef965e65 --- /dev/null +++ b/examples/features/http_upload_download/server/picosha2.h @@ -0,0 +1,388 @@ +/* +The MIT License (MIT) + +Copyright (C) 2017 okdshin + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ +#ifndef PICOSHA2_H +#define PICOSHA2_H +// picosha2:20140213 + +#ifndef PICOSHA2_BUFFER_SIZE_FOR_INPUT_ITERATOR +#define PICOSHA2_BUFFER_SIZE_FOR_INPUT_ITERATOR \ + 1048576 //=1024*1024: default is 1MB memory +#endif + +#include +#include +#include +#include +#include +#include +namespace picosha2 { +typedef unsigned long word_t; +typedef unsigned char byte_t; + +static const size_t k_digest_size = 32; + +namespace detail { +inline byte_t mask_8bit(byte_t x) { return x & 0xff; } + +inline word_t mask_32bit(word_t x) { return x & 0xffffffff; } + +const word_t add_constant[64] = { + 0x428a2f98, 0x71374491, 0xb5c0fbcf, 0xe9b5dba5, 0x3956c25b, 0x59f111f1, + 0x923f82a4, 0xab1c5ed5, 0xd807aa98, 0x12835b01, 0x243185be, 0x550c7dc3, + 0x72be5d74, 0x80deb1fe, 0x9bdc06a7, 0xc19bf174, 0xe49b69c1, 0xefbe4786, + 0x0fc19dc6, 0x240ca1cc, 0x2de92c6f, 0x4a7484aa, 0x5cb0a9dc, 0x76f988da, + 0x983e5152, 0xa831c66d, 0xb00327c8, 0xbf597fc7, 0xc6e00bf3, 0xd5a79147, + 0x06ca6351, 0x14292967, 0x27b70a85, 0x2e1b2138, 0x4d2c6dfc, 0x53380d13, + 0x650a7354, 0x766a0abb, 0x81c2c92e, 0x92722c85, 0xa2bfe8a1, 0xa81a664b, + 0xc24b8b70, 0xc76c51a3, 0xd192e819, 0xd6990624, 0xf40e3585, 0x106aa070, + 0x19a4c116, 0x1e376c08, 0x2748774c, 0x34b0bcb5, 0x391c0cb3, 0x4ed8aa4a, + 0x5b9cca4f, 0x682e6ff3, 0x748f82ee, 0x78a5636f, 0x84c87814, 0x8cc70208, + 0x90befffa, 0xa4506ceb, 0xbef9a3f7, 0xc67178f2}; + +const word_t initial_message_digest[8] = {0x6a09e667, 0xbb67ae85, 0x3c6ef372, + 0xa54ff53a, 0x510e527f, 0x9b05688c, + 0x1f83d9ab, 0x5be0cd19}; + +inline word_t ch(word_t x, word_t y, word_t z) { return (x & y) ^ ((~x) & z); } + +inline word_t maj(word_t x, word_t y, word_t z) { + return (x & y) ^ (x & z) ^ (y & z); +} + +inline word_t rotr(word_t x, std::size_t n) { + assert(n < 32); + return mask_32bit((x >> n) | (x << (32 - n))); +} + +inline word_t bsig0(word_t x) { return rotr(x, 2) ^ rotr(x, 13) ^ rotr(x, 22); } + +inline word_t bsig1(word_t x) { return rotr(x, 6) ^ rotr(x, 11) ^ rotr(x, 25); } + +inline word_t shr(word_t x, std::size_t n) { + assert(n < 32); + return x >> n; +} + +inline word_t ssig0(word_t x) { return rotr(x, 7) ^ rotr(x, 18) ^ shr(x, 3); } + +inline word_t ssig1(word_t x) { return rotr(x, 17) ^ rotr(x, 19) ^ shr(x, 10); } + +template +void hash256_block(RaIter1 message_digest, RaIter2 first, RaIter2 last) { + assert(first + 64 == last); + static_cast(last); // for avoiding unused-variable warning + word_t w[64]; + std::fill(w, w + 64, word_t(0)); + for (std::size_t i = 0; i < 16; ++i) { + w[i] = (static_cast(mask_8bit(*(first + i * 4))) << 24) | + (static_cast(mask_8bit(*(first + i * 4 + 1))) << 16) | + (static_cast(mask_8bit(*(first + i * 4 + 2))) << 8) | + (static_cast(mask_8bit(*(first + i * 4 + 3)))); + } + for (std::size_t i = 16; i < 64; ++i) { + w[i] = mask_32bit(ssig1(w[i - 2]) + w[i - 7] + ssig0(w[i - 15]) + + w[i - 16]); + } + + word_t a = *message_digest; + word_t b = *(message_digest + 1); + word_t c = *(message_digest + 2); + word_t d = *(message_digest + 3); + word_t e = *(message_digest + 4); + word_t f = *(message_digest + 5); + word_t g = *(message_digest + 6); + word_t h = *(message_digest + 7); + + for (std::size_t i = 0; i < 64; ++i) { + word_t temp1 = h + bsig1(e) + ch(e, f, g) + add_constant[i] + w[i]; + word_t temp2 = bsig0(a) + maj(a, b, c); + h = g; + g = f; + f = e; + e = mask_32bit(d + temp1); + d = c; + c = b; + b = a; + a = mask_32bit(temp1 + temp2); + } + *message_digest += a; + *(message_digest + 1) += b; + *(message_digest + 2) += c; + *(message_digest + 3) += d; + *(message_digest + 4) += e; + *(message_digest + 5) += f; + *(message_digest + 6) += g; + *(message_digest + 7) += h; + for (std::size_t i = 0; i < 8; ++i) { + *(message_digest + i) = mask_32bit(*(message_digest + i)); + } +} + +} // namespace detail + +template +void output_hex(InIter first, InIter last, std::ostream& os) { + os.setf(std::ios::hex, std::ios::basefield); + while (first != last) { + os.width(2); + os.fill('0'); + os << static_cast(*first); + ++first; + } + os.setf(std::ios::dec, std::ios::basefield); +} + +template +void bytes_to_hex_string(InIter first, InIter last, std::string& hex_str) { + std::ostringstream oss; + output_hex(first, last, oss); + hex_str.assign(oss.str()); +} + +template +void bytes_to_hex_string(const InContainer& bytes, std::string& hex_str) { + bytes_to_hex_string(bytes.begin(), bytes.end(), hex_str); +} + +template +std::string bytes_to_hex_string(InIter first, InIter last) { + std::string hex_str; + bytes_to_hex_string(first, last, hex_str); + return hex_str; +} + +template +std::string bytes_to_hex_string(const InContainer& bytes) { + std::string hex_str; + bytes_to_hex_string(bytes, hex_str); + return hex_str; +} + +class hash256_one_by_one { + public: + hash256_one_by_one() { init(); } + + void init() { + buffer_.clear(); + std::fill(data_length_digits_, data_length_digits_ + 4, word_t(0)); + std::copy(detail::initial_message_digest, + detail::initial_message_digest + 8, h_); + } + + template + void process(RaIter first, RaIter last) { + add_to_data_length(static_cast(std::distance(first, last))); + std::copy(first, last, std::back_inserter(buffer_)); + std::size_t i = 0; + for (; i + 64 <= buffer_.size(); i += 64) { + detail::hash256_block(h_, buffer_.begin() + i, + buffer_.begin() + i + 64); + } + buffer_.erase(buffer_.begin(), buffer_.begin() + i); + } + + void finish() { + byte_t temp[64]; + std::fill(temp, temp + 64, byte_t(0)); + std::size_t remains = buffer_.size(); + std::copy(buffer_.begin(), buffer_.end(), temp); + assert(remains < 64); + + // This branch is not executed actually (`remains` is always lower than 64), + // but needed to avoid g++ false-positive warning. + // See https://github.com/okdshin/PicoSHA2/issues/25 + // vvvvvvvvvvvvvvvv + if(remains >= 64) { + remains = 63; + } + // ^^^^^^^^^^^^^^^^ + + temp[remains] = 0x80; + + if (remains > 55) { + std::fill(temp + remains + 1, temp + 64, byte_t(0)); + detail::hash256_block(h_, temp, temp + 64); + std::fill(temp, temp + 64 - 4, byte_t(0)); + } else { + std::fill(temp + remains + 1, temp + 64 - 4, byte_t(0)); + } + + write_data_bit_length(&(temp[56])); + detail::hash256_block(h_, temp, temp + 64); + } + + template + void get_hash_bytes(OutIter first, OutIter last) const { + for (const word_t* iter = h_; iter != h_ + 8; ++iter) { + for (std::size_t i = 0; i < 4 && first != last; ++i) { + *(first++) = detail::mask_8bit( + static_cast((*iter >> (24 - 8 * i)))); + } + } + } + + private: + void add_to_data_length(word_t n) { + word_t carry = 0; + data_length_digits_[0] += n; + for (std::size_t i = 0; i < 4; ++i) { + data_length_digits_[i] += carry; + if (data_length_digits_[i] >= 65536u) { + carry = data_length_digits_[i] >> 16; + data_length_digits_[i] &= 65535u; + } else { + break; + } + } + } + void write_data_bit_length(byte_t* begin) { + word_t data_bit_length_digits[4]; + std::copy(data_length_digits_, data_length_digits_ + 4, + data_bit_length_digits); + + // convert byte length to bit length (multiply 8 or shift 3 times left) + word_t carry = 0; + for (std::size_t i = 0; i < 4; ++i) { + word_t before_val = data_bit_length_digits[i]; + data_bit_length_digits[i] <<= 3; + data_bit_length_digits[i] |= carry; + data_bit_length_digits[i] &= 65535u; + carry = (before_val >> (16 - 3)) & 65535u; + } + + // write data_bit_length + for (int i = 3; i >= 0; --i) { + (*begin++) = static_cast(data_bit_length_digits[i] >> 8); + (*begin++) = static_cast(data_bit_length_digits[i]); + } + } + std::vector buffer_; + word_t data_length_digits_[4]; // as 64bit integer (16bit x 4 integer) + word_t h_[8]; +}; + +inline void get_hash_hex_string(const hash256_one_by_one& hasher, + std::string& hex_str) { + byte_t hash[k_digest_size]; + hasher.get_hash_bytes(hash, hash + k_digest_size); + return bytes_to_hex_string(hash, hash + k_digest_size, hex_str); +} + +inline std::string get_hash_hex_string(const hash256_one_by_one& hasher) { + std::string hex_str; + get_hash_hex_string(hasher, hex_str); + return hex_str; +} + +namespace impl { +template +void hash256_impl(RaIter first, RaIter last, OutIter first2, OutIter last2, int, + std::random_access_iterator_tag) { + hash256_one_by_one hasher; + // hasher.init(); + hasher.process(first, last); + hasher.finish(); + hasher.get_hash_bytes(first2, last2); +} + +template +void hash256_impl(InputIter first, InputIter last, OutIter first2, + OutIter last2, int buffer_size, std::input_iterator_tag) { + std::vector buffer(buffer_size); + hash256_one_by_one hasher; + // hasher.init(); + while (first != last) { + int size = buffer_size; + for (int i = 0; i != buffer_size; ++i, ++first) { + if (first == last) { + size = i; + break; + } + buffer[i] = *first; + } + hasher.process(buffer.begin(), buffer.begin() + size); + } + hasher.finish(); + hasher.get_hash_bytes(first2, last2); +} +} + +template +void hash256(InIter first, InIter last, OutIter first2, OutIter last2, + int buffer_size = PICOSHA2_BUFFER_SIZE_FOR_INPUT_ITERATOR) { + picosha2::impl::hash256_impl( + first, last, first2, last2, buffer_size, + typename std::iterator_traits::iterator_category()); +} + +template +void hash256(InIter first, InIter last, OutContainer& dst) { + hash256(first, last, dst.begin(), dst.end()); +} + +template +void hash256(const InContainer& src, OutIter first, OutIter last) { + hash256(src.begin(), src.end(), first, last); +} + +template +void hash256(const InContainer& src, OutContainer& dst) { + hash256(src.begin(), src.end(), dst.begin(), dst.end()); +} + +template +void hash256_hex_string(InIter first, InIter last, std::string& hex_str) { + byte_t hashed[k_digest_size]; + hash256(first, last, hashed, hashed + k_digest_size); + std::ostringstream oss; + output_hex(hashed, hashed + k_digest_size, oss); + hex_str.assign(oss.str()); +} + +template +std::string hash256_hex_string(InIter first, InIter last) { + std::string hex_str; + hash256_hex_string(first, last, hex_str); + return hex_str; +} + +inline void hash256_hex_string(const std::string& src, std::string& hex_str) { + hash256_hex_string(src.begin(), src.end(), hex_str); +} + +template +void hash256_hex_string(const InContainer& src, std::string& hex_str) { + hash256_hex_string(src.begin(), src.end(), hex_str); +} + +template +std::string hash256_hex_string(const InContainer& src) { + return hash256_hex_string(src.begin(), src.end()); +} +templatevoid hash256(std::ifstream& f, OutIter first, OutIter last){ + hash256(std::istreambuf_iterator(f), std::istreambuf_iterator(), first,last); + +} +}// namespace picosha2 +#endif // PICOSHA2_H diff --git a/examples/features/http_upload_download/server/trpc_cpp_fiber.yaml b/examples/features/http_upload_download/server/trpc_cpp_fiber.yaml index 4e58f091..0de98af9 100644 --- a/examples/features/http_upload_download/server/trpc_cpp_fiber.yaml +++ b/examples/features/http_upload_download/server/trpc_cpp_fiber.yaml @@ -24,3 +24,5 @@ plugins: local_file: eol: true filename: http_upload_download_server.log + stdout: # 添加这一块 + format: "[%Y-%m-%d %H:%M:%S.%e] [thread %t] [%l] [%@] %v"