## Commercial Data Developer - Interview Exercise

The Problem
We have a stream of event level data from our audience data platform (see sample below).
Within this are events associating anonymous customer ids with behavioral segment ids.
From this stream we would like to build a customer-segment data set from which we can
find:
- How many customers are in each segment
- For an individual customer, which segments are they in
The Exercise

I used apache spark with databrick platform to analyse and to answer above problem statements

Down loaded data from S3 location (https://reach-solutions-interview-data.s3-eu-west-1.amazonaws.com) placed at "/FileStore/tables/wagal/".

Loaded data using spark and created dataframe for future anlysis

In [0]:
# File location and type
file_location = "/FileStore/tables/wagal/*"
file_type = "json"

# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

#Filter out the events for GB users only, which associate customer ids with 1st party behavioral segment ids

In [0]:
#Diplay the schema to see available fields
df.printSchema()

In [0]:
from pyspark.sql.functions import *

#Flatten the events array and fill  1st party behavioral segment ids with -1 which are not associated with customer ids 
flatten_df = df.withColumn("event", explode("events")) \
.na.fill({'event.c': -1}) \
.na.fill({'event.add':  -1})


#Filter recrods which are associate customer ids with 1st party behavioral segment ids
res_df = flatten_df.filter((flatten_df.country == "GB") & (flatten_df.event.c != -1)  ).drop("events")

#Pritnt schema of dataframe.
res_df.printSchema()

#Show some sample records.
res_df.show()

To generate the report using sql way we need to create a temp table in spark

In [0]:
temp_table_name = "view_customer_segemnt"
res_df.createOrReplaceTempView(temp_table_name)

# Question 1 : How many customers are in each segment

In [0]:
%sql  

SELECT indi_segment_id as segment_id, count(id.val)as customer_count
FROM view_customer_segemnt 
LATERAL VIEW explode(event.add) t_segment AS indi_segment_id
GROUP BY indi_segment_id

Doing same as above, but using spark sql, however the query is same. 

The advanatge is we can do any other operation on dataframe after we anlysys as like saving data in to a file etc

In [0]:

query = "SELECT indi_segment_id as segment_id, count(id.val) as customer_count FROM view_customer_segemnt LATERAL VIEW explode(event.add) t_segment AS indi_segment_id GROUP BY indi_segment_id"

res1_df = spark.sql(query)

res1_df.write.mode("overwrite").format("json").json("/FileStore/tables/count_customers_segment")

##Questions 2: For an individual customer, which segments are they in

In [0]:
%sql

SELECT id.val as customer_id, collect_set(indi_segment_id) as related_customer_segments_ids, count(indi_segment_id) count_related_customer_segments_ids
FROM view_customer_segemnt 
LATERAL VIEW explode(event.add) t_segment AS indi_segment_id
GROUP BY id;


In [0]:
# Doing same as above, but using spark sql, however the query is same. 
# The advanatge is we can do any other operation on dataframe after we anlysys as like saving data in to a file etc
query = """SELECT id.val as customer_id, collect_set(indi_segment_id) as related_customer_segments_ids, count(indi_segment_id) count_related_customer_segments_ids
FROM view_customer_segemnt \
LATERAL VIEW explode(event.add) t_segment AS indi_segment_id \
GROUP BY id"""

res2_df = spark.sql(query)

res2_df.show()

res2_df.write.mode("overwrite").format("json").json("/FileStore/tables/customers_segment")