In [1]:
import numpy as np
import csv
import decimal
import math
import os
import re
import time
import random
from pyspark.sql.functions import udf
from pyspark.sql.functions import isnan, when, count, col, to_date, row_number
import pyspark.sql.functions
from pyspark.sql.window import Window
from pyspark.sql.types import DoubleType
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import LongType
from pyspark.sql.types import DateType
from pyspark.sql import HiveContext
from pyspark.storagelevel import StorageLevel

In [2]:
from pyspark.sql.types import *

from pyspark_llap.sql import HiveWarehouseBuilder
from pyspark_llap.sql.session import CreateTableBuilder, HiveWarehouseSessionImpl
hive = HiveWarehouseBuilder.session(spark).build()

In [3]:
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
spark.sparkContext._conf.getAll()

[('spark.eventLog.enabled', 'true'),
 ('spark.yarn.historyServer.address', 'http://hpchdp2i3.hpc.ford.com:18081'),
 ('spark.yarn.appMasterEnv.MKL_NUM_THREADS', '1'),
 ('spark.repl.local.jars',
  'file:///s/hadoop/user/jars/hive-warehouse-connector-assembly-hpchdp2.jar,file:///u/mpartha9/scoring-code-spark-api_2.4.3-0.0.22.jar,file:///u/mpartha9/64132f079417a607d5972c42-64119c37f60675e3f6972a3d.jar'),
 ('spark.yarn.am.extraLibraryPath',
  '/opt/cloudera/parcels/CDH-7.1.7-1.cdh7.1.7.p1000.24102687/lib/hadoop/lib/native'),
 ('spark.driver.port', '43720'),
 ('spark.yarn.dist.pyFiles',
  'file:///opt/cloudera/parcels/CDH/lib/hive_warehouse_connector/pyspark_hwc-1.0.0.7.1.7.1000-141.zip'),
 ('spark.sql.hive.hiveserver2.jdbc.url.principal', 'hive/_HOST@HPC.FORD.COM'),
 ('spark.shuffle.io.serverThreads', '128'),
 ('spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.RM_HA_URLS',
  'hpchdp2.hpc.ford.com:8088,hpchdp2i4.hpc.ford.com:8088'),
 ('spark.serializer', 'org.apache.spa

In [4]:
hive_context = HiveContext(spark)
hive_context.setConf("hive.exec.dynamic.partition", "true")
hive_context.setConf("hive.execution.engine","spark")
hive_context.setConf("hive.prewarm.enabled","true")
hive_context.setConf("hive.vectorized.execution.enabled","true")
hive_context.setConf("hive.exec.dynamic.partition.mode", "nonstrict")

## VOMART Data Pull between 2003 to 2021 for FORD USA

In [7]:
#Save the original extract as parquet file
#mpartha9_NA_data_1_external_p1.write.parquet("mpartha9_vomart_2003_21_current_model.parquet")

In [5]:
#Read the parquet file
mpartha9_NA_data_1_external_p1=spark.read.parquet('mpartha9_vomart_2003_21_current_model.parquet')

In [6]:
mpartha9_NA_data_1_external_p1.createOrReplaceTempView("mpartha9_NA_data_1_external_p1")

In [7]:
print(mpartha9_NA_data_1_external_p1.count(), mpartha9_NA_data_1_external_p1.select("consumer_id").distinct().count())

66271637 41125090


In [8]:
mpartha9_NA_data_1_external_p1.printSchema()

root
 |-- vin: string (nullable = true)
 |-- consumer_id: decimal(11,0) (nullable = true)
 |-- vehicle_ownership_cycle_num: short (nullable = true)
 |-- vehicle_model: string (nullable = true)
 |-- vehicle_model_year: string (nullable = true)
 |-- acquisition_date: date (nullable = true)
 |-- acquisition_year: short (nullable = true)
 |-- vehicle_age: short (nullable = true)
 |-- fmcc_lease_purchase_ind: byte (nullable = true)
 |-- platform: string (nullable = true)
 |-- lifestage_value: string (nullable = true)
 |-- consumer_type_code: string (nullable = true)
 |-- contribution_margin: decimal(15,2) (nullable = true)



Query_retail_only="""select *
from mpartha9_NA_data_1_external_p1
where consumer_type_code='I'
"""
mpartha9_NA_data_1_external_p1_retail=spark.sql(Query_retail_only)
mpartha9_NA_data_1_external_p1_retail.createOrReplaceTempView("mpartha9_NA_data_1_external_p1_retail")

### CURRENT MODEL :  All NON-ZERO CM

In [9]:
#All NON-ZERO data records
Query2="""select consumer_id, avg(contribution_margin) as avg_cm 
from mpartha9_NA_data_1_external_p1
where contribution_margin !=0
and contribution_margin is NOT NULL
and consumer_type_code='I'
group by consumer_id
"""
mpartha9_NA_combo_all_nonzero_records_p2=spark.sql(Query2)
mpartha9_NA_combo_all_nonzero_records_p2.createOrReplaceTempView("mpartha9_NA_combo_all_nonzero_records_p2")

In [10]:
print(mpartha9_NA_combo_all_nonzero_records_p2.count(), mpartha9_NA_combo_all_nonzero_records_p2.select("consumer_id").distinct().count())

15086551 15086551


In [11]:
mpartha9_NA_combo_all_nonzero_records_p2.show(2)

+-----------+-----------+
|consumer_id|     avg_cm|
+-----------+-----------+
|   18730602|5588.395000|
|  120674850|8706.660000|
+-----------+-----------+
only showing top 2 rows



### CURRENT MODEL :  Only zero CM

In [12]:
#only zero CM  records
Query3="""select avg(case when acquisition_date>='2021-01-01' and acquisition_date<='2021-12-31'
then contribution_margin else NULL end) as cm_365 
from mpartha9_NA_data_1_external_p1
where contribution_margin !=0
and consumer_type_code='I' 
"""
mpartha9_NA_combo_all_zero_records_p3=spark.sql(Query3)
mpartha9_NA_combo_all_zero_records_p3.createOrReplaceTempView("mpartha9_NA_combo_all_zero_records_p3")

In [13]:
print(mpartha9_NA_combo_all_zero_records_p3.count())

1


In [14]:
mpartha9_NA_combo_all_zero_records_p3.show(2)

+------------+
|      cm_365|
+------------+
|13591.790626|
+------------+



### Distinct Customers

In [15]:
Query4="""select distinct consumer_id
from mpartha9_NA_data_1_external_p1
where consumer_type_code='I'
"""
mpartha9_NA_distinct_records_p4=spark.sql(Query4)
mpartha9_NA_distinct_records_p4.createOrReplaceTempView("mpartha9_NA_distinct_records_p4")

In [16]:
mpartha9_NA_distinct_records_p4.show(2)

+-----------+
|consumer_id|
+-----------+
|   70511805|
|  113921504|
+-----------+
only showing top 2 rows



In [17]:
mpartha9_NA_distinct_records_p4.count()

38169329

### Join both tables

In [18]:
Query5="""select a.consumer_id, b.avg_cm
from mpartha9_NA_distinct_records_p4 a
left join mpartha9_NA_combo_all_nonzero_records_p2 b
on a.consumer_id = b.consumer_id
"""
mpartha9_NA_vomart_2003_21_with_avg_cm=spark.sql(Query5)
mpartha9_NA_vomart_2003_21_with_avg_cm.createOrReplaceTempView("mpartha9_NA_vomart_2003_21_with_avg_cm")

In [19]:
print(mpartha9_NA_vomart_2003_21_with_avg_cm.count(), mpartha9_NA_vomart_2003_21_with_avg_cm.select("consumer_id").distinct().count())

38169329 38169329


In [20]:
mpartha9_NA_vomart_2003_21_with_avg_cm.show(2)

+-----------+-----------+
|consumer_id|     avg_cm|
+-----------+-----------+
|    3402087|1386.070000|
|    3807907|7085.770000|
+-----------+-----------+
only showing top 2 rows



In [21]:
mpartha9_NA_vomart_2003_21_with_avg_cm.where(col('avg_cm').isNull()).count()

23082778

In [22]:
mpartha9_NA_vomart_2003_21_with_avg_cm.where(col('avg_cm')==0).count()

0

### Impute fixed single value for NULL CMs

In [23]:
Query6="""select consumer_id,
case when avg_cm is NULL then 13591.790626 else avg_cm end as avg_cm_imputed
from mpartha9_NA_vomart_2003_21_with_avg_cm
"""
mpartha9_NA_vomart_2003_21_with_avg_cm_imputed=spark.sql(Query6)
mpartha9_NA_vomart_2003_21_with_avg_cm_imputed.createOrReplaceTempView("mpartha9_NA_vomart_2003_21_with_avg_cm_imputed")

In [24]:
print(mpartha9_NA_vomart_2003_21_with_avg_cm_imputed.count(), mpartha9_NA_vomart_2003_21_with_avg_cm_imputed.select("consumer_id").distinct().count())

38169329 38169329


In [25]:
mpartha9_NA_vomart_2003_21_with_avg_cm_imputed.show(2)

+-----------+--------------+
|consumer_id|avg_cm_imputed|
+-----------+--------------+
|    3402087|   1386.070000|
|    3807907|   7085.770000|
+-----------+--------------+
only showing top 2 rows



In [26]:
mpartha9_NA_vomart_2003_21_with_avg_cm_imputed.where(col('avg_cm_imputed').isNull()).count()

0

In [27]:
mpartha9_NA_vomart_2003_21_with_avg_cm_imputed.where(col('avg_cm_imputed')==0).count()

0

### Customers who bought a new vehicle in 2022 or later from Ford USA

Query_2022_beyond="""select vin, 
consumer_id, 
vehicle_ownership_cycle_num,
vehicle_model, 
vehicle_model_year, 
acquisition_date, 
acquisition_year,
vehicle_age, 
fmcc_lease_purchase_ind, 
platform, 
lifestage_value,
consumer_type_code,
contribution_margin
from dsc60180_ici_sde_tz_db.vomart
where acquisition_date >= '2022-01-01'
and country_code  = 'USA' 
and vehicle_make = 'FORD'
and consumer_id is not NULL
and vin is not NULL
and contribution_margin is NOT NULL
and contribution_margin != 0
"""
mpartha9_vomart_2022_beyond=hive.executeQuery(Query_2022_beyond)

In [63]:
#Save the original extract as parquet file
#mpartha9_vomart_2022_beyond.write.parquet("mpartha9_vomart_2022_beyond.parquet")

In [28]:
#Read the parquet file
mpartha9_vomart_2022_beyond=spark.read.parquet('mpartha9_vomart_2022_beyond.parquet')

In [29]:
mpartha9_vomart_2022_beyond.createOrReplaceTempView("mpartha9_vomart_2022_beyond")

In [30]:
print(mpartha9_vomart_2022_beyond.count(), mpartha9_vomart_2022_beyond.select("consumer_id").distinct().count())

2061970 1385035


### Only Retail

In [31]:
Query_retail="""select *
from mpartha9_vomart_2022_beyond
where consumer_type_code='I'
"""
mpartha9_vomart_2022_beyond_retail=spark.sql(Query_retail)
mpartha9_vomart_2022_beyond_retail.createOrReplaceTempView("mpartha9_vomart_2022_beyond_retail")

In [32]:
print(mpartha9_vomart_2022_beyond_retail.count(), mpartha9_vomart_2022_beyond_retail.select("consumer_id").distinct().count())

1266000 1196833


### Row Numbering based on Acquisition date

In [33]:
Query_row_numbering = """select *
from 
(select *,
ROW_NUMBER() OVER (PARTITION BY consumer_id ORDER BY acquisition_date) as ROW_numbering
from mpartha9_vomart_2022_beyond_retail
) a
"""
mpartha9_NA_row_numbering = spark.sql(Query_row_numbering)
mpartha9_NA_row_numbering.createOrReplaceTempView("mpartha9_NA_row_numbering")

In [34]:
print(mpartha9_NA_row_numbering.count(), mpartha9_NA_row_numbering.select("consumer_id").distinct().count())

1266000 1196833


In [35]:
mpartha9_NA_row_numbering.show(2)

+-----------------+-----------+---------------------------+-------------+------------------+----------------+----------------+-----------+-----------------------+--------+---------------+------------------+-------------------+-------------+
|              vin|consumer_id|vehicle_ownership_cycle_num|vehicle_model|vehicle_model_year|acquisition_date|acquisition_year|vehicle_age|fmcc_lease_purchase_ind|platform|lifestage_value|consumer_type_code|contribution_margin|ROW_numbering|
+-----------------+-----------+---------------------------+-------------+------------------+----------------+----------------+-----------+-----------------------+--------+---------------+------------------+-------------------+-------------+
|1FTFW1E84NFB10664|   11936403|                          1|        F-150|              2022|      2022-05-25|            2022|          1|                      0|   TRUCK|Early Ownership|                 I|           18407.93|            1|
|1FT8W3DTXNEF02600|  100214113|     

### Only First Transaction in 2022 and beyond

In [36]:
Query_only_latest_transaction = """select consumer_id, acquisition_date, 
contribution_margin as Latest_CM
from mpartha9_NA_row_numbering
where ROW_numbering = 1
"""
mpartha9_NA_only_latest_transaction = spark.sql(Query_only_latest_transaction)
mpartha9_NA_only_latest_transaction.createOrReplaceTempView("mpartha9_NA_only_latest_transaction")

In [37]:
print(mpartha9_NA_only_latest_transaction.count(), mpartha9_NA_only_latest_transaction.select("consumer_id").distinct().count())

1196833 1196833


In [38]:
mpartha9_NA_only_latest_transaction.show(5)

+-----------+----------------+---------+
|consumer_id|acquisition_date|Latest_CM|
+-----------+----------------+---------+
|   11936403|      2022-05-25| 18407.93|
|  100214113|      2022-10-13| 27408.44|
|  101523550|      2022-06-27|  3670.71|
|  101817003|      2022-05-23| 11817.37|
|  110802703|      2023-03-08| 18169.33|
+-----------+----------------+---------+
only showing top 5 rows



### Match back beyond_2022 with consumer_ids before 2022

In [39]:
Match_query = """
select a.consumer_id, a.avg_cm_imputed, b.Latest_CM
from mpartha9_NA_vomart_2003_21_with_avg_cm_imputed a
inner join mpartha9_NA_only_latest_transaction b
on a.consumer_id = b.consumer_id
"""
mpartha9_NA_Match_query = spark.sql(Match_query)
mpartha9_NA_Match_query.createOrReplaceTempView("mpartha9_NA_Match_query")

In [40]:
print(mpartha9_NA_Match_query.count(), mpartha9_NA_Match_query.select("consumer_id").distinct().count())

550794 550794


In [41]:
mpartha9_NA_Match_query.show(5)

+-----------+--------------+---------+
|consumer_id|avg_cm_imputed|Latest_CM|
+-----------+--------------+---------+
|  100214113|  14370.590000| 27408.44|
|  101523550|   8727.925000|  3670.71|
|  101817003|   5231.456250| 11817.37|
|  110802703|  12221.760000| 18169.33|
|  124304305|  16265.140000| 13561.91|
+-----------+--------------+---------+
only showing top 5 rows



In [48]:
mpartha9_NA_Match_query.toPandas().to_csv('/s/mpartha9/RETAIL_current_model_performance_2022_beyond.csv', index=False)

### Read CSV in pyspark

In [42]:
df = sqlContext.read.csv("file:///s/mpartha9/RETAIL_NEW_developed_model_performance_2022_beyond.csv", header=True)

In [43]:
print(df.count(), df.select("consumer_id").distinct().count())

549396 549396


In [44]:
df.createOrReplaceTempView("New_model_consumers")

In [45]:
Match = """
select a.*
from mpartha9_NA_Match_query a
inner join New_model_consumers b
on a.consumer_id = b.consumer_id
"""
mpartha9_match = spark.sql(Match)
mpartha9_match.createOrReplaceTempView("mpartha9_match")

In [46]:
print(mpartha9_match.count(), mpartha9_match.select("consumer_id").distinct().count())

549396 549396


In [47]:
mpartha9_match.show(5)

+-----------+--------------+---------+
|consumer_id|avg_cm_imputed|Latest_CM|
+-----------+--------------+---------+
|  100214113|  14370.590000| 27408.44|
|  101523550|   8727.925000|  3670.71|
|  101817003|   5231.456250| 11817.37|
|  110802703|  12221.760000| 18169.33|
|  124304305|  16265.140000| 13561.91|
+-----------+--------------+---------+
only showing top 5 rows



In [None]:
#------------ CHECK

In [48]:
Check1 = """
select a.consumer_id, b.avg_cm
from mpartha9_match a
inner join mpartha9_NA_vomart_2003_21_with_avg_cm b
on a.consumer_id = b.consumer_id
"""
mpartha9_check1 = spark.sql(Check1)
mpartha9_check1.createOrReplaceTempView("mpartha9_check1")

In [49]:
print(mpartha9_check1.count(), mpartha9_check1.select("consumer_id").distinct().count())

549396 549396


In [50]:
mpartha9_check1.show(5)

+-----------+------------+
|consumer_id|      avg_cm|
+-----------+------------+
|  100214113|14370.590000|
|  101523550| 8727.925000|
|  101817003| 5231.456250|
|  110802703|12221.760000|
|  124304305|16265.140000|
+-----------+------------+
only showing top 5 rows



In [51]:
mpartha9_check1.where(col('avg_cm').isNull()).count()

85599

In [None]:
#Around 15.5% percent of ID’s (of the 549,396 )  who’s previous purchase had 0 or missing CM