In [None]:
from pyspark.sql import Row

In [None]:
# load converters table
rdd = sc.textFile("/insight_converter_site_history.txt")

In [None]:
rdd2=rdd.map(lambda r: r.split("\t"))
rdd3=rdd2.map(lambda r: Row(date = r[0],
                           userid = r[1],
                           siteid = r[2],
                           timestamp = r[3],
                           visit = r[4])).setName("conv_rdd").persist(StorageLevel.MEMORY_AND_DISK_SER)
df = sqlContext.createDataFrame(rdd3)
df.registerTempTable("converters")

In [None]:
# load users table
users_rdd = sc.textFile("/insight_site_staging_10M.txt")\
          .map(lambda r: r.split("\t"))\
          .map(lambda r: Row(date=r[0], 
                             userid=r[1],
                             siteid=r[2],
                             timestamp=r[3],
                             visit=r[4]))\
          .setName("users_rdd")\
          .persist(StorageLevel.MEMORY_AND_DISK_SER)
users_df = sqlContext.createDataFrame(users_rdd)
users_df.registerTempTable("users")

In [None]:
# clearn users table
siteid_nusers = sqlContext.sql("""
    SELECT siteid, count(userid) as num_users 
    FROM users WHERE siteid <> 'NULL'
    GROUP BY siteid 
    ORDER BY num_users DESC
    """)
siteid_nusers.registerTempTable("siteid_nusers")
siteid_nusers_rdd = siteid_nusers.rdd.setName("siteid_nusers_rdd")\
                         .persist(StorageLevel.MEMORY_AND_DISK_SER)

In [None]:
# filter siteids in a given range of number of users
siteid_nusers_select = sqlContext.sql("""
    SELECT siteid, num_users
    FROM siteid_nusers
    WHERE num_users < 50000 and num_users > 150
    """)
siteid_nusers_select_rdd = siteid_nusers_select.rdd.map(lambda x:
                        (x.siteid," ".join([x.siteid,str(x.num_users)]))
                        ).setName("siteid_nusers_select_rdd")\
                         .persist(StorageLevel.MEMORY_AND_DISK_SER)
siteid_nusers_select.registerTempTable("siteid_nusers_select")
siteid_nusers_select_rdd1 = siteid_nusers_select.rdd.map(lambda x:
                        " ".join([x.siteid,str(x.num_users)]))\
                         .setName("siteid_nusers_select_rdd1")\
                         .persist(StorageLevel.MEMORY_AND_DISK_SER)

In [None]:
# select users wheren siteid is in my range
users_sampled=sqlContext.sql("""
    SELECT users.userid, users.siteid,users.timestamp,users.visit
    FROM users,siteid_nusers_select
    WHERE users.siteid = siteid_nusers_select.siteid
    """)
users_sampled_rdd = users_sampled.rdd.setName("users_sampled_rdd")\
                         .persist(StorageLevel.MEMORY_AND_DISK_SER)
users_sampled.registerTempTable("users_sampled")
#site_joined = site_rdd.join(siteid_nusers_select_rdd)
#site_joined.count()

In [None]:
# filter converter table in the selected range of sites
conv_sampled=sqlContext.sql("""
    SELECT userid,converters.siteid,timestamp,visit 
    FROM siteid_nusers_select,converters 
    WHERE siteid_nusers_select.siteid = converters.siteid""")
conv_sampled_rdd = conv_sampled.rdd.setName("conv_sampled_rdd")\
                         .persist(StorageLevel.MEMORY_AND_DISK_SER)
conv_sampled.registerTempTable("conv_sampled")

In [None]:
# compute shared users to fill similarity matrix
siteids_shared_users=sqlContext.sql("""
    SELECT c1.siteid AS siteid1,c2.siteid AS siteid2,count(*) as shared
    FROM users_sampled AS c1,users_sampled AS c2 
    WHERE c1.userid = c2.userid AND c1.siteid <> c2.siteid 
    GROUP BY c1.siteid,c2.siteid""")
siteids_shared_users.registerTempTable("siteids_shared_users")
siteids_shared_users_rdd = siteids_shared_users.rdd.setName("siteids_shared_users")\
                         .persist(StorageLevel.MEMORY_AND_DISK_SER)
siteids_shared_users.registerTempTable("siteids_shared_users")
# siteids_shared_users.rdd.count()

In [None]:
# export shared users for PIC clustering
rdd6 = siteids_shared_users.rdd.map(lambda r: ", ".join([r.siteid1,r.siteid2,str(r.shared)])).repartition(1)
rdd6.saveAsTextFile("hdfs:///data/pic_users_50M_100000_100")
# export number of users per site
rdd7 = siteid_nusers_select_rdd1.repartition(1)
rdd7.saveAsTextFile("hdfs:///data/siteids_n_users_10M_50000_150_n")
# export shared users for matrix
rdd5 = siteids_shared_users_rdd.map(
    lambda r: (r.siteid1, r.siteid2, r.shared)
    ).repartition(1)
rdd5.saveAsTextFile("hdfs:///data/siteids_shared_users_10M_50000_150")

In [None]:
# prepare users table for join with converters
users_sampled_join=users_sampled_rdd.map(lambda x: " ".join(
        [x.userid,x.siteid,x.timestamp,x.visit]
        ))
users_pairs = users_sampled_join.map(lambda x: (x.split(" ")[0], x))

In [None]:
# prepare converters table for join with users
conv_sampled_join=conv_sampled_rdd.map(lambda x: " ".join(
        [x.userid,x.siteid,x.timestamp,x.visit]
        ))
conv_pairs = conv_sampled_join.map(lambda x: (x.split(" ")[0], x))

In [None]:
# identify users with no conversion
users_subtract_conv = users_pairs.subtractByKey(conv_pairs).setName("users_subtract_conv")\
                         .persist(StorageLevel.MEMORY_AND_DISK_SER)

In [None]:
# identify converters with match in the user population 
users_conv_joinC = sqlContext.sql("""
    SELECT 
        conv_sampled.userid,
        conv_sampled.siteid,
        conv_sampled.timestamp,
        conv_sampled.visit
    FROM conv_sampled,users_sampled
    WHERE users_sampled.userid = conv_sampled.userid 
    AND users_sampled.siteid = conv_sampled.siteid
    """)

users_conv_joinC_rdd = users_conv_joinC.rdd.setName("users_conv_joinC_rdd")\
                         .persist(StorageLevel.MEMORY_AND_DISK_SER)
users_conv_joinC.registerTempTable("users_conv_joinC")

In [None]:
# load computed projection of PC for sites
rdd20 = sc.textFile("/pca.csv")
rdd21=rdd20.map(lambda x: (" ".join(x.split(","))))
rdd22=rdd21.map(lambda x: (x.split(" ")[0]," ".join(x.split(" ")[1:])))
#rdd22.take(2)

In [None]:
#load computed clusters for sites
rdd30 = sc.textFile("/clustering_labels6")
rdd31=rdd30.map(lambda x: (x.split(" ")[0], x.split(" ")[1]))
#rdd31.take(2)

In [None]:
# make joins on the converters list
conv_pairs1 = users_conv_joinC_rdd.map(lambda x: (x.siteid," ".join([x.userid,x.siteid,x.timestamp,x.visit]) ))
conv_out = conv_pairs1.join(rdd31).values().map(lambda x: " ".join([x[0],x[1]]))
conv_pairs2 = conv_out.map(lambda x: (x.split(" ")[1],x))
conv_out1 = conv_pairs2.join(rdd22).values().map(lambda x: " ".join([x[0],x[1]]))
conv_out1.take(2)

In [None]:
# make joins on the users list
users_pairs1 = users_subtract_conv.values().map(lambda x: (x.split(" ")[1],x))
users_out = users_pairs1.join(rdd31).values().map(lambda x: " ".join([x[0],x[1]]))
users_pairs2 = users_out.map(lambda x: (x.split(" ")[1],x))
users_out1 = users_pairs2.join(rdd22).values().map(lambda x: " ".join([x[0],x[1]]))
users_out1.take(2)

In [None]:
# print csv for converters set
rdd41 = conv_out1.map(lambda x: ",".join(x.split(" "))).repartition(1)
rdd41.saveAsTextFile("hdfs:///data/converters_10M_50000_150")

In [None]:
# print csv for users set
rdd42 = users_out1.map(lambda x: ",".join(x.split(" "))).repartition(1)
rdd42.saveAsTextFile("hdfs:///data/users_10M_50000_150")