diff --git a/.github/workflows/cd.yml b/.github/workflows/cd.yml index 04818055..97d52432 100644 --- a/.github/workflows/cd.yml +++ b/.github/workflows/cd.yml @@ -63,8 +63,38 @@ jobs: fail_ci_if_error: false token: ${{ secrets.CODECOV_TOKEN }} # not required for public repos + core-test-win: + needs: prep-testbed + runs-on: windows-latest + strategy: + fail-fast: false + matrix: + python-version: [3.7] + steps: + - uses: actions/checkout@v3 + - name: Set up Python ${{ matrix.python-version}} + uses: actions/setup-python@v3 + with: + python-version: ${{ matrix.python-versionn}} + - name: Prepare environment + shell: pwsh + run: | + python -m pip install --upgrade pip + python -m pip install wheel + pip install --pre docarray + pip install -e ".[test]" + - name: Test + id: test + shell: pwsh + run: | + $env:PYTHONIOENCODING='utf-8' + cd tests/ + pytest -v -s -m "not gpu" -k "test" + echo "::set-output name=codecov_flag::annlite" + timeout-minutes: 30 + prerelease: - needs: core-test + needs: [core-test, core-test-win] runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 01aa4279..0c58b030 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -134,9 +134,39 @@ jobs: fail_ci_if_error: false token: ${{ secrets.CODECOV_TOKEN }} # not required for public repos + core-test-win: + needs: prep-testbed + runs-on: windows-latest + strategy: + fail-fast: false + matrix: + python-version: [3.7] + steps: + - uses: actions/checkout@v3 + - name: Set up Python ${{ matrix.python-version}} + uses: actions/setup-python@v3 + with: + python-version: ${{ matrix.python-versionn}} + - name: Prepare environment + shell: pwsh + run: | + python -m pip install --upgrade pip + python -m pip install wheel + pip install --pre docarray + pip install -e ".[test]" + - name: Test + id: test + shell: pwsh + run: | + $env:PYTHONIOENCODING='utf-8' + cd tests/ + pytest -v -s -m "not gpu" -k "test" + echo "::set-output name=codecov_flag::annlite" + timeout-minutes: 30 + # just for blocking the merge until all parallel core-test are successful success-all-test: - needs: core-test + needs: [core-test, core-test-win] if: always() runs-on: ubuntu-latest steps: diff --git a/annlite/index.py b/annlite/index.py index c6627bb5..d7c6a37b 100644 --- a/annlite/index.py +++ b/annlite/index.py @@ -1,6 +1,7 @@ import hashlib import logging import warnings +import platform from pathlib import Path from typing import TYPE_CHECKING, Dict, List, Optional, Union @@ -598,6 +599,8 @@ def index_hash(self): latest_commit = self.meta_table.get_latest_commit() date_time = latest_commit[-1] if latest_commit else None if date_time: + if platform.system() == 'Windows': + return date_time.isoformat('#', 'hours') return date_time.isoformat('#', 'seconds') return None diff --git a/annlite/storage/table.py b/annlite/storage/table.py index 846514bf..562d78c8 100644 --- a/annlite/storage/table.py +++ b/annlite/storage/table.py @@ -414,7 +414,7 @@ def create_table(self): def iter_addresses( self, time_since: 'datetime.datetime' = datetime.datetime(2020, 2, 2, 0, 0) ): - sql = f'SELECT _doc_id, cell_id, offset from {self.name} WHERE time_at > ? AND _deleted = 0 ORDER BY time_at ASC;' + sql = f'SELECT _doc_id, cell_id, offset from {self.name} WHERE time_at >= ? AND _deleted = 0 ORDER BY time_at ASC;' cursor = self._conn.cursor() for doc_id, cell_id, offset in cursor.execute(sql, (time_since,)): diff --git a/include/hnswlib/fusefilter.h b/include/hnswlib/fusefilter.h index a1fbd9e3..47e0ca31 100644 --- a/include/hnswlib/fusefilter.h +++ b/include/hnswlib/fusefilter.h @@ -12,6 +12,9 @@ 100 // probabillity of success should always be > 0.5 so 100 iterations is // highly unlikely #endif +#ifdef _MSC_VER +#include +#endif /** * We start with a few utilities. @@ -62,9 +65,9 @@ typedef struct binary_fuse8_s { #ifdef _MSC_VER // Windows programmers who target 32-bit platform may need help: -uint64_t binary_fuse_mulhi(uint64_t a, uint64_t b) { return __umulh(a, b); } +static inline uint64_t binary_fuse_mulhi(uint64_t a, uint64_t b) { return __umulh(a, b); } #else -uint64_t binary_fuse_mulhi(uint64_t a, uint64_t b) { +static inline uint64_t binary_fuse_mulhi(uint64_t a, uint64_t b) { return ((__uint128_t)a * b) >> 64; } #endif @@ -75,8 +78,8 @@ typedef struct binary_hashes_s { uint32_t h2; } binary_hashes_t; -static inline binary_hashes_t -binary_fuse8_hash_batch(uint64_t hash, const binary_fuse8_t *filter) { +static inline binary_hashes_t binary_fuse8_hash_batch(uint64_t hash, + const binary_fuse8_t *filter) { uint64_t hi = binary_fuse_mulhi(hash, filter->SegmentCountLength); binary_hashes_t ans; ans.h0 = (uint32_t)hi; @@ -113,8 +116,7 @@ static inline uint32_t binary_fuse_calculate_segment_length(uint32_t arity, // These parameters are very sensitive. Replacing 'floor' by 'round' can // substantially affect the construction time. if (arity == 3) { - return ((uint32_t)1) << (int)(floor(log((double)(size)) / log(3.33) + - 2.25)); + return ((uint32_t)1) << (int)(floor(log((double)(size)) / log(3.33) + 2.25)); } else if (arity == 4) { return ((uint32_t)1) << (int)(floor(log((double)(size)) / log(2.91) - 0.5)); } else { @@ -122,7 +124,7 @@ static inline uint32_t binary_fuse_calculate_segment_length(uint32_t arity, } } -double binary_fuse8_max(double a, double b) { +static inline double binary_fuse8_max(double a, double b) { if (a < b) { return b; } @@ -132,11 +134,9 @@ double binary_fuse8_max(double a, double b) { static inline double binary_fuse_calculate_size_factor(uint32_t arity, uint32_t size) { if (arity == 3) { - return binary_fuse8_max(1.125, - 0.875 + 0.25 * log(1000000.0) / log((double)size)); + return binary_fuse8_max(1.125, 0.875 + 0.25 * log(1000000.0) / log((double)size)); } else if (arity == 4) { - return binary_fuse8_max(1.075, - 0.77 + 0.305 * log(600000.0) / log((double)size)); + return binary_fuse8_max(1.075, 0.77 + 0.305 * log(600000.0) / log((double)size)); } else { return 2.0; } @@ -148,15 +148,13 @@ static inline double binary_fuse_calculate_size_factor(uint32_t arity, static inline bool binary_fuse8_allocate(uint32_t size, binary_fuse8_t *filter) { uint32_t arity = 3; - filter->SegmentLength = - size == 0 ? 4 : binary_fuse_calculate_segment_length(arity, size); + filter->SegmentLength = size == 0 ? 4 : binary_fuse_calculate_segment_length(arity, size); if (filter->SegmentLength > 262144) { filter->SegmentLength = 262144; } filter->SegmentLengthMask = filter->SegmentLength - 1; double sizeFactor = binary_fuse_calculate_size_factor(arity, size); - uint32_t capacity = - size <= 1 ? 0 : (uint32_t)(round((double)size * sizeFactor)); + uint32_t capacity = size <= 1 ? 0 : (uint32_t)(round((double)size * sizeFactor)); uint32_t initSegmentCount = (capacity + filter->SegmentLength - 1) / filter->SegmentLength - (arity - 1); @@ -192,17 +190,22 @@ static inline void binary_fuse8_free(binary_fuse8_t *filter) { filter->ArrayLength = 0; } -static inline uint8_t binary_fuse_mod3(uint8_t x) { return x > 2 ? x - 3 : x; } +static inline uint8_t binary_fuse_mod3(uint8_t x) { + return x > 2 ? x - 3 : x; +} // construct the filter, returns true on success, false on failure. // most likely, a failure is due to too high a memory usage // size is the number of keys // The caller is responsable for calling binary_fuse8_allocate(size,filter) -// before. The caller is responsible to ensure that there are not too many -// duplicated keys. The inner loop will run up to XOR_MAX_ITERATIONS times -// (default on 100), it should never fail, except if there are many duplicated -// keys. If it fails, a return value of false is provided. +// before. The caller is responsible to ensure that there are not too many duplicated +// keys. The inner loop will run up to XOR_MAX_ITERATIONS times (default on +// 100), it should never fail, except if there are many duplicated keys. If it fails, +// a return value of false is provided. +// // +// If there are many duplicated keys and you do not want to remove them, you can first +// sort your input, the algorithm will then work adequately. bool binary_fuse8_populate(const uint64_t *keys, uint32_t size, binary_fuse8_t *filter) { uint64_t rng_counter = 0x726b2b9d438b9d4d; @@ -297,6 +300,10 @@ bool binary_fuse8_populate(const uint64_t *keys, uint32_t size, error = (t2count[h2] < 4) ? 1 : error; } if (error) { + memset(reverseOrder, 0, sizeof(uint64_t)*size); + memset(t2count, 0, sizeof(uint8_t)*capacity); + memset(t2hash, 0, sizeof(uint64_t)*capacity); + filter->Seed = binary_fuse_rng_splitmix64(&rng_counter); continue; } @@ -344,9 +351,9 @@ bool binary_fuse8_populate(const uint64_t *keys, uint32_t size, size = stacksize; break; } - memset(reverseOrder, 0, sizeof(uint64_t[size])); - memset(t2count, 0, sizeof(uint8_t[capacity])); - memset(t2hash, 0, sizeof(uint64_t[capacity])); + memset(reverseOrder, 0, sizeof(uint64_t)*size); + memset(t2count, 0, sizeof(uint8_t)*capacity); + memset(t2hash, 0, sizeof(uint64_t)*capacity); filter->Seed = binary_fuse_rng_splitmix64(&rng_counter); } @@ -431,8 +438,8 @@ static inline uint64_t binary_fuse16_fingerprint(uint64_t hash) { return hash ^ (hash >> 32); } -static inline binary_hashes_t -binary_fuse16_hash_batch(uint64_t hash, const binary_fuse16_t *filter) { +static inline binary_hashes_t binary_fuse16_hash_batch(uint64_t hash, + const binary_fuse16_t *filter) { uint64_t hi = binary_fuse_mulhi(hash, filter->SegmentCountLength); binary_hashes_t ans; ans.h0 = (uint32_t)hi; @@ -464,22 +471,67 @@ static inline bool binary_fuse16_contain(uint64_t key, return f == 0; } +// allocate enough capacity for a set containing up to 'size' elements +// caller is responsible to call binary_fuse16_free(filter) +// size should be at least 2. +static inline bool binary_fuse16_allocate(uint32_t size, + binary_fuse16_t *filter) { + uint32_t arity = 3; + filter->SegmentLength = size == 0 ? 4 : binary_fuse_calculate_segment_length(arity, size); + if (filter->SegmentLength > 262144) { + filter->SegmentLength = 262144; + } + filter->SegmentLengthMask = filter->SegmentLength - 1; + double sizeFactor = size <= 1 ? 0 : binary_fuse_calculate_size_factor(arity, size); + uint32_t capacity = (uint32_t)(round((double)size * sizeFactor)); + uint32_t initSegmentCount = + (capacity + filter->SegmentLength - 1) / filter->SegmentLength - + (arity - 1); + filter->ArrayLength = (initSegmentCount + arity - 1) * filter->SegmentLength; + filter->SegmentCount = + (filter->ArrayLength + filter->SegmentLength - 1) / filter->SegmentLength; + if (filter->SegmentCount <= arity - 1) { + filter->SegmentCount = 1; + } else { + filter->SegmentCount = filter->SegmentCount - (arity - 1); + } + filter->ArrayLength = + (filter->SegmentCount + arity - 1) * filter->SegmentLength; + filter->SegmentCountLength = filter->SegmentCount * filter->SegmentLength; + filter->Fingerprints = (uint16_t*)malloc(filter->ArrayLength * sizeof(uint16_t)); + return filter->Fingerprints != NULL; +} + // report memory usage static inline size_t binary_fuse16_size_in_bytes(const binary_fuse16_t *filter) { return filter->ArrayLength * sizeof(uint16_t) + sizeof(binary_fuse16_t); } +// release memory +static inline void binary_fuse16_free(binary_fuse16_t *filter) { + free(filter->Fingerprints); + filter->Fingerprints = NULL; + filter->Seed = 0; + filter->SegmentLength = 0; + filter->SegmentLengthMask = 0; + filter->SegmentCount = 0; + filter->SegmentCountLength = 0; + filter->ArrayLength = 0; +} + // construct the filter, returns true on success, false on failure. // most likely, a failure is due to too high a memory usage // size is the number of keys // The caller is responsable for calling binary_fuse8_allocate(size,filter) -// before. The caller is responsible to ensure that there are not too many -// duplicated keys. The inner loop will run up to XOR_MAX_ITERATIONS times -// (default on 100), it should never fail, except if there are many duplicated -// keys. If it fails, a return value of false is provided. +// before. The caller is responsible to ensure that there are not too many duplicated +// keys. The inner loop will run up to XOR_MAX_ITERATIONS times (default on +// 100), it should never fail, except if there are many duplicated keys. If it fails, +// a return value of false is provided. // -bool binary_fuse16_populate(const uint64_t *keys, uint32_t size, +// If there are many duplicated keys and you do not want to remove them, you can first +// sort your input, the algorithm will then work adequately. +inline bool binary_fuse16_populate(const uint64_t *keys, uint32_t size, binary_fuse16_t *filter) { uint64_t rng_counter = 0x726b2b9d438b9d4d; filter->Seed = binary_fuse_rng_splitmix64(&rng_counter); @@ -573,6 +625,10 @@ bool binary_fuse16_populate(const uint64_t *keys, uint32_t size, error = (t2count[h2] < 4) ? 1 : error; } if (error) { + memset(reverseOrder, 0, sizeof(uint64_t)*size); + memset(t2count, 0, sizeof(uint8_t)*capacity); + memset(t2hash, 0, sizeof(uint64_t)*capacity); + filter->Seed = binary_fuse_rng_splitmix64(&rng_counter); continue; } @@ -620,9 +676,9 @@ bool binary_fuse16_populate(const uint64_t *keys, uint32_t size, size = stacksize; break; } - memset(reverseOrder, 0, sizeof(uint64_t[size])); - memset(t2count, 0, sizeof(uint8_t[capacity])); - memset(t2hash, 0, sizeof(uint64_t[capacity])); + memset(reverseOrder, 0, sizeof(uint64_t)*size); + memset(t2count, 0, sizeof(uint8_t)*capacity); + memset(t2hash, 0, sizeof(uint64_t)*capacity); filter->Seed = binary_fuse_rng_splitmix64(&rng_counter); } @@ -649,4 +705,4 @@ bool binary_fuse16_populate(const uint64_t *keys, uint32_t size, return true; } -#endif +#endif \ No newline at end of file diff --git a/tests/test_table.py b/tests/test_table.py index 52d82ed8..6a750507 100644 --- a/tests/test_table.py +++ b/tests/test_table.py @@ -128,7 +128,7 @@ def test_create_meta_table(tmpdir): addr = table.get_latest_commit() assert addr[:3] == ('0', 1, 2) - assert addr[-1] > time_since + assert addr[-1] >= time_since time_since = datetime.datetime.utcnow() table.delete_address('0') @@ -136,4 +136,4 @@ def test_create_meta_table(tmpdir): assert addresses == [] addr = table.get_latest_commit() - assert addr[-1] > time_since + assert addr[-1] >= time_since