In [1]:
from __future__ import print_function
import sys
import re
import numpy as np
import pandas as pd
from numpy import dot
from numpy.linalg import norm
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql import functions as func
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import udf, length, collect_set, col, split, explode, lit,max as max_, monotonically_increasing_id, array, size, sum as sum_, pandas_udf, PandasUDFType
from pyspark import SparkContext
from pyspark.sql import SQLContext

In [None]:
sc = SparkContext(appName="Large-Scale Customer Data Wrangling_Dataframe")
sqlContext = SQLContext(sc)

In [2]:
schema_Customer =  StructType([
    StructField('CUSTKEY', StringType(),True),
    StructField('NAME', StringType(),True),
    StructField('ADDRESS', StringType(),True),
    StructField('NATIONKEY', StringType(),True),
    StructField('PHONE', StringType(),True),
    StructField('ACCBATL', StringType(),True),
    StructField('MKTSEGMENT', StringType(),True),
    StructField('COMMENT', StringType(),True),
])

schema_Orders =  StructType([
    StructField('ORDERKEY', StringType(),True),
    StructField('CUSTKEY', StringType(),True),
    StructField('ORDERSTATUS', StringType(),True),
    StructField('TOTALPRICE', StringType(),True),
    StructField('ORDERDATE', StringType(),True),
    StructField('ORDER PRIORITY', StringType(),True),
    StructField('CLERK', StringType(),True),
    StructField('SHIP_PRIORITY', StringType(),True),
    StructField('COMMENT', StringType(),True)
])

schema_Lineitem =  StructType([
    StructField('ORDERKEY', StringType(),True),
    StructField('PARTKEY', StringType(),True),
    StructField('SUPPKEY', StringType(),True),
    StructField('LINENUMBER', StringType(),True),
    StructField('QUANTITY', StringType(),True),
    StructField('EXTENDEDPRICE', StringType(),True),
    StructField('DISCOUNT', StringType(),True),
    StructField('TAX', StringType(),True),
    StructField('RETURNFLAG', StringType(),True),
    StructField('LINESTATUS', StringType(),True),
    StructField('SHIPDATE', StringType(),True),
    StructField('COMMITDATE', StringType(),True),
    StructField('RECEIPTDATE', StringType(),True),
    StructField('SHIPINSTRUCT', StringType(),True),
    StructField('SHIPMODE', StringType(),True),
    StructField('COMMENT', StringType(),True)
])

schema_Nation =  StructType([
    StructField('NATIONKEY', StringType(),True),
    StructField('NAME', StringType(),True),
    StructField('REGIONKEY', StringType(),True),
    StructField('COMMENT', StringType(),True)
])

schema_Part =  StructType([
    StructField('PARTKEY', StringType(),True),
    StructField('NAME', StringType(),True),
    StructField('MFGR', StringType(),True),
    StructField('BRAND', StringType(),True),
    StructField('TYPE', StringType(),True),
    StructField('SIZE', StringType(),True),
    StructField('CONTAINER', StringType(),True),
    StructField('RETAILPRICE', StringType(),True),
    StructField('COMMENT', StringType(),True)
])

schema_Partsupp =  StructType([
    StructField('PARTKEY', StringType(),True),
    StructField('SUPPKEY', StringType(),True),
    StructField('AVAILQTY', StringType(),True),
    StructField('SUPPLYCOST', StringType(),True),
    StructField('COMMENT', StringType(),True)
])

schema_Region =  StructType([
    StructField('REGIONKEY', StringType(),True),
    StructField('NAME', StringType(),True),
    StructField('COMMENT', StringType(),True)
])

schema_Supplier =  StructType([
    StructField('SUPPKEY', StringType(),True),
    StructField('NAME', StringType(),True),
    StructField('ADDRESS', StringType(),True),
    StructField('NATIONKEY', StringType(),True),
    StructField('PHONE', StringType(),True),
    StructField('COMMENT', StringType(),True)
])

Customer_file = 'customer.tbl.txt'
Orders_file = 'orders.tbl.txt'
Lineitem_file = 'lineitem.tbl.txt'
Nation_file = 'nation.tbl.txt'
Part_file = 'part.tbl.txt'
Partsupp_file = 'partsupp.tbl.txt'
Region_file = 'region.tbl.txt'
Supplier_file = 'supplier.tbl.txt'


Customer = sqlContext.read.format('csv').options(header='True', infoerSchema = 'true', sep='|').load(Customer_file, schema = schema_Customer)
Orders = sqlContext.read.format('csv').options(header='True', infoerSchema = 'true', sep='|').load(Orders_file, schema = schema_Orders)
Nation = sqlContext.read.format('csv').options(header='True', infoerSchema = 'true', sep='|').load(Nation_file, schema = schema_Nation)
Lineitem = sqlContext.read.format('csv').options(header='True', infoerSchema = 'true', sep='|').load(Lineitem_file, schema = schema_Lineitem)
Part = sqlContext.read.format('csv').options(header='True', infoerSchema = 'true', sep='|').load(Part_file, schema = schema_Part)
Partsupp = sqlContext.read.format('csv').options(header='True', infoerSchema = 'true', sep='|').load(Partsupp_file, schema = schema_Partsupp)
Region = sqlContext.read.format('csv').options(header='True', infoerSchema = 'true', sep='|').load(Region_file, schema = schema_Region)
Supplier = sqlContext.read.format('csv').options(header='True', infoerSchema = 'true', sep='|').load(Supplier_file, schema = schema_Supplier)


In [29]:
Lineitem = Lineitem.withColumn("QUANTITY", Lineitem["QUANTITY"].cast(IntegerType()))
Lineitem = Lineitem.withColumn("PARTKEY", Lineitem["PARTKEY"].cast(IntegerType()))
Orders = Orders.withColumn("CUSTKEY", Orders["CUSTKEY"].cast(IntegerType()))
Customer = Customer.withColumn("CUSTKEY", Customer["CUSTKEY"].cast(IntegerType()))


In [4]:
# Question 1: What are the top-10 sold products?
top_product = Lineitem.select('PARTKEY').withColumn('cnt',lit(1)).groupBy('PARTKEY').agg(sum_('cnt').alias('num')).orderBy('num', ascending=False).limit(10)
top_product.show()


+-------+---+
|PARTKEY|num|
+-------+---+
|  10620| 56|
|   6140| 54|
|   8051| 52|
|  15584| 52|
|  10597| 51|
|  10715| 51|
|   2292| 51|
|  19444| 50|
|  14422| 50|
|  17670| 50|
+-------+---+



In [5]:
# Question 2: What are the top-10 customers based on the number of products ordered?
orders_sub = Orders.select("ORDERKEY","CUSTKEY")
lineitem_sub = Lineitem.select("ORDERKEY","QUANTITY")
cust_quantity = orders_sub.join(lineitem_sub, orders_sub.ORDERKEY == lineitem_sub.ORDERKEY,"inner").drop("ORDERKEY")

top_10_customers = cust_quantity.groupBy("CUSTKEY").agg(sum_("QUANTITY").alias("num")).orderBy("num",ascending=False).limit(10)
top_10_customers.show()

+-------+----+
|CUSTKEY| num|
+-------+----+
|   8362|4082|
|   9454|3870|
|    346|3817|
|   6958|3760|
|   1105|3737|
|  14707|3710|
|  11998|3709|
|  14398|3670|
|   8542|3660|
|   8761|3658|
+-------+----+



In [6]:
# Question 3: What are the top-10 customers that have ordered products from the same supplier
lineitem_sub2 = Lineitem.select('ORDERKEY','SUPPKEY')
customer_supp = orders_sub.join(lineitem_sub2, orders_sub.ORDERKEY == lineitem_sub2.ORDERKEY , 'full' ).drop('ORDERKEY')
top_customers_supp = customer_supp.withColumn('cnt', lit(1)).groupBy('CUSTKEY','SUPPKEY').agg(sum_('cnt').alias('num')).orderBy('num', ascending=False).limit(10)
top_customers_supp.show()


+-------+-------+---+
|CUSTKEY|SUPPKEY|num|
+-------+-------+---+
|   2767|    601|  5|
|  14875|    452|  4|
|   9562|    695|  4|
|   2119|    603|  4|
|  11515|    113|  4|
|  11354|    127|  4|
|  14581|    902|  4|
|   1567|    750|  4|
|   8437|    548|  4|
|   5281|    577|  4|
+-------+-------+---+



In [7]:
# Question 4: Who are the customers that have not ordered products from their own country and have ordered only foreign products
customer_sub = Customer.select("CUSTKEY",col("NATIONKEY").alias("cus_NATIONKEY"))
order_cust_nation = orders_sub.join(customer_sub, "CUSTKEY", "inner")

supplier_sub = Supplier.select("SUPPKEY",col("NATIONKEY").alias("sup_NATIONKEY"))
supp_order_nation = lineitem_sub2.join(supplier_sub,"SUPPKEY","inner")

order_nation_cust_nation = order_cust_nation.join(supp_order_nation,"ORDERKEY","inner").drop("SUPPKEY")

cust_OwnCountryCnt = order_nation_cust_nation.where("cus_NATIONKEY=sup_NATIONKEY").select(col("CUSTKEY").alias("own_CUSTKEY")).dropDuplicates()

all_cust = order_nation_cust_nation.select("CUSTKEY").dropDuplicates()
cust_onlyforeign = all_cust.join(cust_OwnCountryCnt,order_nation_cust_nation.CUSTKEY == cust_OwnCountryCnt.own_CUSTKEY,"full").where(col("own_CUSTKEY").isNull()).select("CUSTKEY")
cust_onlyforeign.show()

+-------+
|CUSTKEY|
+-------+
|    833|
|   1580|
|   1829|
|   2366|
|   3749|
|   3794|
|   5300|
|   6620|
|   7340|
|   7982|
|  10817|
|  11141|
|  11458|
|  14450|
|    737|
|   1127|
|   1721|
|   2999|
|   3179|
|   3698|
+-------+
only showing top 20 rows



In [8]:
# Question 5: Which top 3 countries produced most of the products that are ordered
lineitem_sub3 = Lineitem.select("PARTKEY","SUPPKEY")
nation_part = lineitem_sub3.join(supplier_sub,"SUPPKEY","inner")
nation_partcnt = nation_part.withColumn("cnt",lit(1)).groupBy("sup_NATIONKEY").agg(sum_("cnt").alias("num")).orderBy("num",ascending=False).limit(3)
nation_partcnt.show()


+-------------+-----+
|sup_NATIONKEY|  num|
+-------------+-----+
|           18|31483|
|            7|29975|
|           22|28317|
+-------------+-----+



In [30]:
# Question 6: Who are the top-10 similar customers based of their orders? 
# (Use Jaccard similarity to calculate the similarity) 
# Consider only customers that have ordered at least 10 products. First collect all the products that each customer ordered.
lineitem_sub4 = Lineitem.select("ORDERKEY","PARTKEY")
cust_part = orders_sub.join(lineitem_sub4,"ORDERKEY","inner").drop("ORDERKEY")

cust_partlist = cust_part.groupBy("CUSTKEY").agg(collect_set("PARTKEY").alias("part_list")).where(size(col("part_list"))>=10)
cust_partlist.show()

+-------+--------------------+
|CUSTKEY|           part_list|
+-------+--------------------+
|    148|[17525, 18635, 53...|
|    463|[15297, 10142, 10...|
|    496|[16640, 6876, 152...|
|    833|[17902, 2085, 166...|
|   1088|[8785, 5541, 1578...|
|   1238|[13852, 9133, 578...|
|   1342|[19753, 16305, 10...|
|   1580|[9402, 6977, 1963...|
|   1591|[2796, 5249, 1589...|
|   1645|[356, 12705, 5076...|
|   1829|[7413, 14636, 186...|
|   2122|[10425, 1830, 169...|
|   2366|[15797, 2695, 974...|
|   2659|[17525, 8901, 791...|
|   2866|[12001, 102, 3508...|
|   3175|[4160, 6113, 8799...|
|   3749|[1555, 4062, 8743...|
|   3997|[19376, 9744, 128...|
|   4519|[3144, 4364, 1047...|
|   4900|[19682, 17323, 14...|
+-------+--------------------+
only showing top 20 rows



In [19]:

crossjoin_cust_partlist = cust_partlist.crossJoin(cust_partlist).toDF("cus_a","partlist_a","cus_b","partlist_b").where("cus_a<cus_b")
crossjoin_cust_partlist.show()

+-----+--------------------+-----+--------------------+
|cus_a|          partlist_a|cus_b|          partlist_b|
+-----+--------------------+-----+--------------------+
|  148|[1951, 6880, 1789...|  463|[6823, 141, 4590,...|
|  148|[1951, 6880, 1789...|  496|[495, 12133, 3637...|
|  148|[1951, 6880, 1789...|  833|[11531, 9753, 839...|
|  148|[1951, 6880, 1789...| 1088|[14337, 17378, 50...|
|  148|[1951, 6880, 1789...| 1238|[10757, 8343, 287...|
|  148|[1951, 6880, 1789...| 1342|[11718, 3107, 175...|
|  148|[1951, 6880, 1789...| 1580|[16315, 8478, 171...|
|  148|[1951, 6880, 1789...| 1591|[2401, 4588, 1938...|
|  148|[1951, 6880, 1789...| 1645|[11844, 10050, 16...|
|  148|[1951, 6880, 1789...| 1829|[15007, 814, 1011...|
|  148|[1951, 6880, 1789...| 2122|[11211, 19581, 25...|
|  148|[1951, 6880, 1789...| 2366|[5727, 6843, 9391...|
|  148|[1951, 6880, 1789...| 2659|[13797, 9662, 140...|
|  148|[1951, 6880, 1789...| 2866|[19482, 7373, 338...|
|  148|[1951, 6880, 1789...| 3175|[11849, 10736,

In [20]:
jaccard_udf = udf(lambda a,b: len(set(a)&set(b))/len(set(a)|set(b)),FloatType())
pair_jaccard = crossjoin_cust_partlist.withColumn("jaccard",jaccard_udf("partlist_a","partlist_b"))
pair_jaccard.show()

+-----+--------------------+-----+--------------------+------------+
|cus_a|          partlist_a|cus_b|          partlist_b|     jaccard|
+-----+--------------------+-----+--------------------+------------+
|  148|[1951, 6880, 1789...|  463|[6823, 141, 4590,...|         0.0|
|  148|[1951, 6880, 1789...|  496|[495, 12133, 3637...|         0.0|
|  148|[1951, 6880, 1789...|  833|[11531, 9753, 839...|         0.0|
|  148|[1951, 6880, 1789...| 1088|[14337, 17378, 50...|         0.0|
|  148|[1951, 6880, 1789...| 1238|[10757, 8343, 287...|         0.0|
|  148|[1951, 6880, 1789...| 1342|[11718, 3107, 175...|         0.0|
|  148|[1951, 6880, 1789...| 1580|[16315, 8478, 171...|         0.0|
|  148|[1951, 6880, 1789...| 1591|[2401, 4588, 1938...|         0.0|
|  148|[1951, 6880, 1789...| 1645|[11844, 10050, 16...|         0.0|
|  148|[1951, 6880, 1789...| 1829|[15007, 814, 1011...|         0.0|
|  148|[1951, 6880, 1789...| 2122|[11211, 19581, 25...|         0.0|
|  148|[1951, 6880, 1789...| 2366|

In [21]:
top_10_similar_customers = pair_jaccard.select("cus_a","cus_b","jaccard").orderBy("jaccard",ascending=False).limit(10)
top_10_similar_customers.show()

+-----+-----+-----------+
|cus_a|cus_b|    jaccard|
+-----+-----+-----------+
| 4808|10901| 0.06666667|
| 5390| 7532| 0.06451613|
| 2489| 4283| 0.06349207|
| 2768| 4385|     0.0625|
| 5462| 7739|     0.0625|
|  944|11402| 0.06122449|
|   29| 6788|0.060606062|
|  668|12746|0.060606062|
|10418|11432| 0.05882353|
| 1976| 9740| 0.05882353|
+-----+-----+-----------+



In [33]:
# Question 7: What are the top-10 products pairs that the customer ordered mostly together?
cust_part2 = orders_sub.join(lineitem_sub4,"ORDERKEY","inner").drop("ORDERKEY")
cust_part_part = cust_part2.crossJoin(cust_part2).toDF("cus_a","part_a","cus_b","part_b").where("cus_a = cus_b").where("part_a < part_b")
pair_cnt = cust_part_part.withColumn("cnt",lit(1)).groupBy("part_a","part_b").agg(sum_("cnt").alias("num"))
top_10_products_pairs = pair_cnt.orderBy("num",ascending=False).limit(10)
top_10_products_pairs.show()

+------+------+---+
|part_a|part_b|num|
+------+------+---+
|  9021| 18846|  6|
|   344| 10147|  5|
|  1160|  9714|  5|
|  3992| 19873|  5|
|  3376|  9230|  5|
|  3285| 16720|  5|
| 10339| 12162|  5|
|  1864| 18222|  5|
|  6336|  9344|  5|
|  5120| 16197|  5|
+------+------+---+



In [None]:
sc.stop()