### Connect to Data Lake

In [0]:
app_id = "Your_App_ID"
tenant_id = "Your_Tenant_ID"

configs = {"fs.azure.account.auth.type": "OAuth",
       "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
       "fs.azure.account.oauth2.client.id": app_id,
       "fs.azure.account.oauth2.client.secret": dbutils.secrets.get(scope = "hblscope", key ="hblkeyvaultsecretfordatabricks"),
       "fs.azure.account.oauth2.client.endpoint": "https://login.chinacloudapi.cn/"+tenant_id+"/oauth2/token",
       "fs.azure.createRemoteFileSystemDuringInitialization": "true"}

dbutils.fs.mount(
source = "abfss://hbldatalake@hbldatalake.dfs.core.chinacloudapi.cn/",
mount_point = "/mnt/az_datalake",
extra_configs = configs)

In [0]:
%fs ls /mnt/az_datalake/salesdata/

path,name,size
dbfs:/mnt/az_datalake/salesdata/call_center/,call_center/,0
dbfs:/mnt/az_datalake/salesdata/catalog_page/,catalog_page/,0
dbfs:/mnt/az_datalake/salesdata/catalog_returns/,catalog_returns/,0
dbfs:/mnt/az_datalake/salesdata/catalog_sales/,catalog_sales/,0
dbfs:/mnt/az_datalake/salesdata/customer/,customer/,0
dbfs:/mnt/az_datalake/salesdata/customer_address/,customer_address/,0
dbfs:/mnt/az_datalake/salesdata/customer_demographics/,customer_demographics/,0
dbfs:/mnt/az_datalake/salesdata/date_dim/,date_dim/,0
dbfs:/mnt/az_datalake/salesdata/household_demographics/,household_demographics/,0
dbfs:/mnt/az_datalake/salesdata/income_band/,income_band/,0


### Data Exploration

In [0]:
customerSchema = '''customer_sk LONG,
customer_id string,
current_cdemo_sk LONG,
current_hdemo_sk LONG,
current_addr_sk LONG,
first_shipto_date_sk LONG,
first_sales_date_sk LONG,
salutation STRING,
firstName STRING,
lastName STRING,
preferred_Cust_Flag STRING,
birth_Day LONG,
birth_Month LONG,
birth_Year LONG,
birth_Country STRING,
login STRING,
emailAddress STRING,
last_review_date LONG
'''

In [0]:
customer_df = (spark.read
.option("sep","|")
.schema(customerSchema)
.option("inferSchema","true")
.csv("/mnt/az_datalake/salesdata/customer/customer_001.dat"))

In [0]:
customer_df.printSchema()

### Let us look at he Customer Demographics Dataset

In [0]:
customerDemographicsSchema = ''' cd_demo_sk long,
cd_gender string,
cd_marital_status string,
cd_education_status string,
cd_purchase_estimate string,
cd_credit_rating string,
cd_dep_count int,
cd_dep_employed_count int,
cd_dep_college_count int
'''

In [0]:
customer_demographics = spark.read.option("sep","|").schema(customerDemographicsSchema).csv("/mnt/az_datalake/salesdata/customer_demographics/customer_demographics_001.dat")

In [0]:
display(customer_demographics)

cd_demo_sk,cd_gender,cd_marital_status,cd_education_status,cd_purchase_estimate,cd_credit_rating,cd_dep_count,cd_dep_employed_count,cd_dep_college_count
1,M,M,Primary,500,Good,0,0,0
2,F,M,Primary,500,Good,0,0,0
3,M,S,Primary,500,Good,0,0,0
4,F,S,Primary,500,Good,0,0,0
5,M,D,Primary,500,Good,0,0,0
6,F,D,Primary,500,Good,0,0,0
7,M,W,Primary,500,Good,0,0,0
8,F,W,Primary,500,Good,0,0,0
9,M,U,Primary,500,Good,0,0,0
10,F,U,Primary,500,Good,0,0,0


### Joined the Customer Data with Demographics Data

In [0]:
joinExpr = customer_df.current_cdemo_sk == customer_demographics.cd_demo_sk
customerWithDemographics = customer_df.join(customer_demographics,joinExpr,"inner")

In [0]:
customerWithDemographics.select("customer_sk","firstName","lastName","cd_credit_rating").show(10)

In [0]:
customerWithDemographics.select("cd_credit_rating").distinct().show()

In [0]:
creditProfile = customerWithDemographics.groupBy("cd_credit_rating").count().withColumnRenamed("count","customerCount").withColumnRenamed("cd_credit_rating","creditRating")

display(creditProfile)

creditRating,customerCount
Low Risk,24133
Good,24038
Unknown,24009
High Risk,24382


In [0]:
customerPurchaseLevel = customerWithDemographics.groupBy("cd_purchase_estimate").count().withColumnRenamed("count","customerCount").withColumnRenamed("cd_purchase_estimate","purchaseLevelEstimate")
display(customerPurchaseLevel)

purchaseLevelEstimate,customerCount
1500,4847
5000,4769
8500,4760
10000,4760
2000,4834
7000,4754
6500,4885
3000,4712
9500,4840
1000,4907


In [0]:
customerPurchaseLevel.coalesce(1).write.mode("overwrite").option("header","true").csv("/Output/hbl/customerPurchaseLevel.csv")
customerPurchaseLevel.write.saveAsTable("customerPurchaseLevel")

In [0]:
# df_CPL = spark.read.option("header","true").csv("/FileStore/output/customerPurchaseLevel.csv")
# df_CPL.show(5)

### Let us run some predictive models to categorize the customers into `preferred`/`unpreferred` flag

In [0]:
df_model = customerWithDemographics.select("salutation","birth_Day","birth_Month","birth_Year","cd_gender","cd_marital_status","cd_education_status","cd_purchase_estimate","cd_credit_rating","cd_dep_count","cd_dep_employed_count","cd_dep_college_count","preferred_Cust_Flag")
df_model = df_model.filter(df_model.preferred_Cust_Flag.isNotNull())
df_model.write.saveAsTable("customerdata")
display(df_model)

salutation,birth_Day,birth_Month,birth_Year,cd_gender,cd_marital_status,cd_education_status,cd_purchase_estimate,cd_credit_rating,cd_dep_count,cd_dep_employed_count,cd_dep_college_count,preferred_Cust_Flag
Sir,17.0,8.0,1936.0,M,W,Secondary,500,Good,0,0,0,N
Sir,6.0,9.0,1926.0,F,U,Secondary,500,Good,0,0,0,Y
Dr.,13.0,9.0,1928.0,M,S,4 yr Degree,500,Good,0,0,0,Y
Ms.,11.0,4.0,1992.0,F,S,4 yr Degree,500,Good,0,0,0,N
Mr.,11.0,7.0,1988.0,M,M,College,1000,Good,0,0,0,N
Dr.,11.0,5.0,1948.0,F,D,4 yr Degree,1000,Good,0,0,0,N
Miss,30.0,6.0,1944.0,M,M,Advanced Degree,1000,Good,0,0,0,N
Mr.,14.0,10.0,1934.0,M,U,Advanced Degree,1000,Good,0,0,0,N
Dr.,28.0,4.0,1939.0,M,M,Unknown,1000,Good,0,0,0,N
Ms.,23.0,5.0,1933.0,M,S,Primary,1500,Good,0,0,0,Y


In [0]:
cat_features = ["salutation","cd_gender","cd_marital_status","cd_education_status","cd_credit_rating"]
for i in cat_features:
  df_model.select(i).distinct().show()

In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer

for i in cat_features:
  stringindexer = StringIndexer(inputCol=i, outputCol=i+"_Num").setHandleInvalid("keep")  
  encoder = OneHotEncoder(inputCol=i+"_Num", outputCol=i+'_Vector')
  
  pipeline = Pipeline(stages=[stringindexer, encoder])
  pipeline_fit = pipeline.fit(df_model)
  df_model = pipeline_fit.transform(df_model)

In [0]:
display(df_model.head(5))

salutation,birth_Day,birth_Month,birth_Year,cd_gender,cd_marital_status,cd_education_status,cd_purchase_estimate,cd_credit_rating,cd_dep_count,cd_dep_employed_count,cd_dep_college_count,preferred_Cust_Flag,salutation_Num,salutation_Vector,cd_gender_Num,cd_gender_Vector,cd_marital_status_Num,cd_marital_status_Vector,cd_education_status_Num,cd_education_status_Vector,cd_credit_rating_Num,cd_credit_rating_Vector
Sir,17,8,1936,M,W,Secondary,500,Good,0,0,0,N,2.0,"List(0, 6, List(2), List(1.0))",0.0,"List(0, 2, List(0), List(1.0))",2.0,"List(0, 5, List(2), List(1.0))",6.0,"List(0, 7, List(6), List(1.0))",3.0,"List(0, 4, List(3), List(1.0))"
Sir,6,9,1926,F,U,Secondary,500,Good,0,0,0,Y,2.0,"List(0, 6, List(2), List(1.0))",1.0,"List(0, 2, List(1), List(1.0))",1.0,"List(0, 5, List(1), List(1.0))",6.0,"List(0, 7, List(6), List(1.0))",3.0,"List(0, 4, List(3), List(1.0))"
Dr.,13,9,1928,M,S,4 yr Degree,500,Good,0,0,0,Y,0.0,"List(0, 6, List(0), List(1.0))",0.0,"List(0, 2, List(0), List(1.0))",0.0,"List(0, 5, List(0), List(1.0))",3.0,"List(0, 7, List(3), List(1.0))",3.0,"List(0, 4, List(3), List(1.0))"
Ms.,11,4,1992,F,S,4 yr Degree,500,Good,0,0,0,N,4.0,"List(0, 6, List(4), List(1.0))",1.0,"List(0, 2, List(1), List(1.0))",0.0,"List(0, 5, List(0), List(1.0))",3.0,"List(0, 7, List(3), List(1.0))",3.0,"List(0, 4, List(3), List(1.0))"
Mr.,11,7,1988,M,M,College,1000,Good,0,0,0,N,1.0,"List(0, 6, List(1), List(1.0))",0.0,"List(0, 2, List(0), List(1.0))",3.0,"List(0, 5, List(3), List(1.0))",4.0,"List(0, 7, List(4), List(1.0))",3.0,"List(0, 4, List(3), List(1.0))"


In [0]:
search_engine_indexer = StringIndexer(inputCol="preferred_Cust_Flag", outputCol="preferred_Cust_Flag_Num").setHandleInvalid("keep").fit(df_model)
df_model = search_engine_indexer.transform(df_model)

In [0]:
display(df_model.head(5))

salutation,birth_Day,birth_Month,birth_Year,cd_gender,cd_marital_status,cd_education_status,cd_purchase_estimate,cd_credit_rating,cd_dep_count,cd_dep_employed_count,cd_dep_college_count,preferred_Cust_Flag,salutation_Num,salutation_Vector,cd_gender_Num,cd_gender_Vector,cd_marital_status_Num,cd_marital_status_Vector,cd_education_status_Num,cd_education_status_Vector,cd_credit_rating_Num,cd_credit_rating_Vector,preferred_Cust_Flag_Num
Sir,17,8,1936,M,W,Secondary,500,Good,0,0,0,N,2.0,"List(0, 6, List(2), List(1.0))",0.0,"List(0, 2, List(0), List(1.0))",2.0,"List(0, 5, List(2), List(1.0))",6.0,"List(0, 7, List(6), List(1.0))",3.0,"List(0, 4, List(3), List(1.0))",0.0
Sir,6,9,1926,F,U,Secondary,500,Good,0,0,0,Y,2.0,"List(0, 6, List(2), List(1.0))",1.0,"List(0, 2, List(1), List(1.0))",1.0,"List(0, 5, List(1), List(1.0))",6.0,"List(0, 7, List(6), List(1.0))",3.0,"List(0, 4, List(3), List(1.0))",1.0
Dr.,13,9,1928,M,S,4 yr Degree,500,Good,0,0,0,Y,0.0,"List(0, 6, List(0), List(1.0))",0.0,"List(0, 2, List(0), List(1.0))",0.0,"List(0, 5, List(0), List(1.0))",3.0,"List(0, 7, List(3), List(1.0))",3.0,"List(0, 4, List(3), List(1.0))",1.0
Ms.,11,4,1992,F,S,4 yr Degree,500,Good,0,0,0,N,4.0,"List(0, 6, List(4), List(1.0))",1.0,"List(0, 2, List(1), List(1.0))",0.0,"List(0, 5, List(0), List(1.0))",3.0,"List(0, 7, List(3), List(1.0))",3.0,"List(0, 4, List(3), List(1.0))",0.0
Mr.,11,7,1988,M,M,College,1000,Good,0,0,0,N,1.0,"List(0, 6, List(1), List(1.0))",0.0,"List(0, 2, List(0), List(1.0))",3.0,"List(0, 5, List(3), List(1.0))",4.0,"List(0, 7, List(4), List(1.0))",3.0,"List(0, 4, List(3), List(1.0))",0.0


In [0]:
df_model = df_model.withColumn("cd_purchase_estimate",df_model["cd_purchase_estimate"].cast('integer'))
df_model = df_model.withColumn("preferred_Cust_Flag_Num",df_model["preferred_Cust_Flag_Num"].cast('integer'))
df_model = df_model.withColumn("birth_Day",df_model["birth_Day"].cast('integer'))
df_model = df_model.withColumn("birth_Month",df_model["birth_Month"].cast('integer'))
df_model = df_model.withColumn("birth_Year",df_model["birth_Year"].cast('integer'))

In [0]:
# 剔除冗余、不需要的字段
useful_cols = [
    # label目标值字段
    "preferred_Cust_Flag_Num",  
    
    # 特征值字段
#     "birth_Day", 
#     "birth_Month",
#     "birth_Year",
    "cd_purchase_estimate",
    "cd_dep_count",
    "cd_dep_employed_count",
    "cd_dep_college_count",
    "salutation_Vector",
    "cd_gender_Vector",
    "cd_marital_status_Vector",
    "cd_education_status_Vector",
    "cd_credit_rating_Vector"
]

df_model_select = df_model.select(*useful_cols)
df_model_select.printSchema()

In [0]:
from pyspark.ml.feature import VectorAssembler  
 
df_model_select = df_model.select(*useful_cols)
df_model_select=df_model_select.dropna()

## 构成特征向量
df_assembler = VectorAssembler(inputCols=useful_cols[1:], outputCol="features")
df_model_select = df_assembler.transform(df_model_select)

df_model_select = df_model_select['features','preferred_Cust_Flag_Num']
 
# ## 查看构建后的数据
# model_df=df_model_select.select(['features','preferred_Cust_Flag_Num'])

In [0]:
training_df, test_df= df_model_select.randomSplit([0.7,0.3]) 

### We will apply several classification models to the model_df

In [0]:
from pyspark.ml.classification import LogisticRegression   

lr = LogisticRegression(labelCol='preferred_Cust_Flag_Num').fit(training_df)
# testresult = lr.transform(test_df)
print('{}{}'.format('LogisticRegression Train accuracy：',lr.evaluate(test_df).accuracy))

In [0]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer, VectorIndexer,IndexToString

rf = RandomForestClassifier(labelCol="preferred_Cust_Flag_Num", \
                            featuresCol="features", \
                            numTrees = 50) 
#                             maxDepth = 4, \
#                             maxBins = 32)
# Train model with Training Data
rfModel = rf.fit(training_df)
predictions = rfModel.transform(test_df)

predictions.select("prediction", "preferred_Cust_Flag_Num", "features").show(5)

In [0]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="preferred_Cust_Flag_Num", predictionCol="prediction", metricName="accuracy")

accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

In [0]:
from itertools import chain
featureMetadata = chain(*predictions.schema["features"].metadata["ml_attr"]["attrs"].values())
attrs = sorted((attr["idx"], attr["name"]) for attr in featureMetadata)

name_list = []
fi_list=[]
for idx, name in attrs:
  name_list.append(name)
  fi_list.append(rfModel.featureImportances[idx])

In [0]:
import pandas as pd
feature_importances = pd.DataFrame(data = {'name':name_list, 'importance':fi_list})
display(feature_importances.sort_values('importance', ascending=False))

name,importance
cd_purchase_estimate,0.1692027424131199
cd_dep_count,0.1132997466585983
cd_dep_employed_count,0.1050411403395798
cd_dep_college_count,0.0907365888842826
salutation_Vector_Mr.,0.0330984704318321
cd_education_status_Vector_Advanced Degree,0.0325101883846835
cd_credit_rating_Vector_High Risk,0.0284896964912051
cd_marital_status_Vector_S,0.0284541546155175
cd_marital_status_Vector_D,0.0258627849553057
cd_credit_rating_Vector_Unknown,0.0256049132272033


In [0]:
feature_importances_table = feature_importances.sort_values('importance', ascending=False)
spark_df = spark.createDataFrame(feature_importances_table)
spark_df.write.saveAsTable("hba_customer_featureImportance")

In [0]:
spark_df_table = spark.sql("select * from hba_customer_featureImportance")
display(spark_df_table)

name,importance
cd_dep_college_count,0.0907365888842826
salutation_Vector_Mr.,0.0330984704318321
cd_education_status_Vector_Advanced Degree,0.0325101883846835
cd_credit_rating_Vector_High Risk,0.0284896964912051
cd_education_status_Vector_Primary,0.0153191028070995
cd_education_status_Vector_4 yr Degree,0.0145920598550244
cd_education_status_Vector_2 yr Degree,0.0137360460751455
cd_education_status_Vector_Unknown,0.0096538715154396
cd_credit_rating_Vector_Low Risk,0.0255596141370727
cd_education_status_Vector_Secondary,0.025155748998077


### Connect to Azure ML Workspace

In [None]:
from azureml.core import Workspace
from azureml.core.experiment import Experiment
from azureml.core.authentication import InteractiveLoginAuthentication

subscription_id = 'Your_Subscription_ID'
resource_group = 'herbalifedemo'
workspace_name = 'herbalifeAML'

interactive_auth = InteractiveLoginAuthentication(tenant_id = "Your_Tenant_ID", cloud = "AzureChinaCloud")
try:
    ws = Workspace(subscription_id=subscription_id, resource_group=resource_group,workspace_name=workspace_name, auth=interactive_auth) 
    print('Library configuration succeed')
except:
    print('Workspace not found')
    
experiment = Experiment(ws, "newPrediction")

### Set Compute Target

In [None]:
from azureml.core.compute import AmlCompute, ComputeTarget
from azureml.core.compute_target import ComputeTargetException

# Choose a name for your cluster.
amlcompute_cluster_name = "hbltraincluster"

# Verify that cluster does not exit already.
try:
    compute_target = ComputerTarget(workspace=ws, name=amlcompute_cluster-name)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    compute_config = AmlCompute.procisioning_configuration(vm_size='STANDARD_DS12_V2',
                                                           max_nodes=6)
    compute_target = ComputeTarget.create(ws, amlcompute_cluster_name, compute_config)

compute_target.wait_for_completion(show_output=True)

### MLFlow

In [0]:
import mlflow
import mlflow.spark
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics
import mlflow

mlflow.set_experiment("/hbldatalake_mlflow")

def evalMetrics(summary):
  accuracy=summary.accuracy
  auc = summary.areaUnderROC
  return (accuracy, auc)

#begin pipeline run
with mlflow.start_run() as run:
  
  maxIter=100
  
  #split dataset into training and testing
  featureColumns = [
    # label目标值字段
    "preferred_Cust_Flag_Num",  
    # 特征值字段
    "birth_Day", 
    "birth_Month",
    "birth_Year",
    "cd_purchase_estimate",
    "cd_dep_count",
    "cd_dep_employed_count",
    "cd_dep_college_count",
    "salutation_Vector",
    "cd_gender_Vector",
    "cd_marital_status_Vector",
    "cd_education_status_Vector",
    "cd_credit_rating_Vector"
]
  (train,test) = df_model.select(*featureColumns).na.drop().randomSplit([0.8,0.2])
  ftCols = train.columns[1:]
  
  trainCount=train.count()
  testCount=test.count()
  mlflow.log_param("trainCount", trainCount)
  mlflow.log_param("testCount", testCount)
  
  assembler=VectorAssembler(inputCols=ftCols, outputCol="featureVector")
  scaler=StandardScaler(inputCol="featureVector",outputCol="features",withStd=True,withMean=False)
  lr=LogisticRegression(featuresCol="features",labelCol="preferred_Cust_Flag_Num",maxIter=100)
  pipeline=Pipeline(stages=[assembler,scaler,lr])
  
  #fit the model
  model=pipeline.fit(train)
  lrModel=model.stages[-1]
  
  #Get Metrics
  (accuracy, auc) = evalMetrics(lrModel.summary)
  
  #Log mlflow metrics for the LR model
  mlflow.log_metric("accuracy", accuracy)
  mlflow.log_metric("auc", auc)
  
  #log model generated
  mlflow.spark.log_model(model,"lrModel")

In [0]:
 mlflow.end_run()