In [27]:
%load_ext autoreload
%autoreload 2
%matplotlib inline

from constants import *

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [13]:
from helpers.solr_client import *
from helpers.fusion_client import *

solr = LTRSolrClient(COLLECTION_NAME)
signals_client = SignalsSolrClient(SIGNALS_COLLECTION_NAME, base_fq=SIGNALS_BASE_FQ)
fusion = FusionAPIClient('http://localhost:8764/api/apollo/apps/{}'.format(FUSION_APP_NAME))

In [10]:
print "# products:", solr.num_hits("*:*")
print "# click signals:", signals_client.num_hits("*:*")

# products: 1275077
# click signals: 3273140


# Run Feature Extraction
* Grab the N most recent signals from Solr. These serve as input queries to the Solr LTR feature extraction process.
* Extract features for each row (per https://lucene.apache.org/solr/guide/7_0/learning-to-rank.html#extracting-features)
* Save results as CSV to disk
* Use Spark to parallelize, otherwise this will take forever

In [None]:
# Needed to ship dependent modules to spark cluster
from helpers.spark_utils import spark_add_py_module
import helpers

spark_add_py_module(spark, helpers)

In [14]:
from collections import Counter

def do_extract_features(tup):
    query_id, query, doc_id, solr_date = tup
    dt = solr_date_to_datetime(solr_date)
    
    results = fusion.search_query_pipeline(FUSION_QUERY_PIPELINE_NAME, COLLECTION_NAME, query, 
                                           rows=STAGE_A_TOP_N)
    
    feature_dictionary = None
    for doc in results['response']['docs']:
        feature_names, values = zip(
            *[featval.split('=') for featval in doc['[features]'].split(',')])
        vector = map(float, values)
        if not feature_dictionary:
            feature_dictionary = feature_names
        else:
            assert feature_dictionary == feature_names

        sample = [
            query_id,
            datetime_to_epoch_seconds(dt),
            query,
            str(doc['id']),
            int(str(doc['id']) == str(doc_id)),
        ]
        sample.extend(vector)

        yield sample
            
def tab_delimit(data):
    # ugly
    as_str = []
    for x in data:
        val = x
        if isinstance(x, unicode):
            val = x.encode('utf8')
        else:
            val = str(x)
        as_str.append(val)
    
    return "\t".join(as_str)

def get_feature_names():
    feature_names = [feature['name'] for feature in solr.get_features(FEATURE_STORE_NAME)]
    return feature_names

def save_feature_names(path):
    feature_names = get_feature_names()
    with open(os.path.join(path, 'feature_names.txt'), 'w') as f:
        f.write(json.dumps(feature_names))

## Sample of Feature Extracton output for a single query

In [26]:
import seaborn as sns

def get_signal_query_object(query_text):
    for row in signals_client.get_most_recent_signals(10000, 
                                                      q=query_text,
                                                      fq="*:*"):
        return row
    return None


def get_single_query_results(query):
    """
    For testing only
    """
    row = get_signal_query_object(query)
    
    samples = do_extract_features((
        row[signals_client.schema.query_id_field_name], 
        row[signals_client.schema.query_field_name], 
        row[signals_client.schema.doc_id_field_name], 
        row[signals_client.schema.date_field_name]))
    
    df = pd.DataFrame([sample for sample in samples])    
    cols = ['query_id', 'date', 'query', 'doc_id', 'target']
    feature_names = get_feature_names()
    cols.extend(feature_names)
    df.columns = cols
    
    return df

df = get_single_query_results('ipad 2')
cm = sns.light_palette("green", as_cmap=True)
df.head(25).style.background_gradient(cmap=cm)

Unnamed: 0,query_id,date,query,doc_id,target,tfidf_text,tfidf_name,tfidf_longDescription,tfidf_shortDescription,tfidf_features_t,tfidf_categoryNames_t,tfidf_previous_click_queries,salesRankLongTerm,salesRankMediumTerm,salesRankShortTerm,customerReviewAverage,customerReviewCount,regularPrice,salePrice
0,5d9aa133-7fc1-461b-aca7-3be8c56551d0,1320060000.0,ipad 2,1945531,0,8.41338,8.41338,9.77511,0.0,5.21184,9.12001,66645.6,38,38,94,4.6,653,399.99,399.99
1,5d9aa133-7fc1-461b-aca7-3be8c56551d0,1320060000.0,ipad 2,2339322,1,8.41338,8.41338,9.77511,0.0,5.21184,9.12001,17548.1,44,55,110,4.7,451,399.99,399.99
2,5d9aa133-7fc1-461b-aca7-3be8c56551d0,1320060000.0,ipad 2,1945595,0,8.41338,8.41338,9.77511,0.0,5.21184,9.12001,17284.6,48524,29837,0,4.7,324,499.99,449.99
3,5d9aa133-7fc1-461b-aca7-3be8c56551d0,1320060000.0,ipad 2,1945674,0,8.41338,8.41338,9.77511,0.0,5.21184,9.12001,7517.14,7906,4686,9305,4.6,200,599.99,499.99
4,5d9aa133-7fc1-461b-aca7-3be8c56551d0,1320060000.0,ipad 2,2339386,0,8.41338,8.41338,9.77511,0.0,5.21184,9.12001,7014.7,44881,32287,32596,4.7,164,499.99,449.99
5,5d9aa133-7fc1-461b-aca7-3be8c56551d0,1320060000.0,ipad 2,1918265,0,7.09755,7.09755,9.77511,0.0,5.02115,9.12001,4133.0,14702,8194,7275,4.8,60,729.99,599.99
6,5d9aa133-7fc1-461b-aca7-3be8c56551d0,1320060000.0,ipad 2,1918159,0,7.09755,7.09755,9.77511,0.0,5.02115,9.12001,3872.18,3666,2901,3112,4.4,49,529.99,529.99
7,5d9aa133-7fc1-461b-aca7-3be8c56551d0,1320060000.0,ipad 2,2408224,0,8.96756,8.96756,9.37046,7.45163,6.69544,9.26712,6222.09,8444,6066,7290,3.3,310,29.99,29.99
8,5d9aa133-7fc1-461b-aca7-3be8c56551d0,1320060000.0,ipad 2,1918229,0,7.09755,7.09755,9.77511,0.0,5.02115,9.12001,3176.95,27648,20133,22824,4.8,62,629.99,529.99
9,5d9aa133-7fc1-461b-aca7-3be8c56551d0,1320060000.0,ipad 2,2701307,0,7.92372,7.92372,8.40326,5.363,5.91783,0.0,3893.93,226768,0,0,4.0,11,80.99,80.99


# Extract Features

In [23]:
import shutil

try:
    shutil.rmtree(FEATURE_VECTORS_PATH)
except:
    pass

signals_to_vectorize = (
    (row[signals_client.schema.query_id_field_name], 
     row[signals_client.schema.query_field_name], 
     row[signals_client.schema.doc_id_field_name], 
     row[signals_client.schema.date_field_name])
    for row in signals_client.get_most_recent_signals(SIGNALS_FEATURE_EXTRACTION_LIMIT, 
                                                      fq="*:*", 
                                                      dedup=True)
)

(
    spark.sparkContext
    .parallelize(signals_to_vectorize)
    .flatMap(do_extract_features)
    .map(tab_delimit)
    .saveAsTextFile(FEATURE_VECTORS_PATH)
)        

save_feature_names(FEATURE_VECTORS_PATH)

Fetched 500 signal rows from Solr
Fetched 1000 signal rows from Solr
Fetched 1500 signal rows from Solr
Fetched 2000 signal rows from Solr
Fetched 2500 signal rows from Solr
Fetched 3000 signal rows from Solr
Fetched 3500 signal rows from Solr
Fetched 4000 signal rows from Solr
Fetched 4500 signal rows from Solr
Fetched 5000 signal rows from Solr
