In [1]:
import org.apache.spark.sql.SaveMode


In [2]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark.conf.set("hive.exec.dynamic.partition", "true")
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")


spark.sql("show databases").show()

+--------------------+
|        databaseName|
+--------------------+
|             default|
|         default_new|
|        defaultgsdfg|
|          infofusion|
| infofusion_mapr_orc|
|infofusion_mapr_p...|
|              thcp10|
+--------------------+



In [3]:
println(spark.sessionState.conf.bucketingEnabled)

true


In [4]:
spark.sql("use thcp10 ")

[]

In [5]:
spark.sql("show tables").show()

+--------+--------------------+-----------+
|database|           tableName|isTemporary|
+--------+--------------------+-----------+
|  thcp10|bucketed_10_c_cus...|      false|
|  thcp10|bucketed_10_l_ord...|      false|
|  thcp10|bucketed_10_o_ord...|      false|
|  thcp10|bucketed_10_witho...|      false|
|  thcp10|bucketed_10_witho...|      false|
|  thcp10|bucketed_16_witho...|      false|
|  thcp10|    custkey_customer|      false|
|  thcp10|            customer|      false|
|  thcp10|            lineitem|      false|
|  thcp10|   orderkey_lineitem|      false|
|  thcp10|     orderkey_orders|      false|
|  thcp10|              orders|      false|
+--------+--------------------+-----------+



In [6]:
val customer_ds = spark.sql("select *from customer")

customer_ds = [ROW_NUM: bigint, c_custkey: int ... 7 more fields]


[ROW_NUM: bigint, c_custkey: int ... 7 more fields]

In [7]:
val lineitem_ds=spark.sql("select *from lineitem")

lineitem_ds = [ROW_NUM: bigint, l_orderkey: int ... 15 more fields]


[ROW_NUM: bigint, l_orderkey: int ... 15 more fields]

In [8]:
val orders_ds=spark.sql("select *from orders")

orders_ds = [ROW_NUM: bigint, o_orderkey: int ... 8 more fields]


[ROW_NUM: bigint, o_orderkey: int ... 8 more fields]

In [None]:
//With 10 buckets
val t1 = System.nanoTime
customer_ds.write.bucketBy(10, "c_custkey").sortBy("c_custkey").mode(SaveMode.Overwrite).saveAsTable("bucketed_10_c_custkey_customer")
lineitem_ds.write.bucketBy(10, "l_suppkey","l_orderkey").sortBy("l_orderkey").mode(SaveMode.Overwrite).saveAsTable("bucketed_10_l_orderkey_lineitem")
orders_ds.write.bucketBy(10, "o_orderkey").sortBy("o_orderkey").mode(SaveMode.Overwrite).saveAsTable("bucketed_10_o_orderkey_orders")
val duration = (System.nanoTime - t1) / 1e9d
println(duration)

In [None]:
customer_ds.write.mode(SaveMode.Overwrite).saveAsTable("custkey_customer")
lineitem_ds.write.mode(SaveMode.Overwrite).saveAsTable("orderkey_lineitem")
orders_ds.write.mode(SaveMode.Overwrite).saveAsTable("orderkey_orders")

In [9]:
//With total columns in the Tables with out bucketing
val t1 = System.nanoTime
val custds=spark.sql("select* from custkey_customer")
val lineitemds=spark.sql("select *from orderkey_lineitem")
val orderds=spark.sql("select * from orderkey_orders")
spark.sql("select *from orderkey_lineitem ol ,orderkey_orders oo,custkey_customer cc where ol.l_orderkey=oo.o_orderkey and cc.c_custkey=ol.l_suppkey").count()
val duration = (System.nanoTime - t1) / 1e9d
println(duration)

52.408084595


t1 = 2941192747651739
custds = [ROW_NUM: bigint, c_custkey: int ... 7 more fields]
lineitemds = [ROW_NUM: bigint, l_orderkey: int ... 15 more fields]
orderds = [ROW_NUM: bigint, o_orderkey: int ... 8 more fields]
duration = 52.408084595


52.408084595

In [None]:
//With selected columns in the Tables with out bucketing
val t1 = System.nanoTime
val custds=spark.sql("select c_custkey from custkey_customer")
val lineitemds=spark.sql("select l_suppkey,l_orderkey from orderkey_lineitem")
val orderds=spark.sql("select o_orderkey from orderkey_orders")
spark.sql("select *from orderkey_lineitem ol ,orderkey_orders oo,custkey_customer cc where ol.l_orderkey=oo.o_orderkey and cc.c_custkey=ol.l_suppkey").count()
val duration = (System.nanoTime - t1) / 1e9d
println(duration)

In [None]:

//With total columns in the Tables with bucketing
val t1 = System.nanoTime
val custds=spark.sql("select *from bucketed_10_c_custkey_customer")
val lineitemds=spark.sql("select *from bucketed_10_l_orderkey_lineitem")
val orderds=spark.sql("select *from bucketed_10_o_orderkey_orders")
spark.sql("select *from bucketed_10_l_orderkey_lineitem ol ,bucketed_10_o_orderkey_orders oo,bucketed_10_c_custkey_customer cc where ol.l_orderkey=oo.o_orderkey and cc.c_custkey=ol.l_suppkey").count()
val duration = (System.nanoTime - t1) / 1e9d
println(duration)

In [None]:
//With selected columns in the Tables with bucketing
val t1 = System.nanoTime
val custds=spark.sql("select c_custkey from bucketed_10_c_custkey_customer")
val lineitemds=spark.sql("select l_suppkey,l_orderkey from bucketed_10_l_orderkey_lineitem")
val orderds=spark.sql("select o_orderkey from bucketed_10_o_orderkey_orders")
spark.sql("select ol.l_orderkey from bucketed_10_l_orderkey_lineitem ol ,bucketed_10_o_orderkey_orders oo,bucketed_10_c_custkey_customer cc where ol.l_orderkey=oo.o_orderkey and cc.c_custkey=ol.l_suppkey").count()
val duration = (System.nanoTime - t1) / 1e9d
println(duration)

In [None]:
//With out sort and line item with 16 buckets
val t1 = System.nanoTime
customer_ds.write.bucketBy(3, "c_custkey").saveAsTable("bucketed_10_withoutsort_c_custkey_customer")
lineitem_ds.write.bucketBy(70, "l_suppkey","l_orderkey").saveAsTable("bucketed_16_withoutsort_l_orderkey_lineitem")
orders_ds.write.bucketBy(8, "o_orderkey").saveAsTable("bucketed_10_withoutsort_o_orderkey_orders")
val duration = (System.nanoTime - t1) / 1e9d
println(duration)

In [None]:
//With selected columns in the Tables with bucketing
val t1 = System.nanoTime
val custds=spark.sql("select *from bucketed_10_withoutsort_c_custkey_customer")
val lineitemds=spark.sql("select *from bucketed_16_withoutsort_l_orderkey_lineitem")
val orderds=spark.sql("select *from bucketed_10_withoutsort_o_orderkey_orders")
spark.sql("select *from bucketed_16_withoutsort_l_orderkey_lineitem ol ,bucketed_10_withoutsort_o_orderkey_orders oo,bucketed_10_withoutsort_c_custkey_customer cc where ol.l_orderkey=oo.o_orderkey and cc.c_custkey=ol.l_suppkey").count()
val duration = (System.nanoTime - t1) / 1e9d
println(duration)

In [21]:
// Below Modified by Abhishek and the above is remain intact creatd by jalendhar

Name: Syntax Error.
Message: 
StackTrace: 

In [None]:
// crosstab specifications
// dimension->1 (from the different table)
// measure-> 1 count from customer 
// dimension from household

t1 = time()
dfHousehold = spark.table("default.householdengland")
for column in dfHousehold.columns: 
   dfHousehold = dfHousehold.withColumnRenamed(column, "default_householdengland_" + column);
#dfHousehold.printSchema();
dfCustomer = spark.table("default.customerengland")
for column in dfCustomer.columns:
    dfCustomer = dfCustomer.withColumnRenamed(column, "default_customerengland_" + column);
#dfCustomer.printSchema();
dfJoin1 = dfHousehold.join(dfCustomer,dfHousehold.default_householdengland_HouseholdID == dfCustomer.default_customerengland_householdid,"inner")
dfJoin1 = dfJoin1.dropDuplicates(['default_householdengland_propertytypedecoded','default_customerengland_custid'])
rows = dfJoin1.groupBy("default_householdengland_propertytypedecoded").agg(func.count(func.lit(1)).alias("Customers"))\
             .sort("default_householdengland_propertytypedecoded")
print rows.count()
rows = rows.limit(1000);
for row in rows.collect():
    print row[0], row[1]
totals = dfJoin1.agg(func.count(func.lit(1)).alias("Customers"))
firstRow =  totals.first()
print firstRow[0]
     
print time() - t1

In [12]:
//With 10 buckets using the above existing one and just modified the order table to sort it as as per o_custkey and sort by o_custkey

val t1 = System.nanoTime
// customer_ds.write.bucketBy(10, "c_custkey").sortBy("c_custkey").mode(SaveMode.Overwrite).saveAsTable("bucketed_10_c_custkey_customer")
// lineitem_ds.write.bucketBy(10, "l_suppkey","l_orderkey").sortBy("l_orderkey").mode(SaveMode.Overwrite).saveAsTable("bucketed_10_l_orderkey_lineitem")
orders_ds.write.bucketBy(10, "o_custkey").sortBy("o_custkey").mode(SaveMode.Overwrite).saveAsTable("b_10_o_orderkey_orders")
val duration = (System.nanoTime - t1) / 1e9d
println(duration)

67.30013792


t1 = 2941353188575042
duration = 67.30013792


67.30013792

In [16]:
//The below step is just to create the table from takeing the original(loaded) data. 
customer_ds.write.mode(SaveMode.Overwrite).saveAsTable("custkey_customer")
lineitem_ds.write.mode(SaveMode.Overwrite).saveAsTable("orderkey_lineitem")
orders_ds.write.mode(SaveMode.Overwrite).saveAsTable("orderkey_orders")

In [17]:
//With total columns in the Tables with out bucketing
val t1 = System.nanoTime
val custds=spark.sql("select* from custkey_customer")
val lineitemds=spark.sql("select *from orderkey_lineitem")
val orderds=spark.sql("select * from orderkey_orders")
spark.sql("select *from orderkey_lineitem ol ,b_orderkey_orders oo,custkey_customer cc where ol.l_orderkey=oo.o_orderkey and cc.c_custkey=ol.l_suppkey").count()
val duration = (System.nanoTime - t1) / 1e9d
println(duration)

51.117035235


t1 = 2941540833043172
custds = [ROW_NUM: bigint, c_custkey: int ... 7 more fields]
lineitemds = [ROW_NUM: bigint, l_orderkey: int ... 15 more fields]
orderds = [ROW_NUM: bigint, o_orderkey: int ... 8 more fields]
duration = 51.117035235


51.117035235

In [18]:
//With total columns in the Tables with out bucketing
val t1 = System.nanoTime
val custds=spark.sql("select* from customer")
val lineitemds=spark.sql("select *from lineitem")
val orderds=spark.sql("select * from orders")
spark.sql("select *from lineitem ol ,orders oo,customer cc where ol.l_orderkey=oo.o_orderkey and cc.c_custkey=ol.l_suppkey").count()
val duration = (System.nanoTime - t1) / 1e9d
println(duration)

52.480577422


t1 = 2941620631585702
custds = [ROW_NUM: bigint, c_custkey: int ... 7 more fields]
lineitemds = [ROW_NUM: bigint, l_orderkey: int ... 15 more fields]
orderds = [ROW_NUM: bigint, o_orderkey: int ... 8 more fields]
duration = 52.480577422


52.480577422

In [19]:
//With total columns in the Tables with bucketing
val t1 = System.nanoTime
val custds=spark.sql("select *from bucketed_10_c_custkey_customer")
val lineitemds=spark.sql("select *from bucketed_10_l_orderkey_lineitem")
val orderds=spark.sql("select *from b_10_o_orderkey_orders")
spark.sql("select *from bucketed_10_l_orderkey_lineitem ol ,b_10_o_orderkey_orders oo,bucketed_10_c_custkey_customer cc where ol.l_orderkey=oo.o_orderkey and cc.c_custkey=ol.l_suppkey").count()
val duration = (System.nanoTime - t1) / 1e9d
println(duration)

61.818050014


t1 = 2941874693377824
custds = [ROW_NUM: bigint, c_custkey: int ... 7 more fields]
lineitemds = [ROW_NUM: bigint, l_orderkey: int ... 15 more fields]
orderds = [ROW_NUM: bigint, o_orderkey: int ... 8 more fields]
duration = 61.818050014


61.818050014