In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('data_clean_and_preprocess').getOrCreate()

In [None]:
df = spark.read.option("sep","\t").csv("column_headers.tsv",header='true', inferSchema='true')
df1 = spark.read.option("sep","\t").csv("hit_data.tsv",header='false', inferSchema='true')

In [None]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import functions as sf
sc = SparkContext.getOrCreate()
sqlc = pyspark.SQLContext(sc)

In [None]:
result = df.union(df1)
#result.head(1)
#result.count()

# the following steps are referred from the Adobe Clickstream Analytics (ACA) -page 55

In [None]:
NoValuedExcludeHit=result.filter(result.exclude_hit == 0)
#NoValuedExcludeHit.count()

In [None]:
ValidHitSourceWithout5=NoValuedExcludeHit.filter(NoValuedExcludeHit.hit_source != 5 )
ValidHitSourceWithout7=ValidHitSourceWithout5.filter(ValidHitSourceWithout5.hit_source != 7 )
ValidHitSourceWithout9=ValidHitSourceWithout7.filter(ValidHitSourceWithout7.hit_source != 9 )
ValidHitSourceWithout8=ValidHitSourceWithout9.filter(ValidHitSourceWithout9.hit_source != 8 )
#ValidHitSourceWithout8.count()

In [None]:
MergedColumns = ValidHitSourceWithout8.withColumn('joined_column', sf.concat(sf.col('post_visid_high'),sf.lit('_'), sf.col('post_visid_low')))

# considering post_ columns for preprocessing ACA-page 57

In [None]:
#apart from post_ columns considering column obtained after joining the two post_ columns as mentioned above and mcvisid
MergedColumns=MergedColumns.withColumnRenamed('joined_column', 'post_uniqueId')

In [None]:
MergedColumns=MergedColumns.withColumnRenamed('mcvisid', 'post_mcvisid')

In [None]:
#dropping the extraneous columns
DropPostVHL=MergedColumns.drop('post_visid_high','post_visid_low')
#len(DropPostVHL.columns)

In [None]:
UniqueId=DropPostVHL.select('post_uniqueId')
#UniqueId.count()

In [None]:
#this will be used for identifying number of non-null rows
UID=UniqueId.toPandas()

In [None]:
#considering only post_ columns
newDataF= DropPostVHL.select(*filter(lambda col:'post_' in col, DropPostVHL.columns))
#len(newDataF.columns)
#newDataF.count()

In [None]:
newDataF.columns

# reducing 4 data samples from the post_ subset: mobile_ , evar_, prop_, remaining

# reducing post_mobile_ columns

In [None]:
#post_mobile columns sub group
mobileDF=newDataF.select(*filter(lambda col:'post_mobile' in col, newDataF.columns))
#len(mobileDF.columns)

In [None]:
PandasMobileDF=mobileDF.toPandas()

In [None]:
#checked for null values
import numpy as np
PandasMobileDF.isnull().any()

In [None]:
#print how many null values are there
for column in list(PandasMobileDF.columns):
    print ("{}% of the data from {} column is missing".format(round(PandasMobileDF[column].isnull().sum() * 100 / len(UID.post_uniqueId),2), column))

In [None]:
#Drop the post_mobile_ columns because all contain null values
Without_post_mobile=newDataF.drop(*filter(lambda col:'post_mobile' in col, newDataF.columns))
#len(Without_post_mobile.columns)

# reducing the remaining columns

In [None]:
#Drop the 'post_evar' columns
Without_post_evar=Without_post_mobile.drop(*filter(lambda col:'post_evar' in col, Without_post_mobile.columns))

In [None]:
#Drop the 'post_prop' columns
Without_post_prop=Without_post_evar.drop(*filter(lambda col:'post_prop' in col, Without_post_evar.columns))
#len(Without_post_prop.columns)

In [None]:
from pyspark.sql.functions import isnan, when, count, col

Without_post_prop.select([count(when(col(c).isNotNull(), c)).alias(c) for c in Without_post_prop.columns]).head()

In [None]:
selected=Without_post_prop.select("post_event_list","post_page_event","post_referrer","post_visid_type","post_uniqueId")

# reducing post_prop columns

In [None]:
#filter post_prop to find null values
postProp=newDataF.select(*filter(lambda col:'post_prop' in col, newDataF.columns))

In [None]:
from pyspark.sql.functions import isnan, when, count, col

postProp.select([count(when(col(c).isNull(), c)).alias(c) for c in postProp.columns]).head()

In [None]:
NoNullPostProp=newDataF.select("post_prop2","post_prop3","post_prop5","post_prop6","post_prop13","post_prop35","post_prop60")

In [None]:
from pyspark.sql.functions import isnan, when, count, col

NoNullPostPropC=NoNullPostProp.select([count(when(col(c).isNotNull(), c)).alias(c) for c in NoNullPostProp.columns])

In [None]:
#NoNullPostProp.count()

In [None]:
#for c in NoNullPostPropC.columns:
#    if NoNullPostPropC[[c]].first()[c] > 1000000 :
#        print (" {} for {} ".format(NoNullPostPropC[[c]].first()[c],c))

In [None]:
NoNullPostPropPandas=NoNullPostProp.toPandas()

In [None]:
NoNullPostPropPandas.head()

# reducing post_evar columns

In [None]:
#filter eVar to find null values
eVar=newDataF.select(*filter(lambda col:'post_evar' in col, newDataF.columns))

In [None]:
#checking for non-null values in eVar
from pyspark.sql.functions import isnan, when, count, col

eVar.select([count(when(col(c).isNotNull(), c)).alias(c) for c in eVar.columns]).head()

In [None]:
from pyspark.sql.functions import isnan, when, count, col

eVarC=eVar.select([count(when(col(c).isNotNull(), c)).alias(c) for c in eVar.columns])

In [None]:
#for c in eVarC.columns:
    #if eVarC[[c]].first()[c] > 100000 :
        #print (" {} for {} ".format(eVarC[[c]].first()[c],c))

In [None]:
NoNullevar=newDataF.select( "post_evar1", "post_evar2", "post_evar8", "post_evar11", "post_evar26","post_evar27","post_evar28","post_evar33","post_evar34","post_evar35","post_evar38", "post_evar39", "post_evar40", "post_evar47","post_evar48","post_evar49","post_evar51","post_evar53","post_evar63","post_evar64","post_evar65","post_evar68","post_evar70","post_evar71", "post_evar77","post_evar78","post_evar80","post_evar98","post_evar99","post_evar101","post_evar102")

In [None]:
from pyspark.sql.functions import isnan, when, count, col

NoNullevarC=NoNullevar.select([count(when(col(c).isNotNull(), c)).alias(c) for c in NoNullevar.columns])

In [None]:
#for c in NoNullevarC.columns:
#    if NoNullevarC[[c]].first()[c] > 1000000 :
#        print (" {} for {} ".format(NoNullevarC[[c]].first()[c],c))

In [None]:
NoNullevar=NoNullevar.select( "post_evar27", "post_evar33", "post_evar34", "post_evar35", "post_evar39","post_evar47","post_evar71","post_evar78")

In [None]:
#converted to Pandas for viewing
NoNullevarPandas=NoNullevar.toPandas()

In [None]:
NoNullevarPandas.head()

In [None]:
#checked for the relevance to eliminate columns having Low Variance, high correlation
#column post_evar27 has Low Variance
#column post_evar35 is the combination of values of columns post_evar34 and post_evar47 and high correlation
newPandasDataF=NoNullevarPandas.drop(['post_evar27','post_evar34', 'post_evar47','post_evar33'], axis=1)

In [None]:
newPandasDataF.head()

In [None]:
#selected the identified non-null columns with 100000 but null with 1000000 as the threshold to check the columns relevance
#post_evar101 and post_evar102 look correlated. keeping one seems important
newDataF.select("post_evar1", "post_evar2", "post_evar8", "post_evar11", "post_evar26","post_evar28","post_evar38","post_evar40","post_evar48","post_evar49","post_evar51","post_evar53","post_evar63","post_evar64","post_evar65","post_evar68","post_evar70","post_evar77","post_evar80","post_evar98","post_evar99","post_evar101","post_evar102")

In [None]:
#add some more relevant columns to the dataset after looking at the dropped columns after setting threshold value to 1000000
#columns include post_evar11,post_evar48,post_evar51,post_evar53
NoNullevar=newDataF.select( "post_evar11", "post_evar35", "post_evar39","post_evar48","post_evar51","post_evar53","post_evar71","post_evar78","post_evar77","post_evar102")

In [None]:
df=NoNullevar.toPandas()

In [None]:
df.head(20)

# Comparing prop_, evar_ and remaining extracted relevant columns

In [None]:
#post_uniqueId, post_evar71 and post_mcvisid look like identifiers
#post_evar71=post_mcvisid
selected=newDataF.select("post_uniqueId","post_evar71","post_mcvisid")

In [None]:
selected.show()

In [None]:
#not sure what post_evar11 contains. But all three look like event lists
selected=newDataF.select("post_evar11","post_event_list","post_evar71")

In [None]:
selected.head(20)

In [None]:
#referrer is whichever link the user followed to the current page.
#If evar51=Referrer. there will be a link in the post-referrer
selected=newDataF.select("psot_evar51","post_referrer","post_evar53")

In [None]:
selected.show(50)

In [None]:
#Removing duplicates
#post_prop13 == post_evar53
selected=newDataF.select("post_prop13","post_evar53")
selected.show()

In [None]:
#Removing duplicates
#post_prop6 == post_evar33
selected=newDataF.select("post_prop6","post_evar33")
selected.show()

In [None]:
#extracting all the relevant columns from the entire dataset
selected=newDataF.select("post_event_list","post_page_event","post_visid_type","post_referrer","post_evar53","post_evar71","post_evar11","post_prop60","post_uniqueid")

In [None]:
selectedPandas=selected.toPandas()

In [None]:
selectedPandas.head()

# Grouping users

In [None]:
#Grouping By userid, events and referrer
Dataframe=selected.groupBy("post_uniqueid","post_event_list","post_referrer").count()

In [None]:
#sorting on unique userid
df=Dataframe.orderBy(["post_uniqueid"],ascending=1)
#df.show()
#df.select("post_event_list").show()

In [None]:
#selecting details for a particular user
df.filter(df['post_uniqueid']=="1658310662_925910793").show()

In [None]:
#storing no distinct users in an array to split-on for grouping
listids = [list(x.asDict().values())[0] for x in df.select("post_uniqueid").distinct().collect()]

In [None]:
#takes lot of time to load
dfArray = [df.where(df.post_uniqueid == x) for x in listids]

In [None]:
#selecting no. of distinct users
distinct=df.select('post_uniqueid').distinct().count()

In [None]:
#looping through all unique userids
for i in range(0,distinct):
    dfArray[i].show()

In [None]:
#displaying if the column contains null values
from pyspark.sql.functions import col
df.where(col("post_event_list").isNull()).count()

In [None]:
#drop the null values from event_list
WithoutNaEvent=df.na.drop(subset=["post_event_list"])
#WithoutNaEvent.show()
#WithoutNaEvent.count()
#WithoutNaEvent.toPandas()

In [None]:
#Extracting event list containing 1's
SelectedCols=WithoutNaEvent.filter(WithoutNaEvent.post_event_list.rlike(',1,')).select("post_event_list","post_uniqueid")

In [None]:
#saving the unique ids for the event_list containing 1's
listids = [list(x.asDict().values())[0] for x in WithoutNaEvent.filter(WithoutNaEvent.post_event_list.rlike(',1,')).select("post_uniqueid").distinct().collect()]

In [None]:
#count of userids containing 1's
print("{}".format(len(listids)))

In [None]:
#extracting the entire set of event_lists for the userids in the list containing 1's
for x in listids[0:3]:
    WithoutNaEvent.where(WithoutNaEvent.post_uniqueid == x).show(2)