In [1]:
import pandas as pd
import os
import sys
from pyspark.sql.functions import format_string,date_format
from pyspark.sql import SparkSession

In [5]:
def prompt():
    
    while True:
        search_type = input("Select type(s) of search you want to do:\n 1 = Title, 2= Column, 3= Content.\n Separate by space. You can select multiple\n")


        values = search_type.split(',')
        wrong=False

        for i in values:

            if i not in ['1','2','3']:
                print('Input can only be integers: 1,2,3 separated by comma')
                wrong = True
                
            if wrong==True:
                continue
                
        if wrong==False:
            break
    
            
    keywords = input("Enter keywords for your search separated by comma")

    while len(keywords) == 0 or keywords == ' ':
        print('Please enter valid input')
        keywords = input("Enter keywords for your search separated by comma")

    words = keywords.split(',')

    #Filter
    row_filter = input("Filter. Please enter minimum number of rows per table. Enter n to ignore")

    while len(row_filter) != 1:
        print('Please enter valid input')
        row_filter = input("Filter. Please enter minimum number of rows per table. Enter n to ignore")

    if row_filter == 'n':
        pass
    else:
        while True:
            try:
                row_filter = int(row_filter)
                break
            except ValueError:
                print('Please enter valid input')
                row_filter = input("Filter. Please enter minimum number of rows per table. Enter n to ignore")
                
    return search_type, words, row_filter

                    
                  
                    

In [10]:
search_type, words, row_filter = prompt()

Select type(s) of search you want to do:
 1 = Title, 2= Column, 3= Content.
 Separate by space. You can select multiple
1,2
Enter keywords for your search separated by commanew
Filter. Please enter minimum number of rows per table. Enter n to ignore4


In [11]:
search_type

'1,2'

    Functions to query from tables using spark.sql
   

In [None]:
spark = SparkSession \
        .builder \
        .appName("Python Spark Search Engine Project") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
        
sc = spark.sparkContext

#read all inverted_index files into rdd
#all inverted_index should have the following format:
#key\tcount\t(doc1,doc2,...)
title_line = sc.textFile(sys.argv[1])
column_line = sc.textFile(sys.argv[2])
content_line = sc.textFile(sys.argv[3])

#map to rdds
title_parts = title_line.map(lambda l: l.split("\t"))
title_rdd =  title_parts.map(lambda p: Row(key= p[0], count = int(p[1]), docs = [int(p_.replace("'",'')) for p_ in p[2].split(',')]))

column_parts = column_line.map(lambda l: l.split("\t"))
column_rdd =  column_parts.map(lambda p: Row(key= p[0], count = int(p[1]), docs = [int(p_.replace("'",'')) for p_ in p[2].split(',')]))

content_parts = content_line.map(lambda l: l.split("\t"))
content_rdd =  content_parts.map(lambda p: Row(key= p[0], count = int(p[1]), docs = [int(p_.replace("'",'')) for p_ in p[2].split(',')]))


#create table from rdds
title_search_index = spark.createDataFrame(title_rdd)
column_search_index = spark.createDataFrame(column_rdd)
content_search_index = spark.createDataFrame(content_rdd)

#create temp view
title_search_index.createOrReplaceTempView("title_search_index")
column_search_index.createOrReplaceTempView("column_search_index")
content_search_index.createOrReplaceTempView("content_search_index")


#read master_index 
master_index = sc.textFile(sys.argv[4])
master_index.createOrReplaceTempView("master_index")

#read all rdds for raw data

In [13]:
def getInput(search_type,words,row_filter):
    types = search_type.split(',')
    for t in types:
        function_dict[search_type_dict[t]](words,row_filter)

In [8]:
def title_search(words,row_filter):
    if row_filter == 'n' or 'N':
        min_row = 0
    else:
        min_row = int(row_filter)
  
    new_list = ['"%' + w.strip().lower() + '%"' for w in words]
    ID_list = list()
    for w in new_list:
        query = "SELECT docs FROM title_search_index WHERE key like %s" % w  
        try:
            IDs = spark.sql(query).collect()[0][0]
        except IndexError:
            continue
        ID_list.append(IDs)
    
    if len(ID_list) == 0:
        result = 'Sorry, nothing matches, try a different keyword'
    else:
        re = set(ID_list[0])
        for s in ID_list[1:]:
            re.intersection_update(s)
        re = list(re)

        query_2 = "Table_Length >= " + row_filter  
        #result = master_index.where(col("Doc_ID").isin(re)).filter(query_3).select(master_index.Table_Name)
    
    return query

In [9]:
title_search(words,row_filter)

NameError: name 'spark' is not defined

In [6]:
def column_search(words,row_filter):
    if row_filter == 'n' or 'N':
        min_row = 0
    else:
        min_row = int(row_filter)
  
    new_list = ['"%' + w.strip().lower() + '%"' for w in words]
    ID_list = list()
    for w in new_list:
        query = "SELECT docs FROM column_search_index WHERE key like %s" % w  
        IDs = spark.sql(query).collect()[0][0]
        ID_list.append(IDs)
        
    re = set(ID_list[0])
    for s in ID_list[1:]:
        re.intersection_update(s)
    re = list(re)

    query_2 = "Table_Length >= " + row_filter  
    result = master_index.where(col("Doc_ID").isin(re)).filter(query_3).select(master_index.Table_Name)
    
    return result.show()

In [1]:
def content_search(words,row_filter):
    if row_filter == 'n' or 'N':
        min_row = 0
    else:
        min_row = int(row_filter)
  
    new_list = ['"%' + w.strip().lower() + '%"' for w in words]
    ID_list = list()
    for w in new_list:
        query = "SELECT docs FROM content_search_index WHERE key like %s" % w  
        IDs = spark.sql(query).collect()[0][0]
        ID_list.append(IDs)
        
    re = set(ID_list[0])
    for s in ID_list[1:]:
        re.intersection_update(s)
    re = list(re)

    query_2 = "Table_Length >= " + row_filter  
    result = master_index.where(col("Doc_ID").isin(re)).filter(query_3).select(master_index.Table_Name)
    
    return result.show()

In [8]:
if __name__== "__main__":
    search_type, words, row_filter = prompt()
    file = open('testfile.txt', 'w')
    file.write('%s' % search_type) 
    file.write('%s' % words)
    file.write('%s' % row_filter)
    file.close()

Select type(s) of search you want to do:
 1 = Title, 2= Column, 3= Content.
 Separate by space. You can select multiple
1,2
Enter keywords for your search separated by commanew
Filter. Please enter minimum number of rows per table. Enter n to ignoren
