In [4]:

# todo: change trashold
SUPPORT_TRASHOLD=0.0002
import pyspark.sql.functions as F
from pyspark.sql import Row
import time
# df = spark.read.text("user-searches-min.txt")
import re

def removeTimestamp(row):
    find = re.compile(r'\d{4}-\d{2}-\d{2}')
    start = re.search(find,row).start()
    return row[0:start]

def spliteToUserIdAndUserSearch(row):
    row = row.split("\t", 1)
    row[1] =  row[1].rstrip('\t')
    return (int(row[0]),row[1])

def uniqueList(line):
    uniqueSearches = set(line[1])
    newLine = [line[0],list(uniqueSearches)]
    return newLine

log_txt=sc.textFile("user-searches-min.txt")
header = log_txt.first()

#filter out the header, make sure the rest looks correct
log_txt = log_txt.filter(lambda line: line != header)
logSearch = log_txt.map(lambda line: spliteToUserIdAndUserSearch(removeTimestamp(line))).distinct()
print(logSearch.take(2))

[(1, 'a'), (2, 'b')]


In [5]:
start = time.time()
# number of users
totalOfTransactions = logSearch.groupByKey().count()

In [6]:
print(totalOfTransactions)

3


In [7]:
# remove the user id -> return only query

all_queries = logSearch.map(lambda line: line[1])
# count how much time query is show for all the user divide by number of users
rdd_query_count = all_queries.map(lambda q: (q, 1) ).reduceByKey(lambda c1,c2: c1+c2 ) \
                                                    .map(lambda x: (x[0], x[1] / totalOfTransactions)) \
                                                    .filter(lambda x: x[1] > SUPPORT_TRASHOLD)

# rdd_query_count is list of queries that pass the thrasholds of support
rdd_query_count.take(10)


[('a', 1.0),
 ('b', 0.6666666666666666),
 ('c', 0.6666666666666666),
 ('myspace.com', 0.3333333333333333),
 ('dfdf', 0.3333333333333333),
 ('vaniqa.comh', 0.3333333333333333),
 ('www.collegeucla.edu', 0.3333333333333333),
 ('www.elaorg', 0.3333333333333333)]

In [10]:
# get the set of all queries
# todo: create join instead of collect 
validItems = rdd_query_count.map(lambda x:x[0]).collect()
print(validItems)

['a', 'b', 'c', 'myspace.com', 'dfdf', 'vaniqa.comh', 'www.collegeucla.edu', 'www.elaorg']


In [9]:
def include_queries_by_support_thrasholds(inValidList,validItems):
    return [s for s in inValidList if s in validItems]


In [13]:
user_query = logSearch.groupByKey().mapValues(list).filter(lambda kv: len(kv[1]) > 1)  
# remove all transactions that are not supported - if not in valid items will be removed - if query passed the trusthold 
user_query = user_query.map(lambda t: (t[0],include_queries_by_support_thrasholds(t[1],validItems)))
user_query.take(2)

[(1, ['a', 'b', 'c', 'dfdf', 'www.elaorg']),
 (2, ['b', 'a', 'www.collegeucla.edu'])]

In [14]:
def get_all_pairs(arr):
        result = []
        # all posible pairs for each user 
        for p1 in range(len(arr)):
                for p2 in range(p1+1,len(arr)):
                        result.append((arr[p1],arr[p2]))
        return result
# 
all_queries_pairs_tuples = user_query.map(lambda kv: kv[1]).flatMap(lambda arr: get_all_pairs(arr))
all_queries_pairs_tuples.take(50)
# userId | a, b ,c ,d
# 1| 1,1,0,0->a,b
# 2|0,1,1,0->b,c

[('a', 'b'),
 ('a', 'c'),
 ('a', 'dfdf'),
 ('a', 'www.elaorg'),
 ('b', 'c'),
 ('b', 'dfdf'),
 ('b', 'www.elaorg'),
 ('c', 'dfdf'),
 ('c', 'www.elaorg'),
 ('dfdf', 'www.elaorg'),
 ('b', 'a'),
 ('b', 'www.collegeucla.edu'),
 ('a', 'www.collegeucla.edu'),
 ('a', 'c'),
 ('a', 'myspace.com'),
 ('a', 'vaniqa.comh'),
 ('c', 'myspace.com'),
 ('c', 'vaniqa.comh'),
 ('myspace.com', 'vaniqa.comh')]

In [15]:
# (a,b) is the same as (b,a)- so we need to remove the same pairs
def sort_small_list(arr):
    if(arr[0] <= arr[1]):
        return arr
    return [arr[1],arr[0]]
# the sort is for (a,b) (b,a) = > (a,b) (a,b) => ((a,b),2)
all_queries_tuples_sorted = all_queries_pairs_tuples.map(lambda kv: sort_small_list(list(kv))) \
                                                .map(lambda arr: (arr[0],arr[1]) )

all_queries_pairs_tuples_count = all_queries_tuples_sorted.map(lambda kv: (kv,1)) \
                                                    .reduceByKey(lambda c1,c2: c1+c2 )\
                                                    .filter(lambda kv: kv[1] > 1) \
                                                    .map(lambda x: (x[0], x[1] / totalOfTransactions)) 


all_queries_pairs_tuples_count.take(10)


[(('a', 'b'), 0.6666666666666666), (('a', 'c'), 0.6666666666666666)]

In [16]:
# todo: replace cartesian with join 

rdd_queries_tuples_cartesian = all_queries_pairs_tuples_count.cartesian(rdd_query_count)
rdd_queries_tuples_cartesian.take(3)

[((('a', 'b'), 0.6666666666666666), ('a', 1.0)),
 ((('a', 'b'), 0.6666666666666666), ('b', 0.6666666666666666)),
 ((('a', 'b'), 0.6666666666666666), ('c', 0.6666666666666666))]

In [17]:
# this command calculate XUY/X by taking all lines that ((x ,y , number of suply(xUy)),(z ,number of suply(z))) when z ==x
rdd_join_left = rdd_queries_tuples_cartesian.filter(lambda lr: lr[0][0][0] == lr[1][0]) \
                                            .map(lambda lr: (lr[0][0][0],lr[0][0][1],float(lr[0][1]) / lr[1][1]))

print(rdd_join_left.take(5))
# this command take XUY/Y

rdd_join_right = rdd_queries_tuples_cartesian.filter(lambda lr: lr[0][0][1] == lr[1][0])\
                                             .map(lambda lr: (lr[0][0][1], lr[0][0][0], float(lr[0][1]) / lr[1][1]))
print(rdd_join_right.take(5))

rdd_query_conf = sc.union([rdd_join_left, rdd_join_right])

[('a', 'b', 0.6666666666666666), ('a', 'c', 0.6666666666666666)]
[('b', 'a', 1.0), ('c', 'a', 1.0)]


In [20]:
end = time.time()
elapsed_time = time.strftime("%H:%M:%S", time.gmtime(end - start))
print("elapsed time: %s" % elapsed_time)

elapsed time: 00:00:06


Py4JJavaError: An error occurred while calling o399.saveAsTextFile.
: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory file:/home/yo/test/Untitled Folder/ex02_all_confs already exists
	at org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:131)
	at org.apache.spark.internal.io.HadoopMapRedWriteConfigUtil.assertConf(SparkHadoopWriter.scala:287)
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:71)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopDataset$1(PairRDDFunctions.scala:1096)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1094)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$4(PairRDDFunctions.scala:1067)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1032)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$1(PairRDDFunctions.scala:958)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:958)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$1(RDD.scala:1499)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1478)
	at org.apache.spark.api.java.JavaRDDLike.saveAsTextFile(JavaRDDLike.scala:550)
	at org.apache.spark.api.java.JavaRDDLike.saveAsTextFile$(JavaRDDLike.scala:549)
	at org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(JavaRDDLike.scala:45)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)
