# Imports

In [1]:
import pandas as pd
from pretrainer import *
from fsspec.implementations import hdfs

import xgboost as xgb
from sklearn.feature_selection import GenericUnivariateSelect, chi2, f_classif
from sklearn.linear_model import LogisticRegression
import xgboost as xgb
from sklearn.model_selection import train_test_split
import pickle

import os
from datetime import date
import socket
import sys
from glob import glob

from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, LongType, ArrayType, DoubleType, StringType
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.functions import udf
from pyspark.sql import types as T
from datetime import date, timedelta
from pyspark.sql.types import LongType
import pyspark.sql.functions as func

In [2]:
pd.set_option('display.max_colwidth', None)

# Spark setup

In [2]:
os.environ["PYSPARK_PYTHON"] = "/usr/share/anaconda3/python3.7/bin/python"
queue = 'hddq-exprce-perso-high-mem'

spark = (
    SparkSession.builder.appName(os.environ['KRYLOV_PRINCIPAL'])
    .master("yarn")
    .config("spark.driver.maxResultSize", "10g")
    .config("spark.driver.host", socket.gethostbyname(socket.gethostname()))
    .config("spark.driver.port", "30202")
    .config("spark.executor.memoryOverhead", "8g") # In case a specific exectuer need extra memory, give this as extra.
    .config("spark.executor.cores", "3") # each worker has multiple exectuers, and each executer has multiple cores (=partitions), Dima suggested '8'
    .config("spark.driver.memory", "8g") # Dima set it to 24g, Nir = 8g
    .config("spark.executor.memory", "24g") # The regular RAM per exectuer (in contrast to memoryOverhead which is the extra). 
    .config("spark.rdd.compress", True)
    .config("spark.network.timeout", "600s")
    .config("spark.executor.heartbeatInterval", "300s")
    .config("spark.sql.broadcastTimeout", "1200s")
    .config("spark.dynamicAllocation.enabled", "true") # take and give executors whenever needed - based on the actual data
    .config("spark.dynamicAllocation.minExecutors", 20) # 0 is problematic, yarn will use a worker with nothing
#     .config("spark.dynamicAllocation.initialExecutors", 10) # it is not needed 
    .config("spark.dynamicAllocation.maxExecutors", 10000) # Nir suggested 200, as he said it is partitions/executer.cores (which equal to 3) - 512/3 < 200
    .config("spark.sql.shuffle.partitions", 512) # number of partitions after groupby/windowing, can be even 1000-2000 (per use-case)
    .config("spark.kryoserializer.buffer.max", "1g")
    .config("spark.yarn.queue", queue)
    .config("spark.speculation", False) # spark calcualte time per partition, when it is too long comparing to avg/median yarn will kill the partition, but the data is often skewed 
    .config("spark.rpc.message.maxSize", 1024)
    .config("spark.pyspark.python", "/usr/share/anaconda3/python3.7/bin/python")
    .enableHiveSupport()
    .getOrCreate()
)

# spark.sparkContext.addPyFile(pretrainer_hdfs_path)
spark.sparkContext.addPyFile('./pretrainer.py')
spark.sparkContext.setLogLevel("ERROR")
spark.sparkContext.setCheckpointDir("checkpoint/")
print("The executors' logs link:")
print(f'https://apollo-rno-rm-2.vip.hadoop.ebay.com:50030/proxy/{spark.sparkContext.applicationId}')

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/06/02 13:44:30 WARN HiveConf: DEPRECATED: hive.metastore.ds.retry.* no longer has any effect.  Use hive.hmshandler.retry.* instead
22/06/02 13:44:30 WARN HiveConf: HiveConf of name hive.metastore.local does not exist
22/06/02 13:44:30 WARN HiveConf: HiveConf of name hive.enforce.sorting does not exist
22/06/02 13:44:30 WARN HiveConf: HiveConf of name hive.server2.proxyuser.hue.groups does not exist
22/06/02 13:44:30 WARN HiveConf: HiveConf of name hive.server2.proxyuser.hue.hosts does not exist
22/06/02 13:44:30 WARN HiveConf: HiveConf of name hive.metastore.ds.retry.interval does not exist
22/06/02 13:44:30 WARN HiveConf: HiveConf of name hive.enforce.bucketing does not exist
22/06/02 13:44:30 WARN HiveConf: HiveConf of name hive.metastore.ds.retry.attempts does not exist
22/06/02 13:44:30 WARN HiveConf: HiveConf of name hive.server2.enable.impersonation

The executors' logs link:
https://apollo-rno-rm-2.vip.hadoop.ebay.com:50030/proxy/application_1653723268643_327601


In [4]:
# .config("spark.driver.extraJavaOptions", '-Dhttp.proxyHost=httpproxy.vip.ebay.com -Dhttp.proxyPort=80 -Dhttps.proxyHost=httpproxy.vip.ebay.com -Dhttps.proxyPort=80') \
# .config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.1.1") \

In [5]:
# spark.stop()

# Constants

#### Required constants

PersonalizedTopicsV2WithMLR 2021/06/01 - 2021/10/27
PersonalizedTopicsV2WithMultiCoviewWithPrice 2021/10/28 - 2021/12/19
PersonalizedTopicsV2WithMoreCoviewRecall 2021/12/20 - 2022/01/13

PersonalizedTopicsV2WithPurchaseSuppression  2022/01/14 - 2022/02/15
PersonalizedTopicsV2WithSimilarCategoriesPurchaseSuppression 2022/02/16 - 2022/03/25
PersonalizedTopicsV2WithMetaOrganicPRecall 2022/03/26 - 2022/04/16 
As they have different features and data need to combine them all together


In [3]:
algo_variants = {'PersonalizedTopicsV2WithMLR': pd.date_range(start='2021/06/01', end='2021/10/27'),
                 'PersonalizedTopicsV2WithMultiCoviewWithPrice': pd.date_range(start='2021/10/28', end='2021/12/19'),
                 'PersonalizedTopicsV2WithMoreCoviewRecall': pd.date_range(start='2021/12/20', end='2022/01/13'),
                 'PersonalizedTopicsV2WithPurchaseSuppression': pd.date_range(start='2022/01/14', end='2022/02/15'),
                 'PersonalizedTopicsV2WithSimilarCategoriesPurchaseSuppression': pd.date_range(start='2022/02/16', end='2022/03/25'),
                 'PersonalizedTopicsV2WithMetaOrganicPRecall': pd.date_range(start='2022/03/26', end='2022/05/22')
                }

In [25]:
date_to_variant = {}
for algo, dates in algo_variants.items():
    for d in dates:
        date_to_variant[d] = algo

In [26]:
date_to_variant

{Timestamp('2021-06-01 00:00:00', freq='D'): 'PersonalizedTopicsV2WithMLR',
 Timestamp('2021-06-02 00:00:00', freq='D'): 'PersonalizedTopicsV2WithMLR',
 Timestamp('2021-06-03 00:00:00', freq='D'): 'PersonalizedTopicsV2WithMLR',
 Timestamp('2021-06-04 00:00:00', freq='D'): 'PersonalizedTopicsV2WithMLR',
 Timestamp('2021-06-05 00:00:00', freq='D'): 'PersonalizedTopicsV2WithMLR',
 Timestamp('2021-06-06 00:00:00', freq='D'): 'PersonalizedTopicsV2WithMLR',
 Timestamp('2021-06-07 00:00:00', freq='D'): 'PersonalizedTopicsV2WithMLR',
 Timestamp('2021-06-08 00:00:00', freq='D'): 'PersonalizedTopicsV2WithMLR',
 Timestamp('2021-06-09 00:00:00', freq='D'): 'PersonalizedTopicsV2WithMLR',
 Timestamp('2021-06-10 00:00:00', freq='D'): 'PersonalizedTopicsV2WithMLR',
 Timestamp('2021-06-11 00:00:00', freq='D'): 'PersonalizedTopicsV2WithMLR',
 Timestamp('2021-06-12 00:00:00', freq='D'): 'PersonalizedTopicsV2WithMLR',
 Timestamp('2021-06-13 00:00:00', freq='D'): 'PersonalizedTopicsV2WithMLR',
 Timestamp('

In [27]:
dates

DatetimeIndex(['2022-03-26', '2022-03-27', '2022-03-28', '2022-03-29',
               '2022-03-30', '2022-03-31', '2022-04-01', '2022-04-02',
               '2022-04-03', '2022-04-04', '2022-04-05', '2022-04-06',
               '2022-04-07', '2022-04-08', '2022-04-09', '2022-04-10',
               '2022-04-11', '2022-04-12', '2022-04-13', '2022-04-14',
               '2022-04-15', '2022-04-16', '2022-04-17', '2022-04-18',
               '2022-04-19', '2022-04-20', '2022-04-21', '2022-04-22',
               '2022-04-23', '2022-04-24', '2022-04-25', '2022-04-26',
               '2022-04-27', '2022-04-28', '2022-04-29', '2022-04-30',
               '2022-05-01', '2022-05-02', '2022-05-03', '2022-05-04',
               '2022-05-05', '2022-05-06', '2022-05-07', '2022-05-08',
               '2022-05-09', '2022-05-10', '2022-05-11', '2022-05-12',
               '2022-05-13', '2022-05-14', '2022-05-15', '2022-05-16',
               '2022-05-17', '2022-05-18', '2022-05-19', '2022-05-20',
      

In [5]:
start_date = '2022-04-15'
end_date = '2022-05-15'


In [9]:
#mkdir /data/shpx/data/olivyatan/piyi/pa

In [10]:
#mkdir /data/shpx/data/olivyatan/piyi/pa/model_training

In [11]:
#mkdir /data/shpx/data/ozinman/my_files/piyi/refurb

In [12]:
#mkdir /data/shpx/data/ozinman/my_files/piyi/refurb/model_training

In [6]:
pretrainer_hdfs_base_path = '/apps/b_perso/hp/simplark/pretrainer'

output_hdfs_base_path = '/apps/b_perso/team/olivyatan/piyi/pa'
output_kry_base_path = '/data/shpx/data/olivyatan/piyi/pa/model_training/'

output_file_name_dataset = 'df_pa_2022_05_15_2022_04_15.parquet'
output_file_name_dataset_purchases = 'df_pa_purchases_2022_05_15_2022_04_15.parquet'

In [7]:
piyi_v5_cols = [
'BibowatchRelPosition',
'RecallSourceBullseye',
'TitleCosineSimilarityToShoppingcartCentroid',
'ItemTimeOnSiteV2',
'MaxViewedItemTitleJaccardBigrams',
'BullseyeAbsRVILeafCatMedianPriceDiffV2',
'ItemSalesOverImpPricePrior7DayDecayLogSmoothDomesticWebAndMobile',
'ItemVariantSalesOverImpressions7DayDecayLogSmoothDomesticWebAndMobileV2',
'FreqWatchPriceBellowItemPrice',
'MaxViewedItemTitleJaccard',
'ItemWatchesOverImp7DayDecayLogSmoothDomesticWebAndMobileV2',
'BullseyeRelRVILeafCatMedianPriceDiffV2',
'ItemSalesOverImpPricePrior7DayDecayLogSmoothInternationalWebAndMobileNorm',
'NumSameRviInLastWeek',
'MerchImpressionsDecayed',
'FreqSameItemInWatchBadge',
'RecallSourceTora',
'PriceDiffMedianRecall',
'PlImpressionsDecayed',
'PoissonNextEventProbSameItemInWatch',
'FreqSameLeafCatIdInWatchBadge',
'AvgSameLeafRviPriceRatio',
'RecallSourceBestMatch',
'LeafCatRVICondition',
'AvgSameLeafRviPriceDiff',
'BullseyeRVILeafCatMedianPriceV2',
'ItemSalesOverImpPricePrior7DayDecayLogSmoothDomesticWebAndMobileNorm',
'MaxSameLeafRvihPriceDiff',
'ItemConditionOrdinal',
'SameItemConditionInRvi',
'ItemConditionNorm'
]

feature_cols = piyi_v5_cols

# Fetching train data

## Explore

In [23]:
def load_npy_file(filepath):
    if filepath.endswith(".gz"):
        with gzip.GzipFile(filepath, 'rb') as f:
            return np.load(f)
    else:
        return np.load(filepath)

In [24]:
!ls /data/shpx/data/s_perso/b_perso/hp/simplark/pretrainer/PersonalizedTopicsV2WithSimilarCategoriesPurchaseSuppression/

20220216  20220221  20220226  20220303	20220308  20220313  20220318  20220323
20220217  20220222  20220227  20220304	20220309  20220314  20220319  20220324
20220218  20220223  20220228  20220305	20220310  20220315  20220320  20220325
20220219  20220224  20220301  20220306	20220311  20220316  20220321
20220220  20220225  20220302  20220307	20220312  20220317  20220322


# Explore:

In [13]:
# npy_files = glob("/data/shpx/data/s_perso/b_perso/hp/simplark/pretrainer/PersonalizedTopicsV2WithMoreCoviewRecall/*/**/*.npy.gz", recursive=True)
npy_files = glob("/data/shpx/data/s_perso/b_perso/hp/simplark/pretrainer/PersonalizedTopicsV2WithSimilarCategoriesPurchaseSuppression//*/**/*.npy.gz", recursive=True)
len(npy_files)
npy_data_array = np.concatenate([load_npy_file(f) for f in npy_files[:1]])

In [20]:
179852

179852

In [21]:
pd.Series(npy_data_array['siteId']).value_counts()

0      22946
3      15264
77      7969
101     5452
15      1985
71      1467
2       1200
186      868
210      120
dtype: int64

In [22]:
a = pd.DataFrame({"categ" : npy_data_array['category'],"l1Cat" : npy_data_array['l1Cat'], \
                  "itemId": npy_data_array['itemId'], "siteId" : npy_data_array['siteId']})

In [19]:
a.shape

(57271, 4)

In [20]:
a.columns

Index(['categ', 'l1Cat', 'itemId', 'siteId'], dtype='object')

In [21]:
a[a.l1Cat == b'0.131090']

Unnamed: 0,categ,l1Cat,itemId,siteId
80,b'82335',b'0.131090',313859253858,77
81,b'82335',b'0.131090',403442225478,77
82,b'82335',b'0.131090',403461658402,77
83,b'82335',b'0.131090',403439670138,77
84,b'82335',b'0.131090',133996649720,77
...,...,...,...,...
57226,b'33656',b'0.131090',393504509201,3
57227,b'33656',b'0.131090',165318680569,3
57228,b'33656|14769',b'0.131090',175064796074,3
57229,b'33656',b'0.131090',255121475748,3


In [22]:
npy_data_array['l1Cat'][:2]

array([b'0.11700', b'0.11700'], dtype='|S8')

In [23]:
npy_data_array.dtype.names

('meid',
 'algoVariant',
 'siteId',
 'seedId',
 'l1Cat',
 'l2Cat',
 'l3Cat',
 'category',
 'categoryPath',
 'itemId',
 'placementId',
 'seedProduct',
 'PLXModelScore',
 'labels',
 'weight',
 'counterWeight',
 'score',
 'mlrModelScore',
 'rank',
 'postExplorationRank',
 'preMfeRank',
 'impressionClicked',
 'clickPropensity',
 'features',
 'mlrRank',
 'adRate',
 'adRateWeight',
 'userId')

# Spark fetch

<font color=blue>Notice that Fetcher init will print each file it doesn't find

In [8]:
ft = Fetcher(pretrainer_hdfs_base_path, date_to_variant, start_date, end_date, hdfs.HadoopFileSystem(), num_workers=256)

  """Entry point for launching an IPython kernel.
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/apache/releases/hadoop-2.7.3.2.6.4.2.0.18/share/hadoop/common/lib/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/apache/releases/hadoop-2.7.3.2.6.4.2.0.18/share/hadoop/httpfs/tomcat/webapps/webhdfs/WEB-INF/lib/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/apache/releases/hadoop-2.7.3.2.6.4.2.0.18/share/hadoop/kms/tomcat/webapps/kms/WEB-INF/lib/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
22/06/02 13:46:33 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/06/02 13:46:33 WARN shortcircuit.Domai

In [11]:
ft.paths

['/apps/b_perso/hp/simplark/pretrainer/PersonalizedTopicsV2WithMetaOrganicPRecall/2022/04/15/data/part-00000-09cb5993-6d07-4d60-af94-83e4f48d6fee.npy.gz',
 '/apps/b_perso/hp/simplark/pretrainer/PersonalizedTopicsV2WithMetaOrganicPRecall/2022/04/15/data/part-00001-09cb5993-6d07-4d60-af94-83e4f48d6fee.npy.gz',
 '/apps/b_perso/hp/simplark/pretrainer/PersonalizedTopicsV2WithMetaOrganicPRecall/2022/04/15/data/part-00002-09cb5993-6d07-4d60-af94-83e4f48d6fee.npy.gz',
 '/apps/b_perso/hp/simplark/pretrainer/PersonalizedTopicsV2WithMetaOrganicPRecall/2022/04/15/data/part-00003-09cb5993-6d07-4d60-af94-83e4f48d6fee.npy.gz',
 '/apps/b_perso/hp/simplark/pretrainer/PersonalizedTopicsV2WithMetaOrganicPRecall/2022/04/15/data/part-00004-09cb5993-6d07-4d60-af94-83e4f48d6fee.npy.gz',
 '/apps/b_perso/hp/simplark/pretrainer/PersonalizedTopicsV2WithMetaOrganicPRecall/2022/04/15/data/part-00005-09cb5993-6d07-4d60-af94-83e4f48d6fee.npy.gz',
 '/apps/b_perso/hp/simplark/pretrainer/PersonalizedTopicsV2WithMetaOrg

In [13]:
len(ft.paths) / 50

31.0

In [9]:
df = ft.fetch_spark_df(spark, feature_col_prefix='f_', feature_cols = feature_cols)

                                                                                

In [27]:
#df.count() 

                                                                                

1184796029

# Get simplex Data for HB/PA

#### Simplex data from the vlp categories:
#### Hand bags:

In [None]:
vlp_hb_categories = [
    '169291',
    '52357'
    
]

df_vlp_hb = df \
    .withColumn("meid", F.col("meid").cast("string")) \
    .withColumn("category", F.col("category").cast("string")) \
    .filter(F.col("category").isin(vlp_hb_categories))

### Motors data

In [10]:
vlp_l1_categories=[b'0.131090', b'0.6000' ]
#cast l1 categ to string: 
df_vlp = df \
    .withColumn("meid", F.col("meid").cast("string")) \
    .filter(F.col("l1Cat").isin(vlp_l1_categories))

In [21]:
#df_vlp.count()


##### Filter for purchases:

In [11]:
vlp_purchased_meid = (df_vlp
                           .groupBy("meid")
                           .agg(func.sum("labelPurchase").alias("sum_purchase"))
                           .filter(F.col("sum_purchase") > 0)
                           .select("meid")
                           .dropDuplicates())

vlp_pa_purchased_meid = vlp_purchased_meid.select("meid").dropDuplicates()

df_vlp_purchased = df_vlp.join(vlp_pa_purchased_meid, on = ["meid"], how = "inner")

In [31]:
#vlp_pa_purchased_meid.count()

                                                                                

147724

In [32]:
#df_vlp_purchased.count() 
#1447170

                                                                                

1447170

##### Apply flters for motors data:

In [12]:
df_items = df_vlp_purchased\
.select("itemId", "siteId", "category", "l1Cat")\
.withColumnRenamed("itemId", "ITEM_ID")\
.withColumnRenamed("siteId", "SITE_ID")\
.withColumnRenamed("category", "LEAF_CATEG_ID").dropDuplicates()
#keep unique items only 

In [34]:
#df_items.count()
#1105440 unique items

                                                                                

1105440

In [13]:
#vlp_l1_categories=[131090, 6000 ]

from pyspark.sql import functions
from pyspark.sql.types import IntegerType
binary_to_int = functions.udf(lambda x: int(x, 2), IntegerType())

data = df_items.withColumn("LEAF_CATEG_ID", binary_to_int("LEAF_CATEG_ID"))

In [75]:
#data_cast.printSchema()

root
 |-- ITEM_ID: long (nullable = true)
 |-- SITE_ID: long (nullable = true)
 |-- LEAF_CATEG_ID: integer (nullable = true)
 |-- l1Cat: integer (nullable = true)



[Stage 19:=====>                                               (28 + 228) / 256]

#### Add site id in order to join with relevant categories:

In [14]:
auc_end_date = "2022-04-15"
df_items_lstg = spark.table("access_views.DW_LSTG_ITEM").alias('i') \
        .where( (F.col("AUCT_END_DT") >= auc_end_date))   \
        .join(data, "ITEM_ID")\
        .select(
        'ITEM_ID',
        'AUCT_END_DT',
        'i.LEAF_CATEG_ID',
        "l1Cat",
        'i.ITEM_SITE_ID').withColumnRenamed("ITEM_SITE_ID", "SITE_ID").withColumnRenamed("l1Cat", "L1CATEG")

In [None]:
#df_items_lstg.select("SITE_ID").distinct().show()

##### add categories of dw_category_g for filters:

In [15]:
condition= ( (F.col("MOVE_TO") == F.col("LEAF_CATEG_ID")) &  \
            ( ((F.col("META_CATEG_ID").isin(6000))  & (F.col("SITE_ID").isin(100)) ) | 
               
             ((F.col("META_CATEG_ID").isin(131090)) & (F.col("SITE_ID").isin(3,77,15))) ) )  
  

In [16]:
#motors site id is 100 
from pyspark.sql.functions import col

# Add LEAF_CATEG_ID from DW_LSTG_ITEM
#decimal and integer in join suppose to work - cast automaticly. 
# returns only DW_CATEGORY_GROUPINGS columns where joining with data data frame. - format issue.
df_items_categ = (
    spark.table("ACCESS_VIEWS.DW_CATEGORY_GROUPINGS").alias('c')
    .where(    condition   ) 
    .join(df_items_lstg.alias('i'), ["LEAF_CATEG_ID", "SITE_ID"])
    .select(  'ITEM_ID','AUCT_END_DT', "SITE_ID","L1CATEG", "LEAF_CATEG_ID","META_CATEG_ID", "LEAF_CATEG_NAME","META_CATEG_NAME" )
  
)


In [17]:
df_items_categ=df_items_categ.repartition(2000)

In [None]:
#df_items_categ.count()

In [18]:
df_pa = df_vlp_purchased.join(
    df_items_categ, df.itemId == df_items_categ.ITEM_ID , how="inner"
)

In [19]:
df_pa=df_pa.repartition(500)

In [20]:
df_pa_dedup=df_pa.dropDuplicates()

In [21]:
df_pa_dedup.write.parquet('/user/b_perso/olivyatan/simplex_data_pa', mode='overwrite')

22/06/02 13:49:38 ERROR Utils: Uncaught exception in thread Thread-7
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /user/b_perso/olivyatan/simplex_data_pa/.spark-staging-0/_LOCK (inode 33678766564): File does not exist. Holder DFSClient_NONMAPREDUCE_-155819839_17 does not have any open files.
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3863)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFileInternal(FSNamesystem.java:3951)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:3918)
	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.complete(NameNodeRpcServer.java:989)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.complete(ClientNamenodeProtocolServerSideTranslatorPB.java:605)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:731)
	at or

In [22]:
df_pa_dedup.count()
#test pa:252128 
#size: 1, 377, 661

                                                                                ]]]]

252128

In [28]:
df_pa_data=spark.read.parquet('/user/b_perso/olivyatan/simplex_data_pa')

                                                                                

In [29]:
df_pa_dedup_purchased = df_pa_data.repartition(50, "itemId").toPandas()

                                                                                

### Generate test for motors

In [30]:
#dedup data for test comparing to hp
df_pa_dedup_purchased.to_parquet('/data/shpx/data/olivyatan/pa_data_april_mai_22_dedup.parquet')

In [None]:
df_pa_dedup_purchased.shape

[Stage 1:>(9 + 247) / 256][Stage 2:(10 + 246) / 256][Stage 4:(9989 + 11) / 10000]]]

In [31]:
df_pa_dedup_data=pd.read_parquet('/data/shpx/data/olivyatan/pa_data_april_mai_22_dedup.parquet')

In [32]:
df_pa_dedup_data.shape

(252128, 76)

In [33]:
df_pa_dedup_data['meid'].nunique()

25959

In [6]:
#df_pa_dedup_data=pd.read_parquet('/data/shpx/data/olivyatan/pa_data_15.10_21_22_dedup.parquet')

In [7]:
df_pa_dedup_data.shape 

(1377661, 76)

In [19]:
df_pa_dedup_data.head(3)

Unnamed: 0,meid,algoVariant,siteId,seedId,l1Cat,l2Cat,l3Cat,category,categoryPath,itemId,...,f_MaxSameLeafRvihPriceDiff,f_FreqWatchPriceBellowItemPrice,ITEM_ID,AUCT_END_DT,SITE_ID,L1CATEG,LEAF_CATEG_ID,META_CATEG_ID,LEAF_CATEG_NAME,META_CATEG_NAME
0,75233dc351db403d9532751ffb3aa366,b'BEPERSONAL.PersonalizedTopicsV2WithMultiCovi...,3,0,b'0.131090',b'0.131090.6030',b'0.131090.6030.33637',b'52638',b'0.131090.6030.33637.52638',174782750032,...,-2.644,0.0,174782750032,2022-05-26,3,b'0.131090',52638,131090,Vehicle Parts & Accessories:Car Parts & Access...,Vehicle Parts & Accessories
1,9e774bb9d4ca4b82a7c7548245c72bb5,b'BEPERSONAL.PersonalizedTopicsV2WithMultiCovi...,77,0,b'0.131090',b'0.131090.64826',b'0.131090.64826.82100',b'82110|184226',b'0.131090.64826.82100.135876.82110',294067078678,...,4.283682,6.346194e-10,294067078678,2021-11-11,77,b'0.131090',82110,131090,Auto & Motorrad: Teile:Spezielle Fahrzeug-Teil...,Auto & Motorrad: Teile
2,e2e01e6d818e4d7d8fb897bf00c7e5d2,b'BEPERSONAL.PersonalizedTopicsV2WithMultiCovi...,0,0,b'0.6000',b'0.6000.6028',b'0.6000.6028.6030',b'33654|180120',b'0.6000.6028.6030.33637.33654',203539668828,...,-2.0,-1.0,203539668828,2022-05-28,100,b'0.6000',33654,6000,eBay Motors:Parts & Accessories:Car & Truck Pa...,eBay Motors


In [8]:
df_pa_dedup_data['labelPurchase'].value_counts()

0    1237151
1     140510
Name: labelPurchase, dtype: int64

In [9]:
140510/(1237151+140510)

0.10199170913599209

In [11]:
df_pa_dedup_data["meid"].nunique()

141830

In [14]:
1377661/  141830

9.713466826482408

In [16]:
sub_meids=df_pa_dedup_data.groupby(["SITE_ID", "META_CATEG_NAME"])["meid"].nunique()\
.sort_values(ascending=False).to_frame()
sub_meids['%meid']=(sub_meids['meid']/sub_meids['meid'].sum())*100
sub_meids

Unnamed: 0_level_0,Unnamed: 1_level_0,meid,%meid
SITE_ID,META_CATEG_NAME,Unnamed: 2_level_1,Unnamed: 3_level_1
100,eBay Motors,78934,47.187881
3,Vehicle Parts & Accessories,46655,27.89103
77,Auto & Motorrad: Teile,26292,15.717736
15,Vehicle Parts & Accessories,15395,9.203353


In [15]:
df_pa_dedup_data \
    .groupby('meid') \
    .size() \
    .to_frame("meid_count") \
    .groupby('meid_count') \
    .size() / df_pa_dedup_data.meid.drop_duplicates().shape[0]

meid_count
1     0.004407
2     0.003264
3     0.003427
4     0.012973
5     0.013333
6     0.012571
7     0.013671
8     0.017324
9     0.042431
10    0.794197
11    0.061087
12    0.014242
13    0.004153
14    0.001650
15    0.000613
16    0.000338
17    0.000212
18    0.000071
19    0.000014
20    0.000021
dtype: float64

In [64]:
!hdfs dfs -ls -h /user/b_perso/olivyatan

Found 2 items
drwxr-xr-x   - b_perso hdmi-mm          0 2022-04-14 14:49 /user/b_perso/olivyatan/piyi_sneakers
drwxr-xr-x   - b_perso hdmi-mm          0 2022-05-17 08:37 /user/b_perso/olivyatan/simplex_data_pa


In [None]:
df_items_lstg.count() 

In [45]:
df_pa.count()

                                                                                ]]]

37420660

In [None]:
df_pa_data=spark.read.parquet('/user/b_perso/olivyatan/simplex_data_pa')

In [46]:
df_pa.repartition(50, "itemId").write.parquet('/user/b_perso/olivyatan/simplex_data_pa', mode='overwrite')

22/05/17 18:05:15 ERROR Utils: Uncaught exception in thread Thread-7
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /user/b_perso/olivyatan/simplex_data_pa/.spark-staging-0/_LOCK (inode 33007439009): File does not exist. Holder DFSClient_NONMAPREDUCE_1323127984_17 does not have any open files.
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3810)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFileInternal(FSNamesystem.java:3898)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:3865)
	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.complete(NameNodeRpcServer.java:989)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.complete(ClientNamenodeProtocolServerSideTranslatorPB.java:599)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:731)
	at or

In [4]:
df_pa_data=spark.read.parquet('/user/b_perso/olivyatan/simplex_data_pa')

                                                                                

In [None]:
sample:

In [47]:
meid_list = df_pa_data.select('meid').distinct()

In [49]:
meid_list.show

<bound method DataFrame.show of DataFrame[meid: binary]>

In [48]:
df_pa_data.count()

                                                                                

37420660

### Sample Motors purchased data 
#### as pa data is of 37M, sampled randomly 15%

In [5]:
vlp_purchased_meid = (df_pa_data
                           .groupBy("meid")
                           .agg(func.sum("labelPurchase").alias("sum_purchase"))
                           .filter(F.col("sum_purchase") > 0)
                           .select("meid")
                           .dropDuplicates())
https://45.aihub.krylov.vip.ebay.com/workspace/olivyatan/shpx/simplex_pa/lab/tree/Fetcher_multiple_variations/fetch_data_pa_variations.ipynb#as-pa-data-is-of-37M,-sampled-randomly-15%


In [6]:
vlp_purchased_meid.count()

                                                                                

139527

In [17]:
20000/139527

0.14334143212424835

In [13]:
pa_purchased_meid=vlp_purchased_meid.limit(20000)

In [14]:
pa_purchased_meid.count()

                                                                                

20000

In [49]:
vlp_purchased_meid_sample=vlp_purchased_meid.sample(withReplacement=False, fraction=0.2)
#0.1433

In [50]:
vlp_purchased_meid_sample.count()

                                                                                

28052

In [34]:
type(vlp_purchased_meid_sample)

pyspark.sql.dataframe.DataFrame

In [51]:
df_pa_sample_purchased = df_pa_data.join(vlp_purchased_meid_sample, on = ["meid"], how = "inner")

In [52]:
df_pa_sample_purchased.count()

                                                                                

281025

In [54]:
df_pa_smpl_purchased = df_pa_sample_purchased.repartition(50, "itemId").toPandas()

                                                                                

In [55]:
df_pa_smpl_purchased.shape

(280661, 76)

In [56]:
df_pa_smpl_purchased.to_parquet('/data/shpx/data/olivyatan/pa_data_15.10_21_22_sample.parquet')

In [57]:
df_pa_smpl_purchased=pd.read_parquet('/data/shpx/data/olivyatan/pa_data_15.10_21_22_sample.parquet')

In [58]:
df_pa_smpl_purchased.head(2)

Unnamed: 0,meid,algoVariant,siteId,seedId,l1Cat,l2Cat,l3Cat,category,categoryPath,itemId,...,f_MaxSameLeafRvihPriceDiff,f_FreqWatchPriceBellowItemPrice,ITEM_ID,AUCT_END_DT,SITE_ID,L1CATEG,LEAF_CATEG_ID,META_CATEG_ID,LEAF_CATEG_NAME,META_CATEG_NAME
0,b'01fa127a85e84ddc82d2833863c928e2',b'BEPERSONAL.PersonalizedTopicsV2WithMultiCovi...,77,0,b'0.131090',b'0.131090.34998',b'0.131090.34998.179469',b'179471',b'0.131090.34998.179469.179471',304205202609,...,-1.0,1.323628e-09,304205202609,2022-02-16,77,b'0.131090',179471,131090,Auto & Motorrad: Teile:Werkzeuge:Batterieladeg...,Auto & Motorrad: Teile
1,b'346b749f3784465792736e8630150b6d',b'BEPERSONAL.PersonalizedTopicsV2WithMLR.Perso...,0,0,b'0.6000',b'0.6000.6028',b'0.6000.6028.6030',b'33633',b'0.6000.6028.6030.33605.33633',264941937599,...,-1.0,1.16544e-08,264941937599,2021-11-23,100,b'0.6000',33633,6000,eBay Motors:Parts & Accessories:Car & Truck Pa...,eBay Motors


In [59]:
df_pa_smpl_purchased['meid'].nunique()

28052

In [60]:
sub_meids=df_pa_smpl_purchased.groupby(["SITE_ID", "META_CATEG_NAME"])["meid"].nunique()\
.sort_values(ascending=False).to_frame()
sub_meids['%meid']=(sub_meids['meid']/sub_meids['meid'].sum())*100
sub_meids

In [62]:
sub_meids

Unnamed: 0_level_0,Unnamed: 1_level_0,meid,%meid
SITE_ID,META_CATEG_NAME,Unnamed: 2_level_1,Unnamed: 3_level_1
100,eBay Motors,15714,47.671632
3,Vehicle Parts & Accessories,9132,27.703789
77,Auto & Motorrad: Teile,5043,15.298972
15,Vehicle Parts & Accessories,3074,9.325607


In [63]:
df_pa_smpl_purchased \
    .groupby('meid') \
    .size() \
    .to_frame("meid_count") \
    .groupby('meid_count') \
    .size() / df_pa_smpl_purchased.meid.drop_duplicates().shape[0]

meid_count
1     0.000784
2     0.000570
3     0.000784
4     0.009875
5     0.010908
6     0.010587
7     0.010587
8     0.014117
9     0.032689
10    0.742229
11    0.110117
12    0.033545
13    0.012156
14    0.005704
15    0.002103
16    0.001212
17    0.001105
18    0.000392
19    0.000178
20    0.000143
21    0.000107
24    0.000071
27    0.000036
dtype: float64

In [None]:
 #df_vlp_cached.sort("meid").limit(100000).toPandas()

In [11]:
pdf_pa = df_pa_data.sort("meid").limit(100000).toPandas()

                                                                                

In [11]:
pdf_pa = df_pa_data.sort("meid").limit(100000).toPandas()

                                                                                

In [11]:
pdf_pa = df_pa_data.sort("meid").limit(100000).toPandas()

                                                                                

In [7]:
37420660/139527

268.19654977172877

In [None]:
pdf_pa \
    .groupby('meid') \
    .size() \
    .to_frame("meid_count") \
    .groupby('meid_count') \
    .size() / pdf_pa.meid.drop_duplicates().shape[0]

In [12]:
pdf_pa.to_parquet('/data/shpx/data/olivyatan/pa_data_15.10_21_22.parquet')
#wrriten to file system of krylov as one parquet file, spark writes file per partition. 

In [9]:
pdf_pa=pd.read_parquet('/data/shpx/data/olivyatan/pa_data_15.10_21_22.parquet')

In [10]:

pdf_pa.shape

(100000, 76)

In [41]:
pdf_pa.iloc[2,0:4].head(3)

meid                         b'000000b7833b4550accc8840eacd7765'
algoVariant    b'BEPERSONAL.PersonalizedTopicsV2WithMultiCovi...
siteId                                                        77
Name: 2, dtype: object

In [None]:
convert all byte to in

In [42]:
pdf_pa['meid'][1]

b'000000b7833b4550accc8840eacd7765'

In [None]:
str_df = pdf_pa.select_dtypes([np.object])
str_df = str_df.stack().str.decode('utf-8').unstack()
for col in str_df:
    print(str_df[col])

In [38]:
pdf_pa['meid'][1]

nan

In [11]:
pdf_pa \
    .groupby('meid') \
    .size() \
    .to_frame("meid_count") \
    .groupby('meid_count') \
    .size() / pdf_pa.meid.drop_duplicates().shape[0]

meid_count
1     0.220056
2     0.169460
3     0.134829
4     0.114496
5     0.094917
6     0.076648
7     0.062828
8     0.045949
9     0.032367
10    0.030977
11    0.009452
12    0.004210
13    0.002105
14    0.001112
15    0.000318
16    0.000119
17    0.000040
18    0.000040
19    0.000040
20    0.000040
dtype: float64

In [32]:
for i in pdf_vlp.sample(10).itemId.values:
    print(f'https://www.ebay.com/itm/{i}')

https://www.ebay.com/itm/144263164496
https://www.ebay.com/itm/114959183016
https://www.ebay.com/itm/353880903376
https://www.ebay.com/itm/154809110164
https://www.ebay.com/itm/304191762210
https://www.ebay.com/itm/175156536619
https://www.ebay.com/itm/124558931121
https://www.ebay.com/itm/255196211621
https://www.ebay.com/itm/393428243485
https://www.ebay.com/itm/334274351463


#### Keep only meid w/ purchases

In [53]:
vlp_purchased_meid = (df_vlp_cached
                           .groupBy("meid")
                           .agg(func.sum("labelPurchase").alias("sum_purchase"))
                           .filter(F.col("sum_purchase") > 0)
                           .select("meid")
                           .dropDuplicates())

df_vlp_purchased = df_vlp_cached.join(vlp_purchased_meid, on = ["meid"], how = "inner")

In [54]:
df_vlp_purchased.coalesce(1).write.parquet(f"{output_hdfs_base_path}/{output_file_name_dataset_purchases}")

22/05/16 16:09:34 ERROR AsyncEventQueue: Listener EventLoggingListener threw an exception
java.util.ConcurrentModificationException
	at java.util.Hashtable$Enumerator.next(Hashtable.java:1387)
	at scala.collection.convert.Wrappers$JPropertiesWrapper$$anon$6.next(Wrappers.scala:424)
	at scala.collection.convert.Wrappers$JPropertiesWrapper$$anon$6.next(Wrappers.scala:420)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.mutable.MapLike.toSeq(MapLike.scala:75)
	at scala.collection.mutable.MapLike.toSeq$(MapLike.scala:72)
	at scala.collection.mutable.AbstractMap.toSeq(Map.scala:82)
	at org.apache.spark.scheduler.EventLoggingListener.redactPrope

In [55]:
df_vlp_purchased_cached = spark.read.parquet(f"{output_hdfs_base_path}/{output_file_name_dataset_purchases}")

                                                                                

In [56]:
df_vlp_purchased_cached.count()

                                                                                

0

In [57]:
pdf_vlp_purchased = df_vlp_purchased_cached.toPandas()

                                                                                

In [58]:
pdf_vlp_purchased.to_parquet(f'{output_kry_base_path}/{output_file_name_dataset_purchases}')

In [59]:
pdf_vlp_purchased.labelPurchase.value_counts()

Series([], Name: labelPurchase, dtype: int64)