In [29]:
import glob
import time
import sys
import findspark
findspark.init()
findspark.find()
import pyspark
findspark.find()
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession \
    .builder \
    .appName("tpch") \
    .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/tpch") \
    .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/tpch") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1")\
    .config("spark.executor.memory", "2g")\
    .config("spark.driver.memory", "4g")\
    .getOrCreate()

In [3]:
customer_df = spark.read.format("mongo").option("uri","mongodb://127.0.0.1/tpch.CUSTOMER").load()
lineitem_df = spark.read.format("mongo").option("uri","mongodb://127.0.0.1/tpch.LINEITEM").load()
nation_df = spark.read.format("mongo").option("uri","mongodb://127.0.0.1/tpch.NATION").load()
region_df = spark.read.format("mongo").option("uri","mongodb://127.0.0.1/tpch.REGION").load()
orders_df = spark.read.format("mongo").option("uri","mongodb://127.0.0.1/tpch.ORDERS").load()
part_df = spark.read.format("mongo").option("uri","mongodb://127.0.0.1/tpch.PART").load()
partsupp_df = spark.read.format("mongo").option("uri","mongodb://127.0.0.1/tpch.PARTSUPP").load()
supplier_df = spark.read.format("mongo").option("uri","mongodb://127.0.0.1/tpch.SUPPLIER").load()

In [4]:
customer_df.createOrReplaceTempView("customer")
lineitem_df.createOrReplaceTempView("lineitem")
nation_df.createOrReplaceTempView("nation")
region_df.createOrReplaceTempView("region")
orders_df.createOrReplaceTempView("orders")
part_df.createOrReplaceTempView("part")
partsupp_df.createOrReplaceTempView("partsupp")
supplier_df.createOrReplaceTempView("supplier")

In [5]:
def q1():
        query = """
select
    l_returnflag,
    l_linestatus,
    sum(l_quantity) as sum_qty,
    sum(l_extendedprice) as sum_base_price,
    sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
    sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
    avg(l_quantity) as avg_qty,
    avg(l_extendedprice) as avg_price,
    avg(l_discount) as avg_disc,
    count(*) as count_order
from
    LINEITEM
where
    l_shipdate <= date '1998-12-01' - interval '108' day
group by
    l_returnflag,
    l_linestatus
order by
    l_returnflag,
    l_linestatus;
        """
        spark.sql(query).show()

In [6]:
def q2():
        query = """
select
    s_acctbal,
    s_name,
    n_name,
    p_partkey,
    p_mfgr,
    s_address,
    s_phone,
    s_comment
from
    PART,
    SUPPLIER,
    PARTSUPP,
    NATION,
    REGION
where
    p_partkey = ps_partkey
    and s_suppkey = ps_suppkey
    and p_size = 30
    and p_type like '%STEEL'
    and s_nationkey = n_nationkey
    and n_regionkey = r_regionkey
    and r_name = 'ASIA'
    and ps_supplycost = (
        select
            min(ps_supplycost)
        from
            PARTSUPP,
            SUPPLIER,
            NATION,
            REGION
        where
            p_partkey = ps_partkey
            and s_suppkey = ps_suppkey
            and s_nationkey = n_nationkey
            and n_regionkey = r_regionkey
            and r_name = 'ASIA'
    )
order by
    s_acctbal desc,
    n_name,
    s_name,
    p_partkey
limit
    100;


        """
        spark.sql(query).show()

In [7]:
def q3():
        query = """
select
    l_orderkey,
    sum(l_extendedprice * (1 - l_discount)) as revenue,
    o_orderdate,
    o_shippriority
from
    CUSTOMER,
    ORDERS,
    LINEITEM
where
    c_mktsegment = 'AUTOMOBILE'
    and c_custkey = o_custkey
    and l_orderkey = o_orderkey
    and o_orderdate < date '1995-03-13'
    and l_shipdate > date '1995-03-13'
group by
    l_orderkey,
    o_orderdate,
    o_shippriority
order by
    revenue desc,
    o_orderdate
limit
    10;


        """
        spark.sql(query).show()

In [8]:
def q4():
        query = """
select
    o_orderpriority,
    count(*) as order_count
from
    ORDERS
where
    o_orderdate >= date '1995-01-01'
    and o_orderdate < date '1995-01-01' + interval '3' month
    and exists (
        select
            *
        from
            LINEITEM
        where
            l_orderkey = o_orderkey
            and l_commitdate < l_receiptdate
    )
group by
    o_orderpriority
order by
    o_orderpriority;


        """
        spark.sql(query).show()

In [9]:
def q5():
        query = """
select
    n_name,
    sum(l_extendedprice * (1 - l_discount)) as revenue
from
    CUSTOMER,
    ORDERS,
    LINEITEM,
    SUPPLIER,
    NATION,
    REGION
where
    c_custkey = o_custkey
    and l_orderkey = o_orderkey
    and l_suppkey = s_suppkey
    and c_nationkey = s_nationkey
    and s_nationkey = n_nationkey
    and n_regionkey = r_regionkey
    and r_name = 'MIDDLE EAST'
    and o_orderdate >= date '1994-01-01'
    and o_orderdate < date '1994-01-01' + interval '1' year
group by
    n_name
order by
    revenue desc;


        """
        spark.sql(query).show()

In [10]:
def q6():
        query = """
select
    sum(l_extendedprice * l_discount) as revenue
from
    LINEITEM
where
    l_shipdate >= date '1994-01-01'
    and l_shipdate < date '1994-01-01' + interval '1' year
    and l_discount between 0.06 - 0.01
    and 0.06 + 0.01
    and l_quantity < 24;


        """
        spark.sql(query).show()

In [11]:
def q7():
        query = """
select
    supp_nation,
    cust_nation,
    l_year,
    sum(volume) as revenue
from
    (
        select
            n1.n_name as supp_nation,
            n2.n_name as cust_nation,
            extract(
                year
                from
                    l_shipdate
            ) as l_year,
            l_extendedprice * (1 - l_discount) as volume
        from
            SUPPLIER,
            LINEITEM,
            ORDERS,
            CUSTOMER,
            NATION n1,
            NATION n2
        where
            s_suppkey = l_suppkey
            and o_orderkey = l_orderkey
            and c_custkey = o_custkey
            and s_nationkey = n1.n_nationkey
            and c_nationkey = n2.n_nationkey
            and (
                (
                    n1.n_name = 'JAPAN'
                    and n2.n_name = 'INDIA'
                )
                or (
                    n1.n_name = 'INDIA'
                    and n2.n_name = 'JAPAN'
                )
            )
            and l_shipdate between date '1995-01-01'
            and date '1996-12-31'
    ) as shipping
group by
    supp_nation,
    cust_nation,
    l_year
order by
    supp_nation,
    cust_nation,
    l_year;


        """
        spark.sql(query).show()

In [12]:
def q8():
        query = """
select
    o_year,
    sum(
        case
            when nation = 'INDIA' then volume
            else 0
        end
    ) / sum(volume) as mkt_share
from
    (
        select
            extract(
                year
                from
                    o_orderdate
            ) as o_year,
            l_extendedprice * (1 - l_discount) as volume,
            n2.n_name as nation
        from
            PART,
            SUPPLIER,
            LINEITEM,
            ORDERS,
            CUSTOMER,
            NATION n1,
            NATION n2,
            REGION
        where
            p_partkey = l_partkey
            and s_suppkey = l_suppkey
            and l_orderkey = o_orderkey
            and o_custkey = c_custkey
            and c_nationkey = n1.n_nationkey
            and n1.n_regionkey = r_regionkey
            and r_name = 'ASIA'
            and s_nationkey = n2.n_nationkey
            and o_orderdate between date '1995-01-01'
            and date '1996-12-31'
            and p_type = 'SMALL PLATED COPPER'
    ) as all_nations
group by
    o_year
order by
    o_year;


        """
        spark.sql(query).show()

In [13]:
def q9():
        query = """
select
    nation,
    o_year,
    sum(amount) as sum_profit
from
    (
        select
            n_name as nation,
            extract(
                year
                from
                    o_orderdate
            ) as o_year,
            l_extendedprice * (1 - l_discount) - ps_supplycost * l_quantity as amount
        from
            PART,
            SUPPLIER,
            LINEITEM,
            PARTSUPP,
            ORDERS,
            NATION
        where
            s_suppkey = l_suppkey
            and ps_suppkey = l_suppkey
            and ps_partkey = l_partkey
            and p_partkey = l_partkey
            and o_orderkey = l_orderkey
            and s_nationkey = n_nationkey
            and p_name like '%dim%'
    ) as profit
group by
    nation,
    o_year
order by
    nation,
    o_year desc;


        """
        spark.sql(query).show()

In [14]:
def q10():
        query = """
select
    c_custkey,
    c_name,
    sum(l_extendedprice * (1 - l_discount)) as revenue,
    c_acctbal,
    n_name,
    c_address,
    c_phone,
    c_comment
from
    CUSTOMER,
    ORDERS,
    LINEITEM,
    NATION
where
    c_custkey = o_custkey
    and l_orderkey = o_orderkey
    and o_orderdate >= date '1993-08-01'
    and o_orderdate < date '1993-08-01' + interval '3' month
    and l_returnflag = 'R'
    and c_nationkey = n_nationkey
group by
    c_custkey,
    c_name,
    c_acctbal,
    c_phone,
    n_name,
    c_address,
    c_comment
order by
    revenue desc
limit
    20;


        """
        spark.sql(query).show()

In [15]:
def q11():
        query = """
select
    ps_partkey,
    sum(ps_supplycost * ps_availqty) as value
from
    PARTSUPP,
    SUPPLIER,
    NATION
where
    ps_suppkey = s_suppkey
    and s_nationkey = n_nationkey
    and n_name = 'MOZAMBIQUE'
group by
    ps_partkey
having
    sum(ps_supplycost * ps_availqty) > (
        select
            sum(ps_supplycost * ps_availqty) * 0.0001000000
        from
            PARTSUPP,
            SUPPLIER,
            NATION
        where
            ps_suppkey = s_suppkey
            and s_nationkey = n_nationkey
            and n_name = 'MOZAMBIQUE'
    )
order by
    value desc;


        """
        spark.sql(query).show()

In [16]:
def q12():
        query = """
select l_shipmode,
sum(
    case
        when o_orderpriority = '1-URGENT'
        or o_orderpriority = '2-HIGH' then 1
        else 0
    end
) as high_line_count,
sum(
    case
        when o_orderpriority <> '1-URGENT'
        and o_orderpriority <> '2-HIGH' then 1
        else 0
    end
) as low_line_count
from
    ORDERS,
    LINEITEM
where
    o_orderkey = l_orderkey
    and l_shipmode in ('RAIL', 'FOB')
    and l_commitdate < l_receiptdate
    and l_shipdate < l_commitdate
    and l_receiptdate >= date '1997-01-01'
    and l_receiptdate < date '1997-01-01' + interval '1' year
group by
    l_shipmode
order by
    l_shipmode;


        """
        spark.sql(query).show()

In [17]:
def q13():
        query = """
select
    c_count,
    count(*) as custdist
from
    (
        select
            c_custkey,
            count(o_orderkey) as c_count
        from
            CUSTOMER
            left outer join ORDERS on c_custkey = o_custkey
            and o_comment not like '%pending%deposits%'
        group by
            c_custkey
    ) c_orders
group by
    c_count
order by
    custdist desc,
    c_count desc;


        """
        spark.sql(query).show()

In [18]:
def q14():
        query = """
select
    100.00 * sum(
        case
            when p_type like 'PROMO%' then l_extendedprice * (1 - l_discount)
            else 0
        end
    ) / sum(l_extendedprice * (1 - l_discount)) as promo_revenue
from
    LINEITEM,
    PART
where
    l_partkey = p_partkey
    and l_shipdate >= date '1996-12-01'
    and l_shipdate < date '1996-12-01' + interval '1' month;


        """
        spark.sql(query).show()

In [19]:
def q15():
        query = """
with revenue0 as
 (select
 l_suppkey as supplier_no,
 sum(l_extendedprice * (1 - l_discount)) as total_revenue
 from
 lineitem
 where
 l_shipdate >= date '1997-07-01'
 and l_shipdate < date '1997-07-01' + interval '3' month
 group by
 l_suppkey)

 select
 s_suppkey,
 s_name,
 s_address,
 s_phone,
 total_revenue
 from
 supplier,
 revenue0
 where
 s_suppkey = supplier_no
 and total_revenue = (
 select
 max(total_revenue)
 from
 revenue0
 )
 order by
 s_suppkey



        """
        spark.sql(query).show()

In [20]:
def q16():
        query = """
select
    p_brand,
    p_type,
    p_size,
    count(distinct ps_suppkey) as supplier_cnt
from
    PARTSUPP,
    PART
where
    p_partkey = ps_partkey
    and p_brand <> 'Brand#34'
    and p_type not like 'LARGE BRUSHED%'
    and p_size in (48, 19, 12, 4, 41, 7, 21, 39)
    and ps_suppkey not in (
        select
            s_suppkey
        from
            SUPPLIER
        where
            s_comment like '%Customer%Complaints%'
    )
group by
    p_brand,
    p_type,
    p_size
order by
    supplier_cnt desc,
    p_brand,
    p_type,
    p_size;


        """
        spark.sql(query).show()

In [21]:
def q17():
        query = """
select
    sum(l_extendedprice) / 7.0 as avg_yearly
from
    LINEITEM,
    PART
where
    p_partkey = l_partkey
    and p_brand = 'Brand#44'
    and p_container = 'WRAP PKG'
    and l_quantity < (
        select
            0.2 * avg(l_quantity)
        from
            LINEITEM
        where
            l_partkey = p_partkey
    );


        """
        spark.sql(query).show()

In [22]:
def q18():
        query = """
select
    c_name,
    c_custkey,
    o_orderkey,
    o_orderdate,
    o_totalprice,
    sum(l_quantity)
from
    CUSTOMER,
    ORDERS,
    LINEITEM
where
    o_orderkey in (
        select
            l_orderkey
        from
            LINEITEM
        group by
            l_orderkey
        having
            sum(l_quantity) > 314
    )
    and c_custkey = o_custkey
    and o_orderkey = l_orderkey
group by
    c_name,
    c_custkey,
    o_orderkey,
    o_orderdate,
    o_totalprice
order by
    o_totalprice desc,
    o_orderdate
limit
    100;


        """
        spark.sql(query).show()

In [23]:
def q19():
        query = """
select
    sum(l_extendedprice * (1 - l_discount)) as revenue
from
    LINEITEM,
    PART
where
    (
        p_partkey = l_partkey
        and p_brand = 'Brand#52'
        and p_container in ('SM CASE', 'SM BOX', 'SM PACK', 'SM PKG')
        and l_quantity >= 4
        and l_quantity <= 4 + 10
        and p_size between 1
        and 5
        and l_shipmode in ('AIR', 'AIR REG')
        and l_shipinstruct = 'DELIVER IN PERSON'
    )
    or (
        p_partkey = l_partkey
        and p_brand = 'Brand#11'
        and p_container in ('MED BAG', 'MED BOX', 'MED PKG', 'MED PACK')
        and l_quantity >= 18
        and l_quantity <= 18 + 10
        and p_size between 1
        and 10
        and l_shipmode in ('AIR', 'AIR REG')
        and l_shipinstruct = 'DELIVER IN PERSON'
    )
    or (
        p_partkey = l_partkey
        and p_brand = 'Brand#51'
        and p_container in ('LG CASE', 'LG BOX', 'LG PACK', 'LG PKG')
        and l_quantity >= 29
        and l_quantity <= 29 + 10
        and p_size between 1
        and 15
        and l_shipmode in ('AIR', 'AIR REG')
        and l_shipinstruct = 'DELIVER IN PERSON'
    );


        """
        spark.sql(query).show()

In [24]:
def q20():
        query = """
select
    s_name,
    s_address
from
    SUPPLIER,
    NATION
where
    s_suppkey in (
        select
            ps_suppkey
        from
            PARTSUPP
        where
            ps_partkey in (
                select
                    p_partkey
                from
                    PART
                where
                    p_name like 'green%'
            )
            and ps_availqty > (
                select
                    0.5 * sum(l_quantity)
                from
                    LINEITEM
                where
                    l_partkey = ps_partkey
                    and l_suppkey = ps_suppkey
                    and l_shipdate >= date '1993-01-01'
                    and l_shipdate < date '1993-01-01' + interval '1' year
            )
    )
    and s_nationkey = n_nationkey
    and n_name = 'ALGERIA'
order by
    s_name;




        """
        spark.sql(query).show()

In [25]:
def q21():
        query = """
select
    s_name,
    count(*) as numwait
from
    SUPPLIER,
    LINEITEM l1,
    ORDERS,
    NATION
where
    s_suppkey = l1.l_suppkey
    and o_orderkey = l1.l_orderkey
    and o_orderstatus = 'F'
    and l1.l_receiptdate > l1.l_commitdate
    and exists (
        select
            *
        from
            LINEITEM l2
        where
            l2.l_orderkey = l1.l_orderkey
            and l2.l_suppkey <> l1.l_suppkey
    )
    and not exists (
        select
            *
        from
            LINEITEM l3
        where
            l3.l_orderkey = l1.l_orderkey
            and l3.l_suppkey <> l1.l_suppkey
            and l3.l_receiptdate > l3.l_commitdate
    )
    and s_nationkey = n_nationkey
    and n_name = 'EGYPT'
group by
    s_name
order by
    numwait desc,
    s_name
limit
    100;


        """
        spark.sql(query).show()

In [26]:
def q22():
        query = """
select
    cntrycode,
    count(*) as numcust,
    sum(c_acctbal) as totacctbal
from
    (
        select
            substring(
                c_phone
                from
                    1 for 2
            ) as cntrycode,
            c_acctbal
        from
            CUSTOMER
        where
            substring(
                c_phone
                from
                    1 for 2
            ) in ('20', '40', '22', '30', '39', '42', '21')
            and c_acctbal > (
                select
                    avg(c_acctbal)
                from
                    CUSTOMER
                where
                    c_acctbal > 0.00
                    and substring(
                        c_phone
                        from
                            1 for 2
                    ) in ('20', '40', '22', '30', '39', '42', '21')
            )
            and not exists (
                select
                    *
                from
                    ORDERS
                where
                    o_custkey = c_custkey
            )
    ) as custsale
group by
    cntrycode
order by
    cntrycode;


        """
        spark.sql(query).show()

In [27]:
def execute_queries(q):
    start_time = end_time = time.time()
    query = 'q'+str(q)+"()"
    eval(query)
    end_time = time.time()
    print("Time taken to execute query %s : %.2f s" %(q,(end_time - start_time)))

In [28]:
n = len(sys.argv)
if n > 0:
    execute_queries(sys.argv[1])
else:
    print("Mention Query No.")

+------------------+-------+
|            s_name|numwait|
+------------------+-------+
|Supplier#000000246|     15|
|Supplier#000000655|     15|
|Supplier#000000599|     14|
|Supplier#000000208|     13|
|Supplier#000000227|     13|
|Supplier#000000301|     13|
|Supplier#000000618|     13|
|Supplier#000000898|     13|
|Supplier#000000094|     12|
|Supplier#000000343|     12|
|Supplier#000000856|     12|
|Supplier#000000159|     11|
|Supplier#000000664|     11|
|Supplier#000000022|     10|
|Supplier#000000038|     10|
|Supplier#000000105|     10|
|Supplier#000000111|     10|
|Supplier#000000502|     10|
|Supplier#000000938|     10|
|Supplier#000000069|      9|
+------------------+-------+
only showing top 20 rows

Time taken to execute query 21 : 13.35
+---------+-------+-----------------+
|cntrycode|numcust|       totacctbal|
+---------+-------+-----------------+
|       20|     91|666158.0999999997|
|       21|    101|760953.5499999999|
|       22|    106|759804.8099999996|
|       30|

In [38]:
spark.stop()

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1207, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1033, in send_command
    response = connection.send_command(command)
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1211, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while receiving
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:38009)
Traceback (most recent call last):
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 977, in _get_connection
    connection = self.

Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:38009)