# Setting Up Spark
----------------------


In [10]:
# Initialise findspark() and pyspark Context, Session and Configuration
#import findspark
#findspark.init()
import secrets
import pyspark
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import round
import pyspark.sql.functions as Func 
from pyspark.sql.types import *
# Create Pyspark Configuration and App Name and cluster URL
spark = SparkSession.builder.config("spark.sql.warehouse.dir", "file:///C:/Users/vprasad/spark-warehouse").appName("Categorical Analysis").getOrCreate()
sc = spark.sparkContext
# Read the data from the csv file
data = spark.read.csv("./data.csv", inferSchema=True, multiLine=True, header=True)
data.show(10, False)

# Create Spark Session and Context

In [11]:
# Create Pyspark Configuration and App Name and cluster URL
spark = SparkSession.builder.config("spark.sql.warehouse.dir", "file:///C:/Users/vprasad/spark-warehouse").appName("Categorical Analysis").getOrCreate()
sc = spark.sparkContext

# Read the CSV and load the data

In [12]:
# Read the data from the csv file
data = spark.read.csv("./data.csv", inferSchema=True, multiLine=True, header=True)
data.show(10, False)

+---------+---------+-----------------------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description                        |Quantity|InvoiceDate   |UnitPrice|CustomerID|Country       |
+---------+---------+-----------------------------------+--------+--------------+---------+----------+--------------+
|536365   |85123A   |WHITE HANGING HEART T-LIGHT HOLDER |6       |12/1/2010 8:26|2.55     |17850     |United Kingdom|
|536365   |71053    |WHITE METAL LANTERN                |6       |12/1/2010 8:26|3.39     |17850     |United Kingdom|
|536365   |84406B   |CREAM CUPID HEARTS COAT HANGER     |8       |12/1/2010 8:26|2.75     |17850     |United Kingdom|
|536365   |84029G   |KNITTED UNION FLAG HOT WATER BOTTLE|6       |12/1/2010 8:26|3.39     |17850     |United Kingdom|
|536365   |84029E   |RED WOOLLY HOTTIE WHITE HEART.     |6       |12/1/2010 8:26|3.39     |17850     |United Kingdom|
|536365   |22752    |SET 7 BABUSHKA NESTING BOXES       

# Create *transaction_item_mft* and *card_dim_c* dataframes
----------------
* `card_dim_c` 
    - `cardid: integer (nullable = true)`
    - `Country: string (nullable = true)`
* `transaction_item_mft`
    - `TRANSACTION_ID: string (nullable = true)` 
    - `Product_Code: string (nullable = true)` 
    - `Product_desc: string (nullable = true)` 
    - `date_id: string (nullable = true)`
    - `net_spend_amt: double (nullable = true)`
    - `item_qty: integer (nullable = true)`
    - `CustomerID: integer (nullable = true)`
    - `Country: string (nullable = true)`

In [13]:
# Rename and split the DF into 2 tables
card_dim_c = data.selectExpr(['CustomerID as cardid', 'Country']).distinct() # Getting only the distinct customers

# Removing the UnitPrice Column and calculating the net_aspend_amt
transaction_item_mft = data.selectExpr(
    ['InvoiceNo as TRANSACTION_ID','StockCode as Product_Code',
     'Description as Product_desc', 'InvoiceDate as date_id',
     'round(Quantity * UnitPrice, 2) as net_spend_amt',
     'Quantity as item_qty', 'CustomerID',
     'Country']).distinct()

# A function to convert any Column of String Objects into Date Objects
def convertColToDate(df, colName, currentDateFormat):
    return df.withColumn(colName,Func.to_date(Func.unix_timestamp(Func.col(colName), currentDateFormat).cast("timestamp")) )

# Create the new column and apply the UDF
card_dim_c = card_dim_c.withColumn("card_code",Func.lit(udf_add_card_code()))
card_dim_c.show(10)

# Helper Functions
---------------------------
* `add_card_code()` to create a new column with Unique `card_code`.
* `convertColToDate()` to convert a string column into `Date Object`.
* `spendSummaryByCustomer()` to find the summary of Amount spent by a customer within a **time range**

In [14]:
# Creating a function to generate Unique Identifiers of length 6
LENGTH = 6 # Should be an even number
def add_card_code():
    # Generate the token and append it to "C0"
    return "C0"+secrets.token_hex(int(LENGTH/2))

# Convert the function to a Spark UDF
udf_add_card_code = Func.udf(add_card_code, StringType())

In [15]:
# A function to convert any Column of String Objects into Date Objects
def convertColToDate(df, colName, currentDateFormat):
    return df.withColumn(colName,Func.to_date(Func.unix_timestamp(Func.col(colName), currentDateFormat).cast("timestamp")) )

In [16]:
# A function to find the Amount spend by all the customers b/w a date range
def spendSummaryByCustomer(initial_date, final_date):
    if(initial_date == final_date):
        return transaction_item_mft.filter(Func.col('date_id') == Func.lit(initial_date)).groupBy('CustomerID').agg(Func.round(Func.sum('net_spend_amt'), 2).alias('net_spend_amt'))
    else:
        return transaction_item_mft.filter((Func.col("date_id") >= Func.lit(initial_date)) & (Func.col("date_id") <= Func.lit(final_date))).groupBy('CustomerID').agg(Func.round(Func.sum('net_spend_amt'), 2).alias('net_spend_amt'))
    

# Add a new column in *card_dim_c* with Unique Identifiers as *card_code*
----------------------------------------
* create the column
* validate for any duplicate entry

In [13]:
# Create the new column and apply the UDF
card_dim_c = card_dim_c.withColumn("card_code",Func.lit(udf_add_card_code()))
card_dim_c.show(10)

+------+--------------+---------+
|cardid|       Country|card_code|
+------+--------------+---------+
| 16143|United Kingdom| C0d3c6c9|
| 13983|United Kingdom| C0335b34|
| 15854|United Kingdom| C02104f7|
| 17634|United Kingdom| C0ba921c|
| 12528|       Germany| C0488963|
| 14555|United Kingdom| C0693ecd|
| 13751|United Kingdom| C0588176|
| 12933|United Kingdom| C0ea6d5f|
| 17747|United Kingdom| C093dacb|
| 15782|United Kingdom| C09f918f|
+------+--------------+---------+
only showing top 10 rows



root
 |-- cardid: integer (nullable = true)
 |-- Country: string (nullable = true)
 |-- card_code: string (nullable = true)

No Duplicates Found!


In [None]:
# Rename and split the DF into 2 tables
card_dim_c = data.selectExpr(['CustomerID as cardid', 'Country']).distinct() # Getting only the distinct customers

# Removing the UnitPrice Column and calculating the net_aspend_amt
transaction_item_mft = data.selectExpr(
    ['InvoiceNo as TRANSACTION_ID','StockCode as Product_Code',
     'Description as Product_desc', 'InvoiceDate as date_id',
     'round(Quantity * UnitPrice, 2) as net_spend_amt',
     'Quantity as item_qty', 'CustomerID',
     'Country']).distinct()

# A function to convert any Column of String Objects into Date Objects
def convertColToDate(df, colName, currentDateFormat):
    return df.withColumn(colName,Func.to_date(Func.unix_timestamp(Func.col(colName), currentDateFormat).cast("timestamp")) )

# Create the new column and apply the UDF
card_dim_c = card_dim_c.withColumn("card_code",Func.lit(udf_add_card_code()))
card_dim_c.show(10)

card_dim_c.printSchema()
# Verify that there are no Duplicates in the Column
print("No Duplicates Found!") if card_dim_c.select('card_code').distinct().count() == card_dim_c.select('card_code').count() else print("Duplicates Found!")

# Define the schema for the DF
fields = [
    StructField('store_id', IntegerType(), False),
    StructField('store_code', StringType() ,False)
]

schema = StructType(fields)
# Create the DF from the list
store_dim_c = spark.createDataFrame(store_data, schema)

store_dim_c.show()

# Creating *store_dim_c* Table
----------------
* Create 13 `store_id`'s and Unique `store_code`
* Create a DF from the Dict()

In [15]:

# A function to convert any Column of String Objects into Date Objects
def convertColToDate(df, colName, currentDateFormat):
    return df.withColumn(colName,Func.to_date(Func.unix_timestamp(Func.col(colName), currentDateFormat).cast("timestamp")) )

# Create the new column and apply the UDF
card_dim_c = card_dim_c.withColumn("card_code",Func.lit(udf_add_card_code()))
card_dim_c.show(10)

card_dim_c.printSchema()
# Verify that there are no Duplicates in the Column
print("No Duplicates Found!") if card_dim_c.select('card_code').distinct().count() == card_dim_c.select('card_code').count() else print("Duplicates Found!")


# Create a static Dict for the store_dim_c table and generate unique store_codes
store_data = [ {'store_id': x, 'store_code': str("S000"+secrets.token_hex(2))} for x in range(1, 14)]
store_data



[{'store_id': 1, 'store_code': 'S000efcb'},
 {'store_id': 2, 'store_code': 'S00014d3'},
 {'store_id': 3, 'store_code': 'S000afdf'},
 {'store_id': 4, 'store_code': 'S00087e5'},
 {'store_id': 5, 'store_code': 'S000a08f'},
 {'store_id': 6, 'store_code': 'S0007026'},
 {'store_id': 7, 'store_code': 'S000470b'},
 {'store_id': 8, 'store_code': 'S000f0e0'},
 {'store_id': 9, 'store_code': 'S000b874'},
 {'store_id': 10, 'store_code': 'S00008dc'},
 {'store_id': 11, 'store_code': 'S000da5b'},
 {'store_id': 12, 'store_code': 'S000f5e5'},
 {'store_id': 13, 'store_code': 'S0000f66'}]

In [16]:
# Define the schema for the DF
fields = [
    StructField('store_id', IntegerType(), False),
    StructField('store_code', StringType() ,False)
]
schema = StructType(fields)
# Create the DF from the list
store_dim_c = spark.createDataFrame(store_data, schema)

store_dim_c.show()

+--------+----------+
|store_id|store_code|
+--------+----------+
|       1|  S000efcb|
|       2|  S00014d3|
|       3|  S000afdf|
|       4|  S00087e5|
|       5|  S000a08f|
|       6|  S0007026|
|       7|  S000470b|
|       8|  S000f0e0|
|       9|  S000b874|
|      10|  S00008dc|
|      11|  S000da5b|
|      12|  S000f5e5|
|      13|  S0000f66|
+--------+----------+



# Converting the string date column into Date Object
------------------------
* Remove the HH:MM component from the string
* Convert it into Date Object

In [17]:
# split the entries of the column by " "
splits = Func.split(transaction_item_mft['date_id'], ' ')
# removing the HH:MM from the timestamp
transaction_item_mft = transaction_item_mft.withColumn('date_id', splits.getItem(0))
transaction_item_mft.select('date_id').show(1)

+---------+
|  date_id|
+---------+
|12/1/2010|
+---------+
only showing top 1 row



In [18]:
# Apply the new Date Format
transaction_item_mft = convertColToDate(transaction_item_mft, 'date_id', 'MM/dd/yyyy')

# Drop the rows with null values and duplicates
---------------------

In [21]:
# Drop rows with null values in CustomerID
transaction_item_mft = transaction_item_mft.dropna(subset=['CustomerID'])

# Drop the rows with TRANSACTION_ID Strating with 'C'
transaction_item_mft = transaction_item_mft.filter(~Func.col('TRANSACTION_ID').startswith('C'))

transaction_item_mft.show(20, False)

+--------------+------------+----------------------------------+---------------+-------------+--------+----------+--------------+
|TRANSACTION_ID|Product_Code|Product_desc                      |date_id        |net_spend_amt|item_qty|CustomerID|Country       |
+--------------+------------+----------------------------------+---------------+-------------+--------+----------+--------------+
|536401        |22100       |SKULLS SQUARE TISSUE BOX          |12/1/2010 11:21|2.5          |2       |15862     |United Kingdom|
|536464        |85095       |THREE CANVAS LUGGAGE TAGS         |12/1/2010 12:23|1.95         |1       |17968     |United Kingdom|
|536527        |20712       |JUMBO BAG WOODLAND ANIMALS        |12/1/2010 13:04|19.5         |10      |12662     |Germany       |
|536569        |22818       |CARD CHRISTMAS VILLAGE            |12/1/2010 15:35|5.04         |12      |16274     |United Kingdom|
|536578        |22866       |HAND WARMER SCOTTY DOG DESIGN     |12/1/2010 16:15|50.4      

+----------------+----+----+-----------+
|            date|week|year|fis_week_id|
+----------------+----+----+-----------+
| 1/31/2011 12:21|null|null|       null|
|  2/1/2011 12:27|null|null|       null|
| 5/12/2011 16:29|null|null|       null|
|10/21/2011 14:23|null|null|       null|
| 12/4/2011 12:52|null|null|       null|
| 12/1/2010 11:45|null|null|       null|
|  7/7/2011 18:29|null|null|       null|
|  8/3/2011 12:20|null|null|       null|
| 9/20/2011 10:06|null|null|       null|
|11/11/2011 16:58|null|null|       null|
| 12/2/2011 11:16|null|null|       null|
| 12/5/2011 13:10|null|null|       null|
|  1/5/2011 15:30|null|null|       null|
|  6/3/2011 12:17|null|null|       null|
|10/12/2011 15:27|null|null|       null|
|11/27/2011 11:15|null|null|       null|
| 12/6/2010 11:01|null|null|       null|
|   2/1/2011 8:31|null|null|       null|
| 4/28/2011 11:38|null|null|       null|
| 6/24/2011 14:16|null|null|       null|
+----------------+----+----+-----------+
only showing top

#  Create a new column named *fis_week_id* in the *transaction_item_mft* DF

In [21]:
#create a new column named fis_week in transaction_item_mft table
# transaction_item_mft = transaction_item_mft.withColumn("cycle_list", Func.collect_set("fis_week_id").over(Window.partitionBy("CustomerID")))
#transaction_item_mft.printSchema()

In [23]:
transaction_item_mft.show()

+--------------+------------+--------------------+----------+-------------+--------+----------+--------------+-----------+
|TRANSACTION_ID|Product_Code|        Product_desc|   date_id|net_spend_amt|item_qty|CustomerID|       Country|fis_week_id|
+--------------+------------+--------------------+----------+-------------+--------+----------+--------------+-----------+
|        536401|       22100|SKULLS SQUARE TIS...|2010-12-01|          2.5|       2|     15862|United Kingdom|     492010|
|        536464|       85095|THREE CANVAS LUGG...|2010-12-01|         1.95|       1|     17968|United Kingdom|     492010|
|        536527|       20712|JUMBO BAG WOODLAN...|2010-12-01|         19.5|      10|     12662|       Germany|     492010|
|        536569|       22818|CARD CHRISTMAS VI...|2010-12-01|         5.04|      12|     16274|United Kingdom|     492010|
|        536578|       22866|HAND WARMER SCOTT...|2010-12-01|         50.4|      24|     17690|United Kingdom|     492010|
|        536579|

# Create customer summary at customer and fis_week level

In [22]:
x.show(20, False)

+----------+-----------+-----------------+-----+-------+-------------------------------------------------------+
|CustomerID|fis_week_id|transaction_count|qty  |spend  |cycle_list                                             |
+----------+-----------+-----------------+-----+-------+-------------------------------------------------------+
|12346     |42011      |1                |74215|77183.6|[42011]                                                |
|12347     |242011     |18               |196  |382.52 |[502010, 322011, 502011, 152011, 52011, 242011, 452011]|
|12347     |502010     |31               |319  |711.79 |[502010, 322011, 502011, 152011, 52011, 242011, 452011]|
|12347     |502011     |11               |192  |224.82 |[502010, 322011, 502011, 152011, 52011, 242011, 452011]|
|12347     |152011     |24               |483  |636.25 |[502010, 322011, 502011, 152011, 52011, 242011, 452011]|
|12347     |322011     |22               |277  |584.91 |[502010, 322011, 502011, 152011, 52011, 

In [40]:
#x.filter(Func.col('CustomerID')==12347).select('spend').show()

+-------+
|  spend|
+-------+
| 224.82|
| 636.25|
| 382.52|
| 584.91|
| 711.79|
| 475.39|
|1294.32|
+-------+



In [22]:
import pyspark.sql.window as W
import pyspark.sql.types as T
#(2*(RowNumber-1)/Cnt)+1
_window = W.Window().partitionBy(
        'CustomerID').orderBy('spend')
ntile = x.withColumn('ntile', Func.ntile(2).over(_window))
ntile.show(20,False)

+----------+-----------+-----------------+-----+-------+-------------------------------------------------------+-----+
|CustomerID|fis_week_id|transaction_count|qty  |spend  |cycle_list                                             |ntile|
+----------+-----------+-----------------+-----+-------+-------------------------------------------------------+-----+
|12346     |42011      |1                |74215|77183.6|[42011]                                                |1    |
|12347     |502011     |11               |192  |224.82 |[502010, 322011, 502011, 152011, 52011, 242011, 452011]|1    |
|12347     |242011     |18               |196  |382.52 |[502010, 322011, 502011, 152011, 52011, 242011, 452011]|1    |
|12347     |52011      |29               |315  |475.39 |[502010, 322011, 502011, 152011, 52011, 242011, 452011]|1    |
|12347     |322011     |22               |277  |584.91 |[502010, 322011, 502011, 152011, 52011, 242011, 452011]|1    |
|12347     |152011     |24               |483  |

In [24]:
partition_column='CustomerID'
calculation_column ='spend'
ntile.groupBy(partition_column, 'ntile').agg(
        Func.count(Func.col(calculation_column)).alias('count'),
        Func.max(Func.col(calculation_column)).alias('max'),
        Func.min(Func.col(calculation_column)).alias('min')).show(20,False)

+----------+-----+-----+-------+-------+
|CustomerID|ntile|count|max    |min    |
+----------+-----+-----+-------+-------+
|12346     |1    |1    |77183.6|77183.6|
|12347     |1    |4    |584.91 |224.82 |
|12347     |2    |3    |1294.32|636.25 |
|12348     |1    |2    |310.0  |227.44 |
|12348     |2    |2    |892.8  |367.0  |
|12349     |1    |1    |1757.55|1757.55|
|12350     |1    |1    |334.4  |334.4  |
|12352     |1    |4    |296.5  |120.33 |
|12352     |2    |3    |984.65 |311.73 |
|12353     |1    |1    |89.0   |89.0   |
|12354     |1    |1    |1079.4 |1079.4 |
|12355     |1    |1    |459.4  |459.4  |
|12356     |1    |2    |481.46 |58.35  |
|12356     |2    |1    |2271.62|2271.62|
|12357     |1    |1    |6207.67|6207.67|
|12358     |1    |1    |484.86 |484.86 |
|12358     |2    |1    |683.2  |683.2  |
|12359     |1    |2    |1109.32|547.5  |
|12359     |2    |2    |2814.3 |1838.91|
|12360     |1    |2    |1043.78|534.7  |
+----------+-----+-----+-------+-------+
only showing top

In [26]:
median_data =ntile.groupBy(partition_column, 'ntile').agg(
        Func.count(Func.col(calculation_column)).alias('count'),
        Func.max(Func.col(calculation_column)).alias('max'),
        Func.min(Func.col(calculation_column)).alias('min'))

median_data = median_data.groupBy(partition_column).agg(
        Func.min('max').alias('1st_tile'),
        Func.max('min').alias('2nd_tile'),
        Func.sum('count').alias('count'))
median_data.show(20,False)

+----------+--------+--------+-----+
|CustomerID|1st_tile|2nd_tile|count|
+----------+--------+--------+-----+
|12346     |77183.6 |77183.6 |1    |
|12347     |584.91  |636.25  |7    |
|12348     |310.0   |367.0   |4    |
|12349     |1757.55 |1757.55 |1    |
|12350     |334.4   |334.4   |1    |
|12352     |296.5   |311.73  |7    |
|12353     |89.0    |89.0    |1    |
|12354     |1079.4  |1079.4  |1    |
|12355     |459.4   |459.4   |1    |
|12356     |481.46  |2271.62 |3    |
|12357     |6207.67 |6207.67 |1    |
|12358     |484.86  |683.2   |2    |
|12359     |1109.32 |1838.91 |4    |
|12360     |1043.78 |1083.58 |3    |
|12361     |189.9   |189.9   |1    |
|12362     |479.1   |495.24  |9    |
|12363     |252.9   |299.1   |2    |
|12364     |299.06  |310.32  |4    |
|12365     |641.38  |641.38  |1    |
|12367     |168.9   |168.9   |1    |
+----------+--------+--------+-----+
only showing top 20 rows



In [38]:
median(x,'CustomerID','spend').show(10,False)

+----------+---------+
|CustomerID|med_spend|
+----------+---------+
|12346     |77183.6  |
|12347     |584.91   |
|12348     |338.5    |
|12349     |1757.55  |
|12350     |334.4    |
|12352     |296.5    |
|12353     |89.0     |
|12354     |1079.4   |
|12355     |459.4    |
|12356     |481.46   |
+----------+---------+
only showing top 10 rows



In [24]:
get_cust_summary(x).show()

+----------+--------------------+---------+---------+---------+-----------+-------+-------+-------+----------------+----------------+---------+-------------+---------+
|CustomerID|          cycle_list|avg_spend|max_spend|min_spend|total_spend|avg_qty|max_qty|min_qty|distinct_periods|max_consec_weeks|total_qty|weeks_shopped|avg_visit|
+----------+--------------------+---------+---------+---------+-----------+-------+-------+-------+----------------+----------------+---------+-------------+---------+
|     12346|             [42011]|  77183.6|  77183.6|  77183.6|    77183.6|74215.0|  74215|  74215|               1|               1|    74215|            1|        1|
|     12347|[502010, 322011, ...|   615.71|  1294.32|   224.82|     4310.0| 351.14|    676|    192|               6|               2|     2458|            7|       26|
|     12348|[402011, 52011, 1...|   449.31|    892.8|   227.44|    1797.24| 585.25|   1254|    217|               4|               1|     2341|            4|   

In [6]:
# Initialise findspark() and pyspark Context, Session and Configuration
#import findspark
#findspark.init()
import secrets
import pyspark
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import round
import pyspark.sql.functions as Func
import pyspark.sql.types as T
import pyspark.sql.window as W
# Create Pyspark Configuration and App Name and cluster URL
spark = SparkSession.builder.config("spark.sql.warehouse.dir", "file:///C:/Users/vprasad/spark-warehouse").appName("Categorical Analysis").getOrCreate()
sc = spark.sparkContext
# Read the data from the csv file
data = spark.read.csv("./data.csv", inferSchema=True, multiLine=True, header=True)

shabitCodes = {
        'PR': 'Premium',
        'VL': 'Valuable',
        'PO': 'Potential',
        'UN': 'Uncommitted',
        'LP': 'Lapsing',
        'GO': 'GoneAway',
        '': 'Missing'}

shab_lookup = {'11': 'PR', '12': 'PR', '13': 'PR',
                            '14': 'VL', '15': 'VL', '16': 'PO', '19': 'LP',
                            '21': 'VL', '22': 'VL', '23': 'VL',
                            '24': 'PO', '25': 'PO', '26': 'UN', '29': 'LP',
                            '31': 'PO', '32': 'PO', '33': 'PO',
                            '34': 'UN', '35': 'UN', '36': 'UN', '39': 'LP'}

spend_lookup = {'1': '1.High', '2': '2.Medium', '3': '3.Low'}

visit_pattern_lookup = {'1': '1.Daily', '2': '2.Twice Weekly',
                             '3': '3.Weekly', '4': '4.Stop Start',
                             '5': '5.Now & Then', '6': '6.Hardly Ever',
                             '9': '7.Lapsing'}


In [7]:
def get_loyalty(transactions):
    customer_summary = customer_metrics(transactions)
    visit_pattern_udf = Func.udf(flag_visit_pattern1, T.StringType())
    spend_level_udf = Func.udf(flag_spend, T.StringType())
    flagged_summary = customer_summary.withColumn(
            'visit_pattern',
            visit_pattern_udf(
                              Func.col('weeks_shopped'),
                              Func.col('max_consec_weeks'),
                              Func.col('avg_visit'),
                              Func.col('distinct_periods')))
        # assign spend flag
    flagged_summary = flagged_summary.withColumn(
        'spend_flag',
        spend_level_udf(Func.col('med_spend'),
                        Func.lit(band_high),
                        Func.lit(band_low)))
    def higher_level_cat(visit_pattern, spend_flag):
                category = str(spend_flag) + str(visit_pattern)
                return category
    higher_level_udf = Func.udf(higher_level_cat, T.StringType())

    def lookup_shab(val):
        return shab_lookup[val]
    lookup_shab_udf = Func.udf(lookup_shab, T.StringType())

    def lookup_spend(val):
        return spend_lookup[val]
    lookup_spend_udf = Func.udf(lookup_spend, T.StringType())

    def lookup_visit_pattern(val):
        return visit_pattern_lookup[val]
    lookup_visit_pattern_udf = Func.udf(lookup_visit_pattern, T.StringType())

    flagged_summary = flagged_summary.withColumn(
        'higher_level_cat',
        higher_level_udf(Func.col('visit_pattern'),
                         Func.col('spend_flag')))

    flagged_summary = flagged_summary.withColumn(
        'shabit',
        lookup_shab_udf(Func.col('higher_level_cat'))).withColumn(
            'spend_desc',
            lookup_spend_udf(Func.col('spend_flag'))).withColumn(
                'visit_desc',
                lookup_visit_pattern_udf(Func.col('visit_pattern')))

    return flagged_summary

In [10]:
#suppose a dataframe

def distinct_periods(weeks):
        """ Retuns distinct period shopped"""
        weeks.sort()
        differences = [j - i for i, j in zip(weeks[:-1], weeks[1:])]
        numperiods = 1
        for k in differences:
            if k > 1:
                numperiods += 1
        return numperiods
distinct_periods_udf = Func.udf(distinct_periods, T.IntegerType())

def max_consec_weeks(weeks):
        """ Retruns max consecutive weeks shopped"""
        weeks.sort()
        differences = [j - i for i, j in zip(weeks[:-1], weeks[1:])]
        max_weeks = 1
        tempweeks = 1
        for k in differences:
            if k == 1:
                tempweeks += 1
            elif k > 1:
                tempweeks = 1
            if tempweeks > max_weeks:
                max_weeks = tempweeks
        return max_weeks
max_consec_weeks_udf = Func.udf(max_consec_weeks, T.IntegerType())

def median(data_frame, partition_column, calculation_column):
    """Returns median accross a partition"""

    _window = W.Window().partitionBy(
        partition_column).orderBy(calculation_column)
    
    ntile = data_frame.withColumn('ntile', Func.ntile(2).over(_window))

    median_data = ntile.groupBy(partition_column, 'ntile').agg(
        Func.count(Func.col(calculation_column)).alias('count'),
        Func.max(Func.col(calculation_column)).alias('max'),
        Func.min(Func.col(calculation_column)).alias('min'))

    median_data = median_data.groupBy(partition_column).agg(
        Func.min('max').alias('1st_tile'),
        Func.max('min').alias('2nd_tile'),
        Func.sum('count').alias('count'))

    def med(tile_1, tile_2, count):
        if count % 2 != 0:
            return float(tile_1)
        else:
            return (float(tile_1) + float(tile_2)) / 2.0

    median_udf = Func.udf(med, T.FloatType())

    median_data = median_data.select(
            partition_column,
            median_udf(Func.col('1st_tile'),
                       Func.col('2nd_tile'),
                       Func.col('count')).alias('med_spend'))

    return median_data

def flag_visit_pattern1(weeks_shopped,
                            max_consec_weeks, avg_visit, distinct_periods):
        """ Assign Flag according to visit pattern """

        if (weeks_shopped >= 7) or ((weeks_shopped == 6) and (max_consec_weeks == 6)):

            if avg_visit < 2:
                return '3'
            elif 2 <= avg_visit < 3:
                return '2'
            elif avg_visit >= 3:
                return '1'

        elif (weeks_shopped >= 4):
            return '5'

        elif 1 <= weeks_shopped <= 3:
            return '6'

        else:
            return '99'
visit_pattern_udf = Func.udf(flag_visit_pattern1,T.StringType())

def flag_spend(med_spend, band_high, band_low):
        """ Assign Flag according to spend """

        if med_spend < band_low:
            return '3'

        if band_low <= med_spend <= band_high:
            return '2'

        if med_spend > band_high:
            return '1'

In [11]:
def customer_metrics(df):

    cust_week_summary= df.groupBy('CustomerID', 'fis_week_id').agg(
        Func.count('TRANSACTION_ID').alias('transaction_count'),
        Func.sum('item_qty').alias('qty'),
        Func.round(Func.sum('net_spend_amt'), 2).alias('spend'),
        Func.collect_set("fis_week_id").over(Window.partitionBy(
            "CustomerID")).alias("cycle_list")).orderBy('CustomerID')

    cust_summary= cust_week_summary.groupBy('CustomerID', 'cycle_list').agg(
        Func.round(Func.avg('spend'), 2).alias('avg_spend'),
        Func.max('spend').alias('max_spend'),
        Func.min('spend').alias('min_spend'),
        Func.round(Func.sum('spend'), 2).alias('total_spend'),
        Func.round(Func.avg('qty'), 2).alias('avg_qty'),
        Func.max('qty').alias('max_qty'),
        Func.min('qty').alias('min_qty'),
        distinct_periods_udf('cycle_list').alias('distinct_periods'),
        max_consec_weeks_udf('cycle_list').alias('max_consec_weeks'),
        Func.sum('qty').alias('total_qty'),
        Func.count('fis_week_id').alias('weeks_shopped'),
        Func.round(Func.avg('transaction_count')).cast(
            IntegerType()).alias('avg_visit'))
    
    customer_summary = cust_summary.withColumn(
        'distinct_periods',
        distinct_periods_udf(F.col('cycle_list'))).withColumn(
            'max_consec_weeks',
            max_consec_weeks_udf(F.col('cycle_list')))

    # determine median spend
    median_spend = median(
        data_frame=cust_week_summary)

    customer_summary = customer_summary.join(
        median_spend, on='CustomerID', how='left')

    return customer_summary

In [12]:
data.show()

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/2010 8:26|     7.65|     17850|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|12/1/2010 8:26|     4.

In [28]:
import time
import datetime

def recency(fis_week_id,offeset):
    final_date=fis_week_id-lapsing_day
    
    print(final_date)

In [29]:
recency(201901,7)

201894


In [57]:

calander = transaction_item_mft.select(
    Func.col("date_id").alias("date"),
    Func.date_format(Func.to_date("date_id", "YYYY-MM-dd"), "w").alias("week"),
    Func.year("date_id").alias("year"),
    (Func.year("date_id").cast(StringType())+Func.date_format("date_id", "w").cast(StringType())).alias("fis_week")
).distinct()
calander.show()

+----------+----+----+--------+
|      date|week|year|fis_week|
+----------+----+----+--------+
|2011-06-17|  25|2011|  2036.0|
|2011-10-05|  41|2011|  2052.0|
|2011-01-24|   5|2011|  2016.0|
|2011-04-19|  17|2011|  2028.0|
|2011-10-02|  41|2011|  2052.0|
|2011-01-21|   4|2011|  2015.0|
|2011-04-18|  17|2011|  2028.0|
|2011-05-24|  22|2011|  2033.0|
|2011-10-06|  41|2011|  2052.0|
|2011-01-05|   2|2011|  2013.0|
|2011-11-25|  48|2011|  2059.0|
|2011-03-11|  11|2011|  2022.0|
|2011-10-21|  43|2011|  2054.0|
|2011-02-03|   6|2011|  2017.0|
|2011-10-07|  41|2011|  2052.0|
|2011-08-15|  34|2011|  2045.0|
|2011-06-29|  27|2011|  2038.0|
|2011-09-01|  36|2011|  2047.0|
|2011-06-24|  26|2011|  2037.0|
|2011-07-31|  32|2011|  2043.0|
+----------+----+----+--------+
only showing top 20 rows



In [53]:
calander.printSchema()

root
 |-- date: date (nullable = true)
 |-- week: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- fis_week: double (nullable = true)



In [15]:
from pyspark.sql.functions import *
dd = spark.read.csv('D:\\POC\\calendar.csv',header=True)
df = dd.drop(Func.col('_c0')).drop('fis_week_id')
df.withColumn('fis_week', concat(col('year'),col('week'))).show()

+----------+----+----+--------+
|      date|week|year|fis_week|
+----------+----+----+--------+
|2011-02-10|   7|2011|   20117|
|2011-04-04|  15|2011|  201115|
|2011-05-06|  19|2011|  201119|
|2011-03-10|  11|2011|  201111|
|2011-05-27|  22|2011|  201122|
|2011-10-07|  41|2011|  201141|
|2011-10-02|  41|2011|  201141|
|2011-10-09|  42|2011|  201142|
|2011-08-18|  34|2011|  201134|
|2011-08-22|  35|2011|  201135|
|2011-11-02|  45|2011|  201145|
|2010-12-05|  50|2010|  201050|
|2011-04-28|  18|2011|  201118|
|2011-07-15|  29|2011|  201129|
|2011-01-17|   4|2011|   20114|
|2011-11-04|  45|2011|  201145|
|2011-02-27|  10|2011|  201110|
|2011-07-29|  31|2011|  201131|
|2011-08-14|  34|2011|  201134|
|2011-04-13|  16|2011|  201116|
+----------+----+----+--------+
only showing top 20 rows



In [9]:
query = """select date from calender where fis_week_id = {fis_week_id} """.format(fis_week_id=fis_week_id)


39

In [23]:
def adding_extra_zero(col):
    if len(col)<2:
        return "0"+col
    else:
        return col

In [24]:
dd = udf(adding_extra_zero,StringType())

In [25]:
df.withColumn('newweek',dd(col('week'))).show()

+----------+----+----+-------+
|      date|week|year|newweek|
+----------+----+----+-------+
|2011-02-10|   7|2011|     07|
|2011-04-04|  15|2011|     15|
|2011-05-06|  19|2011|     19|
|2011-03-10|  11|2011|     11|
|2011-05-27|  22|2011|     22|
|2011-10-07|  41|2011|     41|
|2011-10-02|  41|2011|     41|
|2011-10-09|  42|2011|     42|
|2011-08-18|  34|2011|     34|
|2011-08-22|  35|2011|     35|
|2011-11-02|  45|2011|     45|
|2010-12-05|  50|2010|     50|
|2011-04-28|  18|2011|     18|
|2011-07-15|  29|2011|     29|
|2011-01-17|   4|2011|     04|
|2011-11-04|  45|2011|     45|
|2011-02-27|  10|2011|     10|
|2011-07-29|  31|2011|     31|
|2011-08-14|  34|2011|     34|
|2011-04-13|  16|2011|     16|
+----------+----+----+-------+
only showing top 20 rows

