Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE-REQUEST] Joining on more than one columns. #1948

Closed
ashsharma96 opened this issue Feb 24, 2022 · 5 comments
Closed

[FEATURE-REQUEST] Joining on more than one columns. #1948

ashsharma96 opened this issue Feb 24, 2022 · 5 comments
Labels
needed: more information There is not enough information to reproduce the issue, or understand the problem

Comments

@ashsharma96
Copy link

Hey Vaex Team,
Any idea how we can join on more than one column in vaex? I've tried using that merging column hack but it gives error in my case. So how we join on more than one column.

@ashsharma96 ashsharma96 changed the title [FEATURE-REQUEST] Joining on more than onee columns. [FEATURE-REQUEST] Joining on more than one columns. Feb 24, 2022
@JovanVeljanoski
Copy link
Member

Officially not supported.
Unofficially - create a single key column on which to join by concatenating the multiple columns into one.

@ashsharma96
Copy link
Author

Hey @JovanVeljanoski
I tried but its giving abnormal behavior. I'm using this code:

df['merging_column']= df.tm_mid.astype(str) +  df.tm_sku.astype(str)
product['merging_column']= product.tm_mid.astype(str) + product.tm_sku.astype(str)
mergedStuff = df.join(product, on ='merging_column', how='left', rsuffix='_').drop(['merging_column','tm_mid_', 'tm_sku_', 
'tm_sid_', 'merging_column_'])

and its throwing this error:

[02/24/22 16:49:59] ERROR    error evaluating: brand at rows                [dataframe.py]
(file:///opt/conda/lib/python3.7/site-packages/vaex/dataframe.py):[4001](file:///opt/conda/lib/python3.7/site- 
packages/vaex/dataframe.py#4001)
                         14679100-14679105                                               
                         Traceback (most recent call last):                              
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataframe.py", line 3993, in table_part                   
                             values = dict(zip(column_names,                             
                         df.evaluate(column_names)))                                     
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataframe.py", line 2998, in evaluate                     
                             return                                                      
                         self._evaluate_implementation(expression,                       
                         i1=i1, i2=i2, out=out, selection=selection,                     
                         filtered=filtered, array_type=array_type,                       
                         parallel=parallel, chunk_size=chunk_size,                       
                         progress=progress)                                              
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataframe.py", line 6353, in                              
                         _evaluate_implementation                                        
                             df.map_reduce(assign, lambda *_: None,                      
                         expression_to_evaluate, progress=progress,                      
                         ignore_filter=False, selection=selection,                       
                         pre_filter=use_filter, info=True,                               
                         to_numpy=False, name="evaluate")                                
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataframe.py", line 429, in map_reduce                    
                             return self._delay(delay, task)                             
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataframe.py", line 1688, in _delay                       
                             self.execute()                                              
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataframe.py", line 412, in execute                       
                             self.executor.execute()                                     
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/execution.py", line 308, in execute                       
                             for _ in self.execute_generator():                          
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/execution.py", line 435, in                               
                         execute_generator                                               
                             cancel=lambda: self._cancel(run),                           
                         unpack=True, run=run, use_async=use_async)                      
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/multithreading.py", line 106, in map                      
                             iterator = super(ThreadPoolIndex,                           
                         self).map(wrapped, cancellable_iter())                          
                           File "/opt/conda/lib/python3.7/concurrent/fu                  
                         tures/_base.py", line 575, in map                               
                             fs = [self.submit(fn, *args) for args in                    
                         zip(*iterables)]                                                
                           File "/opt/conda/lib/python3.7/concurrent/fu                  
                         tures/_base.py", line 575, in <listcomp>                        
                             fs = [self.submit(fn, *args) for args in                    
                         zip(*iterables)]                                                
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/multithreading.py", line 92, in                           
                         cancellable_iter                                                
                             for value in chunk_iterator:                                
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataset.py", line 1167, in                                
                         chunk_iterator                                                  
                             yield from                                                  
                         self.original.chunk_iterator(columns,                           
                         chunk_size=chunk_size, reverse=reverse)                         
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/join.py", line 64, in chunk_iterator                      
                             yield from                                                  
                         self.original.chunk_iterator(*args, **kwargs)                   
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataset.py", line 1237, in                                
                         chunk_iterator                                                  
                             self.right.chunk_iterator(columns_right,                    
                         chunk_size, reverse=reverse)):                                  
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataset.py", line 1114, in                                
                         chunk_iterator                                                  
                             yield from                                                  
                         self._default_chunk_iterator(self._columns,                     
                         columns, chunk_size, reverse=reverse)                           
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataset.py", line 508, in                                 
                         _default_chunk_iterator                                         
                             yield i1, i2, reader()                                      
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataset.py", line 499, in reader                          
                             chunks = {k: array_map[k][i1:i2] for k in                   
                         columns}                                                        
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataset.py", line 499, in <dictcomp>                      
                             chunks = {k: array_map[k][i1:i2] for k in                   
                         columns}                                                        
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/column.py", line 366, in __getitem__                      
                             ar_unfiltered = ar_unfiltered[i1:i2+1]                      
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataset.py", line 581, in __getitem__                     
                             ds = self.ds.__getitem__(item)                              
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataset.py", line 448, in __getitem__                     
                             return self.slice(item.start or 0,                          
                         item.stop or self.row_count)                                    
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataset.py", line 1015, in slice                          
                             assert filter.sum() == expected_length                      
                         AssertionError                                                  
                                                                                         
                         During handling of the above exception,                         
                         another exception occurred:                                     
                                                                                         
                         Traceback (most recent call last):                              
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataframe.py", line 3998, in table_part                   
                             values[name] = df.evaluate(name)                            
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataframe.py", line 2998, in evaluate                     
                             return                                                      
                         self._evaluate_implementation(expression,                       
                         i1=i1, i2=i2, out=out, selection=selection,                     
                         filtered=filtered, array_type=array_type,                       
                         parallel=parallel, chunk_size=chunk_size,                       
                         progress=progress)                                              
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataframe.py", line 6353, in                              
                         _evaluate_implementation                                        
                             df.map_reduce(assign, lambda *_: None,                      
                         expression_to_evaluate, progress=progress,                      
                         ignore_filter=False, selection=selection,                       
                         pre_filter=use_filter, info=True,                               
                         to_numpy=False, name="evaluate")                                
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataframe.py", line 429, in map_reduce                    
                             return self._delay(delay, task)                             
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataframe.py", line 1688, in _delay                       
                             self.execute()                                              
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataframe.py", line 412, in execute                       
                             self.executor.execute()                                     
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/execution.py", line 308, in execute                       
                             for _ in self.execute_generator():                          
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/execution.py", line 435, in                               
                         execute_generator                                               
                             cancel=lambda: self._cancel(run),                           
                         unpack=True, run=run, use_async=use_async)                      
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/multithreading.py", line 106, in map                      
                             iterator = super(ThreadPoolIndex,                           
                         self).map(wrapped, cancellable_iter())                          
                           File "/opt/conda/lib/python3.7/concurrent/fu                  
                         tures/_base.py", line 575, in map                               
                             fs = [self.submit(fn, *args) for args in                    
                         zip(*iterables)]                                                
                           File "/opt/conda/lib/python3.7/concurrent/fu                  
                         tures/_base.py", line 575, in <listcomp>                        
                             fs = [self.submit(fn, *args) for args in                    
                         zip(*iterables)]                                                
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/multithreading.py", line 92, in                           
                         cancellable_iter                                                
                             for value in chunk_iterator:                                
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataset.py", line 1167, in                                
                         chunk_iterator                                                  
                             yield from                                                  
                         self.original.chunk_iterator(columns,                           
                         chunk_size=chunk_size, reverse=reverse)                         
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/join.py", line 64, in chunk_iterator                      
                             yield from                                                  
                         self.original.chunk_iterator(*args, **kwargs)                   
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataset.py", line 1237, in                                
                         chunk_iterator                                                  
                             self.right.chunk_iterator(columns_right,                    
                         chunk_size, reverse=reverse)):                                  
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataset.py", line 1114, in                                
                         chunk_iterator                                                  
                             yield from                                                  
                         self._default_chunk_iterator(self._columns,                     
                         columns, chunk_size, reverse=reverse)                           
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataset.py", line 508, in                                 
                         _default_chunk_iterator                                         
                             yield i1, i2, reader()                                      
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataset.py", line 499, in reader                          
                             chunks = {k: array_map[k][i1:i2] for k in                   
                         columns}                                                        
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataset.py", line 499, in <dictcomp>                      
                             chunks = {k: array_map[k][i1:i2] for k in                   
                         columns}                                                        
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/column.py", line 366, in __getitem__                      
                             ar_unfiltered = ar_unfiltered[i1:i2+1]                      
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataset.py", line 581, in __getitem__                     
                             ds = self.ds.__getitem__(item)                              
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataset.py", line 448, in __getitem__                     
                             return self.slice(item.start or 0,                          
                         item.stop or self.row_count)                                    
                           File "/opt/conda/lib/python3.7/site-packages                  
                         /vaex/dataset.py", line 1015, in slice                          
                             assert filter.sum() == expected_length                      
                         AssertionError

I'm attaching the part of the error but it was showing error for all the columns. Abnormal behavior is like sometime this code runs perfectly and most of the time it throws these errors. Also first time it throws error but if I re run the code at the same time then it works fine.
Any idea about this?

@JovanVeljanoski
Copy link
Member

Without a reproducible example, i really can't offer much advice. I would refer you to this response, it is perfectly applicable here also: #1729 (comment)

Some things to consider:

  • do you have duplicate values on the join key (left or right)
  • do you have missing/na/nan values in the join key (left or right)
  • check if left, right, inner join work (to help with debugging).
  • if you can, you can try to materialize (df.materialize) the key column to improve performance and iterate faster.

@JovanVeljanoski JovanVeljanoski added the needed: more information There is not enough information to reproduce the issue, or understand the problem label Feb 24, 2022
@ashsharma96
Copy link
Author

ashsharma96 commented Feb 28, 2022

@JovanVeljanoski Thanks for the immediate reply. Replying to what you have asked:

  1. I may have duplicate values on left but not on right.
  2. No, I don't have any na/nan values in the join key on left or right.
  3. All type of joins shows errors.
  4. materialize didn't helped me. Same error coming.

@root-11
Copy link
Contributor

root-11 commented Mar 1, 2022

Overview:

image

The LEFT dataset is 25M rows, the RIGHT is 10.5M.

Worst case would be nested for loop with 25M*10.5M= 262.5M rows. This would never fit into ram.

The typical workload is a LEFT join with 3-5% duplication and 20%-100% match on RIGHT.

In attempts to find a method that works for worst case and is fast enough for the common case I've settled with:

1/5. Create LEFT COMPOSITE KEY INDEX using pool.apply(index task, LEFT dataframe) as HDF5 in chunks of 500_000 rows (50 tasks)

2/5. Create RIGHT COMPOSITE KEY INDEX using pool.apply(index task, RIGHT dataframe, limited to entries in LEFT COMPOSITE KEY INDEX) as HDF5 in chunks of 500_000 rows (21 tasks).

3/5. Create a task queue with the 153 join tasks

4/5. Do the join in chunks (with 500_000 + 500_000 rows) for each processor and store each result as HDF5 in the working directory. As the left and right composite key index are stored, all procs can access them in read-only mode. There is no inter-process communication. The tasks can probably be vectorised. The coordination mechanism is the task queue that only declares the search index limits.

What about join skewness? Let's argue there are no matches in the RIGHT in index % 3 == 0. Whenever a processor seeks to perform join, it will read the left comp. key index and find no matches in the right comp. key index. The size of the output chunk is now zero. As IO probably will be the bottleneck, the processor can quickly move onto the next task, this means that the queue of tasks will be consumed by the frequency of matches. Thereby the processor load will balance itself.

5/5. Create a virtual dataset (HDF5) from all the chunks. This should take milliseconds.

Preview shortcut for vaex: Compute first 5 hits for grid (0,0) and first five hits for grid (-1,-1) in as single task CPU.

composite key reference implementations:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needed: more information There is not enough information to reproduce the issue, or understand the problem
Projects
None yet
Development

No branches or pull requests

3 participants