In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
import findspark
findspark.init()
# %%bash
# echo $JAVA_HOME

In [2]:
# create a Spark session
spark = SparkSession.builder.appName('AdsAreUs').getOrCreate()
spark.sparkContext.setLogLevel("FATAL")

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/06 13:53:13 WARN Utils: Your hostname, Pranavs-MacBook-Air-4.local, resolves to a loopback address: 127.0.0.1; using 10.39.39.249 instead (on interface en0)
25/12/06 13:53:13 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/06 13:53:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


##### data prep and cleaning

In [3]:
from pyspark.sql import Window
from pyspark.sql import functions as f

logs = spark.read.option("ignoreLeadingWhiteSpace", True)\
    .option("ignoreTrailingWhiteSpace", True)\
    .csv('./data/log.csv', sep=',', header=False, inferSchema=True)
header = ['sentiment','publication_URL','product_URL','got_click','gender','age_group']
logs = logs.toDF(*header)

cats = spark.read.option("ignoreLeadingWhiteSpace", True)\
    .option("ignoreTrailingWhiteSpace", True)\
    .csv('./data/product_categories.csv', sep=',', header=True, inferSchema=True)

prods = spark.read.option("ignoreLeadingWhiteSpace", True)\
    .option("ignoreTrailingWhiteSpace", True)\
    .csv('./data/products.csv', sep=',', header=True, inferSchema=True)

In [4]:
# ensure all product + URLs are unique
prods = prods.dropDuplicates(['product','product_URL']) # 'Dell laptop' has a duplicate row

In [5]:
# ensure product_URLs have a match from logs -> prods
# get all URLs missing from prods
missing_urls = logs.join(
    prods,
    logs.product_URL == prods.product_URL,
    how='left_anti'
).select('product_URL').distinct().alias('log')
missing_urls.show(truncate=False)


+-------------------------------+
|product_URL                    |
+-------------------------------+
|https://docker.com.pants       |
|https://HamiltonBeach/blenders |
|https://apple.com/tcomputers   |
|https://NordicTrack/elliptical |
|https://samsung.com/telivisions|
|https://covergirl.com/lipsticks|
|https://covergirl.co/lipstdcks |
|https://samsung.comxwashers    |
+-------------------------------+



In [6]:
# find the best levenshtein distance match (dense_rank = 1)
match = missing_urls.crossJoin(
    prods.select('product_URL').distinct().alias('prod')
).withColumn(
    'lev_dist', f.levenshtein('log.product_URL', 'prod.product_URL')
).withColumn(
    'rank',
    f.dense_rank().over(
        Window.partitionBy('log.product_URL').orderBy('lev_dist')
    )
).where('rank=1').select(
    f.col('log.product_URL').alias('original'),
    f.col('prod.product_URL').alias('best_match')
)
match.show(truncate=False)

+-------------------------------+----------------------------------+
|original                       |best_match                        |
+-------------------------------+----------------------------------+
|https://HamiltonBeach/blenders |https://HamiltonBeach.com/blenders|
|https://NordicTrack/elliptical |https://NordicTrack.com/elliptical|
|https://apple.com/tcomputers   |https://apple.com/computers       |
|https://covergirl.co/lipstdcks |https://covergirl.co/lipsticks    |
|https://covergirl.com/lipsticks|https://covergirl.co/lipsticks    |
|https://docker.com.pants       |https://docker.com/pants          |
|https://samsung.com/telivisions|https://samsung.com/televisions   |
|https://samsung.comxwashers    |https://samsung.com/washers       |
+-------------------------------+----------------------------------+



In [7]:
# update values in logs
logs = logs.join(
    match,
    logs.product_URL == match.original,
    how='left' # keep rows that need no replacement
).withColumn(
    'product_URL', 
    f.when(f.col('best_match').isNotNull(), f.col('best_match'))\
     .otherwise(f.col('product_URL'))
).drop('original','best_match')
# check that issue is resolved
missing_urls = logs.join(
    prods,
    logs.product_URL == prods.product_URL,
    how='left_anti'
).select('product_URL').distinct().alias('log')
missing_urls.show(truncate=False)

+-----------+
|product_URL|
+-----------+
+-----------+



In [8]:
print(f'logs.csv\nrows: {logs.count()}')
logs.printSchema()

logs.csv
rows: 10000
root
 |-- sentiment: string (nullable = true)
 |-- publication_URL: string (nullable = true)
 |-- product_URL: string (nullable = true)
 |-- got_click: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age_group: string (nullable = true)



In [9]:
print(f'product_categories.csv\nrows: {cats.count()}')
cats.printSchema()

product_categories.csv
rows: 25
root
 |-- product: string (nullable = true)
 |-- category: string (nullable = true)



In [10]:
print(f'products.csv\nrows: {prods.count()}')
prods.printSchema()

products.csv
rows: 50
root
 |-- product: string (nullable = true)
 |-- product_URL: string (nullable = true)
 |-- product_type: string (nullable = true)



##### For each product, compute all the Publication_URLs containing an ad for that product.

In [11]:
result = logs.join(
    prods,
    logs.product_URL == prods.product_URL,
    how='inner'
).select('product', 'publication_URL').distinct()\
.groupBy('product')\
.count()\
.orderBy(f.col('count').desc())

print(result.count()) # 50 rows returned
result.show(truncate=False)

50
+----------------------+-----+
|product               |count|
+----------------------+-----+
|Dell laptop           |22   |
|Maytag washer         |21   |
|Centrum MultiVitamins |19   |
|Samsung washer        |19   |
|NordicTrack treadmill |19   |
|Starbucks Coffee      |19   |
|Coach purse           |18   |
|Lenova laptop         |17   |
|Levis Jeans           |17   |
|Apple iPad            |17   |
|Hamilton Beach blender|17   |
|NordicTrack rower     |17   |
|Givenchy perfume      |17   |
|Tesla                 |17   |
|NemoK blender         |17   |
|Cougar jeans          |17   |
|Covergirl makeup      |16   |
|Dell computer         |16   |
|Broyhill recliner     |16   |
|Clinique moisturizer  |16   |
+----------------------+-----+
only showing top 20 rows


In [12]:
# DEBUG: check if all prods accounted for (ensures data cleaning worked)
print(result.count())
# >>> 50
# print(result.agg(f.sum("count")).collect()[0][0])

50


##### For each product type, compute all the Publication_URLs containing an ad for that product type.

In [13]:
result = logs.join(
    prods,
    logs.product_URL == prods.product_URL,
    how='inner'
).select('product_type','publication_URL').distinct()\
.groupBy('product_type')\
.count()\
.orderBy(f.col('count').desc())

result.show(25, truncate=False)

+------------------+-----+
|product_type      |count|
+------------------+-----+
|perfume           |36   |
|computer          |35   |
|washer            |34   |
|blender           |32   |
|jeans             |31   |
|dryer             |27   |
|car               |25   |
|television        |24   |
|women's purse     |24   |
|furniture         |23   |
|coffee            |23   |
|speakers          |23   |
|lipstick          |21   |
|vitamin           |19   |
|treadmill         |19   |
|shaver            |18   |
|refrigerator      |17   |
|tablet            |17   |
|rowing machine    |17   |
|face cream        |16   |
|makeup            |16   |
|pressure cooker   |14   |
|elliptical trainer|14   |
|pants             |13   |
+------------------+-----+



##### For each product, compute the click rate for it. 
Click rate is the number of times a display of an ad was clicked on (by any user) divided by the number of times it was displayed (to any user). Note the click rate is not specific to each user.


In [14]:
result = logs.join(
    prods,
    logs.product_URL == prods.product_URL,
    how='inner'
).groupBy('product')\
    .agg(f.sum('got_click').alias('clicks'),
         f.count('got_click').alias('total')
).withColumn(
    'click_rate',
    f.round(f.col('clicks') / f.col('total'), 4)
)\
.orderBy(f.col('click_rate').desc())\
.select('product','clicks','total','click_rate')

result.show(truncate=False)

+---------------------+------+-----+----------+
|product              |clicks|total|click_rate|
+---------------------+------+-----+----------+
|covergirl lipstick   |114   |139  |0.8201    |
|Clinique moisturizer |174   |216  |0.8056    |
|Giorgio perfume      |171   |214  |0.7991    |
|Apple computer       |161   |203  |0.7931    |
|Samsung TV           |117   |160  |0.7313    |
|Gillette shaver      |112   |157  |0.7134    |
|Docker pants         |107   |156  |0.6859    |
|Levis Jeans          |151   |226  |0.6681    |
|Kaai handbags        |132   |200  |0.66      |
|Dell computer        |144   |221  |0.6516    |
|BasilBasel perfume   |100   |154  |0.6494    |
|Lenova laptop        |130   |204  |0.6373    |
|LG dryer             |87    |138  |0.6304    |
|Centrum MultiVitamins|151   |241  |0.6266    |
|Tesla                |142   |240  |0.5917    |
|Ikea sofa            |102   |178  |0.573     |
|NemoK blender        |139   |244  |0.5697    |
|Apple laptop         |70    |124  |0.56

##### For each product, compute the click rate for each sentiment type

In [15]:
result = logs.join(
    prods,
    logs.product_URL == prods.product_URL,
    how='inner'
).groupBy('sentiment','product')\
    .agg(f.sum('got_click').alias('clicks'),
         f.count('got_click').alias('total')
).withColumn(
    'click_rate',
    f.round(f.col('clicks') / f.col('total'), 4)
)\
.orderBy(f.col('product'),f.col('sentiment').desc())\
.select('product','sentiment','clicks','total','click_rate')

result.show(truncate=False)

+---------------------+---------+------+-----+----------+
|product              |sentiment|clicks|total|click_rate|
+---------------------+---------+------+-----+----------+
|Apple computer       |positive |52    |68   |0.7647    |
|Apple computer       |neutral  |60    |65   |0.9231    |
|Apple computer       |negative |49    |70   |0.7       |
|Apple iPad           |positive |43    |86   |0.5       |
|Apple iPad           |neutral  |54    |86   |0.6279    |
|Apple iPad           |negative |36    |92   |0.3913    |
|Apple laptop         |positive |34    |44   |0.7727    |
|Apple laptop         |neutral  |30    |41   |0.7317    |
|Apple laptop         |negative |6     |39   |0.1538    |
|BasilBasel perfume   |positive |22    |60   |0.3667    |
|BasilBasel perfume   |neutral  |51    |59   |0.8644    |
|BasilBasel perfume   |negative |27    |35   |0.7714    |
|Broyhill recliner    |positive |17    |70   |0.2429    |
|Broyhill recliner    |neutral  |36    |64   |0.5625    |
|Broyhill recl

##### For each product type, compute the click rate for it.

In [16]:
result = logs.join(
    prods,
    logs.product_URL == prods.product_URL,
    how='inner'
).groupBy('product_type')\
    .agg(f.sum('got_click').alias('clicks'),
         f.count('got_click').alias('total')
).withColumn(
    'click_rate',
    f.round(f.col('clicks') / f.col('total'), 4)
)\
.orderBy(f.col('click_rate').desc())\
.select('product_type','clicks','total','click_rate')

result.show(truncate=False)

+------------------+------+-----+----------+
|product_type      |clicks|total|click_rate|
+------------------+------+-----+----------+
|face cream        |174   |216  |0.8056    |
|pants             |107   |156  |0.6859    |
|lipstick          |229   |344  |0.6657    |
|vitamin           |151   |241  |0.6266    |
|perfume           |500   |882  |0.5669    |
|computer          |615   |1101 |0.5586    |
|furniture         |212   |382  |0.555     |
|shaver            |162   |300  |0.54      |
|speakers          |216   |403  |0.536     |
|television        |258   |484  |0.5331    |
|elliptical trainer|93    |176  |0.5284    |
|washer            |398   |767  |0.5189    |
|women's purse     |221   |429  |0.5152    |
|tablet            |133   |264  |0.5038    |
|pressure cooker   |96    |192  |0.5       |
|blender           |327   |655  |0.4992    |
|treadmill         |119   |243  |0.4897    |
|dryer             |225   |497  |0.4527    |
|jeans             |321   |711  |0.4515    |
|car      

##### For each product type, compute the click rate for each sentiment type

In [17]:
result = logs.join(
    prods,
    logs.product_URL == prods.product_URL,
    how='inner'
).groupBy('product_type','sentiment')\
    .agg(f.sum('got_click').alias('clicks'),
         f.count('got_click').alias('total')
).withColumn(
    'click_rate',
    f.round(f.col('clicks') / f.col('total'), 4)
)\
.orderBy(f.col('product_type'),f.col('sentiment').desc())\
.select('product_type','sentiment','clicks','total','click_rate')

result.show(truncate=False)

+------------------+---------+------+-----+----------+
|product_type      |sentiment|clicks|total|click_rate|
+------------------+---------+------+-----+----------+
|blender           |positive |75    |198  |0.3788    |
|blender           |neutral  |73    |216  |0.338     |
|blender           |negative |179   |241  |0.7427    |
|car               |positive |26    |166  |0.1566    |
|car               |neutral  |89    |153  |0.5817    |
|car               |negative |58    |148  |0.3919    |
|coffee            |positive |34    |141  |0.2411    |
|coffee            |neutral  |50    |140  |0.3571    |
|coffee            |negative |67    |144  |0.4653    |
|computer          |positive |219   |362  |0.605     |
|computer          |neutral  |211   |370  |0.5703    |
|computer          |negative |185   |369  |0.5014    |
|dryer             |positive |50    |162  |0.3086    |
|dryer             |neutral  |119   |162  |0.7346    |
|dryer             |negative |56    |173  |0.3237    |
|elliptica

##### For each category, compute the click rate for it.

In [18]:
result = logs.join(
    prods,
    logs.product_URL == prods.product_URL,
    how='inner'
).join(
    cats,
    f.col('product_type') == cats.product,
    how='inner'
).groupBy('category')\
    .agg(f.sum('got_click').alias('clicks'),
         f.count('got_click').alias('total')
).withColumn(
    'click_rate',
    f.round(f.col('clicks') / f.col('total'), 4)
)\
.orderBy(f.col('click_rate').desc())\
.select('category','clicks','total','click_rate')

result.show(truncate=False)

+------------------------+------+-----+----------+
|category                |clicks|total|click_rate|
+------------------------+------+-----+----------+
|health                  |151   |241  |0.6266    |
|beauty products         |954   |1644 |0.5803    |
|household durables      |212   |382  |0.555     |
|consumer electronics    |1384  |2552 |0.5423    |
|accessories             |221   |429  |0.5152    |
|small kitchen appliances|423   |847  |0.4994    |
|apparel                 |428   |867  |0.4937    |
|large kitchen appliances|702   |1539 |0.4561    |
|fitness equipment       |254   |607  |0.4185    |
|transportation          |173   |467  |0.3704    |
|packaged food           |151   |425  |0.3553    |
+------------------------+------+-----+----------+



##### Similar to above, for each category compute the click rate for each sentiment type

In [19]:
result = logs.join(
    prods,
    logs.product_URL == prods.product_URL,
    how='inner'
).join(
    cats,
    f.col('product_type') == cats.product,
    how='inner'
).groupBy('category','sentiment')\
    .agg(f.sum('got_click').alias('clicks'),
         f.count('got_click').alias('total')
).withColumn(
    'click_rate',
    f.round(f.col('clicks') / f.col('total'), 4)
)\
.orderBy(f.col('category'),f.col('sentiment').desc())\
.select('category','sentiment','clicks','total','click_rate')

result.show(truncate=False)

+--------------------+---------+------+-----+----------+
|category            |sentiment|clicks|total|click_rate|
+--------------------+---------+------+-----+----------+
|accessories         |positive |70    |153  |0.4575    |
|accessories         |neutral  |100   |144  |0.6944    |
|accessories         |negative |51    |132  |0.3864    |
|apparel             |positive |144   |298  |0.4832    |
|apparel             |neutral  |161   |249  |0.6466    |
|apparel             |negative |123   |320  |0.3844    |
|beauty products     |positive |300   |525  |0.5714    |
|beauty products     |neutral  |349   |584  |0.5976    |
|beauty products     |negative |305   |535  |0.5701    |
|consumer electronics|positive |474   |839  |0.565     |
|consumer electronics|neutral  |525   |832  |0.631     |
|consumer electronics|negative |385   |881  |0.437     |
|fitness equipment   |positive |84    |207  |0.4058    |
|fitness equipment   |neutral  |104   |199  |0.5226    |
|fitness equipment   |negative 

##### Choose a product randomly
determine if there are any 'significant' differences in the click rate between positive and negative sentiment type of the ad context for that product type `(typo: product?)` given the gender of the viewer

_As a quick note:_ you can see below that the `non-binary` gender category only has 21 entries in `log.csv`. The lack of data for this category becomes an issue later, since it means we cannot run statistical tests.

In [20]:
logs.groupBy('gender').count().show(truncate=False)

+----------+-----+
|gender    |count|
+----------+-----+
|non-binary|21   |
|female    |5050 |
|male      |4929 |
+----------+-----+



In [21]:
# get click rates
click_rates = logs.join(
    prods,
    logs.product_URL == prods.product_URL,
    how='inner'
).groupBy('product','gender','sentiment')\
    .agg(f.sum('got_click').alias('clicks'),
         f.count('got_click').alias('total')
).withColumn(
    'click_rate',
    f.round(f.col('clicks') / f.col('total'), 4)
)\
.orderBy(f.col('product'),f.col('gender'),f.col('sentiment'))\
.select('product','gender','sentiment','clicks','total','click_rate')

In [22]:
# using 'Clinique moisturizer' ,https://clinique.com/moisturizers, face cream, beauty products
clinique = click_rates.where(f.col('sentiment').isin(['positive','negative']))\
           .where(f.col('product') == 'Clinique moisturizer')
clinique.show(truncate=False)

+--------------------+----------+---------+------+-----+----------+
|product             |gender    |sentiment|clicks|total|click_rate|
+--------------------+----------+---------+------+-----+----------+
|Clinique moisturizer|female    |negative |39    |44   |0.8864    |
|Clinique moisturizer|female    |positive |21    |39   |0.5385    |
|Clinique moisturizer|male      |negative |26    |28   |0.9286    |
|Clinique moisturizer|male      |positive |20    |32   |0.625     |
|Clinique moisturizer|non-binary|positive |1     |1    |1.0       |
+--------------------+----------+---------+------+-----+----------+



_Note:_ As mentioned above, the non-binary category is limited. In this case, we do not have any data for the "negative" sentiment type, so we can't do a comparison.

In [23]:
import math
from scipy.stats import norm
# using a two-tailed two-porportion z-test; h0 is p1 = p2, and h1: p1 != p2
# p1 is the positive click rate, p2 is the negative click rate
def z_test_product(product:str,gender:str,alpha:float):
    data = click_rates.where(f.col('sentiment').isin(['positive','negative']))\
           .where(f.col('product') == product)
    
    pos = data.where(
        (f.col('gender') == gender) &
        (f.col('sentiment') == 'positive')
    ).select('clicks','total','click_rate').first()
    x1 = pos['clicks']
    n1 = pos['total']
    p1 = x1 / n1

    neg = data.where(
        (f.col('gender') == gender) &
        (f.col('sentiment') == 'negative')
    ).select('clicks','total','click_rate').first()
    x2 = neg['clicks']
    n2 = neg['total']
    p2 = x2 / n2

    p_hat = (x1 + x2) / (n1 + n2)
    SE = math.sqrt(p_hat*(1-p_hat)*((1/n1)+(1/n2)))

    z = (p1 - p2) / SE
    p_val = 2 * (1 - norm.cdf(abs(z)))

    reject = True if p_val <= alpha else False

    return p_val,reject


In [24]:
print('two-proportion z-test ----\nclick rate by positive vs. negative ad context sentiment\n')
product, alpha = 'Clinique moisturizer', 0.05
print(f'using product: {product}')
print(f'testing at alpha: {alpha}\n')
# gender: male
print('male ----')
male = z_test_product(product=product,gender='male',alpha=alpha)
print(f'p = {round(male[0],5)}\nreject: {male[1]} at a={alpha}\n')

# gender: female
print('female ----')
female = z_test_product(product=product,gender='female',alpha=alpha)
print(f'p = {round(female[0],5)}\nreject: {female[1]} at a={alpha}\n')

# gender: non-binary
# >> we exclude this category because we don't have enough data for a two-proportion test
print('non-binary ----')
print('not enough data')

two-proportion z-test ----
click rate by positive vs. negative ad context sentiment

using product: Clinique moisturizer
testing at alpha: 0.05

male ----
p = 0.00554
reject: True at a=0.05

female ----
p = 0.00041
reject: True at a=0.05

non-binary ----
not enough data


__Conclusion__: We are testing the `'Clinique moisturizer'` product. According to the two-proportion, two-tailed z-test on the click rates of positive and negative ad context sentiment, we can reject the null hypothesis at a confidence level of `alpha=0.05`. There is a significant enough difference in the click rate between positive and negative sentiments for both male and female users. Though, there is not enough data about non-binary users to conduct a test, so we cannot reject the null.

Since the click rates were consistently higher for ads placed in a negative seniment context, I would recommend to the company to prioritize placing `'Clinique moisturizer'` ads in these contexts, regardless of the gender of the viewer.