In [1]:
import pyspark
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
from pyspark.sql import SQLContext
from __future__ import division
import itertools
import  time 
import math

spark = SparkSession \
    .builder \
    .appName("Project 6910A") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()


In [10]:
#<------------------------------Experiment relations------------------------------>#
#--------------------------------------------------------------------------------->#
#--------------------------------------------------------------------------------->#

#nation.tbl
schema = StructType([
    StructField("nationkey", IntegerType()),
    StructField("n_name", StringType()),
    StructField("regionkey", IntegerType()),
    StructField("n_comment", StringType())
])

df = sqlContext.read.format("com.databricks.spark.csv").schema(schema).option("mode", "PERMISSIVE").option('delimiter', '|').load("dbgen_database/nation.tbl")
df.createOrReplaceTempView("nation")

#supplier.tbl
schema2 = StructType([
    StructField("suppkey", IntegerType()),
    StructField("s_name", StringType()),
    StructField("address", StringType()),
    StructField("nationkey", IntegerType()),
    StructField("phone", StringType()),
    StructField("acctbal", DoubleType()),
    StructField("s_comment", StringType())
])

df2 = sqlContext.read.format("com.databricks.spark.csv").schema(schema2).option("mode", "PERMISSIVE").option('delimiter', '|').load("dbgen_database/supplier.tbl")
df2 = df2.sample(False,0.1)
df2.createOrReplaceTempView("supplier")

#partsupp.tbl
schema3 = StructType([
    StructField("partkey", IntegerType()),
    StructField("suppkey", IntegerType()),
    StructField("availqty", IntegerType()),
    StructField("supplycost", DoubleType()),
    StructField("ps_comment", StringType())
])

df3 = sqlContext.read.format("com.databricks.spark.csv").schema(schema3).option("mode", "PERMISSIVE").option('delimiter', '|').load("dbgen_database/partsupp.tbl")
df3 = df3.sample(False,0.01)
df3.createOrReplaceTempView("partsupp")

#orders.tbl
schema4 = StructType([
    StructField("orderkey", IntegerType()),
    StructField("custkey", IntegerType()),
    StructField("orderstatus", StringType()),
    StructField("totalprice", DoubleType()),
    StructField("orderdate", StringType()),
    StructField("orderpriority", StringType()),
    StructField("clerk", StringType()),
    StructField("shippriority", IntegerType()),
    StructField("o_comment", StringType())
])

df4 = sqlContext.read.format("com.databricks.spark.csv").schema(schema4).option("mode", "PERMISSIVE").option('delimiter', '|').load("dbgen_database/orders.tbl")
df4 = df4.sample(False,0.01)
df4.createOrReplaceTempView("orders")

#lineitem.tbl
schema5 = StructType([
    StructField("orderkey", IntegerType()),
    StructField("partkey", IntegerType()),
    StructField("suppkey", IntegerType()),
    StructField("linenumber", IntegerType()),
    StructField("quantity", DoubleType()),
    StructField("extendedprice", DoubleType()),
    StructField("discount", DoubleType()),
    StructField("tax", DoubleType()),
    StructField("returnflag", StringType()),
    StructField("linestatus", StringType()),
    StructField("shipdate", StringType()),
    StructField("commitdate", StringType()),
    StructField("receiptdate", StringType()),
    StructField("shipinstruct", StringType()),
    StructField("shipmode", StringType()),
    StructField("l_comment", StringType())
    
])

df5 = sqlContext.read.format("com.databricks.spark.csv").schema(schema5).option("mode", "PERMISSIVE").option('delimiter', '|').load("dbgen_database/lineitem.tbl")
df5 = df5.sample(False,0.005)
df5.createOrReplaceTempView("lineitem")

print("Sizes of each relation: nation, supplier, partsupp, orders, lineitem")
print(df.count(), df2.count(), df3.count(), df4.count(), df5.count())


Sizes of each relation: nation, supplier, partsupp, orders, lineitem
(25, 981, 7903, 14754, 30032)


In [2]:
#<--------------------------------Example relations------------------------------->#
#--------------------------------------------------------------------------------->#
#--------------------------------------------------------------------------------->#

#<-----Example 1----->#
#Relation_A.txt
schema_eg1_1 = StructType([
    StructField("courseid", IntegerType()),
    StructField("grading_scheme", StringType())
])

dfeg1_1 = sqlContext.read.format("com.databricks.spark.csv").schema(schema_eg1_1).option("mode", "PERMISSIVE").option('delimiter', '|').load("Example_data/Example_1/Relation_A.txt")
dfeg1_1.createOrReplaceTempView("example1_a")

#Relation_B.txt
schema_eg1_2 = StructType([
    StructField("courseid", IntegerType()),
    StructField("professor", StringType()),
    StructField("student", StringType())
])

dfeg1_2 = sqlContext.read.format("com.databricks.spark.csv").schema(schema_eg1_2).option("mode", "PERMISSIVE").option('delimiter', '|').load("Example_data/Example_1/Relation_B.txt")
dfeg1_2.createOrReplaceTempView("example1_b")

#Relation_C.txt
schema_eg1_3 = StructType([
    StructField("student", StringType()),
    StructField("club", StringType())
])

dfeg1_3 = sqlContext.read.format("com.databricks.spark.csv").schema(schema_eg1_3).option("mode", "PERMISSIVE").option('delimiter', '|').load("Example_data/Example_1/Relation_C.txt")
dfeg1_3.createOrReplaceTempView("example1_c")

print("Example 1: Sizes of each relation: example1_a, example1_b, example1_c")
print(dfeg1_1.count(), dfeg1_2.count(), dfeg1_3.count())


Example 1: Sizes of each relation: example1_a, example1_b, example1_c
(10, 20, 10)


In [3]:
#<--------------------------------Example relations------------------------------->#
#--------------------------------------------------------------------------------->#
#--------------------------------------------------------------------------------->#

#<-----Example 2----->#
#Relation_A.txt
schema_eg2_1 = StructType([
    StructField("personid", IntegerType()),
    StructField("sport", StringType())
])

dfeg2_1 = sqlContext.read.format("com.databricks.spark.csv").schema(schema_eg2_1).option("mode", "PERMISSIVE").option('delimiter', '|').load("Example_data/Example_2/Relation_A.txt")
dfeg2_1.createOrReplaceTempView("example2_a")

#Relation_B.txt
schema_eg2_2 = StructType([
    StructField("personid", IntegerType()),
    StructField("country", StringType()),
    StructField("gender", StringType())
])

dfeg2_2 = sqlContext.read.format("com.databricks.spark.csv").schema(schema_eg2_2).option("mode", "PERMISSIVE").option('delimiter', '|').load("Example_data/Example_2/Relation_B.txt")
dfeg2_2.createOrReplaceTempView("example2_b")

#Relation_C.txt
schema_eg2_3 = StructType([
    StructField("country", StringType()),
    StructField("cities", StringType())
])

dfeg2_3 = sqlContext.read.format("com.databricks.spark.csv").schema(schema_eg2_3).option("mode", "PERMISSIVE").option('delimiter', '|').load("Example_data/Example_2/Relation_C.txt")
dfeg2_3.createOrReplaceTempView("example2_c")

print("Example 2: Sizes of each relation: example2_a, example2_b, example2_c")
print(dfeg2_1.count(), dfeg2_2.count(), dfeg2_3.count())


Example 2: Sizes of each relation: example2_a, example2_b, example2_c
(18, 7, 8)


In [4]:
#<--------------------------------Example relations------------------------------->#
#--------------------------------------------------------------------------------->#
#--------------------------------------------------------------------------------->#

#<-----Example 3----->#
#Relation_A.txt
schema_eg3_1 = StructType([
    StructField("shape", StringType()),
    StructField("colour", StringType()),
    StructField("transparency", StringType())
])

dfeg3_1 = sqlContext.read.format("com.databricks.spark.csv").schema(schema_eg3_1).option("mode", "PERMISSIVE").option('delimiter', '|').load("Example_data/Example_3/Relation_A.txt")
dfeg3_1.createOrReplaceTempView("example3_a")

#Relation_B.txt
schema_eg3_2 = StructType([
    StructField("shape", IntegerType()),
    StructField("age", IntegerType())
])

dfeg3_2 = sqlContext.read.format("com.databricks.spark.csv").schema(schema_eg3_2).option("mode", "PERMISSIVE").option('delimiter', '|').load("Example_data/Example_3/Relation_B.txt")
dfeg3_2.createOrReplaceTempView("example3_b")

#Relation_C.txt
schema_eg3_3 = StructType([
    StructField("age", IntegerType()),
    StructField("names", StringType())
])

dfeg3_3 = sqlContext.read.format("com.databricks.spark.csv").schema(schema_eg3_3).option("mode", "PERMISSIVE").option('delimiter', '|').load("Example_data/Example_3/Relation_C.txt")
dfeg3_3.createOrReplaceTempView("example3_c")

print("Example 3: Sizes of each relation: example3_a, example3_b, example3_c")
print(dfeg3_1.count(), dfeg3_2.count(), dfeg3_3.count())


Example 3: Sizes of each relation: example3_a, example3_b, example3_c
(25, 5, 50)


In [5]:
#<--------------------------------Example relations------------------------------->#
#--------------------------------------------------------------------------------->#
#--------------------------------------------------------------------------------->#

#<-----Example 4----->#
#Relation_A.txt
schema_eg4_1 = StructType([
    StructField("names", StringType()),
    StructField("birthyear", IntegerType())
])

dfeg4_1 = sqlContext.read.format("com.databricks.spark.csv").schema(schema_eg4_1).option("mode", "PERMISSIVE").option('delimiter', '|').load("Example_data/Example_4/Relation_A.txt")
dfeg4_1.createOrReplaceTempView("example4_a")

#Relation_B.txt
schema_eg4_2 = StructType([
    StructField("grading", IntegerType()),
    StructField("movienames", StringType())
])

dfeg4_2 = sqlContext.read.format("com.databricks.spark.csv").schema(schema_eg4_2).option("mode", "PERMISSIVE").option('delimiter', '|').load("Example_data/Example_4/Relation_B.txt")
dfeg4_2.createOrReplaceTempView("example4_b")

#Relation_C.txt
schema_eg4_3 = StructType([
])

dfeg4_3 = sqlContext.read.format("com.databricks.spark.csv").schema(schema_eg4_3).option("mode", "PERMISSIVE").option('delimiter', '|').load("Example_data/Example_4/Relation_C.txt")
dfeg4_3.createOrReplaceTempView("example4_c")

print("Example 4: Sizes of each relation:  example4_a, example4_b, example4_c")
print(dfeg4_1.count(), dfeg4_2.count(), dfeg4_3.count())


Example 4: Sizes of each relation:  example4_a, example4_b, example4_c
(26, 6, 0)


In [6]:
#<-----------------------------------Optimizer------------------------------------>#
#--------------------------------------------------------------------------------->#
#--------------------------------------------------------------------------------->#

def join_size_equality(df_A, df_B, attributes_list):
    #Actual computation of join sizes
    card_A = df_A.count()
    card_B = df_B.count()
    card_attributes = list()
    num = card_A * card_B
    denom = 1
    if len(attributes_list) == 0:
        return num
    else:
        for attribute in attributes_list:
            card_attributes += [max(count_Distinct(df_A, attribute), count_Distinct(df_B, attribute))]
        for card_attribute in card_attributes: 
            denom *= card_attribute
        return math.ceil(num/denom)
    

def count_Distinct(df, attribute):
    #Self implemented count distinct function, for spark's implementation slower
    count_dict = dict()
    count = 0
    for element in df.collect():
        element_tuple = str(element[attribute])
        if element_tuple not in count_dict:   
            count_dict[element_tuple] = 1
            count += 1
    return count 

def matching_attributes(df_A, df_B):
    #returns list of common attributes
    #Natural Join
    attributes_list = list()
    for fielda in df_A.schema:
        for fieldb in df_B.schema:
            if fielda.name == fieldb.name:
                attributes_list += [fielda.name]
    return attributes_list

def query_optimizer(relation_pairs, relations_list):
    #actual query_optimizer put together
    #Three relation case
    performance_dict = dict()
    df_dict = dict()
    for relation in relations_list:
        df_dict[relation] = spark.table(relation)
    for relation_pair in relation_pairs:
        attributes_list = matching_attributes(df_dict[relation_pair[0]], df_dict[relation_pair[1]])
        performance = join_size_equality(df_dict[relation_pair[0]], df_dict[relation_pair[1]], attributes_list)
        print(relation_pair[0] + " + " + relation_pair[1] + " Join Size: " + str(performance))
        performance_dict[relation_pair] = performance
    shortest = 99999 ^ 9
    optimal_order = tuple()
    for relation_pair in relation_pairs:
        if shortest > performance_dict[relation_pair]:
            shortest = performance_dict[relation_pair]
            optimal_order = relation_pair
    return shortest, optimal_order


In [13]:
#<---------------------------------Query Digest----------------------------------->#
#--------------------------------------------------------------------------------->#
#--------------------------------------------------------------------------------->#

def input_query():
    #return "SELECT * FROM supplier JOIN orders JOIN lineitem"
    return str(raw_input("Enter your SQL query here:"))

def query_digest(query):
    #assumes query format valid
    query = query.lower()
    string_list = query.split()
    from_index = string_list.index("from")
    intervals = [i for i, a in enumerate(string_list) if a == "join"] + [len(string_list)]
    join_intervals = list()
    for i in range(len(intervals)-1):
        join_intervals += [(intervals[i], intervals[i+1])]
    
    relations = list()
    conditions = dict()
    relations += [string_list[from_index+1]]
    j = 0
    for index in join_intervals:
        relations += [string_list[index[0]+1]]
        #Conditions omitted (Note: Algorithm automatically checks for matching attribute names)
        '''join = string_list[index[0]:index[1]+1]
        if "on" in join:
            on_index = join.index("on")
            and_indices = [on_index] + [i for i, a in enumerate(join) if a == "and"]
            for index2 in and_indices:
                if j not in conditions:
                    conditions[j] = list()
                else:
                    conditions[j] += [join[index2+1] + join[index2+2] +join[index2+3]]
        else:
            conditions[j] = ""
            '''
    return relations

query_relations = query_digest(input_query())
query_combinations = list(itertools.combinations(query_relations, 2))
do_first = query_optimizer(query_combinations, query_relations)
print("Optimal: Join " + do_first[1][0] + " and " + do_first[1][1] +" first with join size: " + str(do_first[0]))


Enter your SQL query here:SELECT * FROM nation JOIN supplier JOIN partsupp 
nation + supplier Join Size: 981.0
nation + partsupp Join Size: 197575
supplier + partsupp Join Size: 1423.0
Optimal: Join nation and supplier first with join size: 981.0


In [8]:
#<-----------------Comparing actual join runtimes (in seconds)-------------------->#
#<-----------------------------Not part of Optimizer------------------------------>#
#--------------------------------------------------------------------------------->#

#s = time.time()
#df.join(df2.join(df3,['suppkey']),['nationkey']).show()
#e = time.time ()
#df3.join(df.join(df2,['nationkey']),['suppkey']).show()
#e2 = time.time()
#print(e-s,e2-e)
