# Importing Spark specific Library
## importing the library needed for spark to work and run in our environment

In [1]:
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql import *

# Import User defined Library
### 1. EncodeLib :contains different packages for label encoding as well as One hot encoding and vectorizing 
### 2. CleanLib : contains DataCleanLib which can be invoked to clean data passed
### 3. S3Serializer : Package for remote data access
### 4. MultiLabel : Package contains MultiLabel classifier 

In [2]:
from Encode_Lib.EncodeLib import LabelEncode,OHEncode,VectorChange
from Data_Cleaning_Lib.CleanLib import  DataCleaningLib
from S3serializer_Lib.S3Serializer import S3Bucket
import Encode_Lib.EncodeLib as eimEncode
import  Data_Cleaning_Lib.CleanLib as eimclean

# Creating spark Instance
## creating a session 'product'

In [3]:
spark = SparkSession.builder.appName('product').getOrCreate()

## Connecting to remote Cluster using our library S3Bucket

In [4]:
s3=S3Bucket()
s3.connect(host = "18.212.194.194", username = "centos", key = './InternalPOC_Digital.pem')

Connecting to host  18.212.194.194
Connected


## Getting complete folder from remote server

In [5]:
s3.get_dir_remote('this.parquet','this.parquet')

Getting data...
Getting File--> part-00000-a3a6f736-9b2b-4010-a7bd-7509e347862c-c000.snappy.parquet
Getting File--> part-00001-a3a6f736-9b2b-4010-a7bd-7509e347862c-c000.snappy.parquet
Getting File--> part-00002-a3a6f736-9b2b-4010-a7bd-7509e347862c-c000.snappy.parquet
Getting File--> part-00003-a3a6f736-9b2b-4010-a7bd-7509e347862c-c000.snappy.parquet
Getting File--> part-00004-a3a6f736-9b2b-4010-a7bd-7509e347862c-c000.snappy.parquet
Getting File--> part-00005-a3a6f736-9b2b-4010-a7bd-7509e347862c-c000.snappy.parquet
Getting File--> part-00006-a3a6f736-9b2b-4010-a7bd-7509e347862c-c000.snappy.parquet
Getting File--> part-00007-a3a6f736-9b2b-4010-a7bd-7509e347862c-c000.snappy.parquet
Getting File--> part-00008-a3a6f736-9b2b-4010-a7bd-7509e347862c-c000.snappy.parquet
Getting File--> part-00009-a3a6f736-9b2b-4010-a7bd-7509e347862c-c000.snappy.parquet
Getting File--> part-00010-a3a6f736-9b2b-4010-a7bd-7509e347862c-c000.snappy.parquet
Getting File--> part-00011-a3a6f736-9b2b-4010-a7bd-7509e3478

## Reading the parquet file 

In [6]:
data=spark.read.parquet('./this.parquet')

In [7]:
data=data.limit(500000)

In [8]:
data.count()

500000

## Removing the date and time columns

In [9]:
data=data.drop(*['fecha_dato','ult_fec_cli_1t','fecha_alta'])

# Calling the DataCleaningLib to clean the data
### 1. Removing Null values
### 2. Removing string provided "NA" /'NAN' values
### 3. Imputing categorical data with the max count
### 4. Imputing numerical columns with mean values()

In [10]:
data_cleaning=DataCleaningLib()
data=data_cleaning.cleaning(data)

1.Data Cleaning and Preprocessing
processing--> ncodpers
processing--> ind_empleado
processing--> pais_residencia
processing--> sexo
processing--> age
processing--> ind_nuevo
processing--> antiguedad
processing--> indrel
processing--> indrel_1mes
processing--> tiprel_1mes
processing--> indresi
processing--> indext
processing--> conyuemp
processing--> canal_entrada
processing--> indfall
processing--> tipodom
processing--> cod_prov
processing--> nomprov
processing--> ind_actividad_cliente
processing--> renta
processing--> segmento
processing--> ind_ahor_fin_ult1
processing--> ind_aval_fin_ult1
processing--> ind_cco_fin_ult1
processing--> ind_cder_fin_ult1
processing--> ind_cno_fin_ult1
processing--> ind_ctju_fin_ult1
processing--> ind_ctma_fin_ult1
processing--> ind_ctop_fin_ult1
processing--> ind_ctpp_fin_ult1
processing--> ind_deco_fin_ult1
processing--> ind_deme_fin_ult1
processing--> ind_dela_fin_ult1
processing--> ind_ecue_fin_ult1
processing--> ind_fond_fin_ult1
processing--> ind_h

### Casting data format from string to integer
#### [Auto casting  can be a feature of our library in upcoming versions]

In [11]:
data=data.withColumn('age',data['age'].cast('int'))

In [12]:
data=data.withColumn('ind_nom_pens_ult1',data['ind_nom_pens_ult1'].cast('int'))
data=data.withColumn('ind_nomina_ult1',data['ind_nomina_ult1'].cast('int'))

### Saving the cleaned file in parquet format(dividing the file into multiple chunks)
#### why?
#### 1. Reduces the size of the data (2gb data is converted to 100 mb further divided into 5 chunks of 20mb)
#### 2. Fetching huge data from remote at once can be memory and time consuming
#### 3. When getting/saving huge data from/to remote there can be errors thus crashing the file [thus fetching in chunks help in avoiding fatal errors / whole file to be corrupted] 

In [13]:
#data.write.parquet('clean_data_V1.01.parquet')

## Saving the cleaned data to remote

In [14]:
s3.save_dir_remote('clean_data_V1.01.parquet','clean_data_V1.01.parquet')

Saving data...
Saving File--> .part-00000-7a8d8518-0142-4a17-8c96-7cece3cabeae-c000.snappy.parquet.crc
Saving File--> ._SUCCESS.crc
Saving File--> _SUCCESS
Saving File--> part-00000-7a8d8518-0142-4a17-8c96-7cece3cabeae-c000.snappy.parquet


#### Taking the product columns

In [15]:
label_columns=[col for col in data.columns if col.startswith('ind_') and col.endswith('ult1')]

In [16]:
label_columns.append('ncodpers')

# Label Encoding the values
## why implement our own?
#### 1. StringIndexer (spark's native label encoder) needs user to pass in what are the columns which are needed to be encoded
#### 2. LabelEncoder automates the whole process for us 
#### 3. The only needed input is the Target/label columns

In [17]:
le=LabelEncode(outputCols=label_columns)

In [18]:
data = le.transform(data)

antiguedad
nomprov
indext
tiprel_1mes
ind_nuevo
indrel_1mes
ind_empleado
ind_actividad_cliente
indresi
sexo
canal_entrada
pais_residencia
indrel
cod_prov
indfall
tipodom
segmento


# One hot encoding:
### Our implementaion of OHE as LabelEncoders is automatic. it One Hot encodes the Label encoded data as well as determines if the above transformation is necessary 

In [19]:
ohe = OHEncode()

In [20]:
#data=data.drop('index_tipodom')

In [21]:
data=ohe.transform(data)

# Vector Assembly :
### converting all the features into a single Vector space named 'features' which will in turn be used for our Models

In [22]:
vec = VectorChange(outputCols=label_columns)
tem = vec.transform(data)

In [23]:
tem.printSchema()

root
 |-- ncodpers: double (nullable = true)
 |-- age: integer (nullable = true)
 |-- ind_ahor_fin_ult1: integer (nullable = true)
 |-- ind_aval_fin_ult1: integer (nullable = true)
 |-- ind_cco_fin_ult1: integer (nullable = true)
 |-- ind_cder_fin_ult1: integer (nullable = true)
 |-- ind_cno_fin_ult1: integer (nullable = true)
 |-- ind_ctju_fin_ult1: integer (nullable = true)
 |-- ind_ctma_fin_ult1: integer (nullable = true)
 |-- ind_ctop_fin_ult1: integer (nullable = true)
 |-- ind_ctpp_fin_ult1: integer (nullable = true)
 |-- ind_deco_fin_ult1: integer (nullable = true)
 |-- ind_deme_fin_ult1: integer (nullable = true)
 |-- ind_dela_fin_ult1: integer (nullable = true)
 |-- ind_ecue_fin_ult1: integer (nullable = true)
 |-- ind_fond_fin_ult1: integer (nullable = true)
 |-- ind_hip_fin_ult1: integer (nullable = true)
 |-- ind_plan_fin_ult1: integer (nullable = true)
 |-- ind_pres_fin_ult1: integer (nullable = true)
 |-- ind_reca_fin_ult1: integer (nullable = true)
 |-- ind_tjcr_fin_ult1

## Saving the final clean and Vectorizied data to the remote Cluster

In [24]:
#tem.write.parquet('product_rec_V1.01.parquet')

In [25]:
s3.save_dir_remote('product_rec_V1.01.parquet','product_rec_V1.01.parquet')

Saving data...
Saving File--> ._SUCCESS.crc
Saving File--> part-00000-3298b50d-6067-4afe-a3ee-c0f3a1fd69d1-c000.snappy.parquet
Saving File--> .part-00000-3298b50d-6067-4afe-a3ee-c0f3a1fd69d1-c000.snappy.parquet.crc
Saving File--> _SUCCESS


## Diving the data into train test Splits for model training and validation 

In [26]:
train,test=tem.randomSplit([0.5,0.5])

# EimCC-MultiLabel contains classifiers for MultiLabel classification
## PySpark has no native library build for such kind of problems
## Features:
## 1. Can perform Multi-Label Classification
##                  2. Gives a recommendation of products for each customer 
##                  3. Gives the current list of products for each customer 

In [27]:
from EimCC.MultiLabel import BinaryRelevance

In [28]:
b=BinaryRelevance(featuresCol='features')

In [30]:
b.fit(train,label_columns[:-1])

#### Functionality of Saving Loading models

In [37]:
b.save('test.pkl')

In [31]:
b.load('test.pkl')

#### Predictions

In [32]:
predicted=b.transform(test)

In [36]:
b.find_recommendation(user=15906.0,id_column='ncodpers')

{'current': ['ind_cno_fin_ult1',
  'ind_ctop_fin_ult1',
  'ind_ctpp_fin_ult1',
  'ind_dela_fin_ult1',
  'ind_hip_fin_ult1',
  'ind_nom_pens_ult1',
  'ind_recibo_ult1'],
 'recommendation': ['ind_cco_fin_ult1']}