# 2D PointCloud Indexing

# IGI Spark
Inverted Grid Index for PointClouds with Apache Spark 2.0.1

## Import Libraries

In [None]:
# From Apache Spark - Pyspark
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf, StorageLevel
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Python
import time
from statistics import mode

## Apache Spark Session

In [None]:
# Set configuration according with your Server/Cluster
sc = (SparkSession.builder.appName("IGI Spark")      
      .master("Set master URL")
      .config("spark.executor.cores", "5")
      .config("spark.executor.instances", "12")
      .config("spark.executor.memory", "20g")      
      .config("spark.driver.memory", "8g")
      .config("spark.driver.cores", "1")   
      .config("spark.python.worker.memory", "70g")                
      .enableHiveSupport()
      .getOrCreate())

# (Optional- For large files) - .config("spark.sql.shuffle.partitions", "10000")  

## IGI Parameters
Set inverted grid index parameters

In [None]:
cmax = 10000
delta = 10

## Loading PointClouds from CSV File for Indexing


In [None]:
# Schema from CSV:
# Example.csv
# 1st Row: ID, X, Y
# 2nd Row: 0, 2.4, 1.2
# 3rd Row: 0, 4.2, 11.5
# 4th Row: 1, 4.3, 12.5
# 5th Row: 1, 64.2, 12.6
# ...
# ith Row: 501, 52.5,12.5
# (i+1)th Row: 501, 23.5,213.5

df = sc.read.format('csv').options(header='true', inferschema='true').csv('Path from the CSV file')

## IGI Spark - Construction

In [None]:
# For every point calculate corresponding cell
datos = df.select(df['I'].cast(IntegerType()), floor(df['X']/delta).alias("X"), floor(df['Y']/delta).alias("Y"))
idx = datos.select((datos['X']+ (cmax/delta)*datos['Y']).alias("Celda"),datos['I'].alias('ID'))              

# Create Inverted Index
indice = idx.groupBy(idx['Celda']).agg(collect_list(idx['ID']).alias("Listas"))
indice = indice.select(indice['Celda'].cast(IntegerType()),indice['Listas'])

# (Optional )Repartition for avoid uneven partitions
#indice = indice.repartition(10000,indice['Celda']).sortWithinPartitions(indice['Celda'])

# Save Dataframe in Cache (RAM)
indice.cache()

# Action to run execution graph
indice.count()

## (Optional) Save and Load index - Hard Drive

In [None]:
# Save index in an ORC File
indice.write.orc("Path to Write File")

# Load index from ORC File
indice = sc.read.orc("Path from ORC File")

## UDF's for Dataframes

In [None]:
#UDF's - Calculating the mode from a vector and calculating the mode from a vector of vectors
def moda_vector(x):
    flattened = [val for sublist in x for val in sublist]
    return mode(flattened)
    
moda = udf(lambda x:mode(x))
moda_2 = udf(lambda x:moda_vector(x))

## Loading PointClouds Queries

In [None]:
# Loading PointClouds Queries from CSV File
dfq = sc.read.format('csv').options(header='true', inferschema='true').csv('path from CSV File')

# For every point calculate corresponding cell
datosq = dfq.select(dfq['I'].cast(IntegerType()), floor(dfq['X']/delta).alias("X"), floor(dfq['Y']/delta).alias("Y"))
dq = datosq.select((datosq['X']+ (cmax/delta)*datosq['Y']).cast(IntegerType()).alias("Celda"),datosq['I'].cast(IntegerType()).alias("ID")) 

#  (Optional )Repartition for avoid uneven partitions
#dq = dq.repartition(10000,dq['Celda']).sortWithinPartitions(dq['Celda'])

# Save Dataframe in Cache (RAM)
dq.cache()

# Action to run execution graph
dq.count()

## Querying Process

In [None]:
# Join Query - Qyerying all PointClouds in Batch
answer = indice.join(dq, "Celda")
answer = answer.select(answer["ID"].alias("QueryID"),answer["Listas"].alias("ListasCloudID"))
anw = answer.groupBy(answer['QueryID']).agg(collect_list(answer['ListasCloudID']).alias("ListasID"))

# 1-NN
anw = anw.select(anw['QueryID'],moda_2(anw['ListasID']).alias("NubeID"))

# Recall@1
recall = anw.select(avg((anw['QueryID']==anw['NubeID']).cast(IntegerType())).alias("Recall@1"))

# Show Recall@1
recall.show()

## Stop Spark Session

In [None]:
sc.stop()