In [1]:
import numpy as np
import pandas as pd

import findspark
findspark.init()
import pyspark
import random


In [2]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, udf, explode, monotonically_increasing_id

In [3]:
# import os
# os.environ['SPARK_HOME'] = "C:\spark-2.4.0-bin-hadoop2.7"
# os.environ['JAVA_HOME'] = "C:\Program Files\Java\jdk-9.0.4"

In [4]:
# create session
spark = SparkSession.builder \
    .master("local[2]") \
    .appName("SPADE") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

# create context
sc = spark.sparkContext

In [6]:
num_samples = 100000000

def inside(p):
    x, y = random.random(), random.random()
    return x*x + y*y < 1

inside_udf = udf(inside)

count = sc.parallelize(range(0, num_samples)).count()
pi = 4 * count / num_samples
print(pi)

4.0


## Read

In [7]:
# read
review_data = spark.read.text('reviews_sample.txt')
min_sup = review_data.count()*0.01

In [8]:
# split into list of words and add sid column

review_split = review_data.withColumn('value', split(review_data.value, ' '))\
.select('value', monotonically_increasing_id().alias('sid'))

## Process 1 length

In [10]:
# create sid index

cols = ('value', 'sid')
review_explode = review_split.select(*cols, explode('value').alias('word') )

In [11]:
# create eid by using window
from pyspark.sql.functions import row_number
from pyspark.sql import Window

w = Window.partitionBy("value").orderBy("word")
review_enumerate = review_explode.withColumn("value", row_number().over(w))
review_enumerate.show()

+-----+----+----------+
|value| sid|      word|
+-----+----+----------+
|    1|5776|  adequate|
|    2|5776|      also|
|    3|5776|      area|
|    4|5776|       ask|
|    5|5776|      back|
|    6|5776| beautiful|
|    7|5776|      best|
|    8|5776|      best|
|    9|5776|      best|
|   10|5776|    blonde|
|   11|5776|    brassy|
|   12|5776|    bumble|
|   13|5776|    bumble|
|   14|5776|      came|
|   15|5776|      came|
|   16|5776|     could|
|   17|5776|  customer|
|   18|5776|     decor|
|   19|5776|definetely|
|   20|5776|definetely|
+-----+----+----------+
only showing top 20 rows



In [14]:
# group by word and cut length
from pyspark.sql.functions import collect_list, struct, col, size
from pyspark.sql.types import ArrayType, IntegerType

def distinct_size(x):
    return len(pd.unique(x))

def zip_spark(x, y):
    return list(zip(x,y))

zip_spark_udf = udf(zip_spark, returnType=ArrayType(ArrayType(IntegerType(), IntegerType())))

distinct_size_udf = udf(distinct_size, returnType=IntegerType())

# combine sid, value(eid) and group based on word
review_zip = review_enumerate.groupby('word').agg(collect_list('sid').alias('sid'), collect_list('value').alias('eid'))\
.where(distinct_size_udf('sid') >=min_sup) # zip columns

In [15]:
%%time
review_zip.show()

+------------+--------------------+--------------------+
|        word|                 sid|                 eid|
+------------+--------------------+--------------------+
|        hope|[1617, 2761, 7627...|[58, 13, 54, 33, ...|
|       still|[1613, 1613, 9600...|[144, 145, 66, 67...|
|         art|[2699, 6140, 2988...|[8, 4, 4, 5, 6, 7...|
|      online|[723, 723, 4390, ...|[72, 73, 14, 11, ...|
|       crust|[4266, 4284, 2732...|[15, 30, 6, 21, 1...|
|      filled|[943, 9857, 1931,...|[35, 81, 22, 23, ...|
|    received|[7215, 6306, 631,...|[47, 51, 37, 95, ...|
|    slightly|[1903, 6475, 470,...|[43, 34, 26, 27, ...|
|       staff|[5832, 3294, 7884...|[132, 75, 78, 79,...|
|conversation|[6743, 2424, 9568...|[28, 21, 15, 7, 4...|
|       often|[9958, 2699, 2699...|[15, 118, 119, 74...|
|    positive|[4003, 1651, 50, ...|[15, 45, 88, 56, ...|
|      taking|[9857, 8732, 3809...|[166, 80, 113, 39...|
|      worked|[1075, 4032, 4413...|[125, 50, 101, 40...|
|       watch|[810, 7807, 1863,

In [None]:
# check
data = pd.read_table('reviews_sample.txt', header=None)
np.sort(data[0][5776].split(' '))

In [None]:
review_zip = review_enumerate.select('word', struct('sid', 'value').alias('id'))\
.groupby('word').agg(collect_list('id').alias('id'))

In [None]:
spark.stop()
sc.stop()