In [1]:
from pyspark.ml.pipeline import  Transformer,Estimator
from pyspark.ml.param.shared import HasInputCol,HasOutputCol,HasInputCols,HasOutputCols
from pyspark import keyword_only
from pyspark.sql.functions import udf
from pyspark.ml import Pipeline
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import Row
from pyspark.ml.feature import Imputer
from pyspark.sql import SparkSession
from pyspark.sql import *

class LabelEncode(Transformer,HasInputCol,HasOutputCol,HasInputCols,HasOutputCols):
    @keyword_only
    def __init__(self,outputCols=None):
        super(LabelEncode,self).__init__()
        kwargs=self._input_kwargs
        self.setParams(**kwargs)
    @keyword_only
    def setParams(self,inputCol=None,outputCols=None):
        kwargs=self._input_kwargs
        return self._set(**kwargs)
    #def xv(self):
        #print('xv')
    def _transform(self,df):
        label_col=self.getOutputCols()
        independent_df=df.select(*list(set(df.columns)-set(label_col)))
        from pyspark.ml.feature import VectorAssembler,OneHotEncoder,StringIndexer
        cc=[cat[0] for cat in independent_df.dtypes if cat[1]=='string']
        for column in cc:
            sti=StringIndexer(inputCol=column,outputCol='index_'+column)
            df=sti.fit(df).transform(df)
            df=df.drop(column)
        return df
    
class OHEncode(Transformer,HasInputCol,HasOutputCol,HasOutputCols):
    @keyword_only
    def __init__(self):
        super(OHEncode,self).__init__()
        kwargs=self._input_kwargs
        self.setParams(**kwargs)
    @keyword_only
    def setParams(self,inputCol=None,outputCol=None):
        kwargs=self._input_kwargs
        return self._set(**kwargs)
    #def xv(self):
        #print('xv')
    def _transform(self,df):
        from pyspark.ml.feature import VectorAssembler,OneHotEncoder,StringIndexer
        #label_column=df.select('TARGET')
        #df=df.drop('TARGET')
        ohe_columns=[col for col in df.columns if col.startswith('index_')]
        ohe_columns=[col for col in ohe_columns if df.select(col).distinct().count()]
        for column in ohe_columns:
            sti=OneHotEncoder(inputCol=column,outputCol='ohe_'+column)
            df=sti.transform(df)
            df=df.drop(column)
        #print(df.columns)
        #df=df.join(label_column)
        return df
    
class VectorChange(Transformer,HasInputCol,HasOutputCol,HasOutputCols):
    @keyword_only
    def __init__(self,outputCols=None):
        super(VectorChange,self).__init__()
        kwargs=self._input_kwargs
        self.setParams(**kwargs)
        
    @keyword_only
    def setParams(self,inputCol=None,outputCols=None):
        kwargs=self._input_kwargs
        return self._set(**kwargs)
    #def xv(self):
        #print('xv')
    def _transform(self,df):
        
        target_col=self.getOutputCols()
        from pyspark.ml.feature import VectorAssembler,OneHotEncoder,StringIndexer
        #print(list(set(df.columns)-set(target_col)))
        assem=VectorAssembler(inputCols=list(set(df.columns)-set(target_col)),outputCol='Feature')
        df=assem.transform(df)
        #print(df.select('Feature').show())
        return df

In [2]:
spark = SparkSession.builder.appName('product').getOrCreate()

In [3]:
data1=spark.read.parquet('cleaned_file_v1.parquet')

In [4]:
data=data1.limit(700000)

In [5]:
data.printSchema

<bound method DataFrame.printSchema of DataFrame[ncodpers: double, ind_empleado: string, pais_residencia: string, sexo: string, age: string, ind_nuevo: string, antiguedad: string, indrel: string, indrel_1mes: string, tiprel_1mes: string, indresi: string, indext: string, canal_entrada: string, indfall: string, tipodom: string, cod_prov: string, nomprov: string, ind_actividad_cliente: string, segmento: string, ind_ahor_fin_ult1: int, ind_aval_fin_ult1: int, ind_cco_fin_ult1: int, ind_cder_fin_ult1: int, ind_cno_fin_ult1: int, ind_ctju_fin_ult1: int, ind_ctma_fin_ult1: int, ind_ctop_fin_ult1: int, ind_ctpp_fin_ult1: int, ind_deco_fin_ult1: int, ind_deme_fin_ult1: int, ind_dela_fin_ult1: int, ind_ecue_fin_ult1: int, ind_fond_fin_ult1: int, ind_hip_fin_ult1: int, ind_plan_fin_ult1: int, ind_pres_fin_ult1: int, ind_reca_fin_ult1: int, ind_tjcr_fin_ult1: int, ind_valo_fin_ult1: int, ind_viv_fin_ult1: int, ind_nomina_ult1: string, ind_nom_pens_ult1: string, ind_recibo_ult1: int, imputed_renta:

In [6]:
len(data.columns)

44

In [7]:
thisList=[col for col in data.columns if col.startswith('ind_') and col.endswith('ult1')]

In [8]:
data = data.drop(*['age','ind_nuevo','antiguedad','indrel','indrel_1mes'])

In [9]:
data=data.drop(*['ind_nomina_ult1','ind_nom_pens_ult1'])

In [10]:
le=LabelEncode(outputCols=thisList)

In [11]:
data = le.transform(data)

In [12]:
data.printSchema

<bound method DataFrame.printSchema of DataFrame[ncodpers: double, ind_ahor_fin_ult1: int, ind_aval_fin_ult1: int, ind_cco_fin_ult1: int, ind_cder_fin_ult1: int, ind_cno_fin_ult1: int, ind_ctju_fin_ult1: int, ind_ctma_fin_ult1: int, ind_ctop_fin_ult1: int, ind_ctpp_fin_ult1: int, ind_deco_fin_ult1: int, ind_deme_fin_ult1: int, ind_dela_fin_ult1: int, ind_ecue_fin_ult1: int, ind_fond_fin_ult1: int, ind_hip_fin_ult1: int, ind_plan_fin_ult1: int, ind_pres_fin_ult1: int, ind_reca_fin_ult1: int, ind_tjcr_fin_ult1: int, ind_valo_fin_ult1: int, ind_viv_fin_ult1: int, ind_recibo_ult1: int, imputed_renta: double, index_canal_entrada: double, index_indresi: double, index_indfall: double, index_ind_actividad_cliente: double, index_tipodom: double, index_cod_prov: double, index_segmento: double, index_sexo: double, index_nomprov: double, index_pais_residencia: double, index_ind_empleado: double, index_indext: double, index_tiprel_1mes: double]>

In [13]:
ohe = OHEncode()

In [14]:
data=ohe.transform(data)

In [15]:
data.printSchema

<bound method DataFrame.printSchema of DataFrame[ncodpers: double, ind_ahor_fin_ult1: int, ind_aval_fin_ult1: int, ind_cco_fin_ult1: int, ind_cder_fin_ult1: int, ind_cno_fin_ult1: int, ind_ctju_fin_ult1: int, ind_ctma_fin_ult1: int, ind_ctop_fin_ult1: int, ind_ctpp_fin_ult1: int, ind_deco_fin_ult1: int, ind_deme_fin_ult1: int, ind_dela_fin_ult1: int, ind_ecue_fin_ult1: int, ind_fond_fin_ult1: int, ind_hip_fin_ult1: int, ind_plan_fin_ult1: int, ind_pres_fin_ult1: int, ind_reca_fin_ult1: int, ind_tjcr_fin_ult1: int, ind_valo_fin_ult1: int, ind_viv_fin_ult1: int, ind_recibo_ult1: int, imputed_renta: double, ohe_index_canal_entrada: vector, ohe_index_indresi: vector, ohe_index_indfall: vector, ohe_index_ind_actividad_cliente: vector, ohe_index_tipodom: vector, ohe_index_cod_prov: vector, ohe_index_segmento: vector, ohe_index_sexo: vector, ohe_index_nomprov: vector, ohe_index_pais_residencia: vector, ohe_index_ind_empleado: vector, ohe_index_indext: vector, ohe_index_tiprel_1mes: vector]>

In [16]:
len(data.columns)

37

In [17]:
from pyspark.ml.feature import VectorAssembler,OneHotEncoder,StringIndexer


In [18]:
assembler=VectorAssembler(inputCols=list(set(data.columns)-set(thisList)),outputCol='features')

In [19]:
data=assembler.transform(data)

In [20]:
data.printSchema()

root
 |-- ncodpers: double (nullable = true)
 |-- ind_ahor_fin_ult1: integer (nullable = true)
 |-- ind_aval_fin_ult1: integer (nullable = true)
 |-- ind_cco_fin_ult1: integer (nullable = true)
 |-- ind_cder_fin_ult1: integer (nullable = true)
 |-- ind_cno_fin_ult1: integer (nullable = true)
 |-- ind_ctju_fin_ult1: integer (nullable = true)
 |-- ind_ctma_fin_ult1: integer (nullable = true)
 |-- ind_ctop_fin_ult1: integer (nullable = true)
 |-- ind_ctpp_fin_ult1: integer (nullable = true)
 |-- ind_deco_fin_ult1: integer (nullable = true)
 |-- ind_deme_fin_ult1: integer (nullable = true)
 |-- ind_dela_fin_ult1: integer (nullable = true)
 |-- ind_ecue_fin_ult1: integer (nullable = true)
 |-- ind_fond_fin_ult1: integer (nullable = true)
 |-- ind_hip_fin_ult1: integer (nullable = true)
 |-- ind_plan_fin_ult1: integer (nullable = true)
 |-- ind_pres_fin_ult1: integer (nullable = true)
 |-- ind_reca_fin_ult1: integer (nullable = true)
 |-- ind_tjcr_fin_ult1: integer (nullable = true)
 |-- ind

In [29]:
from pyspark.ml.classification import LogisticRegression

In [30]:
models = {}
model_preds = {}
id_preds1 = defaultdict(list)

In [31]:
train,test=data.randomSplit([0.7,0.3])

In [None]:
!free -m

In [32]:
tra=train.limit(2000)

In [33]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [34]:
for c in thisList:
    print(c)
    lr = LogisticRegression(featuresCol="features", labelCol=c, regParam=0.1)
    print("...fitting")
    lrModel = lr.fit(tra)
    print('fitted')
    trans = lrModel.transform(test)
    print("transformed")
    binary_eval=BinaryClassificationEvaluator(labelCol=c)
    print(binary_eval.evaluate(trans))
    print('------------------------------------------------------------------------')

ind_ahor_fin_ult1
...fitting
fitted
transformed
0.1964087583548715
------------------------------------------------------------------------
ind_aval_fin_ult1
...fitting
fitted
transformed
0.8205403549596558
------------------------------------------------------------------------
ind_cco_fin_ult1
...fitting
fitted
transformed
0.6683549162657136
------------------------------------------------------------------------
ind_cder_fin_ult1
...fitting
fitted
transformed
0.705103734891928
------------------------------------------------------------------------
ind_cno_fin_ult1
...fitting
fitted
transformed
0.6751380726718121
------------------------------------------------------------------------
ind_ctju_fin_ult1
...fitting
fitted
transformed
0.5
------------------------------------------------------------------------
ind_ctma_fin_ult1
...fitting
fitted
transformed
0.7092633071912446
------------------------------------------------------------------------
ind_ctop_fin_ult1
...fitting
fitted
tr

IllegalArgumentException: 'Field "ind_nomina_ult1" does not exist.\nAvailable fields: ncodpers, ind_ahor_fin_ult1, ind_aval_fin_ult1, ind_cco_fin_ult1, ind_cder_fin_ult1, ind_cno_fin_ult1, ind_ctju_fin_ult1, ind_ctma_fin_ult1, ind_ctop_fin_ult1, ind_ctpp_fin_ult1, ind_deco_fin_ult1, ind_deme_fin_ult1, ind_dela_fin_ult1, ind_ecue_fin_ult1, ind_fond_fin_ult1, ind_hip_fin_ult1, ind_plan_fin_ult1, ind_pres_fin_ult1, ind_reca_fin_ult1, ind_tjcr_fin_ult1, ind_valo_fin_ult1, ind_viv_fin_ult1, ind_recibo_ult1, imputed_renta, ohe_index_canal_entrada, ohe_index_indresi, ohe_index_indfall, ohe_index_ind_actividad_cliente, ohe_index_tipodom, ohe_index_cod_prov, ohe_index_segmento, ohe_index_sexo, ohe_index_nomprov, ohe_index_pais_residencia, ohe_index_ind_empleado, ohe_index_indext, ohe_index_tiprel_1mes, features'

In [36]:
import pandas as pd
from collections import defaultdict

In [40]:
data1=data.limit(1000)

In [41]:
df=data1.toPandas() 

In [50]:
thisList.remove('ind_nomina_ult1')
thisList.remove('ind_nom_pens_ult1')

In [51]:
df_recent = df[['ncodpers']+thisList]

In [54]:
df_recent.head(1)

Unnamed: 0,ncodpers,ind_ahor_fin_ult1,ind_aval_fin_ult1,ind_cco_fin_ult1,ind_cder_fin_ult1,ind_cno_fin_ult1,ind_ctju_fin_ult1,ind_ctma_fin_ult1,ind_ctop_fin_ult1,ind_ctpp_fin_ult1,...,ind_ecue_fin_ult1,ind_fond_fin_ult1,ind_hip_fin_ult1,ind_plan_fin_ult1,ind_pres_fin_ult1,ind_reca_fin_ult1,ind_tjcr_fin_ult1,ind_valo_fin_ult1,ind_viv_fin_ult1,ind_recibo_ult1
0,1375586.0,0,0,1,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


In [55]:
sample = pd.read_csv('sample_submission.csv')

In [None]:
# check if customer already have each product or not. 
already_active = {}
for row in df_recent.values:
    row = list(row)
    id = row.pop(0)
    active = [c[0] for c in zip(tuple(thisList), row) if c[1] > 0]
    already_active[id] = active

# add 7 products(that user don't have yet), higher probability first -> train_pred   
train_preds = {}
for id, p in id_preds.items():
    # Here be dragons
    preds = [i[0] for i in sorted([i for i in zip(tuple(product_col), p) if i[0] not in already_active[id]],
                                  key=lambda i:i [1], 
                                  reverse=True)[:7]]
    train_preds[id] = preds
    
test_preds = []
for row in sample.values:
    id = row[0]
    p = train_preds[id]
    test_preds.append(' '.join(p))