### Parse and clean domains

In [1]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
import string
import re
from tld import get_tld
WWW = re.compile('^www\.')

def parse_domain(domain) :
    if not domain:
        return domain
    domain = domain.lower() # force lowercase to use binary collate in the db
    # app ids come clean
    if domain.isdigit():
        return domain
    if domain.startswith('com.'):
        return domain.split('&',1)[0]
    domain = ''.join(filter(string.printable.__contains__, domain)) # remove non printable characters
    domain_cleaned = WWW.subn('',domain.replace('"','').strip().split('//',1)[-1].split('/',1)[0],count=1)[0]
    # return cleaned top level domain or discard
    try:
        return get_fld("http://" + domain_cleaned )
    except:
        return domain_cleaned
    
udfParseDomain = udf(parse_domain, StringType())

### Read file and filter null values

In [12]:
logsSchema = StructType([StructField("ip",StringType(),True),
                     StructField("uuid",StringType(),True),
                     StructField("useragent",StringType(),True),
                     StructField("domain",StringType(),True),
                     StructField("ssp",StringType(),True),
                     StructField("timestamp",TimestampType(),False)])

df = (spark.read.csv("file:///srv-zompopo3/nameles/datasets/daily/171007/ssp_bid_compressed_00000000020{0,1,2,3,4,5}.csv.gz", schema=logsSchema,
                          header="true", timestampFormat="yyyy-MM-dd HH:mm:ss", escape='"',
                          ignoreLeadingWhiteSpace="true", ignoreTrailingWhiteSpace="true",
                          mode="FAILFAST")
        .where(col("ip").isNotNull())
        .where(col("domain").isNotNull())
        .withColumn("domain", udfParseDomain("domain"))
        )

In [13]:
#Return domains and the number of different IPs that visit the domain
def unique_tuples(df):
    unique_df = df.drop_duplicates(subset=('domain', 'ip')).drop('timestamp', 'useragent', 'ssp', 'uuid')
    unique_df = unique_df.groupby(['domain']).count()
    unique_df = unique_df.orderBy(desc('count'))
    return unique_df

In [14]:
df_unique = unique_tuples(df)

In [15]:
# count_tuples groups by domain and ip
def count_tuples(df):
    count_df = df.groupby(['domain', 'ip']).count()
    return count_df


# count_by_domain groups all visits a domain has
def count_by_domain(df):
    count_df = df.groupby(['domain']).count()
    count_df = count_df.orderBy(desc("count"))
    return count_df


# count_visits groups all visits by domain, differencing by IP
def count_visits(df):
    count_df = df.groupby(['domain']).pivot('ip').count()
    return count_df

In [16]:
df_tuples = count_tuples(df)
df_domain = count_by_domain(df)

### Co-visit rate
Receives as parameters: dataframe, minimum rate of co-visit, minimum count of visitations in common, boolean value if it will use the unique dataframe (True) or the dataframe with common values (False)

In [17]:
def covisit(input_df, rate, min_count, unique):
    #Duplicate domain column to perform self-join
    input2_df = input_df.withColumnRenamed('domain', 'domain2')
    #joined_df = self-join to get all domain combinations with same ip
    joined_df = (input_df.join(input2_df, 'ip', 'inner')
                .where(input_df.domain != input2_df.domain2)
                .drop('useragent', 'ssp', 'uuid')
                # unique dataframe - TO DO: Test this works properly
                .drop_duplicates(subset=('domain', 'domain2', 'ip')))
                

    #count ips that have visited both domains 
    count_df = (joined_df.groupBy(['domain', 'domain2']).count()
                        .withColumnRenamed('count', 'covisit'))
    count_df = count_df.where(count_df.covisit > 1)
    
    if (unique == False):
        count_df = count_df.join(df_domain, 'domain', 'inner').withColumnRenamed('count', 'visits')
    else:
        count_df = count_df.join(df_unique, 'domain', 'inner').withColumnRenamed('count', 'visits')

    #calculate covisitation rate
    count_df = count_df.withColumn('covisit', col('covisit')/col('visits'))
    
    count_df = count_df.where(count_df.covisit > rate).where(count_df.visits > min_count)
    count_df = count_df.drop('visits')
    return count_df

In [18]:
rate_df = covisit(df, 0.5, 6, False)
rate_dfshow()

+--------------------+--------------------+------------------+
|              domain|             domain2|           covisit|
+--------------------+--------------------+------------------+
|        mastimon.com|com.jb.emoji.goke...|0.5925925925925926|
|        gogoanime.tv|      dramameong.com|0.5063291139240507|
|        m.meczyki.pl| sportowefakty.wp.pl|0.5121951219512195|
|        m.meczyki.pl|          m.kwejk.pl|0.5121951219512195|
|        m.meczyki.pl|               wp.pl|0.7317073170731707|
|        m.meczyki.pl|              olx.pl|0.5609756097560976|
| sigortahaber.com.tr| womenshealth.com.tr|0.5454545454545454|
|          arslege.pl|              olx.pl|0.5161290322580645|
|  finanzfernsehen.de|         yieldlab.de|0.5555555555555556|
|        mylekarze.pl|               wp.pl|             0.625|
|        mylekarze.pl| sportowefakty.wp.pl|             0.625|
|        mylekarze.pl|              olx.pl|             0.625|
|        mylekarze.pl|          pudelek.pl|            

In [21]:
rate_df.orderBy(desc("covisit")).show()

+--------------------+--------------------+------------------+
|              domain|             domain2|           covisit|
+--------------------+--------------------+------------------+
|      dom2reality.ru|           drive2.ru|               1.0|
|      dom2reality.ru|            avito.ru|               1.0|
|      dom2reality.ru|com.cleanmaster.m...|               1.0|
| sportsexcellent.net|com.juteralabs.pe...|               1.0|
|           506795511|               wp.pl|               1.0|
|      dom2reality.ru|org.aastudio.game...|               1.0|
|      dom2reality.ru|com.cleanmaster.s...|               1.0|
|surabaya.tribunne...|com.jb.emoji.goke...|               1.0|
|   tradisikita.my.id|com.jb.emoji.goke...|               0.9|
|        otorider.com|com.jb.emoji.goke...|               0.9|
|  gazetka-oferta.com|               wp.pl|0.8888888888888888|
|      koreanindo.net|com.jb.emoji.goke...|0.8888888888888888|
|   m.cosmopolitan.pl|          pudelek.pl|0.8888888888