In [1]:
from pyspark.sql.functions import *
import numpy as np
import math
import pandas as pd
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, ArrayType
from nltk.stem import PorterStemmer
from nltk.tokenize import sent_tokenize, word_tokenize
import nltk
from nltk.stem.snowball import SnowballStemmer
from nltk.corpus import stopwords
from pyspark.sql.functions import row_number, lit
from pyspark.sql.window import Window
from nltk.stem import WordNetLemmatizer

#Read two spatio-textual files
df1 = spark.read.csv("C:/Users/nikos/Desktop/partitiondata.csv", header=False).toDF("userId", "Text", "Latitude", "Longitude")
df2 = spark.read.csv("C:/Users/nikos/Desktop/check-ins.csv", header=False).toDF("userId", "Text", "Latitude", "Longitude")

gps = df1.withColumn("Latitude", regexp_replace('Latitude', '["(]', '').cast("float")).withColumn("Longitude", regexp_replace('Longitude', '[)"]', '').cast("float")).withColumn('DatasetID', lit("A"))

check = df2.withColumn("Latitude", regexp_replace('Latitude', '["(]', '').cast("float")).withColumn("Longitude", regexp_replace('Longitude', '[)"]', '').cast("float")).withColumn('DatasetID', lit("B"))

#Union the dataframes and sort by Latitude
datasets = gps.union(check)
sorted_by_latitude = datasets.sort(col("Latitude").asc())

stop = stopwords.words('english')

#Function of lemmatization the text
def lemmatization(x):
    wordnet_lemmatizer = WordNetLemmatizer()
    #stemmer = SnowballStemmer("english")
    lista = []
    words = x.split(',')
    #words = nltk.word_tokenize(x)
    remove = [item for item in words if item not in stop]
    
    for word in remove:
        lista.append(wordnet_lemmatizer.lemmatize(word))   
     
    return lista

udf_lemmatization = udf(lemmatization, ArrayType(StringType()))

lemma_df = sorted_by_latitude.withColumn("Lemma", udf_lemmatization(sorted_by_latitude.Text)).drop('Text')

#γενικά χρειάζομαι το νούμερο της γραμμής για να κάνω το οριζόντιο partitioning με τη συνάρτηση cellID
#διότι με τη διαίρεση του μεγέθους του dataframe με το 10, και παίρνοντας το ακέραιο μέρος της διαίρεσης του με το νούμερο της γραμμής, αντιστοιχίζεται στο αντίστοιχο κελί.
#επομένως ο μόνος τρόπος για να βρίσκω το νούμερο της γραμμής είναι οι δύο επόμενες εντολές, οι οποίες δε ξέρω αν είναι αποδοτικές
#είχα βρει και το monotonically_increasing_id αλλά δε μου βγάζει πάντα με τη σειρά του αριθμούς αλλά διακριτούς αριθμούς που δε μου κάνουν
#αν έχεις κάποια άλλη πρόταση για να βρίσκω πιο αποδοτικά το νούμερο της γραμμής ενημέρωσε με
w = Window().orderBy(lit('A'))

dataframe = lemma_df.withColumn("IncreasingId", (row_number().over(w)-1))

dimension = 10
size = dataframe.count()/dimension

def cellID(x):

    cellid = int(x / size) 
    return cellid

udf_cellID = udf(cellID, IntegerType())

cell_point = dataframe.withColumn("cellid", udf_cellID(dataframe.IncreasingId))
          
y_axis = []

#Find the maximum latitude value of each cell for the duplication of points
for i in range(0,(dimension-1)):
    value = cell_point.where(cell_point.cellid == i).select(max('Latitude')).collect()[0]
    y_axis.append(value)

  
#radius of search
r = 0.5

#Function that duplicates the points of dataset B to the nearest cells based on radius
def find_cell(y, cell, dataset):
    l = []
    l.append(cell)
    if dataset == "B":
        #check the lower cells to duplicate point
        for i in range((cell - 1), -1):
            if (i >= 0) and ((y - r) < y_axis[i][0]):
                l.append(i)
            else:
                break
        #check the upper cells to duplicate point
        for j in range((cell + 1), dimension):
            if (cell <= dimension) and ((y + r) > y_axis[j - 1][0]):
                l.append(j)
            else:
                break
    return l

udf_cell = udf(find_cell, ArrayType(IntegerType()))

duplicate_points = cell_point.withColumn("cell", udf_cell(cell_point.Latitude, cell_point.cellid, cell_point.DatasetID))

#make the duplication of the row
duplicated = duplicate_points.withColumn('grid', explode(duplicate_points.cell)).drop('cell').drop('cellid')

#find the distinct values to partition data
mapping = {k: i for i, k in enumerate(
    duplicated.select("grid").distinct().orderBy(asc("grid")).rdd.flatMap(lambda x: x).collect()
)}
                        
print(mapping)

#partition by the distinct cell id
result = (duplicated
    .select("grid", struct([c for c in duplicated.columns]))
    .rdd.partitionBy(len(mapping), lambda k: mapping[k])
    .values()
    .toDF(duplicated.schema))

print("Number of partitions: {}".format(result.rdd.getNumPartitions()))

partitions = result.rdd.glom().collect()
for i, l in enumerate(partitions): 
    print ("partition #{} length: {}".format(i, len(l)))
   


{0: 0, 1: 1, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6, 7: 7, 8: 8, 9: 9}
Number of partitions: 10
partition #0 length: 300
partition #1 length: 323
partition #2 length: 366
partition #3 length: 491
partition #4 length: 491
partition #5 length: 491
partition #6 length: 373
partition #7 length: 673
partition #8 length: 440
partition #9 length: 524


# Find points of dataset "A" and "B"

In [3]:
#Find points of dataset A and B in each partition
def function1(iterator):
    
    #retain the values of each partition to a pandas dataframe
    d = pd.DataFrame([p[0], p[1], p[2], p[3], p[4], p[5], p[6]] for p in iterator)
    d.columns = ['usedid', 'latitude', 'longitude', 'dataset', 'text', 'increasingid', 'cellid']
    
    a = 0
    b = 0
    number = []
    
    for row1 in d.iterrows():
        if row1[1][3] == "A":
            a += 1
        elif row1[1][3] == "B":
            b += 1
                                    
    number.append(a)
    number.append(b)
    return [number]

nearest_points = result.rdd \
        .mapPartitions(function1) \
        .collect()
    
print("points of A, B in each partition: {}".format(nearest_points))

points of A, B in each partition: [[277, 23], [257, 66], [175, 191], [300, 191], [300, 191], [227, 264], [0, 373], [0, 673], [216, 757], [248, 602]]


# Functions of Jaccard Similarity and Euclidean Distance

In [4]:
import time
import matplotlib.pyplot as plt

#function which computes the jaccard similarity
def jaccard_similarity(list1, list2):
    intersection = len(list(set(list1).intersection(list2)))
    union = (len(list1) + len(list2)) - intersection
    return float(intersection / union)

#function which computes the euclidean distance
def euclidean_distance(object1, object2):
    x1 = object1[2]
    x2 = object2[2]
    y1 = object1[1]
    y2 = object2[1]
    dist = math.sqrt((x2 - x1)**2 + (y2 - y1)**2)
    return dist
    

# Brute-force in each partition

In [6]:
#compute the euclidean distance and jaccard similarity for each point of dataset B against all of dataset A
def function2(iterator):
    start_time = time.time()
    
    d = pd.DataFrame([p[0], p[1], p[2], p[3], p[4], p[5], p[6]] for p in iterator)
    d.columns = ['usedid', 'latitude', 'longitude', 'dataset', 'text', 'increasingid', 'cellid']
    y = []
    
    #textual similarity
    e = 0.7
    #counters of use of functions and join of points
    count_functions= 0
    count_join = 0
    
    for row1 in d.iterrows():
        
        x = []
        if  (row1[1][3] == "B"):
            
            for row2 in d.iterrows():
                if (row2[1][3] == "A"):
                    count_functions += 1
                    
                    if (euclidean_distance(row1[1], row2[1]) <= r) and (jaccard_similarity(row1[1][4], row2[1][4]) >= e):
                        count_join += 1
                        x.append(row1[1][5])
            
        y.append(x)
                                  
    return [[str(time.time() - start_time), count_functions, count_join]]

nearest_points = result.rdd \
        .mapPartitions(function2) \
        .collect()
    
print("Execution time, count of using functions, count of join points: {}".format(nearest_points))

Execution time, count of using functions, count of join points: [['2.9914004802703857', 6371, 0], ['8.918791770935059', 16962, 0], ['23.199210166931152', 33425, 210], ['34.793915033340454', 57300, 0], ['34.433778285980225', 57300, 0], ['42.04338455200195', 59928, 0], ['29.76351571083069', 0, 0], ['78.77758955955505', 0, 0], ['137.3939447402954', 163512, 0], ['101.38583636283875', 149296, 0]]


# Plane sweep in each partition

In [8]:
#sort values by Longitude, apply plane sweep until the longitude of a point is greater than the longitude of the checked point + radius 
def function3(iterator):
    #retain the start time of the execution of each partition
    start_time = time.time()
    
    #retain the values of each partition to a pandas dataframe and sort them based on the longitude
    d = pd.DataFrame([p[0], p[1], p[2], p[3], p[4], p[5], p[6]] for p in iterator)
    d.columns = ['usedid', 'latitude', 'longitude', 'dataset', 'text', 'increasingid', 'cellid']
    d.sort_values(by=['longitude'])
    
    #creates the counters of the metrics as zero
    count_euclidean = 0
    count_jaccard = 0
    count_join = 0
    
    #εδώ θέλω να αρχικοποιήσω μια λίστα στην οποία θα κρατάω στη θέση του id της γραμμής για το dataset A το στοιχείο από το dataset B, το οποίο περνά από τις δύο συναρτήσεις
    #επειδή όμως το id δεν αρχίζει από 0 αλλά αυξάνεται συνέχεια, δε μπορούσα να το κάνω διότι η λίστα ξεκινάει με δείκτη 0 και ανεβαίνει
    #έτσι σκέφτηκα να φτιάχνω μια λίστα με μέγεθος βάσει της μέγιστης τιμής του id και της ελάχιστης, και πιο κάτω στη συνάρτηση να αφαιρώ το id της γραμμής από την ελάχιστη τιμή του id σε κάθε partition
    #για παράδειγμα, αν είχα ελάχιστη τιμή id 1600 και έλεγχα το 1601, θα έβαζα στη θέση 1601-1600 = 1 της λίστας αυτό που ήθελα
    #ξέρω ότι έτσι θα δημιουργήσω μια μεγάλη λίστα που δεν την χρειάζομαι με ακρετά κενά πεδία, αλλά ο έλεγχος που πραγματοποιώ πιο κάτ στη συνάρτηση, γίνεται μ΄΄ονο κατά + r
    #αν έχεις κάτι άλλο να μου προτείνεις ενημέρωσε με σε παρακαλώ
    
    minvalue = d['increasingid'].min()
    maxvalue = d['increasingid'].max()
    points = [[] for x in range(minvalue, maxvalue)]
     
    e = 0.7
    #check each point from its next points, until the longitude of a point is greater than the longitude of the checked point + r
    for i, row1 in enumerate(d.iterrows()):
        
        for j, row2 in enumerate(d[i+1:].iterrows()):
            
            if row2[1][2] < (row1[1][2] + r):
                if row1[1][3] != row2[1][3]:
                    count_euclidean += 1
                    
                    if row1[1][3] == "A" and (euclidean_distance(row1[1], row2[1]) <= r):
                        count_jaccard += 1
                        if jaccard_similarity(row1[1][4], row2[1][4]) >= e:
                            count_join += 1 
                            points[row1[1][5]-minvalue].append(row2[1][5])
                            
                    elif row1[1][3] == "B" and (euclidean_distance(row1[1], row2[1]) <= r):
                        count_jaccard += 1 
                        if jaccard_similarity(row1[1][4], row2[1][4]) >= e:
                            count_join += 1
                            points[row2[1][5]-minvalue].append(row1[1][5])
            else:
                break
                                    
    return [[str(time.time() - start_time), count_euclidean, count_jaccard, count_join]]

nearest_points = result.rdd \
        .mapPartitions(function3) \
        .collect()

#partition_time = [float(i) for i in nearest_points]
print("Execution time, count euclidean function, count jaccard function, count join: {}".format(nearest_points))

Execution time, count euclidean function, count jaccard function, count join: [['16.49152708053589', 6371, 6371, 0], ['22.38470482826233', 16962, 16962, 0], ['32.467488050460815', 33425, 33425, 210], ['57.770145416259766', 57300, 57300, 0], ['57.929160833358765', 57300, 57300, 0], ['44.17149353027344', 43357, 43357, 0], ['16.42401695251465', 0, 0, 0], ['46.989752531051636', 0, 0, 0], ['119.73594975471497', 150336, 417, 0], ['99.18936824798584', 136452, 0, 0]]
