In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("DataBricks Learning").getOrCreate()

In [0]:
customer = spark.read.table('samples.tpch.customer')
customerdf = customer.selectExpr('c_custkey as CustomerKey','c_name as CustomerName','c_phone as CustomerPhone','c_nationkey as CustomerNationKey')
customerdf.show()





+-----------+------------------+---------------+-----------------+
|CustomerKey|      CustomerName|  CustomerPhone|CustomerNationKey|
+-----------+------------------+---------------+-----------------+
|     412445|Customer#000412445|31-421-403-4333|               21|
|     412446|Customer#000412446|30-487-949-7942|               20|
|     412447|Customer#000412447|17-797-466-6308|                7|
|     412448|Customer#000412448|16-541-510-4964|                6|
|     412449|Customer#000412449|24-710-983-5536|               14|
|     412450|Customer#000412450|30-293-696-5047|               20|
|     412451|Customer#000412451|30-590-724-6711|               20|
|     412452|Customer#000412452|20-492-590-3363|               10|
|     412453|Customer#000412453|31-480-724-9665|               21|
|     412454|Customer#000412454|19-898-261-2669|                9|
|     412455|Customer#000412455|26-667-672-4269|               16|
|     412456|Customer#000412456|29-882-106-8873|              

In [0]:
from pyspark.sql.functions import when,col
order = spark.read.table('samples.tpch.orders')
orderdf = order.selectExpr('o_orderkey as OrderKey','o_custkey as OrderCustomerKey','o_orderstatus as OrderStatus','o_orderdate as OrderDate','o_totalprice as TotalAmt')
orderdf.select('OrderStatus').distinct().show()
orderdf = orderdf.withColumn('Status',when(orderdf.OrderStatus == 'F',"Failed")
                            .when(orderdf.OrderStatus == 'O',"Ordered")
                            .otherwise ("Proccesing")).drop('OrderStatus')
orderdf.show()

+-----------+
|OrderStatus|
+-----------+
|          F|
|          O|
|          P|
+-----------+

+--------+----------------+----------+---------+-------+
|OrderKey|OrderCustomerKey| OrderDate| TotalAmt| Status|
+--------+----------------+----------+---------+-------+
|13710944|          227285|1995-10-11|162169.66|Ordered|
|13710945|          225010|1997-09-29|252273.67|Ordered|
|13710946|          238820|1997-10-31|179947.16|Ordered|
|13710947|          581233|1995-05-25| 33843.49|Ordered|
|13710948|           10033|1995-09-04| 42500.65|Ordered|
|13710949|          615502|1995-07-13| 48225.35|Ordered|
|13710950|          710665|1992-11-29|265761.00| Failed|
|13710951|          382528|1993-05-21|137666.86| Failed|
|13710976|          122618|1998-03-06|158725.42|Ordered|
|13710977|          575623|1998-05-04|178703.66|Ordered|
|13710978|          236596|1993-02-04| 87222.97| Failed|
|13710979|          712924|1992-09-18|178101.35| Failed|
|13710980|           90848|1996-11-14|123597.8

In [0]:
nation = spark.read.table('samples.tpch.nation')
nationdf = nation.selectExpr('n_nationkey as Nationkey','n_name as NationName')
nationdf.show()


+---------+----------+
|Nationkey|NationName|
+---------+----------+
|        0|   ALGERIA|
|        1| ARGENTINA|
|        2|    BRAZIL|
|        3|    CANADA|
|        4|     EGYPT|
|        5|  ETHIOPIA|
|        6|    FRANCE|
|        7|   GERMANY|
|        8|     INDIA|
|        9| INDONESIA|
|       10|      IRAN|
|       11|      IRAQ|
|       12|     JAPAN|
|       13|    JORDAN|
|       14|     KENYA|
|       15|   MOROCCO|
|       16|MOZAMBIQUE|
|       17|      PERU|
|       18|     CHINA|
|       19|   ROMANIA|
+---------+----------+
only showing top 20 rows



In [0]:
res = orderdf.join(customerdf).where(orderdf['OrderCustomerKey'] == customerdf['CustomerKey'])\
              .join(nationdf).where(customerdf['CustomerNationKey'] == nationdf['Nationkey'])\
              .select('OrderKey','CustomerName','NationName','OrderDate','Status','TotalAmt')
              

In [0]:


StatusSalesAmount = res.groupby('Status').sum('TotalAmt').withColumnRenamed('sum(TotalAmt)','SumTotalAmt')
display(StatusSalesAmount)




Status,SumTotalAmt
Ordered,548923692869.55
Proccesing,35333603011.43
Failed,549181919365.27


Databricks visualization. Run in Databricks to view.

In [0]:
from pyspark.sql.functions import sum,avg,max,count
display(res.filter(res.OrderDate == '1995-06-10').groupBy('NationName','Status').agg(sum('TotalAmt').alias('SumTotalAmt'),count('OrderKey').alias('TotalOrderCount')))

NationName,Status,SumTotalAmt,TotalOrderCount
CHINA,Ordered,13597652.13,90
INDIA,Ordered,15556860.72,108
UNITED KINGDOM,Proccesing,4066783.73,21
GERMANY,Ordered,12688658.02,95
IRAN,Proccesing,5395171.64,28
IRAQ,Proccesing,4263762.24,24
ALGERIA,Ordered,14325611.42,100
CHINA,Proccesing,3537359.49,17
JAPAN,Failed,26833.53,1
KENYA,Ordered,13709912.54,91


Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

In [0]:
from pyspark.sql.functions import to_timestamp, date_format, year

display(res.filter((res.Status == "Failed") & (year(res.OrderDate) == 1995)).groupby(
    "OrderDate"
).agg(
    sum("TotalAmt").alias("SumTotalSales"), count("OrderKey").alias("TotalOrderCount")
))

OrderDate,SumTotalSales,TotalOrderCount
1995-02-21,378337429.9,2598
1995-02-01,469403915.34,3066
1995-02-08,475118268.43,3143
1995-01-23,481846486.2,3172
1995-03-17,131206498.52,1189
1995-02-20,404670758.04,2762
1995-05-08,14425381.86,253
1995-03-20,122027910.99,1114
1995-03-28,75385944.95,819
1995-05-05,13337304.95,246


Databricks visualization. Run in Databricks to view.

In [0]:
res.printSchema()

root
 |-- OrderKey: long (nullable = true)
 |-- CustomerName: string (nullable = true)
 |-- NationName: string (nullable = true)
 |-- OrderDate: date (nullable = true)
 |-- Status: string (nullable = false)
 |-- TotalAmt: decimal(18,2) (nullable = true)



In [0]:
from pyspark.sql.functions import to_timestamp,date_format,year,month
res = res.withColumn('Year',year(res.OrderDate))
res = res.withColumn('Month',month(res.OrderDate))

In [0]:
res.show()

+--------+------------------+----------+----------+-------+---------+----+-----+
|OrderKey|      CustomerName|NationName| OrderDate| Status| TotalAmt|Year|Month|
+--------+------------------+----------+----------+-------+---------+----+-----+
|13949350|Customer#000000007|     CHINA|1997-12-05|Ordered|217862.05|1997|   12|
| 7325473|Customer#000000007|     CHINA|1992-05-30| Failed|111957.12|1992|    5|
|25815556|Customer#000000007|     CHINA|1992-10-03| Failed|212906.99|1992|   10|
|13668358|Customer#000000007|     CHINA|1997-08-23|Ordered|173327.17|1997|    8|
|29797575|Customer#000000007|     CHINA|1994-02-06| Failed| 71906.44|1994|    2|
|18708064|Customer#000000007|     CHINA|1995-08-22|Ordered|129100.04|1995|    8|
|19696194|Customer#000000007|     CHINA|1996-10-09|Ordered| 37581.90|1996|   10|
| 1763205|Customer#000000007|     CHINA|1994-08-28| Failed| 11814.29|1994|    8|
|26458976|Customer#000000007|     CHINA|1997-07-06|Ordered|165670.04|1997|    7|
|15337408|Customer#000000007

In [0]:
display(res.groupBy('Year').agg(count('TotalAmt').alias("TotalTxnCount")))

Year,TotalTxnCount
1997,1137325
1994,1137944
1996,1141556
1998,668101
1995,1136308
1992,1139873
1993,1138893


Databricks visualization. Run in Databricks to view.

In [0]:
display(res.filter(res.Year == '1995').groupby('Month').agg(count('Orderkey').alias('TotalTxCount')))

Month,TotalTxCount
12,96407
1,97141
6,93670
3,96571
5,96244
9,93445
4,93484
8,96280
7,96731
10,96293


Databricks visualization. Run in Databricks to view.

In [0]:
display(res.groupby('Year','Month').agg(count('Orderkey').alias('TotalTxCount')))

Year,Month,TotalTxCount
1997,11,93049
1998,2,87411
1995,12,96407
1998,7,96736
1994,3,96593
1996,11,93500
1998,1,97175
1992,12,96819
1994,12,96729
1994,8,96741


Databricks visualization. Run in Databricks to view.

In [0]:
display('Order Overview')

'Order Overview'

In [0]:
%sql
select o_orderstatus,count(distinct o_custkey) from samples.tpch.orders
group by o_orderstatus


o_orderstatus,count(DISTINCT o_custkey)
F,498132
O,498182
P,155917


In [0]:
CountCust = spark.sql("select o_orderstatus,count(distinct o_custkey) as custcount from samples.tpch.orders group by o_orderstatus")
CountCust.show()

+-------------+---------+
|o_orderstatus|custcount|
+-------------+---------+
|            F|   498132|
|            O|   498182|
|            P|   155917|
+-------------+---------+

