In [0]:
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.sql.functions import *

In [0]:
%sql
Create database if not exists Sample

In [0]:
spark.sql("CREATE DATABASE IF NOT EXISTS Sample")
#//fetch metadata data from the catalog. your database name will be listed here
dbs = spark.catalog.listDatabases()
display(dbs)

name,description,locationUri
default,Default Hive database,dbfs:/user/hive/warehouse
sample,,dbfs:/user/hive/warehouse/sample.db


In [0]:
%sql

CREATE TABLE IF NOT EXISTS Sample.Transactions ( AccountId INT, TranDate DATE, TranAmt DECIMAL(8, 2));

CREATE TABLE IF NOT EXISTS Sample.Logical (RowID INT,FName VARCHAR(20), Salary SMALLINT);

In [0]:
sch=StructType([
  StructField("AccountId", IntegerType(), True),
  StructField("TranDate", DateType(), True),
  StructField("TranAmt", DecimalType(), True)])
#spark.catalog.createTable doesn't work for me

In [0]:
spark.sql("CREATE TABLE IF NOT EXISTS Sample.Transactions ( AccountId INT, TranDate DATE, TranAmt DECIMAL(8, 2));")
spark.sql("CREATE TABLE IF NOT EXISTS Sample.Logical (RowID INT,FName VARCHAR(20), Salary SMALLINT);")

Out[41]: DataFrame[]

In [0]:
spark.sql("CREATE TABLE IF NOT EXISTS Sample.Logical (RowID INT,FName VARCHAR(20), Salary SMALLINT);")

In [0]:
%sql

INSERT INTO Sample.Transactions VALUES 
( 1, '2011-01-01', 500),
( 1, '2011-01-15', 50),
( 1, '2011-01-22', 250),
( 1, '2011-01-24', 75),
( 1, '2011-01-26', 125),
( 1, '2011-01-28', 175),
( 2, '2011-01-01', 500),
( 2, '2011-01-15', 50),
( 2, '2011-01-22', 25),
( 2, '2011-01-23', 125),
( 2, '2011-01-26', 200),
( 2, '2011-01-29', 250),
( 3, '2011-01-01', 500),
( 3, '2011-01-15', 50 ),
( 3, '2011-01-22', 5000),
( 3, '2011-01-25', 550),
( 3, '2011-01-27', 95 ),
( 3, '2011-01-30', 2500)



num_affected_rows,num_inserted_rows
18,18


In [0]:
data_trans = [( 1, "2011-01-01", 500),
( 1, "2011-01-15", 50),
( 1, "2011-01-22", 250),
( 1, "2011-01-24", 75),
( 1, "2011-01-26", 125),
( 1, "2011-01-28", 175),
( 2, "2011-01-01", 500),
( 2, "2011-01-15", 50),
( 2, "2011-01-22", 25),
( 2, "2011-01-23", 125),
( 2, "2011-01-26", 200),
( 2, "2011-01-29", 250),
( 3, "2011-01-01", 500),
( 3, "2011-01-15", 50 ),
( 3, "2011-01-22", 5000),
( 3, "2011-01-25", 550),
( 3, "2011-01-27", 95 ),
( 3, "2011-01-30", 2500)]
columns=["AccountId","TranDate", "TranAmt"]
df_trans = spark.createDataFrame(data_trans, columns)
df_trans.write.insertInto("Sample.Transactions",overwrite=True)
spark.sql('select * from Sample.Transactions').show()

+---------+----------+-------+
|AccountId|  TranDate|TranAmt|
+---------+----------+-------+
|        3|2011-01-22|5000.00|
|        3|2011-01-25| 550.00|
|        3|2011-01-27|  95.00|
|        3|2011-01-30|2500.00|
|        1|2011-01-01| 500.00|
|        1|2011-01-15|  50.00|
|        1|2011-01-22| 250.00|
|        1|2011-01-24|  75.00|
|        1|2011-01-26| 125.00|
|        1|2011-01-28| 175.00|
|        2|2011-01-01| 500.00|
|        2|2011-01-15|  50.00|
|        2|2011-01-22|  25.00|
|        2|2011-01-23| 125.00|
|        2|2011-01-26| 200.00|
|        2|2011-01-29| 250.00|
|        3|2011-01-01| 500.00|
|        3|2011-01-15|  50.00|
+---------+----------+-------+



In [0]:
%sql
INSERT INTO Sample.Logical
VALUES (1,'George', 800),
(2,'Sam', 950),
(3,'Diane', 1100),
(4,'Nicholas', 1250),
(5,'Samuel', 1250),
(6,'Patricia', 1300),
(7,'Brian', 1500),
(8,'Thomas', 1600),
(9,'Fran', 2450),
(10,'Debbie', 2850),
(11,'Mark', 2975),
(12,'James', 3000),
(13,'Cynthia', 3000),
(14,'Christopher', 5000);

num_affected_rows,num_inserted_rows
14,14


In [0]:
data_logical = [(1,'George', 800),
(2,'Sam', 950),
(3,'Diane', 1100),
(4,'Nicholas', 1250),
(5,'Samuel', 1250),
(6,'Patricia', 1300),
(7,'Brian', 1500),
(8,'Thomas', 1600),
(9,'Fran', 2450),
(10,'Debbie', 2850),
(11,'Mark', 2975),
(12,'James', 3000),
(13,'Cynthia', 3000),
(14,'Christopher', 5000)]
columns=["RowID","FName", "Salary"]
df_logic = spark.createDataFrame(data_logical, columns)
df_logic.write.insertInto("Sample.Logical",overwrite=True)
spark.sql('select * from Sample.Logical').show()

+-----+-----------+------+
|RowID|      FName|Salary|
+-----+-----------+------+
|    4|   Nicholas|  1250|
|    5|     Samuel|  1250|
|   13|    Cynthia|  3000|
|   14|Christopher|  5000|
|    6|   Patricia|  1300|
|    7|      Brian|  1500|
|    1|     George|   800|
|    8|     Thomas|  1600|
|    9|       Fran|  2450|
|   10|     Debbie|  2850|
|   11|       Mark|  2975|
|   12|      James|  3000|
|    2|        Sam|   950|
|    3|      Diane|  1100|
+-----+-----------+------+



Totals based on previous row

In [0]:
%sql

SELECT AccountId,
TranDate,
TranAmt,
-- running total of all transactions
SUM(TranAmt) OVER (PARTITION BY AccountId ORDER BY TranDate) as RunTotalAmt
FROM Sample.Transactions ORDER BY AccountId, TranDate;

AccountId,TranDate,TranAmt,RunTotalAmt
1,2011-01-01,500.0,500.0
1,2011-01-15,50.0,550.0
1,2011-01-22,250.0,800.0
1,2011-01-24,75.0,875.0
1,2011-01-26,125.0,1000.0
1,2011-01-28,175.0,1175.0
2,2011-01-01,500.0,500.0
2,2011-01-15,50.0,550.0
2,2011-01-22,25.0,575.0
2,2011-01-23,125.0,700.0


In [0]:
#A group by normally reduces the number of rows returned by rolling them up and calculating averages or sums for each row. partition by does not affect the number of rows returned, but it changes how a window function's result is calculated.

In [0]:
windowSpec  = Window.partitionBy("AccountId").orderBy("TranDate")
new=df_trans.withColumn("RunTotalAmt", sum(col("TranAmt")).over(windowSpec)).orderBy("AccountId", "TranDate")
display(new)

AccountId,TranDate,TranAmt,RunTotalAmt
1,2011-01-01,500,500
1,2011-01-15,50,550
1,2011-01-22,250,800
1,2011-01-24,75,875
1,2011-01-26,125,1000
1,2011-01-28,175,1175
2,2011-01-01,500,500
2,2011-01-15,50,550
2,2011-01-22,25,575
2,2011-01-23,125,700


In [0]:
%sql
SELECT AccountId,
TranDate,
TranAmt,
-- running average of all transactions
AVG(TranAmt) OVER (PARTITION BY AccountId ORDER BY TranDate) as RunAvg,
-- running total # of transactions
COUNT(*) OVER (PARTITION BY AccountId ORDER BY TranDate) as RunTranQty,
-- smallest of the transactions so far
MIN(TranAmt) OVER (PARTITION BY AccountId ORDER BY TranDate) as RunSmallAmt,
-- largest of the transactions so far
MAX(TranAmt) OVER (PARTITION BY AccountId ORDER BY TranDate) as RunLargeAmt,
-- running total of all transactions
SUM(TranAmt) OVER (PARTITION BY AccountId ORDER BY TranDate) RunTotalAmt
FROM Sample.Transactions 
ORDER BY AccountId,TranDate;

AccountId,TranDate,TranAmt,RunAvg,RunTranQty,RunSmallAmt,RunLargeAmt,RunTotalAmt
1,2011-01-01,500.0,500.0,1,500.0,500.0,500.0
1,2011-01-15,50.0,275.0,2,50.0,500.0,550.0
1,2011-01-22,250.0,266.666667,3,50.0,500.0,800.0
1,2011-01-24,75.0,218.75,4,50.0,500.0,875.0
1,2011-01-26,125.0,200.0,5,50.0,500.0,1000.0
1,2011-01-28,175.0,195.833333,6,50.0,500.0,1175.0
2,2011-01-01,500.0,500.0,1,500.0,500.0,500.0
2,2011-01-15,50.0,275.0,2,50.0,500.0,550.0
2,2011-01-22,25.0,191.666667,3,25.0,500.0,575.0
2,2011-01-23,125.0,175.0,4,25.0,500.0,700.0


In [0]:
windowSpec = Window.partitionBy("AccountId").orderBy("TranDate")
new2 = df_trans.withColumn("RunAvg",avg("TranAmt").over(windowSpec)) \
  .withColumn("RunTranQty",count("*").over(windowSpec)) \
  .withColumn("RunSmallAmt",min("TranAmt").over(windowSpec)) \
  .withColumn("RunLargeAmt",max("TranAmt").over(windowSpec)) \
  .withColumn("RunTotalAmt",sum("TranAmt").over(windowSpec)) \
  .orderBy("AccountId", "TranDate")
display(new2)

AccountId,TranDate,TranAmt,RunAvg,RunTranQty,RunSmallAmt,RunLargeAmt,RunTotalAmt
1,2011-01-01,500,500.0,1,500,500,500
1,2011-01-15,50,275.0,2,50,500,550
1,2011-01-22,250,266.6666666666667,3,50,500,800
1,2011-01-24,75,218.75,4,50,500,875
1,2011-01-26,125,200.0,5,50,500,1000
1,2011-01-28,175,195.83333333333331,6,50,500,1175
2,2011-01-01,500,500.0,1,500,500,500
2,2011-01-15,50,275.0,2,50,500,550
2,2011-01-22,25,191.66666666666663,3,25,500,575
2,2011-01-23,125,175.0,4,25,500,700


* Calculating Totals Based Upon a Subset of Rows

* CURRENT ROW, the current row
* UNBOUNDED PRECEDING, all rows before the current row 
* UNBOUNDED FOLLOWING, all rows after the current row 
* x PRECEDING, x rows before the current row -> relative
* y FOLLOWING, y rows after the current row -> relative

In [0]:
%sql
SELECT AccountId,
TranDate,
TranAmt,
-- average of the current and previous 2 transactions
AVG(TranAmt) OVER (PARTITION BY AccountId ORDER BY TranDate ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) as SlideAvg,
-- total # of the current and previous 2 transactions
COUNT(*) OVER (PARTITION BY AccountId ORDER BY TranDate ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) as SlideQty,
-- smallest of the current and previous 2 transactions
MIN(TranAmt) OVER (PARTITION BY AccountId ORDER BY TranDate ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) as SlideMin,
-- largest of the current and previous 2 transactions
MAX(TranAmt) OVER (PARTITION BY AccountId ORDER BY TranDate ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) as SlideMax,
-- total of the current and previous 2 transactions
SUM(TranAmt) OVER (PARTITION BY AccountId ORDER BY TranDate ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) as SlideTotal,
ROW_NUMBER() OVER (PARTITION BY AccountId ORDER BY TranDate) AS RN
FROM Sample.Transactions 
ORDER BY AccountId, TranDate, RN

AccountId,TranDate,TranAmt,SlideAvg,SlideQty,SlideMin,SlideMax,SlideTotal,RN
1,2011-01-01,500.0,500.0,1,500.0,500.0,500.0,1
1,2011-01-15,50.0,275.0,2,50.0,500.0,550.0,2
1,2011-01-22,250.0,266.666667,3,50.0,500.0,800.0,3
1,2011-01-24,75.0,125.0,3,50.0,250.0,375.0,4
1,2011-01-26,125.0,150.0,3,75.0,250.0,450.0,5
1,2011-01-28,175.0,125.0,3,75.0,175.0,375.0,6
2,2011-01-01,500.0,500.0,1,500.0,500.0,500.0,1
2,2011-01-15,50.0,275.0,2,50.0,500.0,550.0,2
2,2011-01-22,25.0,191.666667,3,25.0,500.0,575.0,3
2,2011-01-23,125.0,66.666667,3,25.0,125.0,200.0,4


In [0]:
windowSpec = Window.partitionBy("AccountId").orderBy("TranDate").rowsBetween(-2,Window.currentRow)
windowSpec2 = Window.partitionBy("AccountId").orderBy("TranDate")
new3 = df_trans.withColumn("SlideAvg",avg("TranAmt").over(windowSpec)) \
  .withColumn("SlideQty",count("*").over(windowSpec)) \
  .withColumn("SlideMin",min("TranAmt").over(windowSpec)) \
  .withColumn("SlideMax",max("TranAmt").over(windowSpec)) \
  .withColumn("SlideTotal",sum("TranAmt").over(windowSpec)) \
  .withColumn("RN", row_number().over(windowSpec2)) \
  .orderBy("AccountId", "TranDate")
display(new3)

AccountId,TranDate,TranAmt,SlideAvg,SlideQty,SlideMin,SlideMax,SlideTotal,RN
1,2011-01-01,500,500.0,1,500,500,500,1
1,2011-01-15,50,275.0,2,50,500,550,2
1,2011-01-22,250,266.6666666666667,3,50,500,800,3
1,2011-01-24,75,125.0,3,50,250,375,4
1,2011-01-26,125,150.0,3,75,250,450,5
1,2011-01-28,175,125.0,3,75,175,375,6
2,2011-01-01,500,500.0,1,500,500,500,1
2,2011-01-15,50,275.0,2,50,500,550,2
2,2011-01-22,25,191.66666666666663,3,25,500,575,3
2,2011-01-23,125,66.66666666666667,3,25,125,200,4


* Logical Window

In [0]:
%sql
SELECT RowID,
FName,
Salary,
SUM(Salary) OVER (ORDER BY Salary ROWS UNBOUNDED PRECEDING) as SumByRows,
SUM(Salary) OVER (ORDER BY Salary RANGE UNBOUNDED PRECEDING) as SumByRange,

FROM Sample.Logical
ORDER BY RowID;

RowID,FName,Salary,SumByRows,SumByRange
1,George,800,800,800
2,Sam,950,1750,1750
3,Diane,1100,2850,2850
4,Nicholas,1250,4100,5350
5,Samuel,1250,5350,5350
6,Patricia,1300,6650,6650
7,Brian,1500,8150,8150
8,Thomas,1600,9750,9750
9,Fran,2450,12200,12200
10,Debbie,2850,15050,15050


In [0]:
windowSpec1 = Window.orderBy("Salary").rowsBetween(Window.unboundedPreceding, Window.currentRow)
windowSpec2 = Window.orderBy("Salary").rangeBetween(Window.unboundedPreceding, Window.currentRow)
new4 = df_logic.withColumn("SumByRows",sum("Salary").over(windowSpec1)) \
  .withColumn("SumByRange",sum("Salary").over(windowSpec2)) \
  .orderBy("RowID")
display(new4)

RowID,FName,Salary,SumByRows,SumByRange
1,George,800,800,800
2,Sam,950,1750,1750
3,Diane,1100,2850,2850
4,Nicholas,1250,4100,5350
5,Samuel,1250,5350,5350
6,Patricia,1300,6650,6650
7,Brian,1500,8150,8150
8,Thomas,1600,9750,9750
9,Fran,2450,12200,12200
10,Debbie,2850,15050,15050


In [0]:
%sql
SELECT
AccountNumber,
OrderDate,
TotalDue,
ROW_NUMBER() OVER (PARTITION BY AccountNumber ORDER BY OrderDate) AS RN
FROM Sales.SalesOrderHeader
ORDER BY AccountNumber
LIMIT 10; 

In [0]:
filePath = "dbfs:/user/hive/warehouse/salesorderheader"
salesorderheader = spark.read.format("delta") \
              .option("header","true") \
              .option("inferSchema","true") \
              .load(filePath)

display(salesorderheader)

SalesOrderID,RevisionNumber,OrderDate,DueDate,ShipDate,Status,OnlineOrderFlag,SalesOrderNumber,PurchaseOrderNumber,AccountNumber,CustomerID,ShipToAddressID,BillToAddressID,ShipMethod,CreditCardApprovalCode,SubTotal,TaxAmt,Freight,TotalDue,Comment,rowguid,ModifiedDate
71774,2,2008-06-01T00:00:00.000+0000,2008-06-13T00:00:00.000+0000,2008-06-08T00:00:00.000+0000,5,False,SO71774,PO348186287,10-4020-000609,29847,1092,1092,CARGO TRANSPORT 5,,880.3484,70.4279,22.0087,972.785,,89E42CDC-8506-48A2-B89B-EB3E64E3554E,2008-06-08T00:00:00.000+0000
71776,2,2008-06-01T00:00:00.000+0000,2008-06-13T00:00:00.000+0000,2008-06-08T00:00:00.000+0000,5,False,SO71776,PO19952192051,10-4020-000106,30072,640,640,CARGO TRANSPORT 5,,78.81,6.3048,1.9703,87.0851,,8A3448C5-E677-4158-A29B-DD33069BE0B0,2008-06-08T00:00:00.000+0000
71780,2,2008-06-01T00:00:00.000+0000,2008-06-13T00:00:00.000+0000,2008-06-08T00:00:00.000+0000,5,False,SO71780,PO19604173239,10-4020-000340,30113,653,653,CARGO TRANSPORT 5,,38418.6895,3073.4952,960.4672,42452.6519,,A47665D2-7AC9-4CF3-8A8B-2A3883554284,2008-06-08T00:00:00.000+0000
71782,2,2008-06-01T00:00:00.000+0000,2008-06-13T00:00:00.000+0000,2008-06-08T00:00:00.000+0000,5,False,SO71782,PO19372114749,10-4020-000582,29485,1086,1086,CARGO TRANSPORT 5,,39785.3304,3182.8264,994.6333,43962.7901,,F1BE45A5-5C57-4A50-93C6-5F8BE44CB7CB,2008-06-08T00:00:00.000+0000
71783,2,2008-06-01T00:00:00.000+0000,2008-06-13T00:00:00.000+0000,2008-06-08T00:00:00.000+0000,5,False,SO71783,PO19343113609,10-4020-000024,29957,992,992,CARGO TRANSPORT 5,,83858.4261,6708.6741,2096.4607,92663.5609,,7DB2329E-6446-42A8-8915-9C8370B68ED8,2008-06-08T00:00:00.000+0000
71784,2,2008-06-01T00:00:00.000+0000,2008-06-13T00:00:00.000+0000,2008-06-08T00:00:00.000+0000,5,False,SO71784,PO19285135919,10-4020-000448,29736,659,659,CARGO TRANSPORT 5,,108561.8317,8684.9465,2714.0458,119960.824,,CA31F324-2C32-4F8D-95EB-596E7F343027,2008-06-08T00:00:00.000+0000
71796,2,2008-06-01T00:00:00.000+0000,2008-06-13T00:00:00.000+0000,2008-06-08T00:00:00.000+0000,5,False,SO71796,PO17052159664,10-4020-000420,29660,1058,1058,CARGO TRANSPORT 5,,57634.6342,4610.7707,1440.8659,63686.2708,,917EF5BA-F32D-4563-8588-66DB0BCDC846,2008-06-08T00:00:00.000+0000
71797,2,2008-06-01T00:00:00.000+0000,2008-06-13T00:00:00.000+0000,2008-06-08T00:00:00.000+0000,5,False,SO71797,PO16501134889,10-4020-000142,29796,642,642,CARGO TRANSPORT 5,,78029.6898,6242.3752,1950.7422,86222.8072,,BB3FEE84-C8BF-4DD2-BCCA-675AB6A11C38,2008-06-08T00:00:00.000+0000
71815,2,2008-06-01T00:00:00.000+0000,2008-06-13T00:00:00.000+0000,2008-06-08T00:00:00.000+0000,5,False,SO71815,PO13021155785,10-4020-000276,30089,1034,1034,CARGO TRANSPORT 5,,1141.5782,91.3263,28.5395,1261.444,,2AA5F39B-1096-4A4B-B17B-F10504A397CE,2008-06-08T00:00:00.000+0000
71816,2,2008-06-01T00:00:00.000+0000,2008-06-13T00:00:00.000+0000,2008-06-08T00:00:00.000+0000,5,False,SO71816,PO12992180445,10-4020-000295,30027,1038,1038,CARGO TRANSPORT 5,,3398.1659,271.8533,84.9541,3754.9733,,E3C189E7-98DE-4C40-B6C2-0D1D13F9BB33,2008-06-08T00:00:00.000+0000


In [0]:
windowSpec  = Window.partitionBy("AccountNumber").orderBy("OrderDate")
new5=salesorderheader.select(col("AccountNumber"), col("OrderDate"), col("TotalDue") ,row_number().over(windowSpec).alias("RN") ).orderBy("AccountNumber").limit(10)
display(new5)

AccountNumber,OrderDate,TotalDue,RN
10-4020-000006,2008-06-01T00:00:00.000+0000,2361.6403,1
10-4020-000016,2008-06-01T00:00:00.000+0000,98138.2131,1
10-4020-000024,2008-06-01T00:00:00.000+0000,92663.5609,1
10-4020-000025,2008-06-01T00:00:00.000+0000,2669.3183,1
10-4020-000052,2008-06-01T00:00:00.000+0000,70698.9922,1
10-4020-000061,2008-06-01T00:00:00.000+0000,81834.9826,1
10-4020-000088,2008-06-01T00:00:00.000+0000,39531.6085,1
10-4020-000106,2008-06-01T00:00:00.000+0000,87.0851,1
10-4020-000142,2008-06-01T00:00:00.000+0000,86222.8072,1
10-4020-000151,2008-06-01T00:00:00.000+0000,272.6468,1
