diff --git a/DirectProgramming/C++SYCL_FPGA/ReferenceDesigns/db/README.md b/DirectProgramming/C++SYCL_FPGA/ReferenceDesigns/db/README.md index f74e919b50..c66454b764 100644 --- a/DirectProgramming/C++SYCL_FPGA/ReferenceDesigns/db/README.md +++ b/DirectProgramming/C++SYCL_FPGA/ReferenceDesigns/db/README.md @@ -76,12 +76,18 @@ This design leverages concepts discussed in the [FPGA tutorials](/DirectProgramm ### Query Implementations -The following sections describe at a high level how queries 1, 11 and 12 are implemented on the FPGA using a set of generalized database operators (found in `db_utils/`). In the block diagrams below, the blocks are oneAPI kernels, and the arrows represent `pipes` that shows the flow of data from one kernel to another. +The following sections describe at a high level how queries 1, 9, 11 and 12 are implemented on the FPGA using a set of generalized database operators (found in `db_utils/`). In the block diagrams below, the blocks are oneAPI kernels, and the arrows represent `pipes` that shows the flow of data from one kernel to another. #### Query 1 Query 1 is the simplest of the four queries and only uses the `Accumulator` database operator. The query streams in each row of the LINEITEM table and performs computation on each row. +#### Query 9 + +Query 9 is the most complicated of the four queries and utilizes all database operators (`LikeRegex`, `Accumulator`, `MapJoin`, `MergeJoin`, `DuplicateMergeJoin`, and `FifoSort`). The block diagram of the design is shown below. + +![](assets/q9.png) + #### Query 11 Query 11 showcases the `MapJoin` and `FifoSort` database operators. The block diagram of the design is shown below. @@ -101,6 +107,8 @@ Query 12 showcases the `MergeJoin` database operator. The block diagram of the d |`dbdata.cpp` | Contains code to parse the database input files and validate the query output |`dbdata.hpp` | Definitions of database related data structures and parsing functions |`query1/query1_kernel.cpp` | Contains the kernel for Query 1 +|`query9/query9_kernel.cpp` | Contains the kernel for Query 9 +|`query9/pipe_types.cpp` | All data types and instantiations for pipes used in query 9 |`query11/query11_kernel.cpp` | Contains the kernel for Query 11 |`query11/pipe_types.cpp` | All data types and instantiations for pipes used in query 11 |`query12/query12_kernel.cpp` | Contains the kernel for Query 12 @@ -142,7 +150,7 @@ Query 12 showcases the `MergeJoin` database operator. The block diagram of the d cd build cmake .. -DQUERY=1 ``` - `-DQUERY=` can be any of the following query numbers: `1`, `11` or `12`. + `-DQUERY=` can be any of the following query numbers: `1`, `9`, `11` or `12`. 3. Compile the design. (The provided targets match the recommended development flow.) @@ -156,12 +164,14 @@ Query 12 showcases the `MergeJoin` database operator. The block diagram of the d ``` The report resides at `db_report.prj/reports/report.html`. + >**Note**: If you are compiling Query 9 (`-DQUERY=9`), expect a long report generation time. You can download pre-generated reports from [https://iotdk.intel.com/fpga-precompiled-binaries/latest/db.fpga.tar.gz](https://iotdk.intel.com/fpga-precompiled-binaries/latest/db.fpga.tar.gz). + 3. Compile for FPGA hardware (longer compile time, targets FPGA device). ``` make fpga ``` - When building for hardware, the default scale factor is **1**. To use the smaller scale factor of 0.01, add the flag `-DSF_SMALL=1` to the original `cmake` command. For example: `cmake .. -DQUERY=11 -DSF_SMALL=1`. See the [Database files](#database-files) for more information. + When building for hardware, the default scale factor is **1**. To use the smaller scale factor of 0.01, add the flag `-DSF_SMALL=1` to the original `cmake` command. For example: `cmake .. -DQUERY=9 -DSF_SMALL=1`. See the [Database files](#database-files) for more information. (Optional) The hardware compile may take several hours to complete. You can download a pre-compiled binary (compatible with Linux* Ubuntu* 18.04) for an Intel® FPGA PAC D5005 (with Intel Stratix® 10 SX) from [https://iotdk.intel.com/fpga-precompiled-binaries/latest/db.fpga.tar.gz](https://iotdk.intel.com/fpga-precompiled-binaries/latest/db.fpga.tar.gz). @@ -176,7 +186,7 @@ Query 12 showcases the `MergeJoin` database operator. The block diagram of the d cd build cmake -G "NMake Makefiles" -DQUERY=1 ``` - `-DQUERY=` can be any of the following query numbers: `1`, `11` or `12`. + `-DQUERY=` can be any of the following query numbers: `1`, `9`, `11` or `12`. 3. Compile the design. (The provided targets match the recommended development flow.) @@ -191,6 +201,8 @@ Query 12 showcases the `MergeJoin` database operator. The block diagram of the d ``` The report resides at `db_report.prj/reports/report.html` directory. + >**Note**: If you are compiling Query 9 (`-DQUERY=9`), expect a long report generation time. + 3. Compile for FPGA hardware (longer compile time, targets FPGA device): ``` nmake fpga @@ -216,7 +228,7 @@ Query 12 showcases the `MergeJoin` database operator. The block diagram of the d ``` ./db.fpga_emu --dbroot=../data/sf0.01 --test ``` - (Optional) Run the design for queries `11` and `12`. + (Optional) Run the design for queries `9`, `11` and `12`. 2. Run the design on an FPGA device. ``` @@ -229,7 +241,7 @@ Query 12 showcases the `MergeJoin` database operator. The block diagram of the d ``` db.fpga_emu.exe --dbroot=../data/sf0.01 --test ``` - (Optional) Run the design for queries `11` and `12`. + (Optional) Run the design for queries `9`, `11` and `12`. 2. Run the sample on an FPGA device. ``` diff --git a/DirectProgramming/C++SYCL_FPGA/ReferenceDesigns/db/assets/q9.png b/DirectProgramming/C++SYCL_FPGA/ReferenceDesigns/db/assets/q9.png new file mode 100644 index 0000000000..67c479f2f0 Binary files /dev/null and b/DirectProgramming/C++SYCL_FPGA/ReferenceDesigns/db/assets/q9.png differ diff --git a/DirectProgramming/C++SYCL_FPGA/ReferenceDesigns/db/sample.json b/DirectProgramming/C++SYCL_FPGA/ReferenceDesigns/db/sample.json index 85b4f220b1..b0a302aff6 100755 --- a/DirectProgramming/C++SYCL_FPGA/ReferenceDesigns/db/sample.json +++ b/DirectProgramming/C++SYCL_FPGA/ReferenceDesigns/db/sample.json @@ -30,6 +30,17 @@ "./db.fpga_emu --dbroot=../data/sf0.01 --test" ] }, + { + "id": "fpga_emu_q9", + "steps": [ + "icpx --version", + "mkdir build-q9", + "cd build-q9", + "cmake .. -DQUERY=9", + "make fpga_emu", + "./db.fpga_emu --dbroot=../data/sf0.01 --test" + ] + }, { "id": "fpga_emu_q11", "steps": [ @@ -97,6 +108,19 @@ "db.fpga_emu.exe --dbroot=../data/sf0.01 --test" ] }, + { + "id": "fpga_emu_q9", + "steps": [ + "icpx --version", + "cd ../..", + "mkdir build-q9", + "cd build-q9", + "xcopy /E ..\\ReferenceDesigns\\db\\data ..\\data\\", + "cmake -G \"NMake Makefiles\" ../ReferenceDesigns/db -DQUERY=9", + "nmake fpga_emu", + "db.fpga_emu.exe --dbroot=../data/sf0.01 --test" + ] + }, { "id": "fpga_emu_q11", "steps": [ diff --git a/DirectProgramming/C++SYCL_FPGA/ReferenceDesigns/db/src/CMakeLists.txt b/DirectProgramming/C++SYCL_FPGA/ReferenceDesigns/db/src/CMakeLists.txt index 1841854cbb..30849bf784 100755 --- a/DirectProgramming/C++SYCL_FPGA/ReferenceDesigns/db/src/CMakeLists.txt +++ b/DirectProgramming/C++SYCL_FPGA/ReferenceDesigns/db/src/CMakeLists.txt @@ -15,6 +15,9 @@ endif() if(${QUERY} EQUAL 1) set(DEFAULT_BOARD "intel_a10gx_pac:pac_a10") set(DEFAULT_BOARD_STR "Intel Arria(R) 10 GX") +elseif(${QUERY} EQUAL 9) + set(DEFAULT_BOARD "intel_s10sx_pac:pac_s10") + set(DEFAULT_BOARD_STR "Intel Stratix(R) 10 SX") elseif(${QUERY} EQUAL 11) set(DEFAULT_BOARD "intel_s10sx_pac:pac_s10") set(DEFAULT_BOARD_STR "Intel Stratix(R) 10 SX") @@ -42,8 +45,8 @@ else() endif() # ensure a supported query was requested -if(NOT ${QUERY} EQUAL 1 AND NOT ${QUERY} EQUAL 11 AND NOT ${QUERY} EQUAL 12) - message(FATAL_ERROR "\tQUERY ${QUERY} not supported (supported queries are 1, 11 and 12)") +if(NOT ${QUERY} EQUAL 1 AND NOT ${QUERY} EQUAL 9 AND NOT ${QUERY} EQUAL 11 AND NOT ${QUERY} EQUAL 12) + message(FATAL_ERROR "\tQUERY ${QUERY} not supported (supported queries are 1, 9, 11 and 12)") endif() # Pick the default seed if the user did not specify one to CMake. @@ -52,6 +55,8 @@ if(NOT DEFINED SEED) if(${FPGA_DEVICE} MATCHES ".*a10.*") if(${QUERY} EQUAL 1) set(SEED "-Xsseed=2") + elseif(${QUERY} EQUAL 9) + set(SEED "-Xsseed=2") elseif(${QUERY} EQUAL 11) set(SEED "-Xsseed=4") elseif(${QUERY} EQUAL 12) @@ -60,6 +65,8 @@ if(NOT DEFINED SEED) elseif(${FPGA_DEVICE} MATCHES ".*s10.*") if(${QUERY} EQUAL 1) set(SEED "-Xsseed=3") + elseif(${QUERY} EQUAL 9) + set(SEED "-Xsseed=2") elseif(${QUERY} EQUAL 11) set(SEED "-Xsseed=3") elseif(${QUERY} EQUAL 12) @@ -68,6 +75,8 @@ if(NOT DEFINED SEED) elseif(${FPGA_DEVICE} MATCHES ".*agilex.*") if(${QUERY} EQUAL 1) set(SEED "-Xsseed=2") + elseif(${QUERY} EQUAL 9) + set(SEED "-Xsseed=2") elseif(${QUERY} EQUAL 11) set(SEED "-Xsseed=4") elseif(${QUERY} EQUAL 12) @@ -82,10 +91,10 @@ if(IGNORE_DEFAULT_SEED) set(SEED "") endif() -# Error out if trying to run Q11 on Arria 10 +# Error out if trying to run Q9 or Q11 on Arria 10 if (${FPGA_DEVICE} MATCHES ".*a10.*") - if(${QUERY} EQUAL 11) - message(FATAL_ERROR "Query 11 is not supported on Arria 10 devices") + if(${QUERY} EQUAL 9 OR ${QUERY} EQUAL 11) + message(FATAL_ERROR "Queries 9 and 11 are not supported on Arria 10 devices") endif() endif() @@ -106,6 +115,9 @@ endif() if(${QUERY} EQUAL 1) set(DEVICE_SOURCE query1/query1_kernel.cpp) set(DEVICE_HEADER query1/query1_kernel.hpp) +elseif(${QUERY} EQUAL 9) + set(DEVICE_SOURCE query9/query9_kernel.cpp) + set(DEVICE_HEADER query9/query9_kernel.hpp) elseif(${QUERY} EQUAL 11) set(DEVICE_SOURCE query11/query11_kernel.cpp) set(DEVICE_HEADER query11/query11_kernel.hpp) @@ -113,7 +125,7 @@ elseif(${QUERY} EQUAL 12) set(DEVICE_SOURCE query12/query12_kernel.cpp) set(DEVICE_HEADER query12/query12_kernel.hpp) else() - message(FATAL_ERROR "\tQUERY ${QUERY} not supported (supported queries are 1, 11 and 12)") + message(FATAL_ERROR "\tQUERY ${QUERY} not supported (supported queries are 1, 9, 11 and 12)") endif() # A SYCL ahead-of-time (AoT) compile processes the device code in two stages. diff --git a/DirectProgramming/C++SYCL_FPGA/ReferenceDesigns/db/src/db.cpp b/DirectProgramming/C++SYCL_FPGA/ReferenceDesigns/db/src/db.cpp index d41a3bf1e3..fee2020eb8 100644 --- a/DirectProgramming/C++SYCL_FPGA/ReferenceDesigns/db/src/db.cpp +++ b/DirectProgramming/C++SYCL_FPGA/ReferenceDesigns/db/src/db.cpp @@ -42,6 +42,11 @@ using namespace sycl; bool DoQuery1(queue& q, Database& dbinfo, std::string& db_root_dir, std::string& args, bool test, bool print, double& kernel_latency, double& total_latency); +#elif (QUERY == 9) +#include "query9/query9_kernel.hpp" +bool DoQuery9(queue& q, Database& dbinfo, std::string& db_root_dir, + std::string& args, bool test, bool print, double& kernel_latency, + double& total_latency); #elif (QUERY == 11) #include "query11/query11_kernel.hpp" bool DoQuery11(queue& q, Database& dbinfo, std::string& db_root_dir, @@ -168,9 +173,9 @@ int main(int argc, char* argv[]) { } // make sure the query is supported - if (!(query == 1 || query == 11 || query == 12)) { + if (!(query == 1 || query == 9 || query == 11 || query == 12)) { std::cerr << "ERROR: unsupported query (" << query << "). " - << "Only queries 1, 11 and 12 are supported\n"; + << "Only queries 1, 9, 11 and 12 are supported\n"; return 1; } @@ -224,6 +229,13 @@ int main(int argc, char* argv[]) { success = DoQuery1(q, dbinfo, db_root_dir, args, test_query, print_result, kernel_latency[run], total_latency[run]); +#endif + } else if (query == 9) { + // query9 +#if (QUERY == 9) + success = DoQuery9(q, dbinfo, db_root_dir, args, + test_query, print_result, + kernel_latency[run], total_latency[run]); #endif } else if (query == 11) { // query11 @@ -351,6 +363,51 @@ bool DoQuery1(queue& q, Database& dbinfo, std::string& db_root_dir, } #endif +#if (QUERY == 9) +bool DoQuery9(queue& q, Database& dbinfo, std::string& db_root_dir, + std::string& args, bool test, bool print, double& kernel_latency, + double& total_latency) { + // the default colour regex based on the TPCH documents + std::string colour = "GREEN"; + + // parse the query arguments + if (!test && !args.empty()) { + std::stringstream ss(args); + std::getline(ss, colour, ','); + } else { + if (!args.empty()) { + std::cout << "Testing query 9, therefore ignoring the '--args' flag\n"; + } + } + + // convert the colour regex to uppercase characters (convention) + transform(colour.begin(), colour.end(), colour.begin(), ::toupper); + + std::cout << "Running Q9 with colour regex: " << colour << std::endl; + + // the output of the query + std::array sum_profit; + + // perform the query + bool success = SubmitQuery9(q, dbinfo, colour, sum_profit, kernel_latency, + total_latency); + + if (success) { + // validate the results of the query, if requested + if (test) { + success = dbinfo.ValidateQ9(db_root_dir, sum_profit); + } + + // print the results of the query, if requested + if (print) { + dbinfo.PrintQ9(sum_profit); + } + } + + return success; +} +#endif + #if (QUERY == 11) bool DoQuery11(queue& q, Database& dbinfo, std::string& db_root_dir, std::string& args, bool test, bool print, double& kernel_latency, diff --git a/DirectProgramming/C++SYCL_FPGA/ReferenceDesigns/db/src/dbdata.cpp b/DirectProgramming/C++SYCL_FPGA/ReferenceDesigns/db/src/dbdata.cpp index a4640e513b..09efcf86ee 100644 --- a/DirectProgramming/C++SYCL_FPGA/ReferenceDesigns/db/src/dbdata.cpp +++ b/DirectProgramming/C++SYCL_FPGA/ReferenceDesigns/db/src/dbdata.cpp @@ -625,6 +625,59 @@ bool Database::ValidateQ1(std::string db_root_dir, return valid; } +// +// validate the results of Query 9 +// +bool Database::ValidateQ9(std::string db_root_dir, + std::array& sum_profit) { + std::cout << "Validating query 9 test results" << std::endl; + + // populate date row by row (as presented in the file) + std::string path(db_root_dir + kSeparator + "answers" + kSeparator + "q9.out"); + std::ifstream ifs(path); + std::string line; + + bool valid = true; + + if (!ifs.is_open()) { + std::cout << "Failed to open " << path << "\n"; + return false; + } + + // do nothing with the first line, it is a header line + std::getline(ifs, line); + + while (std::getline(ifs, line)) { + // split row into column strings by separator ('|') + std::vector column_data = SplitRowStr(line); + assert(column_data.size() == 3); + + std::string nationname_gold = column_data[0]; + trim(nationname_gold); + transform(nationname_gold.begin(), nationname_gold.end(), + nationname_gold.begin(), ::toupper); + + assert(n.name_key_map.find(nationname_gold) != n.name_key_map.end()); + + unsigned char nationkey_gold = n.name_key_map[nationname_gold]; + + unsigned int year_gold = std::stoi(column_data[1]); + double sum_profit_gold = std::stod(column_data[2]); + + double sum_profit_res = + (double)(sum_profit[year_gold * 25 + nationkey_gold]) / (100.0 * 100.0); + + if (!AlmostEqual(sum_profit_gold, sum_profit_res, 0.01f)) { + std::cerr << "ERROR: sum_profit for " << nationname_gold << " in " + << year_gold << " did not match (Expected=" << sum_profit_gold + << ", Result=" << sum_profit_res << ")\n"; + valid = false; + } + } + + return valid; +} + // // validate the results of Query 11 // @@ -782,6 +835,56 @@ void Database::PrintQ1(std::array& sum_qty, } } +// +// print the results of Query 9 +// +void Database::PrintQ9(std::array& sum_profit) { + // row of Q9 output for local sorting + struct Row { + Row(std::string& nation, int year, DBDecimal sum_profit) + : nation(nation), year(year), sum_profit(sum_profit) {} + std::string nation; + int year; + DBDecimal sum_profit; + + void print() { + std::cout << nation << "|" << year << "|" + << (double)(sum_profit) / (100.0 * 100.0) << "\n"; + } + }; + + // create the rows + std::vector outrows; + for (unsigned char nat = 0; nat < kNationTableSize; nat++) { + std::string nation_name = n.key_name_map[nat]; + for (int y = 1992; y <= 1998; y++) { + outrows.push_back(Row(nation_name, y, sum_profit[y * 25 + nat])); + } + } + + // sort rows by year + std::sort(outrows.begin(), outrows.end(), + [](const Row& a, const Row& b) -> bool { + return a.year > b.year; + }); + + // sort rows by nation + // stable_sort() preserves the order of the previous sort + std::stable_sort(outrows.begin(), outrows.end(), + [](const Row& a, const Row& b) -> bool { + return a.nation < b.nation; + }); + + // print the header + std::cout << "nation|o_year|sum_profit\n"; + + // print the results + std::cout << std::fixed << std::setprecision(2); + for (int i = 0; i < outrows.size(); i++) { + outrows[i].print(); + } +} + // // print the results of Query 11 // diff --git a/DirectProgramming/C++SYCL_FPGA/ReferenceDesigns/db/src/dbdata.hpp b/DirectProgramming/C++SYCL_FPGA/ReferenceDesigns/db/src/dbdata.hpp index 3fb9a3852c..5e560372ca 100644 --- a/DirectProgramming/C++SYCL_FPGA/ReferenceDesigns/db/src/dbdata.hpp +++ b/DirectProgramming/C++SYCL_FPGA/ReferenceDesigns/db/src/dbdata.hpp @@ -181,6 +181,9 @@ struct Database { std::array& avg_discount, std::array& count); + bool ValidateQ9(std::string db_root_dir, + std::array& sum_profit); + bool ValidateQ11(std::string db_root_dir, std::vector& partkeys, std::vector& partkey_values); @@ -198,6 +201,8 @@ struct Database { std::array& avg_discount, std::array& count); + void PrintQ9(std::array& sum_profit); + void PrintQ11(std::vector& partkeys, std::vector& partkey_values); diff --git a/DirectProgramming/C++SYCL_FPGA/ReferenceDesigns/db/src/query9/pipe_types.hpp b/DirectProgramming/C++SYCL_FPGA/ReferenceDesigns/db/src/query9/pipe_types.hpp new file mode 100644 index 0000000000..a6adbfc0fa --- /dev/null +++ b/DirectProgramming/C++SYCL_FPGA/ReferenceDesigns/db/src/query9/pipe_types.hpp @@ -0,0 +1,272 @@ +#ifndef __PIPE_TYPES_H__ +#define __PIPE_TYPES_H__ +#pragma once + +#include +#include + +#include "../db_utils/StreamingData.hpp" +#include "../dbdata.hpp" + +using namespace sycl; + +// +// A single row of the PARTSUPPLIER table +// with a subset of the columns (needed for this query) +// +class PartSupplierRow { + public: + PartSupplierRow() : valid(false), partkey(0), suppkey(0), supplycost(0) {} + PartSupplierRow(bool v_valid, DBIdentifier v_partkey, DBIdentifier v_suppkey, + DBDecimal v_supplycost) + : valid(v_valid), + partkey(v_partkey), + suppkey(v_suppkey), + supplycost(v_supplycost) {} + + // NOTE: this is not true, but is key to be used by MapJoin + DBIdentifier PrimaryKey() const { return suppkey; } + + bool valid; + DBIdentifier partkey; + DBIdentifier suppkey; + DBDecimal supplycost; +}; + +// +// A row of the join SUPPLIER and PARTSUPPLIER table +// +class SupplierPartSupplierJoined { + public: + SupplierPartSupplierJoined() + : valid(false), partkey(0), suppkey(0), supplycost(0), nationkey(0) {} + SupplierPartSupplierJoined(bool v_valid, DBIdentifier v_partkey, + DBIdentifier v_suppkey, DBDecimal v_supplycost, + unsigned char v_nationkey) + : valid(v_valid), + partkey(v_partkey), + suppkey(v_suppkey), + supplycost(v_supplycost), + nationkey(v_nationkey) {} + + DBIdentifier PrimaryKey() const { return partkey; } + + void Join(const unsigned char nation_key, const PartSupplierRow& ps_row) { + partkey = ps_row.partkey; + suppkey = ps_row.suppkey; + supplycost = ps_row.supplycost; + nationkey = nation_key; + } + + bool valid; + DBIdentifier partkey; + DBIdentifier suppkey; + DBDecimal supplycost; + unsigned char nationkey; +}; + +// +// A single row of the ORDERS table +// with a subset of the columns (needed for this query) +// +class OrdersRow { + public: + OrdersRow() : valid(false), orderkey(0), orderdate(0) {} + OrdersRow(bool v_valid, DBIdentifier v_orderkey, DBDate v_orderdate) + : valid(v_valid), orderkey(v_orderkey), orderdate(v_orderdate) {} + + DBIdentifier PrimaryKey() const { return orderkey; } + + bool valid; + DBIdentifier orderkey; + DBDate orderdate; +}; + +// +// A single row of the LINEITEM table +// with a subset of the columns (needed for this query) +// +class LineItemMinimalRow { + public: + LineItemMinimalRow() + : valid(false), idx(0), orderkey(0), partkey(0), suppkey(0) {} + LineItemMinimalRow(bool v_valid, unsigned int v_idx, DBIdentifier v_orderkey, + DBIdentifier v_partkey, DBIdentifier v_suppkey) + : valid(v_valid), + idx(v_idx), + orderkey(v_orderkey), + partkey(v_partkey), + suppkey(v_suppkey) {} + + DBIdentifier PrimaryKey() const { return orderkey; } + + bool valid; + unsigned int idx; + DBIdentifier orderkey, partkey, suppkey; +}; + +// +// A row of the join LINEITEM and ORDERS table +// +class LineItemOrdersMinimalJoined { + public: + LineItemOrdersMinimalJoined() + : valid(false), lineitemIdx(0), partkey(0), suppkey(0), orderdate(0) {} + LineItemOrdersMinimalJoined(bool v_valid, unsigned int v_lineitem_idx, + DBIdentifier v_partkey, DBIdentifier v_suppkey, + DBDate v_orderdate) + : valid(v_valid), + lineitemIdx(v_lineitem_idx), + partkey(v_partkey), + suppkey(v_suppkey), + orderdate(v_orderdate) {} + + DBIdentifier PrimaryKey() { return partkey; } + + void Join(const OrdersRow& o_row, const LineItemMinimalRow& li_row) { + lineitemIdx = li_row.idx; + partkey = li_row.partkey; + suppkey = li_row.suppkey; + orderdate = o_row.orderdate; + } + + bool valid; + unsigned int lineitemIdx; + DBIdentifier partkey; + DBIdentifier suppkey; + DBDate orderdate; +}; + +// +// Datatype to be sent to be sorted by the FifoSorter +// +class SortData { + public: + SortData() {} + SortData(unsigned int v_lineitem_idx, DBIdentifier v_partkey, + DBIdentifier v_suppkey, DBDate v_orderdate) + : lineitemIdx(v_lineitem_idx), + partkey(v_partkey), + suppkey(v_suppkey), + orderdate(v_orderdate) {} + SortData(const LineItemOrdersMinimalJoined& d) + : lineitemIdx(d.lineitemIdx), + partkey(d.partkey), + suppkey(d.suppkey), + orderdate(d.orderdate) {} + + bool operator<(const SortData& t) const { return partkey < t.partkey; } + bool operator>(const SortData& t) const { return partkey > t.partkey; } + bool operator<=(const SortData& t) const { return partkey <= t.partkey; } + bool operator>=(const SortData& t) const { return partkey >= t.partkey; } + bool operator==(const SortData& t) const { return partkey == t.partkey; } + bool operator!=(const SortData& t) const { return partkey != t.partkey; } + + unsigned int lineitemIdx; + DBIdentifier partkey; + DBIdentifier suppkey; + DBDate orderdate; +}; + +// +// The final data used to compute the 'amount' +// +class FinalData { + public: + FinalData() + : valid(false), + partkey(0), + lineitemIdx(0), + orderdate(0), + supplycost(0), + nationkey(0) {} + + FinalData(bool v_valid, DBIdentifier v_partkey, unsigned int v_lineitem_idx, + DBDate v_orderdate, DBDecimal v_supplycost, + unsigned char v_nationkey) + : valid(v_valid), + partkey(v_partkey), + lineitemIdx(v_lineitem_idx), + orderdate(v_orderdate), + supplycost(v_supplycost), + nationkey(v_nationkey) {} + + DBIdentifier PrimaryKey() { return partkey; } + + void Join(const SupplierPartSupplierJoined& s_ps_row, + const LineItemOrdersMinimalJoined& li_o_row) { + valid = s_ps_row.suppkey == li_o_row.suppkey; + + partkey = s_ps_row.partkey; + lineitemIdx = li_o_row.lineitemIdx; + orderdate = li_o_row.orderdate; + supplycost = s_ps_row.supplycost; + nationkey = s_ps_row.nationkey; + } + + bool valid; + DBIdentifier partkey; + unsigned int lineitemIdx; + DBDate orderdate; + DBDecimal supplycost; + unsigned char nationkey; +}; + +// joining window sizes +constexpr int kRegexFilterElementsPerCycle = 1; +constexpr int kOrdersJoinWinSize = 1; +constexpr int kLineItemJoinWinSize = 2; +constexpr int kLineItemOrdersJoinWinSize = kLineItemJoinWinSize; +constexpr int kLineItemOrdersSortedWinSize = 1; +constexpr int kPartSupplierDuplicatePartkeys = 4; +constexpr int kFinalDataMaxSize = + kPartSupplierDuplicatePartkeys * kLineItemOrdersSortedWinSize; + +// pipe data +using LineItemMinimalRowPipeData = + StreamingData; + +using OrdersRowPipeData = + StreamingData; + +using LineItemOrdersMinimalJoinedPipeData = + StreamingData; + +using LineItemOrdersMinimalSortedPipeData = + StreamingData; + +using PartSupplierRowPipeData = + StreamingData; + +using SupplierPartSupplierJoinedPipeData = + StreamingData; + +using FinalPipeData = + StreamingData; + +// pipes +using LineItemPipe = + sycl::pipe; + +using OrdersPipe = + sycl::pipe; + +using LineItemOrdersPipe = + sycl::pipe; + +using LineItemOrdersSortedPipe = + sycl::pipe; + +using PartSupplierPartsPipe = + sycl::pipe; + +using PartSupplierPipe = + sycl::pipe; + +using FinalPipe = + sycl::pipe; + +#endif /* __PIPE_TYPES_H__ */ diff --git a/DirectProgramming/C++SYCL_FPGA/ReferenceDesigns/db/src/query9/query9_kernel.cpp b/DirectProgramming/C++SYCL_FPGA/ReferenceDesigns/db/src/query9/query9_kernel.cpp new file mode 100644 index 0000000000..d1cf24a9fe --- /dev/null +++ b/DirectProgramming/C++SYCL_FPGA/ReferenceDesigns/db/src/query9/query9_kernel.cpp @@ -0,0 +1,644 @@ +#include +#include +#include +#include + +#include "query9_kernel.hpp" +#include "pipe_types.hpp" + +#include "onchip_memory_with_cache.hpp" // DirectProgramming/DPC++FPGA/include + +#include "../db_utils/Accumulator.hpp" +#include "../db_utils/LikeRegex.hpp" +#include "../db_utils/MapJoin.hpp" +#include "../db_utils/MergeJoin.hpp" +#include "../db_utils/Misc.hpp" +#include "../db_utils/ShannonIterator.hpp" +#include "../db_utils/Tuple.hpp" +#include "../db_utils/Unroller.hpp" +#include "../db_utils/fifo_sort.hpp" + +using namespace std::chrono; + +// +// NOTE: See the README file for a diagram of how the different kernels are +// connected +// + +// kernel class names +class ProducerOrders; +class FilterParts; +class ProducePartSupplier; +class JoinPartSupplierSupplier; +class JoinLineItemOrders; +class FeedSort; +class FifoSort; +class ConsumeSort; +class JoinEverything; +class Compute; + +///////////////////////////////////////////////////////////////////////////// +// sort configuration +using SortType = SortData; + +// need to sort at most 6% of the lineitem table +constexpr int kNumSortStages = CeilLog2(kLineItemTableSize * 0.06); +constexpr int kSortSize = Pow2(kNumSortStages); + +using SortInPipe = pipe; +using SortOutPipe = pipe; + +static_assert(kLineItemTableSize * 0.06 <= kSortSize, + "Must be able to sort all part keys"); +///////////////////////////////////////////////////////////////////////////// + +// +// Helper function to shuffle the valid values in 'input' into 'output' using +// the bits template +// For example, consider this simple case: +// input = {7,8} +// if bits = 1 (2'b01), then output = {0,7} +// if bits = 2 (2'b01), then output = {0,8} +// if bits = 3 (2'b01), then output = {7,8} +// +template +void Shuffle(NTuple& input, + NTuple& output) { + // get number of ones (number of valid entries) in the input + constexpr char kNumOnes = CountOnes(bits); + + // static asserts + static_assert(tuple_size > 0, + "tuple_size must strictly positive"); + static_assert(kNumOnes <= tuple_size, + "Number of valid bits in bits cannot exceed the size of the tuple"); + + // full crossbar to reorder valid entries of 'input' + UnrolledLoop<0, kNumOnes>([&](auto i) { + constexpr char pos = PositionOfNthOne(i + 1, bits) - 1; + output.template get() = input.template get(); + }); +} + +bool SubmitQuery9(queue& q, Database& dbinfo, std::string colour, + std::array& sum_profit, + double& kernel_latency, double& total_latency) { + // copy the regex string to character array, pad with NULL characters + std::array regex_word; + for (size_t i = 0; i < 11; i++) { + regex_word[i] = (i < colour.size()) ? colour[i] : '\0'; + } + + // create space for the input buffers + // the REGEX + buffer regex_word_buf(regex_word); + + // PARTS + buffer p_name_buf(dbinfo.p.name); + + // SUPPLIER + buffer s_nationkey_buf(dbinfo.s.nationkey); + + // PARTSUPPLIER + buffer ps_partkey_buf(dbinfo.ps.partkey); + buffer ps_suppkey_buf(dbinfo.ps.suppkey); + buffer ps_supplycost_buf(dbinfo.ps.supplycost); + + // ORDERS + buffer o_orderkey_buf(dbinfo.o.orderkey); + buffer o_orderdate_buf(dbinfo.o.orderdate); + + // LINEITEM + buffer l_orderkey_buf(dbinfo.l.orderkey); + buffer l_partkey_buf(dbinfo.l.partkey); + buffer l_suppkey_buf(dbinfo.l.suppkey); + buffer l_quantity_buf(dbinfo.l.quantity); + buffer l_extendedprice_buf(dbinfo.l.extendedprice); + buffer l_discount_buf(dbinfo.l.discount); + + // setup the output buffer (the profit for each nation and year) + buffer sum_profit_buf(sum_profit); + + // number of producing iterations depends on the number of elements per cycle + const size_t l_rows = dbinfo.l.rows; + const size_t l_iters = + (l_rows + kLineItemJoinWinSize - 1) / kLineItemJoinWinSize; + const size_t o_rows = dbinfo.o.rows; + const size_t o_iters = + (o_rows + kOrdersJoinWinSize - 1) / kOrdersJoinWinSize; + const size_t ps_rows = dbinfo.ps.rows; + const size_t ps_iters = + (ps_rows + kPartSupplierDuplicatePartkeys - 1) + / kPartSupplierDuplicatePartkeys; + const size_t p_rows = dbinfo.p.rows; + const size_t p_iters = + (p_rows + kRegexFilterElementsPerCycle - 1) + / kRegexFilterElementsPerCycle; + + // start timer + high_resolution_clock::time_point host_start = high_resolution_clock::now(); + + ///////////////////////////////////////////////////////////////////////////// + //// FilterParts Kernel: + //// Filter the PARTS table and produce the filtered LINEITEM table + auto filter_parts_event = q.submit([&](handler& h) { + // REGEX word accessor + accessor regex_word_accessor(regex_word_buf, h, read_only); + + // PARTS table accessors + accessor p_name_accessor(p_name_buf, h, read_only); + + // LINEITEM table accessors + accessor l_orderkey_accessor(l_orderkey_buf, h, read_only); + accessor l_partkey_accessor(l_partkey_buf, h, read_only); + accessor l_suppkey_accessor(l_suppkey_buf, h, read_only); + + // kernel to filter parts table based on REGEX + h.single_task([=]() [[intel::kernel_args_restrict]] { + // a map where the key is the partkey and the value is whether + // that partkeys name matches the given regex + bool partkeys_matching_regex[kPartTableSize + 1]; + + /////////////////////////////////////////////// + //// Stage 1 + // find valid parts with REGEX + LikeRegex<11, 55> regex[kRegexFilterElementsPerCycle]; + + // initialize regex word + for (size_t i = 0; i < 11; i++) { + const char c = regex_word_accessor[i]; + UnrolledLoop<0, kRegexFilterElementsPerCycle>([&](auto re) { + regex[re].word[i] = c; + }); + } + + // stream in rows of PARTS table and check partname against REGEX + [[intel::initiation_interval(1), intel::ivdep]] + for (size_t i = 0; i < p_iters; i++) { + UnrolledLoop<0, kRegexFilterElementsPerCycle>([&](auto re) { + const size_t idx = i * kRegexFilterElementsPerCycle + re; + const bool idx_range = idx < p_rows; + + // read in partkey + // valid partkeys in range [1,kPartTableSize] + const DBIdentifier partkey = idx_range ? idx + 1 : 0; + + // read in regex string + UnrolledLoop<0, 55>([&](auto k) { + regex[re].str[k] = p_name_accessor[idx * 55 + k]; + }); + + // run regex matching + regex[re].Match(); + + // mark valid partkey + if (idx_range) { + partkeys_matching_regex[partkey] = regex[re].Contains(); + } + }); + } + /////////////////////////////////////////////// + + /////////////////////////////////////////////// + //// Stage 2 + // read in the LINEITEM table (kLineItemJoinWinSize rows at a time) + // row is valid if its PARTKEY matched the REGEX + [[intel::initiation_interval(1)]] + for (size_t i = 0; i < l_iters + 1; i++) { + bool done = (i == l_iters); + bool valid = (i != l_iters); + + // bulk read of data from global memory + NTuple data; + + UnrolledLoop<0, kLineItemJoinWinSize>([&](auto j) { + size_t idx = i * kLineItemJoinWinSize + j; + bool in_range = idx < l_rows; + + DBIdentifier orderkey = l_orderkey_accessor[idx]; + DBIdentifier partkey = l_partkey_accessor[idx]; + DBIdentifier suppkey = l_suppkey_accessor[idx]; + + bool matches_partkey_name_regex = partkeys_matching_regex[partkey]; + bool data_is_valid = in_range && matches_partkey_name_regex; + + data.get() = LineItemMinimalRow(data_is_valid, idx, orderkey, + partkey, suppkey); + }); + + // write to pipe + LineItemPipe::write(LineItemMinimalRowPipeData(done, valid, data)); + } + /////////////////////////////////////////////// + }); + }); + /////////////////////////////////////////////////////////////////////////// + + /////////////////////////////////////////////////////////////////////////// + //// ProducerOrders Kernel: produce the ORDERS table + auto producer_orders_event = q.submit([&](handler& h) { + // ORDERS table accessors + accessor o_orderkey_accessor(o_orderkey_buf, h, read_only); + accessor o_orderdate_accessor(o_orderdate_buf, h, read_only); + + // produce ORDERS table (kOrdersJoinWinSize rows at a time) + h.single_task([=]() [[intel::kernel_args_restrict]] { + [[intel::initiation_interval(1)]] + for (size_t i = 0; i < o_iters + 1; i++) { + bool done = (i == o_iters); + bool valid = (i != o_iters); + + // bulk read of data from global memory + NTuple data; + + UnrolledLoop<0, kOrdersJoinWinSize>([&](auto j) { + size_t idx = i * kOrdersJoinWinSize + j; + bool in_range = idx < l_rows; + + DBIdentifier orderkey_tmp = o_orderkey_accessor[idx]; + DBDate orderdate = o_orderdate_accessor[idx]; + + DBIdentifier orderkey = + in_range ? orderkey_tmp : std::numeric_limits::max(); + + data.get() = OrdersRow(in_range, orderkey, orderdate); + }); + + // write to pipe + OrdersPipe::write(OrdersRowPipeData(done, valid, data)); + } + }); + }); + /////////////////////////////////////////////////////////////////////////// + + /////////////////////////////////////////////////////////////////////////// + //// JoinLineItemOrders Kernel: join the LINEITEM and ORDERS table + auto join_lineitem_orders_event = q.submit([&](handler& h) { + // kernel to join LINEITEM and ORDERS table + h.single_task([=]() [[intel::kernel_args_restrict]] { + // JOIN LINEITEM and ORDERS table + MergeJoin(); + + // join is done, tell downstream + LineItemOrdersPipe::write( + LineItemOrdersMinimalJoinedPipeData(true, false)); + }); + }); + /////////////////////////////////////////////////////////////////////////// + + /////////////////////////////////////////////////////////////////////////// + //// JoinPartSupplierSupplier Kernel: join the PARTSUPPLIER and SUPPLIER tables + auto join_partsupplier_supplier_event = q.submit([&](handler& h) { + // SUPPLIER table accessors + size_t s_rows = dbinfo.s.rows; + accessor s_nationkey_accessor(s_nationkey_buf, h, read_only); + + // kernel to join partsupplier and supplier tables + h.single_task( + [=]() [[intel::kernel_args_restrict]] { + // +1 is to account for fact that SUPPKEY is [1,kSF*10000] + unsigned char nation_key_map_data[kSupplierTableSize + 1]; + bool nation_key_map_valid[kSupplierTableSize + 1]; + for (int i = 0; i < kSupplierTableSize + 1; i++) { + nation_key_map_valid[i] = false; + } + + /////////////////////////////////////////////// + //// Stage 1 + // populate the array map + [[intel::initiation_interval(1)]] + for (size_t i = 0; i < s_rows; i++) { + // NOTE: based on TPCH docs, SUPPKEY is guaranteed + // to be unique in range [1:kSF*10000] + DBIdentifier s_suppkey = i + 1; + unsigned char s_nationkey = s_nationkey_accessor[i]; + + nation_key_map_data[s_suppkey] = s_nationkey; + nation_key_map_valid[s_suppkey] = true; + } + /////////////////////////////////////////////// + + /////////////////////////////////////////////// + //// Stage 2 + // MAPJOIN PARTSUPPLIER and SUPPLIER tables by suppkey + MapJoin(nation_key_map_data, + nation_key_map_valid); + + // tell downstream we are done + PartSupplierPartsPipe::write( + SupplierPartSupplierJoinedPipeData(true, false)); + /////////////////////////////////////////////// + }); + }); + ///////////////////////////////////////////////////////////////////////////// + + ///////////////////////////////////////////////////////////////////////////// + //// ProducePartSupplier Kernel: produce the PARTSUPPLIER table + auto produce_part_supplier_event = q.submit([&](handler& h) { + // PARTSUPPLIER table accessors + accessor ps_partkey_accessor(ps_partkey_buf, h, read_only); + accessor ps_suppkey_accessor(ps_suppkey_buf, h, read_only); + accessor ps_supplycost_accessor(ps_supplycost_buf, h, read_only); + + // kernel to produce the PARTSUPPLIER table + h.single_task([=]() [[intel::kernel_args_restrict]] { + [[intel::initiation_interval(1)]] + for (size_t i = 0; i < ps_iters + 1; i++) { + bool done = (i == ps_iters); + bool valid = (i != ps_iters); + + // bulk read of data from global memory + NTuple data; + + UnrolledLoop<0, kPartSupplierDuplicatePartkeys>([&](auto j) { + size_t idx = i * kPartSupplierDuplicatePartkeys + j; + bool in_range = idx < ps_rows; + DBIdentifier partkey = ps_partkey_accessor[idx]; + DBIdentifier suppkey = ps_suppkey_accessor[idx]; + DBDecimal supplycost = ps_supplycost_accessor[idx]; + + data.get() = + PartSupplierRow(in_range, partkey, suppkey, supplycost); + }); + + // write to pipe + PartSupplierPipe::write(PartSupplierRowPipeData(done, valid, data)); + } + }); + }); + ///////////////////////////////////////////////////////////////////////////// + + ///////////////////////////////////////////////////////////////////////////// + //// Compute Kernel: do the final computation on the data + auto computation_kernel_event = q.submit([&](handler& h) { + // LINEITEM table accessors + accessor l_quantity_accessor(l_quantity_buf, h, read_only); + accessor l_extendedprice_accessor(l_extendedprice_buf, h, read_only); + accessor l_discount_accessor(l_discount_buf, h, read_only); + + // output accessors + accessor sum_profit_accessor(sum_profit_buf, h, write_only, no_init); + + h.single_task([=]() [[intel::kernel_args_restrict]] { + // the accumulators + constexpr int kAccumCacheSize = 8; + NTuple> + sum_profit_local; + + // initialize the accumulators + UnrolledLoop<0, kFinalDataMaxSize>([&](auto j) { + sum_profit_local.template get().init(0); + }); + + bool done = false; + [[intel::initiation_interval(1)]] + do { + FinalPipeData pipe_data = FinalPipe::read(); + done = pipe_data.done; + + const bool pipeDataValid = !pipe_data.done && pipe_data.valid; + + UnrolledLoop<0, kFinalDataMaxSize>([&](auto j) { + FinalData D = pipe_data.data.get(); + + bool D_valid = pipeDataValid && D.valid; + unsigned int D_idx = D.lineitemIdx; + + // grab LINEITEM data from global memory and compute 'amount' + DBDecimal quantity=0, extendedprice=0, discount=0, supplycost=0; + if(D_valid) { + quantity = l_quantity_accessor[D_idx]; + extendedprice = l_extendedprice_accessor[D_idx]; + discount = l_discount_accessor[D_idx]; + supplycost = D.supplycost; + } + + // Why quantity x 100? So we can divide 'amount' by 100*100 later + DBDecimal amount = (extendedprice * (100 - discount)) - + (supplycost * quantity * 100); + + // compute index based on order year and nation + // See Date.hpp + unsigned int orderyear = (D.orderdate >> 9) & 0x07FFFFF; + unsigned int nation = D.nationkey; + unsigned char idx = (orderyear - 1992) * 25 + nation; + + unsigned char idx_final = D_valid ? idx : 0; + DBDecimal amount_final = D_valid ? amount : 0; + + auto current_amount = sum_profit_local.template get().read(idx_final); + auto computed_amount = current_amount + amount_final; + sum_profit_local.template get().write(idx_final, computed_amount); + }); + } while (!done); + + // push back the accumulated data to global memory + for (size_t n = 0; n < 25; n++) { + for (size_t y = 0; y < 7; y++) { + size_t in_idx = y * 25 + n; + size_t out_idx = (y + 1992) * 25 + n; + + DBDecimal amount = 0; + + UnrolledLoop<0, kFinalDataMaxSize>([&](auto j) { + amount += sum_profit_local.template get().read(in_idx); + }); + + sum_profit_accessor[out_idx] = amount; + } + } + }); + }); + ///////////////////////////////////////////////////////////////////////////// + + ///////////////////////////////////////////////////////////////////////////// + //// FeedSort Kernel: kernel to filter out invalid data and feed the sorter + auto feed_sort_event = q.submit([&](handler& h) { + h.single_task([=]() [[intel::kernel_args_restrict]] { + bool done = false; + size_t num_rows = 0; + + [[intel::initiation_interval(1)]] + do { + // get data from upstream + bool valid; + LineItemOrdersMinimalJoinedPipeData pipe_data = + LineItemOrdersPipe::read(valid); + done = pipe_data.done && valid; + + if (!done && valid && pipe_data.valid) { + NTuple + shuffle_data; + unsigned char valid_count = 0; + char valid_bits = 0; + + // convert the 'valid' bits in the tuple to a bitset (valid_bits) + UnrolledLoop<0, kLineItemOrdersJoinWinSize>([&](auto i) { + constexpr char mask = 1 << i; + valid_bits |= pipe_data.data.get().valid ? mask : 0; + }); + + // full crossbar to do the shuffling from pipe_data to shuffle_data + UnrolledLoop<0, Pow2(kLineItemOrdersJoinWinSize)>([&](auto i) { + if (valid_bits == i) { + Shuffle(pipe_data.data, + shuffle_data); + valid_count = CountOnes(i); + } + }); + + // Send the data to sorter. + // The idea here is that this loop executes in the range + // [0,kLineItemOrdersJoinWinSize] times. + // However, we know that at most 6% of the data will match the filter + // and go to the sorter. So, that means for every ~16 pieces of + // data, we expect <1 will match the filter and go to the sorter. + // Therefore, so long as kLineItemOrdersJoinWinSize <= 16 + // this loop will, on average, execute ONCE per outer loop iteration + // (i.e. statistically, valid_count=1 for every 16 pieces of data). + // NOTE: for this loop to get good throughput it is VERY important to: + // A) Apply the [[intel::speculated_iterations(0)]] attribute + // B) Explicitly bound the loop iterations + // For an explanation why, see the optimize_inner_loops tutorial. + [[intel::initiation_interval(1), intel::speculated_iterations(0)]] + for (char i = 0; i < valid_count && + i < kLineItemOrdersJoinWinSize; i++) { + UnrolledLoop<0, kLineItemOrdersJoinWinSize>([&](auto j) { + if (j == i) { + SortInPipe::write(SortData(shuffle_data.get())); + } + }); + } + + num_rows += valid_count; + } + } while (!done); + + // send in pad data to ensure we send in exactly kSortSize elements + ShannonIterator i(num_rows, kSortSize); + + while (i.InRange()) { + SortInPipe::write( + SortData(0, std::numeric_limits::max(), 0, 0)); + + i.Step(); + } + + // drain the input pipe + while (!done) { + bool valid; + LineItemOrdersMinimalJoinedPipeData pipe_data = + LineItemOrdersPipe::read(valid); + done = pipe_data.done && valid; + } + }); + }); + ///////////////////////////////////////////////////////////////////////////// + + ///////////////////////////////////////////////////////////////////////////// + //// ConsumeSort Kernel: consume the output of the sorter + auto consume_sort_event = q.submit([&](handler& h) { + h.single_task([=]() [[intel::kernel_args_restrict]] { + bool done = false; + size_t num_rows = 0; + + // read out data from the sorter until 'done' signal from upstream + [[intel::initiation_interval(1)]] + do { + bool valid; + SortData in_data = SortOutPipe::read(valid); + done = (in_data.partkey == std::numeric_limits::max()) && + valid; + num_rows += valid ? 1 : 0; + + if (!done && valid) { + NTuple<1, LineItemOrdersMinimalJoined> out_data; + out_data.get<0>() = LineItemOrdersMinimalJoined( + true, in_data.lineitemIdx, in_data.partkey, in_data.suppkey, + in_data.orderdate); + + LineItemOrdersSortedPipe::write( + LineItemOrdersMinimalSortedPipeData(false, true, out_data)); + } + } while (!done); + + // tell downstream kernel that the sort is done + LineItemOrdersSortedPipe::write( + LineItemOrdersMinimalSortedPipeData(true, false)); + + // drain the data we don't care about from the sorter + ShannonIterator i(num_rows, kSortSize); + while (i.InRange()) { + bool valid; + (void)SortOutPipe::read(valid); + + if (valid) { + i.Step(); + } + } + }); + }); + ///////////////////////////////////////////////////////////////////////////// + + ///////////////////////////////////////////////////////////////////////////// + //// FifoSort Kernel: the sorter + auto sort_event = q.submit([&](handler& h) { + h.single_task([=]() [[intel::kernel_args_restrict]] { + ihc::sort(ihc::LessThan()); + }); + }); + ///////////////////////////////////////////////////////////////////////////// + + ///////////////////////////////////////////////////////////////////////////// + //// JoinEverything Kernel: join the sorted + //// LINEITEM+ORDERS with SUPPLIER+PARTSUPPLIER + auto join_li_o_s_ps_event = q.submit([&](handler& h) { + h.single_task([=]() [[intel::kernel_args_restrict]] { + DuplicateMergeJoin(); + + // join is done, tell downstream + FinalPipe::write(FinalPipeData(true, false)); + }); + }); + ///////////////////////////////////////////////////////////////////////////// + + // wait for kernel to finish + filter_parts_event.wait(); + computation_kernel_event.wait(); + join_li_o_s_ps_event.wait(); + sort_event.wait(); + consume_sort_event.wait(); + feed_sort_event.wait(); + produce_part_supplier_event.wait(); + join_partsupplier_supplier_event.wait(); + join_lineitem_orders_event.wait(); + producer_orders_event.wait(); + + high_resolution_clock::time_point host_end = high_resolution_clock::now(); + duration diff = host_end - host_start; + + // gather profiling info + auto filter_parts_start = + filter_parts_event + .get_profiling_info(); + auto computation_end = + computation_kernel_event + .get_profiling_info(); + + // calculating the kernel execution time in ms + auto kernel_execution_time = (computation_end - filter_parts_start) * 1e-6; + + kernel_latency = kernel_execution_time; + total_latency = diff.count(); + + return true; +} diff --git a/DirectProgramming/C++SYCL_FPGA/ReferenceDesigns/db/src/query9/query9_kernel.hpp b/DirectProgramming/C++SYCL_FPGA/ReferenceDesigns/db/src/query9/query9_kernel.hpp new file mode 100644 index 0000000000..1103ff301f --- /dev/null +++ b/DirectProgramming/C++SYCL_FPGA/ReferenceDesigns/db/src/query9/query9_kernel.hpp @@ -0,0 +1,17 @@ +#ifndef __QUERY9_KERNEL_HPP__ +#define __QUERY9_KERNEL_HPP__ +#pragma once + +#include +#include + +#include "../dbdata.hpp" + +using namespace sycl; + +bool SubmitQuery9(queue& q, Database& dbinfo, + std::string colour, + std::array& sum_profit, + double& kernel_latency, double& total_latency); + +#endif //__QUERY9_KERNEL_HPP__