# Clube do Livro - Cap. 06 
# Agrupamentos 

In [0]:
spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")

In [0]:
path = "/databricks-datasets/definitive-guide/data/retail-data/all/*.csv"

df = ( 
        spark.read
            .format('csv')
            .option('header', 'true')
            .option('inferSchema', 'true')
            .load(path)
            .coalesce(5)

)

df.cache().count()
df.printSchema()
df.createOrReplaceTempView('dfTable')

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)



In [0]:
from pyspark.sql.functions import count, countDistinct, approxCountDistinct, first, last, min, max, sum,sumDistinct, avg, col


display(
  df.select(
    count('*').alias('countAll'),
    count(col('CustomerID')).alias('countCustID'), # count not null 
    countDistinct(col('customerID')).alias('countDistCustID'),
    approxCountDistinct(col('customerID'), 0.1 ).alias('countApproxCustID'),
    first(col('customerID')).alias('firstRow'),
    last(col('customerID')).alias('lastRow'),
    min(col('customerID')).alias('minCustID'),
    max(col('customerID')).alias('maxRow'),
    sum('Quantity').alias('sumQtd'),
    sumDistinct(col('Quantity')).alias('sumDistQtd'),
    avg(col('Quantity')).alias('avgQnt')
  )
)



countAll,countCustID,countDistCustID,countApproxCustID,firstRow,lastRow,minCustID,maxRow,sumQtd,sumDistQtd,avgQnt
541909,406829,4372,4336,17850,12680,12346,18287,5176450,29310,9.55224954743324


In [0]:
# Recommendation : When applying AVG(), use COALESCE to convert NULL values into zeros. This way, we will obtain the correct average, since the AVG() function ignores NULL values.

df_avg = spark.sql("""
            select 1 as col1 
            union all 
            select 2 as col1
            union all 
            select 3 as col1
            union all 
            select null as col1
             """)

df_avg.selectExpr(
    "count(col1) as count", # not null
    "sum(col1) as sum",
    "avg(col1) as avg"  # not null 
).display()

count,sum,avg
3,6,2.0


In [0]:
from pyspark.sql.functions import collect_set, collect_list

df2 = df.limit(200)


display(
  df2.agg(
      collect_set('Country'), # returns array with distinct values
      collect_list('Country') # return all values
  )
)

collect_set(Country),collect_list(Country)
"List(France, Australia, United Kingdom)","List(United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, France, France, France, France, France, France, France, France, France, France, France, France, France, France, France, France, France, France, France, France, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, United Kingdom, Australia, Australia, Australia)"


Window Function

In [0]:
from pyspark.sql.functions import to_date

dfWithDate = (
  df.withColumn( "date", to_date(col('InvoiceDate'), "MM/d/yyyy H:mm"))

)

dfWithDate.createOrReplaceTempView('dfWithDate')

display(dfWithDate.limit(5))

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country,date
536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12/1/2010 8:26,2.55,17850,United Kingdom,2010-12-01
536365,71053,WHITE METAL LANTERN,6,12/1/2010 8:26,3.39,17850,United Kingdom,2010-12-01
536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,12/1/2010 8:26,2.75,17850,United Kingdom,2010-12-01
536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,12/1/2010 8:26,3.39,17850,United Kingdom,2010-12-01
536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,12/1/2010 8:26,3.39,17850,United Kingdom,2010-12-01


In [0]:
from pyspark.sql import Window
from pyspark.sql.functions import rank, dense_rank, lead

windowSpec = (
    Window
    .partitionBy( "CustomerID", "date").orderBy(col('Quantity').desc())
    .rowsBetween(
        Window.unboundedPreceding,
        Window.currentRow
        )
    )


maxPurchaseQnt = max(col('Quantity')).over(windowSpec)
sumQnt = sum(col('Quantity')).over(windowSpec)
purchaseDenseRank = dense_rank().over(windowSpec)
purchaseRank = rank().over(windowSpec)


display(
    dfWithDate.where( "CustomerID is not NULL")
    .select(
        col('CustomerID'),
        col('date'),
        col('Quantity'),
        maxPurchaseQnt.alias('maxPurchaseQnt'),
        sumQnt.alias('sumQnt'),
        purchaseDenseRank.alias('purchaseDenseRank'),
        purchaseRank.alias('purchaseRank')

    )
)

CustomerID,date,Quantity,maxPurchaseQnt,sumQnt,purchaseDenseRank,purchaseRank
12346,2011-01-18,74215,74215,74215,1,1
12346,2011-01-18,-74215,74215,0,2,2
12347,2010-12-07,36,36,36,1,1
12347,2010-12-07,30,36,66,2,2
12347,2010-12-07,24,36,90,3,3
12347,2010-12-07,12,36,102,4,4
12347,2010-12-07,12,36,114,4,4
12347,2010-12-07,12,36,126,4,4
12347,2010-12-07,12,36,138,4,4
12347,2010-12-07,12,36,150,4,4


In [0]:
%sql

select 
  CustomerID,
  'date',
  Quantity,
  MAX(quantity) over(partition by CustomerID, 'date' order by Quantity desc rows between unbounded preceding and current row) as max_qnt,
  SUM(quantity) over(partition by CustomerID, 'date' order by Quantity desc rows between unbounded preceding and current row) as sum_qnt,
  dense_rank()  over(partition by CustomerID, 'date' order by Quantity desc rows between unbounded preceding and current row) as dense_rank_qnt,
  rank()        over(partition by CustomerID, 'date' order by Quantity desc rows between unbounded preceding and current row) as rank_qnt
  from dfWithDate
  where CustomerID is not null 

CustomerID,date,Quantity,max_qnt,sum_qnt,dense_rank_qnt,rank_qnt
12346,date,74215,74215,74215,1,1
12346,date,-74215,74215,0,2,2
12347,date,240,240,240,1,1
12347,date,48,240,288,2,2
12347,date,36,240,324,3,3
12347,date,36,240,360,3,3
12347,date,36,240,396,3,3
12347,date,36,240,432,3,3
12347,date,36,240,468,3,3
12347,date,36,240,504,3,3


In [0]:
dfNotNull = dfWithDate.na.drop()
dfNotNull.createOrReplaceTempView('dfNotNull')

In [0]:

%sql 
-- two or more groupings in one table


select 
  customerID,
  null as stockCode,
  sum(Quantity) as qnt
from dfNotNull
group by customerID

union all 

select 
  customerID,
  stockCode,
  sum(Quantity) as qnt
from dfNotNull
group by customerID,stockCode
order by customerID
limit 10 

customerID,stockCode,qnt
12346,,0
12346,23166.0,0
12347,21976.0,48
12347,22771.0,12
12347,22699.0,24
12347,22698.0,12
12347,21171.0,12
12347,22728.0,16
12347,22729.0,8
12347,23174.0,4


### A better way to perform this aggregation is by using grouping sets.

In [0]:
%sql

select 
  customerID, 
  stockCode, 
  grouping_id() as aggID, -- This approach identifies the aggregation level.
  sum(quantity) as qnt
from dfNotNull
group by customerID, stockCode
  grouping sets (
    (customerID),
    (customerID,stockCode)
  )
order by customerID,stockCode

customerID,stockCode,aggID,qnt
12346,,1,0
12346,23166,0,0
12347,,1,2458
12347,16008,0,24
12347,17021,0,36
12347,20665,0,6
12347,20719,0,40
12347,20780,0,12
12347,20782,0,6
12347,20966,0,10


##ROLLUP 
### ROLLUP performs hierarchical grouping

In [0]:
spark.sql("""
      select 
        YEAR(date) as YEAR_,
        MONTH(date) as  MONTH_,
        DAY(date) as DAY_,
        sum(quantity) as qnt
      from dfNotNull
      group by 
        rollup(
            YEAR(date), MONTH(date), DAY(date)
        )
      order by YEAR_, MONTH_, DAY_
""").display()

YEAR_,MONTH_,DAY_,qnt
,,,4906888
2010.0,,,296362
2010.0,12.0,,296362
2010.0,12.0,1.0,24032
2010.0,12.0,2.0,20855
2010.0,12.0,3.0,11548
2010.0,12.0,5.0,16394
2010.0,12.0,6.0,16095
2010.0,12.0,7.0,19351
2010.0,12.0,8.0,21275


#CUBE
CUBE performs cross grouping

In [0]:
spark.sql("""
      select 
        YEAR(date) as YEAR_,
        MONTH(date) as  MONTH_,
        DAY(date) as DAY_,
        sum(quantity) as qnt
      from dfNotNull
      group by 
        cube(
            YEAR(date), MONTH(date), DAY(date)
        )
      order by YEAR_, MONTH_, DAY_
""").display()

YEAR_,MONTH_,DAY_,qnt
,,,4906888
,,1.0,168590
,,2.0,141171
,,3.0,145457
,,4.0,186715
,,5.0,215059
,,6.0,192302
,,7.0,202399
,,8.0,171510
,,9.0,164883


# pivot

In [0]:
data = [
    ("A", "X", 10),
    ("A", "Y", 20),
    ("B", "X", 30),
    ("B", "Y", 40)
]
dataframe = (
    spark.createDataFrame(data, ['id','categ','value'])
)
display(dataframe)

id,categ,value
A,X,10
A,Y,20
B,X,30
B,Y,40


In [0]:
pivoted_df = (
    dataframe.groupBy('id').pivot('categ').sum('value')

)

display(pivoted_df)

id,X,Y
B,30,40
A,10,20
