In [1]:
import modin

In [2]:
modin.__version__

'0.15.2'

In [3]:
import modin.pandas as pd
import pandas
from modin.distributed.dataframe.pandas import unwrap_partitions, from_partitions
import haystack
from haystack.document_stores import ElasticsearchDocumentStore
from haystack.nodes import BM25Retriever
import tqdm
import ray
from modin.experimental.batch import PandasQueryPipeline
import numpy as np
import glob
import concurrent.futures
import multiprocessing
import requests

In [4]:
#ray.init(log_to_driver=False)

In [5]:
!ls ../data

Posts.xml     df_raw	      stackoverflow.com-Posts.7z
df_processed  df_raw.parquet  start.sh


In [6]:
ELASTIC_HOST="np-database.c.np-training.internal"
ELASTIC_HOST="localhost"
ELASTIC_INDEX="stackoverflow"
ELASTIC_PORT=9200

In [8]:
requests.put(f"http://{ELASTIC_HOST}:{ELASTIC_PORT}/_template/index_defaults", json = 
    {
      "index_patterns": "*", 
      "settings": {
        "number_of_shards": 20
      }
    } 
).json()





{'acknowledged': True}

In [18]:
requests.delete(f"http://{ELASTIC_HOST}:{ELASTIC_PORT}/{ELASTIC_INDEX}").json()

{'acknowledged': True}

In [10]:
# requests.put(f"http://{ELASTIC_HOST}:{ELASTIC_PORT}/{ELASTIC_INDEX}", json = 
#     {
#         "settings": {
#             "number_of_shards": 25,
#             "number_of_replicas": 2
#       }
#     } 
#     ) .json()





In [19]:
requests.get(f"http://{ELASTIC_HOST}:{ELASTIC_PORT}/_all/_settings").json()

{'label': {'settings': {'index': {'routing': {'allocation': {'include': {'_tier_preference': 'data_content'}}},
    'number_of_shards': '1',
    'provided_name': 'label',
    'creation_date': '1659433835030',
    'number_of_replicas': '1',
    'uuid': '6cnzMQhoSbuJys0jMoyqsQ',
    'version': {'created': '8030399'}}}}}

In [12]:
files = glob.glob("../data/df_processed/*.parquet")
files

['../data/df_processed/part-0004.snappy.parquet',
 '../data/df_processed/part-0011.snappy.parquet',
 '../data/df_processed/part-0015.snappy.parquet',
 '../data/df_processed/part-0021.snappy.parquet',
 '../data/df_processed/part-0024.snappy.parquet',
 '../data/df_processed/part-0001.snappy.parquet',
 '../data/df_processed/part-0012.snappy.parquet',
 '../data/df_processed/part-0010.snappy.parquet',
 '../data/df_processed/part-0018.snappy.parquet',
 '../data/df_processed/part-0008.snappy.parquet',
 '../data/df_processed/part-0028.snappy.parquet',
 '../data/df_processed/part-0009.snappy.parquet',
 '../data/df_processed/part-0006.snappy.parquet',
 '../data/df_processed/part-0022.snappy.parquet',
 '../data/df_processed/part-0016.snappy.parquet',
 '../data/df_processed/part-0003.snappy.parquet',
 '../data/df_processed/part-0013.snappy.parquet',
 '../data/df_processed/part-0027.snappy.parquet',
 '../data/df_processed/part-0014.snappy.parquet',
 '../data/df_processed/part-0025.snappy.parquet',


In [None]:
ds = ray.data.read_parquet("../data/df_processed")


In [None]:
ds

In [None]:
ds.take(1)

In [None]:
df = pd.read_parquet("../data/df_processed")

In [None]:
df.head()

In [None]:
#df.shape

In [None]:
#df.head(5)

In [16]:
def convert_to_document_dict(row):
    d = row.to_dict()
    #d['content']=d['Title'] + "\n" + d['Body']
    
    d['content']=d['Title']
    d['id']= d['Id']
    d['Tags'] = d['Tags'].split(",")
    del d['Id']
    del d['Body']
    del d['AnswerBody']
    d_doc = haystack.schema.Document.from_dict(d)
    
    return d_doc


def fetch_results(query:str, retriever, top_k=10):
    candidate_documents = retriever.retrieve(
        query=query,
        top_k=10,
    #filters={"year": ["2015", "2016", "2017"]}
    )

    for doc in candidate_documents:
        print (doc.meta['Title'], doc.score, doc.id )
        
        
def index_df(df, document_store):
    docs = df.apply(convert_to_document_dict, axis=1).to_numpy().flatten().tolist()
    
    document_store.write_documents(docs, batch_size=1_000)
    
def index_individual_df(df, document_store):
    
    docs = df.apply(convert_to_document_dict, axis=1)\
            .apply(lambda x: document_store.write_documents ( [doc]))
    
    
    
def get_document_store():
    
    document_store = ElasticsearchDocumentStore(host=ELASTIC_HOST
                                            , index=ELASTIC_INDEX, port=ELASTIC_PORT)
    
    return document_store


def transform_batch(df: pd.DataFrame) -> pd.DataFrame:
    
    document_store = get_document_store()
    index_df(df,document_store)
    
    return df


def transform_batch2(df: pd.DataFrame) -> pd.DataFrame:
    
    document_store = ElasticsearchDocumentStore(host=ELASTIC_HOST
                                            , index=ELASTIC_INDEX, port=ELASTIC_PORT)
    
    
    index_df(df,document_store)
    
    return df



def transform_file_batch(path: str) -> pd.DataFrame:
    
    df = pandas.read_parquet(path)
    document_store = get_document_store()
    
    
    index_df(df,document_store)
    
    return len(df)



In [17]:
cores = multiprocessing.cpu_count()-2

In [20]:
# max_workers=10
with concurrent.futures.ProcessPoolExecutor(max_workers = cores) as executor:
    for file_path, num_records in zip(files, executor.map(transform_file_batch, files)):
        print(f"path:{file_path}; records:{num_records}")



path:../data/df_processed/part-0004.snappy.parquet; records:362133
path:../data/df_processed/part-0011.snappy.parquet; records:362133
path:../data/df_processed/part-0015.snappy.parquet; records:362133
path:../data/df_processed/part-0021.snappy.parquet; records:362133
path:../data/df_processed/part-0024.snappy.parquet; records:362133
path:../data/df_processed/part-0001.snappy.parquet; records:362133
path:../data/df_processed/part-0012.snappy.parquet; records:362133
path:../data/df_processed/part-0010.snappy.parquet; records:362133
path:../data/df_processed/part-0018.snappy.parquet; records:362133
path:../data/df_processed/part-0008.snappy.parquet; records:362133
path:../data/df_processed/part-0028.snappy.parquet; records:362133
path:../data/df_processed/part-0009.snappy.parquet; records:362133
path:../data/df_processed/part-0006.snappy.parquet; records:362133
path:../data/df_processed/part-0022.snappy.parquet; records:362133
path:../data/df_processed/part-0016.snappy.parquet; records:36

In [None]:
#curl http://localhost:9200/_cat/indices?v

In [None]:
!ls

In [None]:
df.head()

In [None]:
pipeline = PandasQueryPipeline(df) 
pipeline.add_query(transform_batch, is_output=True)
result_dfs = pipeline.compute_batch() # Begin batch processing.

In [None]:
!ls

In [None]:
result_dfs

In [None]:
ds.

In [None]:
ds.limit(1000).map_batches(transform_batch)

In [None]:
transformed_ds = ds.map_batches(transform_batch)


In [None]:
transformed_ds.show()


In [None]:
#convert_to_document_dict ( df.iloc[0] )

In [None]:
#df.head(5).apply(convert_to_document_dict, axis=1)

In [None]:
document_store.delete_documents()

In [None]:
def apply_func(df):
    print ( len (df))



In [None]:
df.apply(lambda s: apply_func(s.to_frame()), axis=1)

In [None]:
@ray.remote
def index_df_with_remote(partition):
    df_subset = from_partitions( [ partitions ] ,axis=0)
      
    #docs = df_subset.apply(convert_to_document_dict, axis=1).to_numpy().flatten().tolist()
    
    #return  len (docs)
     
    return 5

In [None]:
partitions = unwrap_partitions(df, axis=0, get_ip=True)


In [None]:
futures = [index_df_with_remote.remote(partition) for partition in partitions]


In [None]:
print(ray.get(futures)) # [0, 1, 4, 9]


In [None]:
?document_store.write_documents

In [None]:
index_df(df, document_store)

In [None]:
# index_individual_df(df, document_store)

In [None]:
!ls

In [None]:
#partitions = unwrap_partitions(df.head(500))



In [None]:

# partitions = unwrap_partitions(df, axis=0, get_ip=True)

# for i in tqdm.tqdm (range(len (partitions))):
    
#     df_subset = from_partitions(partitions[i:i+1],axis=0)
    
#     docs = df_subset.apply(convert_to_document_dict, axis=1).to_numpy().flatten().tolist()
    
#     document_store.write_documents(docs)

In [None]:
document_store.write_documents(docs)

In [None]:
document_store.get_document_count()

In [None]:
fetch_results(query="encrypted URL", retriever=retriever)