In [1]:
import pandas as pd
import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt
mpl.rcParams['patch.force_edgecolor'] = True
import seaborn as sns
%matplotlib inline

from plotly import __version__
from plotly.offline import download_plotlyjs, init_notebook_mode, plot, iplot
import cufflinks as cf
init_notebook_mode(connected=True)
cf.go_offline()
import time

IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.


In [2]:
dataset = pd.read_csv('logs.csv',parse_dates = [1])

In [3]:
dataset['day_of_month'] = dataset['ts'].apply(lambda x:x.day)
dataset['day_of_week'] = dataset['ts'].apply(lambda x:x.weekday())
dataset['hour_of_visit'] = dataset['ts'].apply(lambda x:x.hour)

In [4]:
dataset.drop(['useragent','ts'],inplace = True, axis = 1)

In [5]:
dataset.head(5)

Unnamed: 0,uuid,hashed_ip,day_of_month,day_of_week,hour_of_visit
0,313908E1F6825D28ADF3FCE451E5B5E5,7C222A735EE6,11,1,9
1,C7F60E6140A59120D9C9854CA87758DF,A4ABB70D677A,11,1,23
2,2DC20DA3585AEDFD846E8679AE5C14C7,4F32D980A86D,11,1,11
3,1C8B0E355480105C5C5B8B466399155F,848DDB6C94DE,11,1,16
4,86F243798AE16A55AFC1D3293279CCD5,BA557225B261,11,1,15


In [46]:
def number_of_unique_hashed_ips(group):
    return len(group['hashed_ip'].unique()) >= 5

In [47]:
def is_weekday_active(group):
    condition_check = 0
    for _,row in group.iterrows():
        if row['day_of_week'] >= 0 and row['day_of_week'] <= 4 and row['hour_of_visit'] >= 9 and row['hour_of_visit'] <= 18:
            condition_check += 1
    return condition_check > 0

In [112]:
grouped_data = dataset.groupby('uuid')
df_new = grouped_data.size().reset_index()
df_new.rename(columns = {0:'highly_active'},inplace = True)
df_new['highly_active'] = df_new['highly_active'].apply(lambda x:x>7)
df_new['weekday_biz'] = grouped_data.apply(is_weekday_active).reset_index()[0]
df_new['globally_active'] = grouped_data.apply(number_of_unique_hashed_ips).reset_index()[0]
df_new.set_index('uuid',inplace=True)
df_new

Unnamed: 0_level_0,highly_active,weekday_biz,globally_active
uuid,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
0000AE02C6B52538D17DFAF66F2ACAF1,False,False,False
0000CD03517160EAEDB23A93A16489AE,False,False,False
0000F516516CD22AC34384D65FE369B6,False,False,False
0000FBBCD26E0934C890EF3BE71B3122,False,False,False
00011B831B460DD8EF00C3214E1F794F,False,False,False
000124E7DB33683745C16181B51B9400,False,True,False
00015D114458660BDCF15F7B4FA9B933,False,True,False
00017B671B96A7A28B78F965D325B8C2,False,False,False
00021EA3CBA4E3A56AEA6EFC3F93B723,False,False,False
00025C5FB0B4BEC9FB67F431D9BE5ECC,False,True,False


In [113]:
df_new['highly_active'].sum()

14825

In [87]:
from pyspark import SparkContext
from pyspark.sql import SparkSession, types, functions 
from pyspark.sql.functions import collect_list, udf, collect_set

sc = SparkContext()
spark = SparkSession(sc)
spark_DF = spark.createDataFrame(dataset)

In [99]:
def custom_spark_function(list_1, list_2):
    condition_satified = 0
    for day_of_week, hour_of_day in zip(list_1,list_2):
        if day_of_week >=0 and day_of_week <= 4 and hour_of_day >=9 and hour_of_day <=18:
            condition_satified += 1
    return condition_satified > 0

def custom_spark_function_2(hashed_ip):
    return len(hashed_ip) >= 5

def custom_spark_function_3(count_of_records):
    return count_of_records > 7

In [103]:
sparkF = udf(custom_spark_function, types.BooleanType())
sparkF2 = udf(custom_spark_function_2, types.BooleanType())
sparkF3 = udf(custom_spark_function_3, types.BooleanType())

In [107]:
start_time = time.time()
sp_grouped_data = spark_DF.groupBy('uuid')
sp_aggregated_data = sp_grouped_data.agg(functions.count(functions.lit(1)).alias('number_of_visits'),
                                         collect_list('day_of_week').alias('day_of_week'),
                                         collect_list('hour_of_visit').alias('hour_of_visit'),
                                         collect_set('hashed_ip').alias('hashed_ip'))\
                     .withColumn('highly_active', sparkF3('number_of_visits'))\
                     .withColumn('weekday_biz', sparkF('day_of_week','hour_of_visit'))\
                     .withColumn('gloabally_active', sparkF2('hashed_ip'))

rdd_rows = sorted(sp_aggregated_data.drop('day_of_week','hour_of_visit','hashed_ip').collect())
df_new = spark.createDataFrame(rdd_rows).toPandas()
print('Time elapsed: ', time.time() - start_time)

Time elapsed:  18.569083213806152


In [108]:
df_new.set_index('uuid',inplace=True)

In [109]:
df_new

Unnamed: 0_level_0,number_of_visits,highly_active,weekday_biz,gloabally_active
uuid,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
0000AE02C6B52538D17DFAF66F2ACAF1,1,False,False,False
0000CD03517160EAEDB23A93A16489AE,1,False,False,False
0000F516516CD22AC34384D65FE369B6,4,False,False,False
0000FBBCD26E0934C890EF3BE71B3122,1,False,False,False
00011B831B460DD8EF00C3214E1F794F,1,False,False,False
000124E7DB33683745C16181B51B9400,1,False,True,False
00015D114458660BDCF15F7B4FA9B933,1,False,True,False
00017B671B96A7A28B78F965D325B8C2,1,False,False,False
00021EA3CBA4E3A56AEA6EFC3F93B723,3,False,False,False
00025C5FB0B4BEC9FB67F431D9BE5ECC,1,False,True,False


In [111]:
df_new['highly_active'].sum()

14825