# Zillow Lead Scoring

## Imports & Setup

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, LongType, DoubleType, TimestampType, StructType, StructField
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, Bucketizer, VectorAssembler, StandardScaler, OneHotEncoder, Imputer
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier

In [2]:
spark = SparkSession.builder.appName('leadscoring').getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/11/08 13:12:00 WARN Utils: Your hostname, Jordans-MacBook-Pro-2.local, resolves to a loopback address: 127.0.0.1; using 192.168.10.76 instead (on interface en0)
25/11/08 13:12:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/08 13:12:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
sc = spark.sparkContext # Create spark context

## Reading the Dataset

In [4]:
path = '/Users/jordan/Code/ML/realtime-lead-scoring/data/raw/'

In [5]:
zillowSchema = StructType([
    StructField('ProspectID', StringType(), True),
    StructField('LeadNumber', LongType(), True),
    StructField('LeadCaptureChannel', StringType(), True),
    StructField('ReferralSource', StringType(), True),
    StructField('OptOutEmail', StringType(), True),
    StructField('OptOutCall', StringType(), True),
    StructField('ContactedAgent', LongType(), True),
    StructField('TotalVisits', LongType(), True),
    StructField('TotalBrowsingTime', LongType(), True),
    StructField('AvgListingsViewedPerSession', DoubleType(), True),
    StructField('LastAction', StringType(), True),
    StructField('Country', StringType(), True),
    StructField('Specialization', StringType(), True),
    StructField('How did you hear about Zillow', StringType(), True),
    StructField('What is your current occupation', StringType(), True),
    StructField('What matters most to you in choosing this house', StringType(), True),
    StructField('Search', StringType(), True),
    StructField('Magazine', StringType(), True),
    StructField('Newspaper Article', StringType(), True),
    StructField('Zillow Forums', StringType(), True),
    StructField('Newspaper', StringType(), True),
    StructField('Digital Advertisement', StringType(), True),
    StructField('Through Recommendations', StringType(), True),
    StructField('Receive More Updates About Our Houses', StringType(), True),
    StructField('LeadStatusTag', StringType(), True),
    StructField('LeadQuality', StringType(), True),
    StructField('Update me on Zillow Content', StringType(), True),
    StructField('Get updates on DM Content', StringType(), True),
    StructField('LeadProfile', StringType(), True),
    StructField('City', StringType(), True),
    StructField('Asymmetric_Activity_Index', StringType(), True),
    StructField('Asymmetric_Profile_Index', StringType(), True),
    StructField('Asymmetric_Activity_Score', LongType(), True),
    StructField('Asymmetric_Profile_Score', LongType(), True),
    StructField('I agree to pay the amount through cheque', StringType(), True),
    StructField('a free copy of Mastering The Interview', StringType(), True),
    StructField('FinalEngagementAction', StringType(), True)
])

In [6]:
df = spark.read.option("dropMalformed", True).option("ignoreLeadingWhiteSpace", True).csv(path, header=True, schema=zillowSchema)

In [7]:
df = df.select(
    [
        "ContactedAgent", "TotalVisits", "TotalBrowsingTime", "AvgListingsViewedPerSession",
        "LeadCaptureChannel", "ReferralSource", "LastAction", "FinalEngagementAction",
        "City", "Country", "LeadStatusTag", "ProspectID", "LeadNumber"
    ]
)

In [8]:
df.show()

+--------------+-----------+-----------------+---------------------------+--------------------+--------------+--------------------+---------------------+------------------+-------+--------------------+--------------------+----------+
|ContactedAgent|TotalVisits|TotalBrowsingTime|AvgListingsViewedPerSession|  LeadCaptureChannel|ReferralSource|          LastAction|FinalEngagementAction|              City|Country|       LeadStatusTag|          ProspectID|LeadNumber|
+--------------+-----------+-----------------+---------------------------+--------------------+--------------+--------------------+---------------------+------------------+-------+--------------------+--------------------+----------+
|             0|          0|                0|                        0.0|                 API|    Olark Chat|Page Visited on W...|             Modified|            Select|   NULL|Interested in oth...|7927b2df-8bba-4d2...|    660737|
|             0|          5|              674|                  

25/11/08 13:12:23 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Prospect ID, Lead Number, Lead Origin, Lead Source, Converted, TotalVisits, Total Time Spent on Website, Page Views Per Visit, Last Activity, Country, Tags, City, Last Notable Activity
 Schema: ProspectID, LeadNumber, LeadCaptureChannel, ReferralSource, ContactedAgent, TotalVisits, TotalBrowsingTime, AvgListingsViewedPerSession, LastAction, Country, LeadStatusTag, City, FinalEngagementAction
Expected: ProspectID but found: Prospect ID
CSV file: file:///Users/jordan/Code/ML/realtime-lead-scoring/data/raw/Lead%20Scoring.csv


In [9]:
# Split data into training and testing
train, test = df.randomSplit([0.8, 0.2])

In [10]:
train.count(), test.count()

25/11/08 13:12:50 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Prospect ID, Lead Number, Lead Origin, Lead Source, Converted, TotalVisits, Total Time Spent on Website, Page Views Per Visit, Last Activity, Country, Tags, City, Last Notable Activity
 Schema: ProspectID, LeadNumber, LeadCaptureChannel, ReferralSource, ContactedAgent, TotalVisits, TotalBrowsingTime, AvgListingsViewedPerSession, LastAction, Country, LeadStatusTag, City, FinalEngagementAction
Expected: ProspectID but found: Prospect ID
CSV file: file:///Users/jordan/Code/ML/realtime-lead-scoring/data/raw/Lead%20Scoring.csv
25/11/08 13:12:50 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Prospect ID, Lead Number, Lead Origin, Lead Source, Converted, TotalVisits, Total Time Spent on Website, Page Views Per Visit, Last Activity, Country, Tags, City, Last Notable Activity
 Schema: ProspectID, LeadNumber, LeadCaptureChannel, ReferralSource, ContactedAgent, TotalVisits, Tot

(7434, 1806)

## Building the Pipeline Model

In [11]:
pipeline = Pipeline(stages=[
    Imputer(inputCols=['TotalVisits', 'TotalBrowsingTime', 'AvgListingsViewedPerSession'], outputCols=['TotalVisits', 'TotalBrowsingTime', 'AvgListingsViewedPerSession']),
    Bucketizer(inputCol="TotalVisits", outputCol="TotalVisits_bucket", splits=[0, 10, 20, 50, 75, 100, 150, 200, 300, float('Inf')]),
    StringIndexer(inputCol='LeadCaptureChannel', outputCol='LeadCaptureChannel_index', handleInvalid="keep"),
    OneHotEncoder(inputCol='LeadCaptureChannel_index', outputCol='LeadCaptureChannel_ohe'),
    StringIndexer(inputCol='ReferralSource', outputCol='ReferralSource_index', handleInvalid="keep"),
    OneHotEncoder(inputCol='ReferralSource_index', outputCol='ReferralSource_ohe'),
    StringIndexer(inputCol='LastAction', outputCol='LastAction_index', handleInvalid="keep"),
    OneHotEncoder(inputCol='LastAction_index', outputCol='LastAction_ohe'),
    StringIndexer(inputCol='FinalEngagementAction', outputCol='FinalEngagementAction_index', handleInvalid="keep"),
    OneHotEncoder(inputCol='FinalEngagementAction_index', outputCol='FinalEngagementAction_ohe'),
    StringIndexer(inputCol='City', outputCol='City_index', handleInvalid="keep"),
    OneHotEncoder(inputCol='City_index', outputCol='City_ohe'),
    StringIndexer(inputCol='Country', outputCol='Country_index', handleInvalid="keep"),
    OneHotEncoder(inputCol='Country_index', outputCol='Country_ohe'),
    StringIndexer(inputCol='LeadStatusTag', outputCol='LeadStatusTag_index', handleInvalid="keep"),
    OneHotEncoder(inputCol='LeadStatusTag_index', outputCol='LeadStatusTag_ohe'),
    VectorAssembler(inputCols=["TotalVisits_bucket", "TotalBrowsingTime", "AvgListingsViewedPerSession", "LeadCaptureChannel_ohe", "ReferralSource_ohe", "LastAction_ohe", "City_ohe", "Country_ohe", "LeadStatusTag_ohe"], outputCol='features'),
    RandomForestClassifier(featuresCol='features', labelCol='ContactedAgent')
])

In [12]:
leadmodel = pipeline.fit(train)

25/11/08 13:13:16 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Prospect ID, Lead Number, Lead Origin, Lead Source, Converted, TotalVisits, Total Time Spent on Website, Page Views Per Visit, Last Activity, Country, Tags, City, Last Notable Activity
 Schema: ProspectID, LeadNumber, LeadCaptureChannel, ReferralSource, ContactedAgent, TotalVisits, TotalBrowsingTime, AvgListingsViewedPerSession, LastAction, Country, LeadStatusTag, City, FinalEngagementAction
Expected: ProspectID but found: Prospect ID
CSV file: file:///Users/jordan/Code/ML/realtime-lead-scoring/data/raw/Lead%20Scoring.csv
25/11/08 13:13:16 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Prospect ID, Lead Number, Lead Origin, Lead Source, Converted, TotalVisits, Total Time Spent on Website, Page Views Per Visit, Last Activity, Country, Tags, City, Last Notable Activity
 Schema: ProspectID, LeadNumber, LeadCaptureChannel, ReferralSource, ContactedAgent, TotalVisits, Tot

In [13]:
pred = leadmodel.transform(test)

In [14]:
# Test Accuracy
pred.filter(pred.ContactedAgent == pred.prediction).count() / pred.count()

25/11/08 13:13:43 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Prospect ID, Lead Number, Lead Origin, Lead Source, Converted, TotalVisits, Total Time Spent on Website, Page Views Per Visit, Last Activity, Country, Tags, City, Last Notable Activity
 Schema: ProspectID, LeadNumber, LeadCaptureChannel, ReferralSource, ContactedAgent, TotalVisits, TotalBrowsingTime, AvgListingsViewedPerSession, LastAction, Country, LeadStatusTag, City, FinalEngagementAction
Expected: ProspectID but found: Prospect ID
CSV file: file:///Users/jordan/Code/ML/realtime-lead-scoring/data/raw/Lead%20Scoring.csv
25/11/08 13:13:43 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Prospect ID, Lead Number, Lead Origin, Lead Source, Converted, TotalVisits, Total Time Spent on Website, Page Views Per Visit, Last Activity, Country, Tags, City, Last Notable Activity
 Schema: ProspectID, LeadNumber, LeadCaptureChannel, ReferralSource, ContactedAgent, TotalVisits, Tot

0.8654485049833887

In [15]:
# display the id, probability and prediction data (just the first 20 lines are fine)
pred.select('ProspectID', 'probability', 'prediction', "ContactedAgent").show(20)

+--------------------+--------------------+----------+--------------+
|          ProspectID|         probability|prediction|ContactedAgent|
+--------------------+--------------------+----------+--------------+
|6be73d7d-525d-11e...|[0.79688892366402...|       0.0|             0|
|498b9fe5-4e52-11e...|[0.55725402978900...|       0.0|             0|
|498ba7d0-4e52-11e...|[0.55725402978900...|       0.0|             0|
|f8c8282b-33dd-455...|[0.80462539703303...|       0.0|             0|
|498bc13b-4e52-11e...|[0.74542054844436...|       0.0|             0|
|5e748f60-2b1c-479...|[0.26668235252672...|       1.0|             0|
|ce400e14-966f-44b...|[0.17580445666315...|       1.0|             0|
|6be73f79-525d-11e...|[0.75766836413399...|       0.0|             0|
|f4f486ac-70f9-4d6...|[0.78130911521717...|       0.0|             0|
|13f7d9cd-a47c-402...|[0.81170140374731...|       0.0|             0|
|1520480f-4226-4fe...|[0.81170140374731...|       0.0|             0|
|6f9bdf8d-21ff-461..

25/11/08 13:13:48 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Prospect ID, Lead Number, Lead Origin, Lead Source, Converted, TotalVisits, Total Time Spent on Website, Page Views Per Visit, Last Activity, Country, Tags, City, Last Notable Activity
 Schema: ProspectID, LeadNumber, LeadCaptureChannel, ReferralSource, ContactedAgent, TotalVisits, TotalBrowsingTime, AvgListingsViewedPerSession, LastAction, Country, LeadStatusTag, City, FinalEngagementAction
Expected: ProspectID but found: Prospect ID
CSV file: file:///Users/jordan/Code/ML/realtime-lead-scoring/data/raw/Lead%20Scoring.csv


## Saving Test Data to Separate Files for Streaming

In [16]:
test = test.repartition(100)
test.rdd.getNumPartitions()

25/11/08 13:14:09 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Prospect ID, Lead Number, Lead Origin, Lead Source, Converted, TotalVisits, Total Time Spent on Website, Page Views Per Visit, Last Activity, Country, Tags, City, Last Notable Activity
 Schema: ProspectID, LeadNumber, LeadCaptureChannel, ReferralSource, ContactedAgent, TotalVisits, TotalBrowsingTime, AvgListingsViewedPerSession, LastAction, Country, LeadStatusTag, City, FinalEngagementAction
Expected: ProspectID but found: Prospect ID
CSV file: file:///Users/jordan/Code/ML/realtime-lead-scoring/data/raw/Lead%20Scoring.csv


100

In [18]:
path = '/Users/jordan/Code/ML/realtime-lead-scoring/data/'

In [None]:
test.write.format("csv").option("header", "true").save(path + "stream/user_events")

## Streaming

In [25]:
# Source
# Files
sourceStream = spark.readStream.format("parquet").option("header", "true").option("maxFilesPerTrigger", 1).schema(test.schema).load(path + "stream/user_events/")

# Query
query = leadmodel.transform(sourceStream).select("ProspectID", "ContactedAgent", "prediction", "probability")

# Sink
streamSink = query.writeStream.outputMode("update").format("memory").queryName("leads").trigger(processingTime='5 seconds').start()

25/11/08 13:17:32 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/qt/4yhgm0j53w96n6z_j5zct2k40000gn/T/temporary-662731ec-1add-4985-8c32-ca3e78ed399c. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/11/08 13:17:32 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [26]:
df.show()

+--------------+-----------+-----------------+---------------------------+--------------------+--------------+--------------------+---------------------+------------------+-------+--------------------+--------------------+----------+
|ContactedAgent|TotalVisits|TotalBrowsingTime|AvgListingsViewedPerSession|  LeadCaptureChannel|ReferralSource|          LastAction|FinalEngagementAction|              City|Country|       LeadStatusTag|          ProspectID|LeadNumber|
+--------------+-----------+-----------------+---------------------------+--------------------+--------------+--------------------+---------------------+------------------+-------+--------------------+--------------------+----------+
|             0|          0|                0|                        0.0|                 API|    Olark Chat|Page Visited on W...|             Modified|            Select|   NULL|Interested in oth...|7927b2df-8bba-4d2...|    660737|
|             0|          5|              674|                  

25/11/08 13:17:37 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Prospect ID, Lead Number, Lead Origin, Lead Source, Converted, TotalVisits, Total Time Spent on Website, Page Views Per Visit, Last Activity, Country, Tags, City, Last Notable Activity
 Schema: ProspectID, LeadNumber, LeadCaptureChannel, ReferralSource, ContactedAgent, TotalVisits, TotalBrowsingTime, AvgListingsViewedPerSession, LastAction, Country, LeadStatusTag, City, FinalEngagementAction
Expected: ProspectID but found: Prospect ID
CSV file: file:///Users/jordan/Code/ML/realtime-lead-scoring/data/raw/Lead%20Scoring.csv


In [29]:
spark.sql("select * from leads").show(truncate=False)

+------------------------------------+--------------+----------+----------------------------------------+
|ProspectID                          |ContactedAgent|prediction|probability                             |
+------------------------------------+--------------+----------+----------------------------------------+
|055df508-28b1-42ee-8fc3-3816d536af12|0             |0.0       |[0.819131296251421,0.18086870374857905] |
|abcbb1cd-ca33-4213-88d1-eaef1c468cb3|0             |0.0       |[0.7329915333544793,0.26700846664552075]|
|2f93b5df-7a90-4cc4-8790-c50907696f65|0             |0.0       |[0.7476042048202918,0.2523957951797083] |
|4cb1fc3c-1fa3-4a8c-b3fc-24d5a711df88|0             |0.0       |[0.7502400352656777,0.24975996473432227]|
|d7160279-3aef-4c1d-875b-5c08866330de|0             |0.0       |[0.7368582278219317,0.26314177217806833]|
|5fd4ea2d-97fa-4022-b505-5f8dbd40fe41|1             |0.0       |[0.6398048327762564,0.3601951672237437] |
|24e4322e-57b7-4919-ab07-7d01c66a19a8|0       

In [30]:
for s in spark.streams.active:
    if s.name == "leads":
        s.stop()

25/11/08 13:18:02 WARN DAGScheduler: Failed to cancel job group d7b8d440-5325-4c6c-9b05-28253c66331e. Cannot find active jobs for it.
25/11/08 13:18:02 WARN DAGScheduler: Failed to cancel job group d7b8d440-5325-4c6c-9b05-28253c66331e. Cannot find active jobs for it.
