In [1]:
import findspark
findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [2]:
df = spark.read.format('csv').option('header', 'true').option('inferSchema', 'true')\
    .load('file:///home/sha/dev/books/Spark-The-Definitive-Guide/data/retail-data/by-day/2010-12-01.csv')

In [3]:
df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [4]:
df.createOrReplaceTempView('dfTable')

In [5]:
from pyspark.sql.functions import col, instr, expr
priceFilter = col("UnitPrice") > 600
descripFilter = instr(df.Description, "POSTAGE") >= 1
df.where(df.StockCode.isin("DOT")).where(priceFilter | descripFilter).show()

+---------+---------+--------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|   Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+
|   536544|      DOT|DOTCOM POSTAGE|       1|2010-12-01 14:32:00|   569.77|      null|United Kingdom|
|   536592|      DOT|DOTCOM POSTAGE|       1|2010-12-01 17:06:00|   607.49|      null|United Kingdom|
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+



In [6]:
DOTCodeFilter = col('StockCode') == 'DOT'
priceFilter = col('UnitPrice') > 600
descripFilter = instr(col('Description'), 'POSTAGE') >= 1
df.withColumn('isExpensive', DOTCodeFilter & (priceFilter | descripFilter)).where('isExpensive').select('unitPrice', 'isExpensive').show(5)

+---------+-----------+
|unitPrice|isExpensive|
+---------+-----------+
|   569.77|       true|
|   607.49|       true|
+---------+-----------+



In [7]:
df.withColumn('isExpensive', expr('NOT UnitPrice <= 250')).where('isExpensive').select('Description', 'UnitPrice').show(5)

+--------------+---------+
|   Description|UnitPrice|
+--------------+---------+
|DOTCOM POSTAGE|   569.77|
|DOTCOM POSTAGE|   607.49|
+--------------+---------+



In [8]:
from pyspark.sql.functions import pow
fabricatedQuantity = pow(col('Quantity') * col('UnitPrice'), 2) + 5
df.select(expr('CustomerId'), fabricatedQuantity.alias('realQuantity')).show(2)

+----------+------------------+
|CustomerId|      realQuantity|
+----------+------------------+
|   17850.0|239.08999999999997|
|   17850.0|          418.7156|
+----------+------------------+
only showing top 2 rows



In [9]:
df.selectExpr('CustomerId', '(POWER((Quantity * UnitPrice), 2.0) + 5) as realQuantity').show(2)

+----------+------------------+
|CustomerId|      realQuantity|
+----------+------------------+
|   17850.0|239.08999999999997|
|   17850.0|          418.7156|
+----------+------------------+
only showing top 2 rows



In [10]:
from pyspark.sql.functions import lit, round, bround
df.select(round(lit('2.5')), bround(lit('2.5'))).show(2)

+-------------+--------------+
|round(2.5, 0)|bround(2.5, 0)|
+-------------+--------------+
|          3.0|           2.0|
|          3.0|           2.0|
+-------------+--------------+
only showing top 2 rows



In [11]:
from pyspark.sql.functions import corr
df.stat.corr('Quantity', 'UnitPrice')
df.select(corr('Quantity', 'UnitPrice')).show()

+-------------------------+
|corr(Quantity, UnitPrice)|
+-------------------------+
|     -0.04112314436835551|
+-------------------------+



In [12]:
df.describe().show()

+-------+-----------------+------------------+--------------------+------------------+------------------+------------------+--------------+
|summary|        InvoiceNo|         StockCode|         Description|          Quantity|         UnitPrice|        CustomerID|       Country|
+-------+-----------------+------------------+--------------------+------------------+------------------+------------------+--------------+
|  count|             3108|              3108|                3098|              3108|              3108|              1968|          3108|
|   mean| 536516.684944841|27834.304044117645|                null| 8.627413127413128| 4.151946589446603|15661.388719512195|          null|
| stddev|72.89447869788873|17407.897548583845|                null|26.371821677029203|15.638659854603892|1854.4496996893627|          null|
|    min|           536365|             10002| 4 PURPLE FLOCK D...|               -24|               0.0|           12431.0|     Australia|
|    max|          C

In [13]:
df.stat.crosstab('StockCode', 'Quantity').show()

+------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|StockCode_Quantity| -1|-10|-12| -2|-24| -3| -4| -5| -6| -7|  1| 10|100| 11| 12|120|128| 13| 14|144| 15| 16| 17| 18| 19|192|  2| 20|200| 21|216| 22| 23| 24| 25|252| 27| 28|288|  3| 30| 32| 33| 34| 36|384|  4| 40|432| 47| 48|480|  5| 50| 56|  6| 60|600| 64|  7| 70| 72|  8| 80|  9| 96|
+------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|             22578|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0| 

In [14]:
colName = 'UnitPrice'
quantileProbs = [0.0, 0.25, 0.5, 0.75, 1.0]
relError = 0.05
df.stat.approxQuantile('UnitPrice', quantileProbs, relError)

[0.0, 1.65, 2.51, 4.21, 607.49]

In [15]:
from pyspark.sql.functions import monotonically_increasing_id
df.select(monotonically_increasing_id()).show(2)

+-----------------------------+
|monotonically_increasing_id()|
+-----------------------------+
|                            0|
|                            1|
+-----------------------------+
only showing top 2 rows



In [16]:
from pyspark.sql.functions import initcap
df.select(initcap(col('Description'))).show()

+--------------------+
|initcap(Description)|
+--------------------+
|White Hanging Hea...|
| White Metal Lantern|
|Cream Cupid Heart...|
|Knitted Union Fla...|
|Red Woolly Hottie...|
|Set 7 Babushka Ne...|
|Glass Star Froste...|
|Hand Warmer Union...|
|Hand Warmer Red P...|
|Assorted Colour B...|
|Poppy's Playhouse...|
|Poppy's Playhouse...|
|Feltcraft Princes...|
|Ivory Knitted Mug...|
|Box Of 6 Assorted...|
|Box Of Vintage Ji...|
|Box Of Vintage Al...|
|Home Building Blo...|
|Love Building Blo...|
|Recipe Box With M...|
+--------------------+
only showing top 20 rows



In [17]:
from pyspark.sql.functions import lower, upper
df.select(col('Description'), lower(col('Description')), upper(lower(col('Description')))).show(2)

+--------------------+--------------------+-------------------------+
|         Description|  lower(Description)|upper(lower(Description))|
+--------------------+--------------------+-------------------------+
|WHITE HANGING HEA...|white hanging hea...|     WHITE HANGING HEA...|
| WHITE METAL LANTERN| white metal lantern|      WHITE METAL LANTERN|
+--------------------+--------------------+-------------------------+
only showing top 2 rows



In [18]:
from pyspark.sql.functions import ltrim, rtrim, rpad, lpad, trim
df.select(ltrim(lit('    Hello    ')).alias('ltrim'),
         rtrim(lit('    Hello    ')).alias('rtrim'),
         trim(lit('    Hello    ')).alias('trim'),
         lpad(lit('Hello'), 3, '*').alias('lpad'),
         rpad(lit('Hello'), 10, '*').alias('rpad')).show(2)

+---------+---------+-----+----+----------+
|    ltrim|    rtrim| trim|lpad|      rpad|
+---------+---------+-----+----+----------+
|Hello    |    Hello|Hello| Hel|Hello*****|
|Hello    |    Hello|Hello| Hel|Hello*****|
+---------+---------+-----+----+----------+
only showing top 2 rows



In [19]:
from pyspark.sql.functions import regexp_replace
regex_string = 'BLACK|WHITE|RED|GREEN|BLUE'
df.select(
    regexp_replace(col('Description'), regex_string, 'COLOR').alias('color_clean'),
    col('Description')).show(2)

+--------------------+--------------------+
|         color_clean|         Description|
+--------------------+--------------------+
|COLOR HANGING HEA...|WHITE HANGING HEA...|
| COLOR METAL LANTERN| WHITE METAL LANTERN|
+--------------------+--------------------+
only showing top 2 rows



In [20]:
from pyspark.sql.functions import translate
df.select(translate(col('Description'), 'LEET', '1337'), col('Description')).show(2)

+----------------------------------+--------------------+
|translate(Description, LEET, 1337)|         Description|
+----------------------------------+--------------------+
|              WHI73 HANGING H3A...|WHITE HANGING HEA...|
|               WHI73 M37A1 1AN73RN| WHITE METAL LANTERN|
+----------------------------------+--------------------+
only showing top 2 rows



In [21]:
from pyspark.sql.functions import regexp_extract
extract_str = '(BLACK|WHITE|RED|GREEN|BLUE)'
df.select(
    regexp_extract(col('Description'), extract_str, 1).alias('color_clean'), col('Description')).show(2)

+-----------+--------------------+
|color_clean|         Description|
+-----------+--------------------+
|      WHITE|WHITE HANGING HEA...|
|      WHITE| WHITE METAL LANTERN|
+-----------+--------------------+
only showing top 2 rows



In [22]:
from pyspark.sql.functions import instr
containsBlack = instr(col('Description'), 'BLACK') >= 1
containsWhite = instr(col('Description'), 'WHITE') >= 1
df.withColumn('hasSimpleColor', containsBlack | containsWhite).where('hasSimpleColor').select('Description').show(3, False)

+----------------------------------+
|Description                       |
+----------------------------------+
|WHITE HANGING HEART T-LIGHT HOLDER|
|WHITE METAL LANTERN               |
|RED WOOLLY HOTTIE WHITE HEART.    |
+----------------------------------+
only showing top 3 rows



In [23]:
from pyspark.sql.functions import expr, locate
simpleColors = ['black', 'white', 'red', 'green', 'blue']
def color_locator(column, color_string):
    return locate(color_string.upper(), column).cast('boolean').alias('is_' + color_string)
selectedColumns = [color_locator(df.Description, c) for c in simpleColors]
selectedColumns.append(expr('*'))

df.select(*selectedColumns).where(expr('is_white OR is_red')).select('Description').show(3, False)

+----------------------------------+
|Description                       |
+----------------------------------+
|WHITE HANGING HEART T-LIGHT HOLDER|
|WHITE METAL LANTERN               |
|RED WOOLLY HOTTIE WHITE HEART.    |
+----------------------------------+
only showing top 3 rows



In [24]:
from pyspark.sql.functions import current_date, current_timestamp
dateDF = spark.range(10).withColumn('today', current_date()).withColumn('now', current_timestamp())
dateDF.createOrReplaceTempView('dateTable')
dateDF.printSchema()

root
 |-- id: long (nullable = false)
 |-- today: date (nullable = false)
 |-- now: timestamp (nullable = false)



In [25]:
from pyspark.sql.functions import date_add, date_sub
dateDF.select(date_sub(col('today'), 5), date_add(col('today'), 5)).show(1)

+------------------+------------------+
|date_sub(today, 5)|date_add(today, 5)|
+------------------+------------------+
|        2018-12-22|        2019-01-01|
+------------------+------------------+
only showing top 1 row



In [26]:
from pyspark.sql.functions import datediff, months_between, to_date
dateDF.withColumn('week_ago', date_sub(col('today'), 7)).select(datediff(col('week_ago'), col('today'))).show(1)

+-------------------------+
|datediff(week_ago, today)|
+-------------------------+
|                       -7|
+-------------------------+
only showing top 1 row



In [27]:
dateDF.select(to_date(lit('2016-01-01')).alias('start'), to_date(lit('2017-05-22')).alias('end')).select(months_between(col('start'), col('end'))).show(1)

+--------------------------------+
|months_between(start, end, true)|
+--------------------------------+
|                    -16.67741935|
+--------------------------------+
only showing top 1 row



In [28]:
from pyspark.sql.functions import coalesce
df.select(coalesce(col('Description'), col('CustomerId'))).show()

+---------------------------------+
|coalesce(Description, CustomerId)|
+---------------------------------+
|             WHITE HANGING HEA...|
|              WHITE METAL LANTERN|
|             CREAM CUPID HEART...|
|             KNITTED UNION FLA...|
|             RED WOOLLY HOTTIE...|
|             SET 7 BABUSHKA NE...|
|             GLASS STAR FROSTE...|
|             HAND WARMER UNION...|
|             HAND WARMER RED P...|
|             ASSORTED COLOUR B...|
|             POPPY'S PLAYHOUSE...|
|             POPPY'S PLAYHOUSE...|
|             FELTCRAFT PRINCES...|
|             IVORY KNITTED MUG...|
|             BOX OF 6 ASSORTED...|
|             BOX OF VINTAGE JI...|
|             BOX OF VINTAGE AL...|
|             HOME BUILDING BLO...|
|             LOVE BUILDING BLO...|
|             RECIPE BOX WITH M...|
+---------------------------------+
only showing top 20 rows



In [29]:
cDf = spark.createDataFrame([(None, None), (1, None), (None, 2)], ("a", "b"))

In [30]:
cDf.show()

+----+----+
|   a|   b|
+----+----+
|null|null|
|   1|null|
|null|   2|
+----+----+



In [31]:
cDf.select(coalesce(cDf["a"], cDf["b"])).show()

+--------------+
|coalesce(a, b)|
+--------------+
|          null|
|             1|
|             2|
+--------------+



In [32]:
dd = cDf.na.drop('any')
dd.show()

+---+---+
|  a|  b|
+---+---+
+---+---+



In [33]:
dd = df.na.drop('all', subset=['StockCode', 'InvoiceNo'])
dd.show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|2010-12-01 08:26:00|     7.65|   17850.0|United Kingdom|
|   536365|    21730|GLASS S

In [34]:
dd = df.na.fill('all', subset=['StockCode', 'InvoiceNo'])
dd.show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|2010-12-01 08:26:00|     7.65|   17850.0|United Kingdom|
|   536365|    21730|GLASS S

In [35]:
fill_cols_vals = {'StockCode': 5, 'Description': 'No Value'}
dd = df.na.fill(fill_cols_vals)
dd.where(expr('Description = "No Value"')).select('InvoiceNo', 'Description').show()

+---------+-----------+
|InvoiceNo|Description|
+---------+-----------+
|   536414|   No Value|
|   536545|   No Value|
|   536546|   No Value|
|   536547|   No Value|
|   536549|   No Value|
|   536550|   No Value|
|   536552|   No Value|
|   536553|   No Value|
|   536554|   No Value|
|   536589|   No Value|
+---------+-----------+



In [36]:
df.where(expr('Description = null')).select('InvoiceNo', 'Description').show()

+---------+-----------+
|InvoiceNo|Description|
+---------+-----------+
+---------+-----------+



In [37]:
df.select(expr('Description = null')).show()

+--------------------+
|(Description = NULL)|
+--------------------+
|                null|
|                null|
|                null|
|                null|
|                null|
|                null|
|                null|
|                null|
|                null|
|                null|
|                null|
|                null|
|                null|
|                null|
|                null|
|                null|
|                null|
|                null|
|                null|
|                null|
+--------------------+
only showing top 20 rows



In [38]:
df.where(col('Description') != None).select('InvoiceNo', 'Description').show()

+---------+-----------+
|InvoiceNo|Description|
+---------+-----------+
+---------+-----------+



In [39]:
df.na.replace([""], ["UNKNOWN"], 'Description').where(expr('Description = "UNKNOWN"')).select('InvoiceNo', 'Description').show()

+---------+-----------+
|InvoiceNo|Description|
+---------+-----------+
+---------+-----------+



In [40]:
udfExampleDF = spark.range(5).toDF('num')

def power3(double_value):
    return double_value ** 3

In [41]:
from pyspark.sql.functions import udf
power3udf = udf(power3)

In [42]:
udfExampleDF.select(power3udf(col('num'))).show(2)

+-----------+
|power3(num)|
+-----------+
|          0|
|          1|
+-----------+
only showing top 2 rows



In [43]:
from pyspark.sql.types import IntegerType, DoubleType
spark.udf.register('power3py', power3, DoubleType())

<function __main__.power3(double_value)>

In [44]:
udfExampleDF.selectExpr('power3py(num)').show(2)

+-------------+
|power3py(num)|
+-------------+
|         null|
|         null|
+-------------+
only showing top 2 rows



In [45]:
df.count()

3108

In [46]:
df = spark.read.format('csv').option('header', 'true').option('inferSchema', 'true')\
.load('file:///home/sha/dev/books/Spark-The-Definitive-Guide/data/retail-data/all/*.csv')\
.coalesce(5)

In [47]:
df.count()

541909

In [48]:
df.head()

Row(InvoiceNo='536365', StockCode='85123A', Description='WHITE HANGING HEART T-LIGHT HOLDER', Quantity=6, InvoiceDate='12/1/2010 8:26', UnitPrice=2.55, CustomerID=17850, Country='United Kingdom')

In [49]:
df.cache()
df.createOrReplaceTempView('dfTable')

In [50]:
from pyspark.sql.functions import count
df.select(count('StockCode')).show()

+----------------+
|count(StockCode)|
+----------------+
|          541909|
+----------------+



In [51]:
from pyspark.sql.functions import countDistinct
df.select(countDistinct('StockCode')).show()

+-------------------------+
|count(DISTINCT StockCode)|
+-------------------------+
|                     4070|
+-------------------------+



In [52]:
from pyspark.sql.functions import approx_count_distinct
df.select(approx_count_distinct('StockCode', 0.1)).show()

+--------------------------------+
|approx_count_distinct(StockCode)|
+--------------------------------+
|                            3364|
+--------------------------------+



In [53]:
from pyspark.sql.functions import first, last
df.select(first('StockCode'), last('StockCode')).show()

+-----------------------+----------------------+
|first(StockCode, false)|last(StockCode, false)|
+-----------------------+----------------------+
|                 85123A|                 22138|
+-----------------------+----------------------+



In [54]:
from pyspark.sql.functions import min, max
df.select(min('Quantity'), max('Quantity')).show()

+-------------+-------------+
|min(Quantity)|max(Quantity)|
+-------------+-------------+
|       -80995|        80995|
+-------------+-------------+



In [55]:
from pyspark.sql.functions import sum, sumDistinct
df.select(sum('Quantity')).show()

+-------------+
|sum(Quantity)|
+-------------+
|      5176450|
+-------------+



In [56]:
df.select(sumDistinct('Quantity')).show()

+----------------------+
|sum(DISTINCT Quantity)|
+----------------------+
|                 29310|
+----------------------+



In [57]:
from pyspark.sql.functions import avg, expr
df.select(
    count('Quantity').alias('total_transactions'),
    sum('Quantity').alias('total_purchases'),
    avg('Quantity').alias('avg_purchases'),
    expr('mean(Quantity)').alias('mean_purchases'))\
.selectExpr('total_purchases/total_transactions', 'avg_purchases', 'mean_purchases').show()

+--------------------------------------+----------------+----------------+
|(total_purchases / total_transactions)|   avg_purchases|  mean_purchases|
+--------------------------------------+----------------+----------------+
|                      9.55224954743324|9.55224954743324|9.55224954743324|
+--------------------------------------+----------------+----------------+



In [58]:
from pyspark.sql.functions import var_pop, stddev_pop
from pyspark.sql.functions import var_samp, stddev_samp
df.select(var_pop('Quantity'), var_samp('Quantity'),
         stddev_pop('Quantity'), stddev_samp('Quantity')).show()

+------------------+------------------+--------------------+---------------------+
| var_pop(Quantity)|var_samp(Quantity)|stddev_pop(Quantity)|stddev_samp(Quantity)|
+------------------+------------------+--------------------+---------------------+
|47559.303646609354| 47559.39140929905|  218.08095663447864|   218.08115785023486|
+------------------+------------------+--------------------+---------------------+



In [59]:
from pyspark.sql.functions import skewness, kurtosis
df.select(skewness('Quantity'), kurtosis('Quantity')).show()

+--------------------+------------------+
|  skewness(Quantity)|kurtosis(Quantity)|
+--------------------+------------------+
|-0.26407557610527843|119768.05495536518|
+--------------------+------------------+



In [60]:
from pyspark.sql.functions import corr, covar_pop, covar_samp
df.select(corr('InvoiceNo', 'Quantity'), covar_samp('InvoiceNo', 'Quantity'), covar_pop('InvoiceNo', 'Quantity')).show()

+-------------------------+-------------------------------+------------------------------+
|corr(InvoiceNo, Quantity)|covar_samp(InvoiceNo, Quantity)|covar_pop(InvoiceNo, Quantity)|
+-------------------------+-------------------------------+------------------------------+
|     4.912186085637639E-4|             1052.7280543913773|            1052.7260778752732|
+-------------------------+-------------------------------+------------------------------+



In [61]:
df.groupBy('InvoiceNo', 'CustomerId').count().show()

+---------+----------+-----+
|InvoiceNo|CustomerId|count|
+---------+----------+-----+
|   536846|     14573|   76|
|   537026|     12395|   12|
|   537883|     14437|    5|
|   538068|     17978|   12|
|   538279|     14952|    7|
|   538800|     16458|   10|
|   538942|     17346|   12|
|  C539947|     13854|    1|
|   540096|     13253|   16|
|   540530|     14755|   27|
|   541225|     14099|   19|
|   541978|     13551|    4|
|   542093|     17677|   16|
|   543188|     12567|   63|
|   543590|     17377|   19|
|  C543757|     13115|    1|
|  C544318|     12989|    1|
|   544578|     12365|    1|
|   545165|     16339|   20|
|   545289|     14732|   30|
+---------+----------+-----+
only showing top 20 rows



In [62]:
df.groupBy('InvoiceNo').agg(count('Quantity').alias('quan'), expr('count(Quantity)')).show()

+---------+----+---------------+
|InvoiceNo|quan|count(Quantity)|
+---------+----+---------------+
|   536596|   6|              6|
|   536938|  14|             14|
|   537252|   1|              1|
|   537691|  20|             20|
|   538041|   1|              1|
|   538184|  26|             26|
|   538517|  53|             53|
|   538879|  19|             19|
|   539275|   6|              6|
|   539630|  12|             12|
|   540499|  24|             24|
|   540540|  22|             22|
|  C540850|   1|              1|
|   540976|  48|             48|
|   541432|   4|              4|
|   541518| 101|            101|
|   541783|  35|             35|
|   542026|   9|              9|
|   542375|   6|              6|
|  C542604|   8|              8|
+---------+----+---------------+
only showing top 20 rows



In [63]:
df.groupBy('InvoiceNo').agg(expr('avg(Quantity)'), expr('stddev_pop(Quantity)')).show()

+---------+------------------+--------------------+
|InvoiceNo|     avg(Quantity)|stddev_pop(Quantity)|
+---------+------------------+--------------------+
|   536596|               1.5|  1.1180339887498947|
|   536938|33.142857142857146|  20.698023172885524|
|   537252|              31.0|                 0.0|
|   537691|              8.15|   5.597097462078001|
|   538041|              30.0|                 0.0|
|   538184|12.076923076923077|   8.142590198943392|
|   538517|3.0377358490566038|  2.3946659604837897|
|   538879|21.157894736842106|  11.811070444356483|
|   539275|              26.0|  12.806248474865697|
|   539630|20.333333333333332|  10.225241100118645|
|   540499|              3.75|  2.6653642652865788|
|   540540|2.1363636363636362|  1.0572457590557278|
|  C540850|              -1.0|                 0.0|
|   540976|10.520833333333334|   6.496760677872902|
|   541432|             12.25|  10.825317547305483|
|   541518| 23.10891089108911|  20.550782784878713|
|   541783|1

In [64]:
from pyspark.sql.functions import to_date
dfWithDate = df.withColumn('date', to_date(col('InvoiceDate'), 'MM/d/yyyy H:mm'))
dfWithDate.createOrReplaceTempView('dfWithDate')

In [65]:
from pyspark.sql.window import Window
from pyspark.sql.functions import desc

In [66]:
windowSpec = Window.partitionBy('CustomerId', 'date').orderBy(desc('Quantity'))\
.rowsBetween(Window.unboundedPreceding, Window.currentRow)

In [67]:
from pyspark.sql.functions import max
maxPurchaseQuantity = max(col('Quantity')).over(windowSpec)

In [68]:
from pyspark.sql.functions import dense_rank, rank
purchaseDenseRank = dense_rank().over(windowSpec)
purchaseRank = rank().over(windowSpec)

In [70]:
dfWithDate.where('CustomerId IS NULL').orderBy('CustomerId').select(col('CustomerId'), col('date'), col('Quantity'))

DataFrame[CustomerId: int, date: date, Quantity: int]

In [71]:
dfWithDate.where('CustomerId IS NOT NULL').orderBy('CustomerId')\
    .select(
        col('CustomerId'),
        col('date'),
        col('Quantity'),
        purchaseDenseRank.alias('quantityDenseRank'),
        maxPurchaseQuantity.alias('maxPurchaseQuantity')
).show()

+----------+----------+--------+-----------------+-------------------+
|CustomerId|      date|Quantity|quantityDenseRank|maxPurchaseQuantity|
+----------+----------+--------+-----------------+-------------------+
|     12346|2011-01-18|   74215|                1|              74215|
|     12346|2011-01-18|  -74215|                2|              74215|
|     12347|2010-12-07|      36|                1|                 36|
|     12347|2010-12-07|      30|                2|                 36|
|     12347|2010-12-07|      24|                3|                 36|
|     12347|2010-12-07|      12|                4|                 36|
|     12347|2010-12-07|      12|                4|                 36|
|     12347|2010-12-07|      12|                4|                 36|
|     12347|2010-12-07|      12|                4|                 36|
|     12347|2010-12-07|      12|                4|                 36|
|     12347|2010-12-07|      12|                4|                 36|
|     

In [72]:
dfWithDate.count()

541909

In [73]:
dfNoNull = dfWithDate.drop()
dfNoNull.createOrReplaceTempView('dfNoNull')

In [74]:
dfNoNull.count()

541909

In [75]:
rolledUpDF = dfNoNull.rollup('Date', 'Country').agg(sum('Quantity'))\
    .selectExpr('Date', 'Country', '`sum(Quantity)` as total_quantity')\
    .orderBy('Date')
rolledUpDF.show()

+----------+--------------+--------------+
|      Date|       Country|total_quantity|
+----------+--------------+--------------+
|      null|          null|       5176450|
|2010-12-01|       Germany|           117|
|2010-12-01|   Netherlands|            97|
|2010-12-01|        France|           449|
|2010-12-01|     Australia|           107|
|2010-12-01|          null|         26814|
|2010-12-01|          EIRE|           243|
|2010-12-01|        Norway|          1852|
|2010-12-01|United Kingdom|         23949|
|2010-12-02|       Germany|           146|
|2010-12-02|          null|         21023|
|2010-12-02|United Kingdom|         20873|
|2010-12-02|          EIRE|             4|
|2010-12-03|          null|         14830|
|2010-12-03|          EIRE|          2575|
|2010-12-03|       Germany|           170|
|2010-12-03|   Switzerland|           110|
|2010-12-03|       Belgium|           528|
|2010-12-03|      Portugal|            65|
|2010-12-03|        France|           239|
+----------

In [76]:
from pyspark.sql.functions import sum
dfNoNull.cube('Date', 'Country').agg(sum(col('Quantity')))\
    .select('Date', 'Country', 'sum(Quantity)').orderBy('Date').show()

+----+--------------------+-------------+
|Date|             Country|sum(Quantity)|
+----+--------------------+-------------+
|null|               Japan|        25218|
|null|           Australia|        83653|
|null|            Portugal|        16180|
|null|                 RSA|          352|
|null|           Hong Kong|         4769|
|null|         Unspecified|         3300|
|null|             Germany|       117448|
|null|                 USA|         1034|
|null|             Finland|        10666|
|null|                null|      5176450|
|null|           Singapore|         5234|
|null|United Arab Emirates|          982|
|null|              Cyprus|         6317|
|null|               Spain|        26824|
|null|     Channel Islands|         9479|
|null|  European Community|          497|
|null|              Norway|        19247|
|null|             Denmark|         8188|
|null|             Lebanon|          386|
|null|      Czech Republic|          592|
+----+--------------------+-------

In [82]:
# dfDateNoNull = dfNoNull.selectExpr('Date IS NOT NULL')
# dfDateNoNull.cube('Date', 'Country').agg(sum(col('Quantity')))\
#     .select('Date', 'Country', 'sum(Quantity)').orderBy('Date').show()

In [83]:
pivoted = dfWithDate.groupBy("date").pivot("Country").sum()

In [84]:
# pivoted.where("date > '2011-12-05'").select("date" ,"`USA_sum(Quantity)`").show()

AnalysisException: "cannot resolve '`USA_sum(Quantity)`' given input columns: [Cyprus_sum(CAST(Quantity AS BIGINT)), Australia_sum(UnitPrice), Canada_sum(UnitPrice), Lithuania_sum(CAST(Quantity AS BIGINT)), EIRE_sum(UnitPrice), Bahrain_sum(CAST(Quantity AS BIGINT)), Norway_sum(CAST(CustomerID AS BIGINT)), USA_sum(CAST(CustomerID AS BIGINT)), RSA_sum(CAST(Quantity AS BIGINT)), Canada_sum(CAST(CustomerID AS BIGINT)), United Kingdom_sum(CAST(CustomerID AS BIGINT)), Finland_sum(CAST(CustomerID AS BIGINT)), Israel_sum(CAST(CustomerID AS BIGINT)), Belgium_sum(CAST(Quantity AS BIGINT)), Lithuania_sum(CAST(CustomerID AS BIGINT)), Singapore_sum(CAST(CustomerID AS BIGINT)), Channel Islands_sum(UnitPrice), Belgium_sum(CAST(CustomerID AS BIGINT)), Malta_sum(CAST(Quantity AS BIGINT)), Netherlands_sum(CAST(CustomerID AS BIGINT)), Finland_sum(CAST(Quantity AS BIGINT)), Australia_sum(CAST(Quantity AS BIGINT)), Japan_sum(UnitPrice), Saudi Arabia_sum(UnitPrice), Switzerland_sum(CAST(CustomerID AS BIGINT)), United Arab Emirates_sum(CAST(CustomerID AS BIGINT)), Denmark_sum(CAST(CustomerID AS BIGINT)), Poland_sum(CAST(Quantity AS BIGINT)), Greece_sum(UnitPrice), Germany_sum(CAST(CustomerID AS BIGINT)), Netherlands_sum(UnitPrice), Portugal_sum(UnitPrice), Netherlands_sum(CAST(Quantity AS BIGINT)), Israel_sum(UnitPrice), Bahrain_sum(CAST(CustomerID AS BIGINT)), Iceland_sum(CAST(Quantity AS BIGINT)), Sweden_sum(UnitPrice), Israel_sum(CAST(Quantity AS BIGINT)), Switzerland_sum(UnitPrice), Norway_sum(UnitPrice), Hong Kong_sum(CAST(Quantity AS BIGINT)), Austria_sum(CAST(Quantity AS BIGINT)), Malta_sum(UnitPrice), United Kingdom_sum(UnitPrice), Australia_sum(CAST(CustomerID AS BIGINT)), Brazil_sum(UnitPrice), Belgium_sum(UnitPrice), France_sum(UnitPrice), Brazil_sum(CAST(CustomerID AS BIGINT)), Unspecified_sum(CAST(CustomerID AS BIGINT)), Unspecified_sum(UnitPrice), Hong Kong_sum(CAST(CustomerID AS BIGINT)), European Community_sum(UnitPrice), Spain_sum(CAST(Quantity AS BIGINT)), Japan_sum(CAST(CustomerID AS BIGINT)), Austria_sum(CAST(CustomerID AS BIGINT)), Czech Republic_sum(UnitPrice), Lithuania_sum(UnitPrice), Switzerland_sum(CAST(Quantity AS BIGINT)), Norway_sum(CAST(Quantity AS BIGINT)), Czech Republic_sum(CAST(CustomerID AS BIGINT)), Poland_sum(UnitPrice), Singapore_sum(UnitPrice), Italy_sum(CAST(CustomerID AS BIGINT)), Malta_sum(CAST(CustomerID AS BIGINT)), Spain_sum(UnitPrice), Sweden_sum(CAST(CustomerID AS BIGINT)), Poland_sum(CAST(CustomerID AS BIGINT)), Saudi Arabia_sum(CAST(Quantity AS BIGINT)), Denmark_sum(CAST(Quantity AS BIGINT)), USA_sum(CAST(Quantity AS BIGINT)), Germany_sum(CAST(Quantity AS BIGINT)), Spain_sum(CAST(CustomerID AS BIGINT)), date, Greece_sum(CAST(Quantity AS BIGINT)), United Arab Emirates_sum(UnitPrice), Lebanon_sum(UnitPrice), RSA_sum(CAST(CustomerID AS BIGINT)), Cyprus_sum(CAST(CustomerID AS BIGINT)), Saudi Arabia_sum(CAST(CustomerID AS BIGINT)), France_sum(CAST(Quantity AS BIGINT)), Hong Kong_sum(UnitPrice), RSA_sum(UnitPrice), Unspecified_sum(CAST(Quantity AS BIGINT)), Germany_sum(UnitPrice), Portugal_sum(CAST(CustomerID AS BIGINT)), European Community_sum(CAST(CustomerID AS BIGINT)), Iceland_sum(UnitPrice), Finland_sum(UnitPrice), France_sum(CAST(CustomerID AS BIGINT)), Brazil_sum(CAST(Quantity AS BIGINT)), Italy_sum(UnitPrice), Channel Islands_sum(CAST(Quantity AS BIGINT)), Denmark_sum(UnitPrice), Sweden_sum(CAST(Quantity AS BIGINT)), Portugal_sum(CAST(Quantity AS BIGINT)), Bahrain_sum(UnitPrice), EIRE_sum(CAST(Quantity AS BIGINT)), Austria_sum(UnitPrice), United Arab Emirates_sum(CAST(Quantity AS BIGINT)), Lebanon_sum(CAST(Quantity AS BIGINT)), Japan_sum(CAST(Quantity AS BIGINT)), United Kingdom_sum(CAST(Quantity AS BIGINT)), Canada_sum(CAST(Quantity AS BIGINT)), Iceland_sum(CAST(CustomerID AS BIGINT)), Czech Republic_sum(CAST(Quantity AS BIGINT)), Lebanon_sum(CAST(CustomerID AS BIGINT)), Italy_sum(CAST(Quantity AS BIGINT)), Channel Islands_sum(CAST(CustomerID AS BIGINT)), USA_sum(UnitPrice), Singapore_sum(CAST(Quantity AS BIGINT)), EIRE_sum(CAST(CustomerID AS BIGINT)), European Community_sum(CAST(Quantity AS BIGINT)), Cyprus_sum(UnitPrice), Greece_sum(CAST(CustomerID AS BIGINT))];;\n'Project [date#3281, 'USA_sum(Quantity)]\n+- Filter (cast(date#3281 as string) > 2011-12-05)\n   +- Project [date#3281, __pivot_sum(CAST(`Quantity` AS BIGINT)) AS `sum(CAST(``Quantity`` AS BIGINT))`#3832[0] AS Australia_sum(CAST(Quantity AS BIGINT))#3989L, __pivot_sum(`UnitPrice`) AS `sum(``UnitPrice``)`#3910[0] AS Australia_sum(UnitPrice)#3990, __pivot_sum(CAST(`CustomerID` AS BIGINT)) AS `sum(CAST(``CustomerID`` AS BIGINT))`#3988[0] AS Australia_sum(CAST(CustomerID AS BIGINT))#3991L, __pivot_sum(CAST(`Quantity` AS BIGINT)) AS `sum(CAST(``Quantity`` AS BIGINT))`#3832[1] AS Austria_sum(CAST(Quantity AS BIGINT))#3992L, __pivot_sum(`UnitPrice`) AS `sum(``UnitPrice``)`#3910[1] AS Austria_sum(UnitPrice)#3993, __pivot_sum(CAST(`CustomerID` AS BIGINT)) AS `sum(CAST(``CustomerID`` AS BIGINT))`#3988[1] AS Austria_sum(CAST(CustomerID AS BIGINT))#3994L, __pivot_sum(CAST(`Quantity` AS BIGINT)) AS `sum(CAST(``Quantity`` AS BIGINT))`#3832[2] AS Bahrain_sum(CAST(Quantity AS BIGINT))#3995L, __pivot_sum(`UnitPrice`) AS `sum(``UnitPrice``)`#3910[2] AS Bahrain_sum(UnitPrice)#3996, __pivot_sum(CAST(`CustomerID` AS BIGINT)) AS `sum(CAST(``CustomerID`` AS BIGINT))`#3988[2] AS Bahrain_sum(CAST(CustomerID AS BIGINT))#3997L, __pivot_sum(CAST(`Quantity` AS BIGINT)) AS `sum(CAST(``Quantity`` AS BIGINT))`#3832[3] AS Belgium_sum(CAST(Quantity AS BIGINT))#3998L, __pivot_sum(`UnitPrice`) AS `sum(``UnitPrice``)`#3910[3] AS Belgium_sum(UnitPrice)#3999, __pivot_sum(CAST(`CustomerID` AS BIGINT)) AS `sum(CAST(``CustomerID`` AS BIGINT))`#3988[3] AS Belgium_sum(CAST(CustomerID AS BIGINT))#4000L, __pivot_sum(CAST(`Quantity` AS BIGINT)) AS `sum(CAST(``Quantity`` AS BIGINT))`#3832[4] AS Brazil_sum(CAST(Quantity AS BIGINT))#4001L, __pivot_sum(`UnitPrice`) AS `sum(``UnitPrice``)`#3910[4] AS Brazil_sum(UnitPrice)#4002, __pivot_sum(CAST(`CustomerID` AS BIGINT)) AS `sum(CAST(``CustomerID`` AS BIGINT))`#3988[4] AS Brazil_sum(CAST(CustomerID AS BIGINT))#4003L, __pivot_sum(CAST(`Quantity` AS BIGINT)) AS `sum(CAST(``Quantity`` AS BIGINT))`#3832[5] AS Canada_sum(CAST(Quantity AS BIGINT))#4004L, __pivot_sum(`UnitPrice`) AS `sum(``UnitPrice``)`#3910[5] AS Canada_sum(UnitPrice)#4005, __pivot_sum(CAST(`CustomerID` AS BIGINT)) AS `sum(CAST(``CustomerID`` AS BIGINT))`#3988[5] AS Canada_sum(CAST(CustomerID AS BIGINT))#4006L, __pivot_sum(CAST(`Quantity` AS BIGINT)) AS `sum(CAST(``Quantity`` AS BIGINT))`#3832[6] AS Channel Islands_sum(CAST(Quantity AS BIGINT))#4007L, __pivot_sum(`UnitPrice`) AS `sum(``UnitPrice``)`#3910[6] AS Channel Islands_sum(UnitPrice)#4008, __pivot_sum(CAST(`CustomerID` AS BIGINT)) AS `sum(CAST(``CustomerID`` AS BIGINT))`#3988[6] AS Channel Islands_sum(CAST(CustomerID AS BIGINT))#4009L, __pivot_sum(CAST(`Quantity` AS BIGINT)) AS `sum(CAST(``Quantity`` AS BIGINT))`#3832[7] AS Cyprus_sum(CAST(Quantity AS BIGINT))#4010L, __pivot_sum(`UnitPrice`) AS `sum(``UnitPrice``)`#3910[7] AS Cyprus_sum(UnitPrice)#4011, ... 91 more fields]\n      +- Aggregate [date#3281], [date#3281, pivotfirst(Country#1659, sum(CAST(`Quantity` AS BIGINT))#3752L, Australia, Austria, Bahrain, Belgium, Brazil, Canada, Channel Islands, Cyprus, Czech Republic, Denmark, EIRE, European Community, Finland, France, Germany, Greece, Hong Kong, Iceland, Israel, Italy, Japan, Lebanon, Lithuania, Malta, Netherlands, Norway, Poland, Portugal, RSA, Saudi Arabia, Singapore, Spain, Sweden, Switzerland, USA, United Arab Emirates, United Kingdom, Unspecified, 0, 0) AS __pivot_sum(CAST(`Quantity` AS BIGINT)) AS `sum(CAST(``Quantity`` AS BIGINT))`#3832, pivotfirst(Country#1659, sum(`UnitPrice`)#3753, Australia, Austria, Bahrain, Belgium, Brazil, Canada, Channel Islands, Cyprus, Czech Republic, Denmark, EIRE, European Community, Finland, France, Germany, Greece, Hong Kong, Iceland, Israel, Italy, Japan, Lebanon, Lithuania, Malta, Netherlands, Norway, Poland, Portugal, RSA, Saudi Arabia, Singapore, Spain, Sweden, Switzerland, USA, United Arab Emirates, United Kingdom, Unspecified, 0, 0) AS __pivot_sum(`UnitPrice`) AS `sum(``UnitPrice``)`#3910, pivotfirst(Country#1659, sum(CAST(`CustomerID` AS BIGINT))#3754L, Australia, Austria, Bahrain, Belgium, Brazil, Canada, Channel Islands, Cyprus, Czech Republic, Denmark, EIRE, European Community, Finland, France, Germany, Greece, Hong Kong, Iceland, Israel, Italy, Japan, Lebanon, Lithuania, Malta, Netherlands, Norway, Poland, Portugal, RSA, Saudi Arabia, Singapore, Spain, Sweden, Switzerland, USA, United Arab Emirates, United Kingdom, Unspecified, 0, 0) AS __pivot_sum(CAST(`CustomerID` AS BIGINT)) AS `sum(CAST(``CustomerID`` AS BIGINT))`#3988]\n         +- Aggregate [date#3281, Country#1659], [date#3281, Country#1659, sum(cast(Quantity#1655 as bigint)) AS sum(CAST(`Quantity` AS BIGINT))#3752L, sum(UnitPrice#1657) AS sum(`UnitPrice`)#3753, sum(cast(CustomerID#1658 as bigint)) AS sum(CAST(`CustomerID` AS BIGINT))#3754L]\n            +- Project [InvoiceNo#1652, StockCode#1653, Description#1654, Quantity#1655, InvoiceDate#1656, UnitPrice#1657, CustomerID#1658, Country#1659, to_date('InvoiceDate, Some(MM/d/yyyy H:mm)) AS date#3281]\n               +- Repartition 5, false\n                  +- Relation[InvoiceNo#1652,StockCode#1653,Description#1654,Quantity#1655,InvoiceDate#1656,UnitPrice#1657,CustomerID#1658,Country#1659] csv\n"