# Databricks notebook source

## Spark project : Marketing campaign for members with loyalty score

You will be doing :
* PySpark to read csv file with delimiter as tab
* Wrangle and derive features from existing columns
* Write dataframe into Delta Lake as parquet files
* Leverage functionalities across spark/pandas/koalas

already have spark and spark session on data bricks

In [0]:
spark.version

Out[1]: '3.3.0'

## About Dataset

**Context**
A response model can provide a significant boost to the efficiency of a marketing campaign by increasing responses or reducing expenses. The objective is to predict who will respond to an offer for a product or service

**Content**
AcceptedCmp1 - 1 if customer accepted the offer in the 1st campaign, 0 otherwise

AcceptedCmp2 - 1 if customer accepted the offer in the 2nd campaign, 0 otherwise

AcceptedCmp3 - 1 if customer accepted the offer in the 3rd campaign, 0 otherwise

AcceptedCmp4 - 1 if customer accepted the offer in the 4th campaign, 0 otherwise

AcceptedCmp5 - 1 if customer accepted the offer in the 5th campaign, 0 otherwise

Response (target) - 1 if customer accepted the offer in the last campaign, 0 otherwise

Complain - 1 if customer complained in the last 2 years

DtCustomer - date of customer’s enrolment with the company

Education - customer’s level of education

Marital - customer’s marital status

Kidhome - number of small children in customer’s household
- Teenhome - number of teenagers in customer’s household
- Income - customer’s yearly household income
 
MntFishProducts - amount spent on fish products in the last 2 years

MntMeatProducts - amount spent on meat products in the last 2 years

MntFruits - amount spent on fruits products in the last 2 years

MntSweetProducts - amount spent on sweet products in the last 2 years

MntWines - amount spent on wine products in the last 2 years

MntGoldProds - amount spent on gold products in the last 2 years

NumDealsPurchases - number of purchases made with discount

NumCatalogPurchases - number of purchases made using catalogue

NumStorePurchases - number of purchases made directly in stores

NumWebPurchases - number of purchases made through company’s web site

NumWebVisitsMonth - number of visits to company’s web site in the last month

Recency - number of days since the last purchase

**Acknowledgements**
O. Parr-Rud. Business Analytics Using SAS Enterprise Guide and SAS Enterprise Miner. SAS Institute, 2014.

In [0]:
# DBTITLE 1,Read csv with Spark
from pyspark.sql.functions import to_date, col, current_date, year, round

df = spark.read.option("delimiter", ",").csv('dbfs:/FileStore/datasets/marketing_campaign-1.csv',inferSchema=True, header=True)

display(df.limit(10))

ID,Year_Birth,Education,Marital_Status,Income,Kidhome,Teenhome,Dt_Customer,Recency,MntWines,MntFruits,MntMeatProducts,MntFishProducts,MntSweetProducts,MntGoldProds,NumDealsPurchases,NumWebPurchases,NumCatalogPurchases,NumStorePurchases,NumWebVisitsMonth,AcceptedCmp3,AcceptedCmp4,AcceptedCmp5,AcceptedCmp1,AcceptedCmp2,Complain,Z_CostContact,Z_Revenue,Response
5524,1957,Graduation,Single,58138,0,0,2012-09-04,58,635,88,546,172,88,88,3,8,10,4,7,0,0,0,0,0,0,3,11,1
2174,1954,Graduation,Single,46344,1,1,2014-03-08,38,11,1,6,2,1,6,2,1,1,2,5,0,0,0,0,0,0,3,11,0
4141,1965,Graduation,Together,71613,0,0,2013-08-21,26,426,49,127,111,21,42,1,8,2,10,4,0,0,0,0,0,0,3,11,0
6182,1984,Graduation,Together,26646,1,0,2014-02-10,26,11,4,20,10,3,5,2,2,0,4,6,0,0,0,0,0,0,3,11,0
5324,1981,PhD,Married,58293,1,0,2014-01-19,94,173,43,118,46,27,15,5,5,3,6,5,0,0,0,0,0,0,3,11,0
7446,1967,Master,Together,62513,0,1,2013-09-09,16,520,42,98,0,42,14,2,6,4,10,6,0,0,0,0,0,0,3,11,0
965,1971,Graduation,Divorced,55635,0,1,2012-11-13,34,235,65,164,50,49,27,4,7,3,7,6,0,0,0,0,0,0,3,11,0
6177,1985,PhD,Married,33454,1,0,2013-05-08,32,76,10,56,3,1,23,2,4,0,4,8,0,0,0,0,0,0,3,11,0
4855,1974,PhD,Together,30351,1,0,2013-06-06,19,14,0,24,3,3,2,1,3,0,2,9,0,0,0,0,0,0,3,11,1
5899,1950,PhD,Together,5648,1,1,2014-03-13,68,28,0,6,1,1,13,1,1,0,0,20,1,0,0,0,0,0,3,11,0


In [0]:
# DBTITLE 1,Delete unwanted columns
deleted_columns = ['AcceptedCmp3', 'AcceptedCmp4', 'AcceptedCmp5', 'AcceptedCmp1', 'AcceptedCmp2', 'Complain', \
                   'Z_CostContact', 'Z_Revenue', 'Response']

df = df.drop(*deleted_columns)

display(df.limit(10))

ID,Year_Birth,Education,Marital_Status,Income,Kidhome,Teenhome,Dt_Customer,Recency,MntWines,MntFruits,MntMeatProducts,MntFishProducts,MntSweetProducts,MntGoldProds,NumDealsPurchases,NumWebPurchases,NumCatalogPurchases,NumStorePurchases,NumWebVisitsMonth
5524,1957,Graduation,Single,58138,0,0,2012-09-04,58,635,88,546,172,88,88,3,8,10,4,7
2174,1954,Graduation,Single,46344,1,1,2014-03-08,38,11,1,6,2,1,6,2,1,1,2,5
4141,1965,Graduation,Together,71613,0,0,2013-08-21,26,426,49,127,111,21,42,1,8,2,10,4
6182,1984,Graduation,Together,26646,1,0,2014-02-10,26,11,4,20,10,3,5,2,2,0,4,6
5324,1981,PhD,Married,58293,1,0,2014-01-19,94,173,43,118,46,27,15,5,5,3,6,5
7446,1967,Master,Together,62513,0,1,2013-09-09,16,520,42,98,0,42,14,2,6,4,10,6
965,1971,Graduation,Divorced,55635,0,1,2012-11-13,34,235,65,164,50,49,27,4,7,3,7,6
6177,1985,PhD,Married,33454,1,0,2013-05-08,32,76,10,56,3,1,23,2,4,0,4,8
4855,1974,PhD,Together,30351,1,0,2013-06-06,19,14,0,24,3,3,2,1,3,0,2,9
5899,1950,PhD,Together,5648,1,1,2014-03-13,68,28,0,6,1,1,13,1,1,0,0,20


#### Determine number of years of membership
Formula of Calculation : 2021 - ( extract_year( Column["Dt_Customer"] ) )

In [0]:
df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Year_Birth: integer (nullable = true)
 |-- Education: string (nullable = true)
 |-- Marital_Status: string (nullable = true)
 |-- Income: integer (nullable = true)
 |-- Kidhome: integer (nullable = true)
 |-- Teenhome: integer (nullable = true)
 |-- Dt_Customer: date (nullable = true)
 |-- Recency: integer (nullable = true)
 |-- MntWines: integer (nullable = true)
 |-- MntFruits: integer (nullable = true)
 |-- MntMeatProducts: integer (nullable = true)
 |-- MntFishProducts: integer (nullable = true)
 |-- MntSweetProducts: integer (nullable = true)
 |-- MntGoldProds: integer (nullable = true)
 |-- NumDealsPurchases: integer (nullable = true)
 |-- NumWebPurchases: integer (nullable = true)
 |-- NumCatalogPurchases: integer (nullable = true)
 |-- NumStorePurchases: integer (nullable = true)
 |-- NumWebVisitsMonth: integer (nullable = true)



In [0]:
df = df.withColumn('Num_Year_Of_Customer', 2021 - (year(to_date(col('Dt_Customer'),'dd-MM-yyyy')).cast('int'))).drop('Dt_Customer')
display(df.limit(10))

ID,Year_Birth,Education,Marital_Status,Income,Kidhome,Teenhome,Recency,MntWines,MntFruits,MntMeatProducts,MntFishProducts,MntSweetProducts,MntGoldProds,NumDealsPurchases,NumWebPurchases,NumCatalogPurchases,NumStorePurchases,NumWebVisitsMonth,Num_Year_Of_Customer
5524,1957,Graduation,Single,58138,0,0,58,635,88,546,172,88,88,3,8,10,4,7,9
2174,1954,Graduation,Single,46344,1,1,38,11,1,6,2,1,6,2,1,1,2,5,7
4141,1965,Graduation,Together,71613,0,0,26,426,49,127,111,21,42,1,8,2,10,4,8
6182,1984,Graduation,Together,26646,1,0,26,11,4,20,10,3,5,2,2,0,4,6,7
5324,1981,PhD,Married,58293,1,0,94,173,43,118,46,27,15,5,5,3,6,5,7
7446,1967,Master,Together,62513,0,1,16,520,42,98,0,42,14,2,6,4,10,6,8
965,1971,Graduation,Divorced,55635,0,1,34,235,65,164,50,49,27,4,7,3,7,6,9
6177,1985,PhD,Married,33454,1,0,32,76,10,56,3,1,23,2,4,0,4,8,8
4855,1974,PhD,Together,30351,1,0,19,14,0,24,3,3,2,1,3,0,2,9,8
5899,1950,PhD,Together,5648,1,1,68,28,0,6,1,1,13,1,1,0,0,20,7


#### Determine member score
Formula of Calculation : ( income / 1000 ) - ( recency / 365 * 100 ) + ( Num_Year_Of_Customer )

In [0]:
df = df.withColumn('Member_Score', ((col('income')/1000) - (col('recency')/365*100) + col('Num_Year_Of_Customer')).cast('decimal(12,2)'))
display(df.limit(10))

ID,Year_Birth,Education,Marital_Status,Income,Kidhome,Teenhome,Recency,MntWines,MntFruits,MntMeatProducts,MntFishProducts,MntSweetProducts,MntGoldProds,NumDealsPurchases,NumWebPurchases,NumCatalogPurchases,NumStorePurchases,NumWebVisitsMonth,Num_Year_Of_Customer,Member_Score
5524,1957,Graduation,Single,58138,0,0,58,635,88,546,172,88,88,3,8,10,4,7,9,51.25
2174,1954,Graduation,Single,46344,1,1,38,11,1,6,2,1,6,2,1,1,2,5,7,42.93
4141,1965,Graduation,Together,71613,0,0,26,426,49,127,111,21,42,1,8,2,10,4,8,72.49
6182,1984,Graduation,Together,26646,1,0,26,11,4,20,10,3,5,2,2,0,4,6,7,26.52
5324,1981,PhD,Married,58293,1,0,94,173,43,118,46,27,15,5,5,3,6,5,7,39.54
7446,1967,Master,Together,62513,0,1,16,520,42,98,0,42,14,2,6,4,10,6,8,66.13
965,1971,Graduation,Divorced,55635,0,1,34,235,65,164,50,49,27,4,7,3,7,6,9,55.32
6177,1985,PhD,Married,33454,1,0,32,76,10,56,3,1,23,2,4,0,4,8,8,32.69
4855,1974,PhD,Together,30351,1,0,19,14,0,24,3,3,2,1,3,0,2,9,8,33.15
5899,1950,PhD,Together,5648,1,1,68,28,0,6,1,1,13,1,1,0,0,20,7,-5.98


#### Determine product score
Formula of Calculation : ( summation of product amount )

In [0]:
df = df.withColumn('Product_Score', ((col('MntWines')+col('MntFruits')+col('MntMeatProducts')+col('MntFishProducts')+col('MntSweetProducts'))/100).cast('decimal(12,2)'))
display(df.limit(10))

ID,Year_Birth,Education,Marital_Status,Income,Kidhome,Teenhome,Recency,MntWines,MntFruits,MntMeatProducts,MntFishProducts,MntSweetProducts,MntGoldProds,NumDealsPurchases,NumWebPurchases,NumCatalogPurchases,NumStorePurchases,NumWebVisitsMonth,Num_Year_Of_Customer,Member_Score,Product_Score
5524,1957,Graduation,Single,58138,0,0,58,635,88,546,172,88,88,3,8,10,4,7,9,51.25,15.29
2174,1954,Graduation,Single,46344,1,1,38,11,1,6,2,1,6,2,1,1,2,5,7,42.93,0.21
4141,1965,Graduation,Together,71613,0,0,26,426,49,127,111,21,42,1,8,2,10,4,8,72.49,7.34
6182,1984,Graduation,Together,26646,1,0,26,11,4,20,10,3,5,2,2,0,4,6,7,26.52,0.48
5324,1981,PhD,Married,58293,1,0,94,173,43,118,46,27,15,5,5,3,6,5,7,39.54,4.07
7446,1967,Master,Together,62513,0,1,16,520,42,98,0,42,14,2,6,4,10,6,8,66.13,7.02
965,1971,Graduation,Divorced,55635,0,1,34,235,65,164,50,49,27,4,7,3,7,6,9,55.32,5.63
6177,1985,PhD,Married,33454,1,0,32,76,10,56,3,1,23,2,4,0,4,8,8,32.69,1.46
4855,1974,PhD,Together,30351,1,0,19,14,0,24,3,3,2,1,3,0,2,9,8,33.15,0.44
5899,1950,PhD,Together,5648,1,1,68,28,0,6,1,1,13,1,1,0,0,20,7,-5.98,0.36


#### Determine purchase score
Formula of Calculation : ( number of purchases in every platform )

In [0]:
df = df.withColumn('Purchase_Score',(col('NumDealsPurchases')+col('NumWebPurchases')+col('NumCatalogPurchases')+col('NumStorePurchases')))
display(df.limit(10))

ID,Year_Birth,Education,Marital_Status,Income,Kidhome,Teenhome,Recency,MntWines,MntFruits,MntMeatProducts,MntFishProducts,MntSweetProducts,MntGoldProds,NumDealsPurchases,NumWebPurchases,NumCatalogPurchases,NumStorePurchases,NumWebVisitsMonth,Num_Year_Of_Customer,Member_Score,Product_Score,Purchase_Score
5524,1957,Graduation,Single,58138,0,0,58,635,88,546,172,88,88,3,8,10,4,7,9,51.25,15.29,25
2174,1954,Graduation,Single,46344,1,1,38,11,1,6,2,1,6,2,1,1,2,5,7,42.93,0.21,6
4141,1965,Graduation,Together,71613,0,0,26,426,49,127,111,21,42,1,8,2,10,4,8,72.49,7.34,21
6182,1984,Graduation,Together,26646,1,0,26,11,4,20,10,3,5,2,2,0,4,6,7,26.52,0.48,8
5324,1981,PhD,Married,58293,1,0,94,173,43,118,46,27,15,5,5,3,6,5,7,39.54,4.07,19
7446,1967,Master,Together,62513,0,1,16,520,42,98,0,42,14,2,6,4,10,6,8,66.13,7.02,22
965,1971,Graduation,Divorced,55635,0,1,34,235,65,164,50,49,27,4,7,3,7,6,9,55.32,5.63,21
6177,1985,PhD,Married,33454,1,0,32,76,10,56,3,1,23,2,4,0,4,8,8,32.69,1.46,10
4855,1974,PhD,Together,30351,1,0,19,14,0,24,3,3,2,1,3,0,2,9,8,33.15,0.44,6
5899,1950,PhD,Together,5648,1,1,68,28,0,6,1,1,13,1,1,0,0,20,7,-5.98,0.36,2


#### Determine loyalty score
Formula of Calculation : ( member_score ) + ( product_score ) + ( purchase_score )

In [0]:
df = df.withColumn('Loyalty_Score', (col('Member_Score')+col('Product_Score')+col('Purchase_Score')).cast('decimal(12,2)'))
display(df.limit(10))

ID,Year_Birth,Education,Marital_Status,Income,Kidhome,Teenhome,Recency,MntWines,MntFruits,MntMeatProducts,MntFishProducts,MntSweetProducts,MntGoldProds,NumDealsPurchases,NumWebPurchases,NumCatalogPurchases,NumStorePurchases,NumWebVisitsMonth,Num_Year_Of_Customer,Member_Score,Product_Score,Purchase_Score,Loyalty_Score
5524,1957,Graduation,Single,58138,0,0,58,635,88,546,172,88,88,3,8,10,4,7,9,51.25,15.29,25,91.54
2174,1954,Graduation,Single,46344,1,1,38,11,1,6,2,1,6,2,1,1,2,5,7,42.93,0.21,6,49.14
4141,1965,Graduation,Together,71613,0,0,26,426,49,127,111,21,42,1,8,2,10,4,8,72.49,7.34,21,100.83
6182,1984,Graduation,Together,26646,1,0,26,11,4,20,10,3,5,2,2,0,4,6,7,26.52,0.48,8,35.0
5324,1981,PhD,Married,58293,1,0,94,173,43,118,46,27,15,5,5,3,6,5,7,39.54,4.07,19,62.61
7446,1967,Master,Together,62513,0,1,16,520,42,98,0,42,14,2,6,4,10,6,8,66.13,7.02,22,95.15
965,1971,Graduation,Divorced,55635,0,1,34,235,65,164,50,49,27,4,7,3,7,6,9,55.32,5.63,21,81.95
6177,1985,PhD,Married,33454,1,0,32,76,10,56,3,1,23,2,4,0,4,8,8,32.69,1.46,10,44.15
4855,1974,PhD,Together,30351,1,0,19,14,0,24,3,3,2,1,3,0,2,9,8,33.15,0.44,6,39.59
5899,1950,PhD,Together,5648,1,1,68,28,0,6,1,1,13,1,1,0,0,20,7,-5.98,0.36,2,-3.62


In [0]:
# DBTITLE 1,Delete feature-derived columns
deleted_columns = [ 'Kidhome', 'Teenhome', 'Recency', 'MntWines', 'MntFruits', 'MntMeatProducts', 'MntFishProducts', \
                   'MntSweetProducts', 'MntGoldProds', 'NumDealsPurchases', 'NumWebPurchases', 'NumCatalogPurchases', \
                   'NumStorePurchases', 'NumWebVisitsMonth']

df = df.drop(*deleted_columns)
display(df.limit(10))

ID,Year_Birth,Education,Marital_Status,Income,Num_Year_Of_Customer,Member_Score,Product_Score,Purchase_Score,Loyalty_Score
5524,1957,Graduation,Single,58138,9,51.25,15.29,25,91.54
2174,1954,Graduation,Single,46344,7,42.93,0.21,6,49.14
4141,1965,Graduation,Together,71613,8,72.49,7.34,21,100.83
6182,1984,Graduation,Together,26646,7,26.52,0.48,8,35.0
5324,1981,PhD,Married,58293,7,39.54,4.07,19,62.61
7446,1967,Master,Together,62513,8,66.13,7.02,22,95.15
965,1971,Graduation,Divorced,55635,9,55.32,5.63,21,81.95
6177,1985,PhD,Married,33454,8,32.69,1.46,10,44.15
4855,1974,PhD,Together,30351,8,33.15,0.44,6,39.59
5899,1950,PhD,Together,5648,7,-5.98,0.36,2,-3.62


In [0]:
from pyspark.sql.functions import when, lit, col

df = df.withColumn("Loyalty_Score", when(col('Loyalty_Score') < 0, lit(0)).otherwise(col('Loyalty_Score')))

In [0]:
display(df.limit(10))

ID,Year_Birth,Education,Marital_Status,Income,Num_Year_Of_Customer,Member_Score,Product_Score,Purchase_Score,Loyalty_Score
5524,1957,Graduation,Single,58138,9,51.25,15.29,25,91.54
2174,1954,Graduation,Single,46344,7,42.93,0.21,6,49.14
4141,1965,Graduation,Together,71613,8,72.49,7.34,21,100.83
6182,1984,Graduation,Together,26646,7,26.52,0.48,8,35.0
5324,1981,PhD,Married,58293,7,39.54,4.07,19,62.61
7446,1967,Master,Together,62513,8,66.13,7.02,22,95.15
965,1971,Graduation,Divorced,55635,9,55.32,5.63,21,81.95
6177,1985,PhD,Married,33454,8,32.69,1.46,10,44.15
4855,1974,PhD,Together,30351,8,33.15,0.44,6,39.59
5899,1950,PhD,Together,5648,7,-5.98,0.36,2,-3.62


In [0]:
df = df.na.fill(0, ["Loyalty_Score"])
display(df.limit(10))

ID,Year_Birth,Education,Marital_Status,Income,Num_Year_Of_Customer,Member_Score,Product_Score,Purchase_Score,Loyalty_Score
5524,1957,Graduation,Single,58138,9,51.25,15.29,25,91.54
2174,1954,Graduation,Single,46344,7,42.93,0.21,6,49.14
4141,1965,Graduation,Together,71613,8,72.49,7.34,21,100.83
6182,1984,Graduation,Together,26646,7,26.52,0.48,8,35.0
5324,1981,PhD,Married,58293,7,39.54,4.07,19,62.61
7446,1967,Master,Together,62513,8,66.13,7.02,22,95.15
965,1971,Graduation,Divorced,55635,9,55.32,5.63,21,81.95
6177,1985,PhD,Married,33454,8,32.69,1.46,10,44.15
4855,1974,PhD,Together,30351,8,33.15,0.44,6,39.59
5899,1950,PhD,Together,5648,7,-5.98,0.36,2,-3.62


#### Finish wrangling data in spark dataframe

### Create external table with Delta Lake on DBFS path

can use SQL commands to create table

In [0]:
%sql

drop table if exists default.member_scoring;

create external table default.member_scoring
(
   ID integer,
   Year_Birth integer,
   Education string,
   Marital_Status string,
   Income integer,
   Num_Year_Of_Customer integer,
   Member_Score decimal(12,2),
   Product_Score decimal(12,2),
   Purchase_Score integer,
   Loyalty_Score decimal(12,2)
 )
 USING DELTA
 LOCATION 'dbfs:/FileStore/tables/member_scoring';

### Write dataframe into specified path of table

In [0]:
df.write.format('delta').mode('overwrite').save('dbfs:/FileStore/tables/member_scoring')

## Query table with Spark SQL

In [0]:
read_df = spark.sql("select * from default.member_scoring")
display(read_df.limit(10))

ID,Year_Birth,Education,Marital_Status,Income,Num_Year_Of_Customer,Member_Score,Product_Score,Purchase_Score,Loyalty_Score
5524,1957,Graduation,Single,58138,9,51.25,15.29,25,91.54
2174,1954,Graduation,Single,46344,7,42.93,0.21,6,49.14
4141,1965,Graduation,Together,71613,8,72.49,7.34,21,100.83
6182,1984,Graduation,Together,26646,7,26.52,0.48,8,35.0
5324,1981,PhD,Married,58293,7,39.54,4.07,19,62.61
7446,1967,Master,Together,62513,8,66.13,7.02,22,95.15
965,1971,Graduation,Divorced,55635,9,55.32,5.63,21,81.95
6177,1985,PhD,Married,33454,8,32.69,1.46,10,44.15
4855,1974,PhD,Together,30351,8,33.15,0.44,6,39.59
5899,1950,PhD,Together,5648,7,-5.98,0.36,2,0.0


In [0]:
read_df.count()

Out[23]: 2240

## Delta Lake travel back time

In [0]:
df.write.format('delta').mode('append').save('dbfs:/FileStore/tables/member_scoring')

read_df = spark.sql("select * from default.member_scoring")
display(read_df.limit(10))

read_df.count()

ID,Year_Birth,Education,Marital_Status,Income,Num_Year_Of_Customer,Member_Score,Product_Score,Purchase_Score,Loyalty_Score
5524,1957,Graduation,Single,58138,9,51.25,15.29,25,91.54
2174,1954,Graduation,Single,46344,7,42.93,0.21,6,49.14
4141,1965,Graduation,Together,71613,8,72.49,7.34,21,100.83
6182,1984,Graduation,Together,26646,7,26.52,0.48,8,35.0
5324,1981,PhD,Married,58293,7,39.54,4.07,19,62.61
7446,1967,Master,Together,62513,8,66.13,7.02,22,95.15
965,1971,Graduation,Divorced,55635,9,55.32,5.63,21,81.95
6177,1985,PhD,Married,33454,8,32.69,1.46,10,44.15
4855,1974,PhD,Together,30351,8,33.15,0.44,6,39.59
5899,1950,PhD,Together,5648,7,-5.98,0.36,2,0.0


Out[30]: 4480

history of table versions

In [0]:
%sql

describe history default.member_scoring

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
4,2023-03-24T08:51:15.000+0000,7252763392678599,srichaiperksiv@gmail.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(1513197760899196),0324-073658-nts8mv0k,3.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 2240, numOutputBytes -> 54528)",,Databricks-Runtime/11.3.x-scala2.12
3,2023-03-24T08:26:42.000+0000,7252763392678599,srichaiperksiv@gmail.com,RESTORE,"Map(version -> 1, timestamp -> null)",,List(1513197760899196),0324-073658-nts8mv0k,2.0,Serializable,False,"Map(numRestoredFiles -> 0, removedFilesSize -> 54492, numRemovedFiles -> 1, restoredFilesSize -> 0, numOfFilesAfterRestore -> 1, tableSizeAfterRestore -> 54492)",,Databricks-Runtime/11.3.x-scala2.12
2,2023-03-24T08:23:15.000+0000,7252763392678599,srichaiperksiv@gmail.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(1513197760899196),0324-073658-nts8mv0k,1.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 2240, numOutputBytes -> 54492)",,Databricks-Runtime/11.3.x-scala2.12
1,2023-03-24T08:21:44.000+0000,7252763392678599,srichaiperksiv@gmail.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(1513197760899196),0324-073658-nts8mv0k,0.0,WriteSerializable,False,"Map(numFiles -> 1, numOutputRows -> 2240, numOutputBytes -> 54492)",,Databricks-Runtime/11.3.x-scala2.12
0,2023-03-24T08:20:20.000+0000,7252763392678599,srichaiperksiv@gmail.com,CREATE TABLE,"Map(isManaged -> false, description -> null, partitionBy -> [], properties -> {})",,List(1513197760899196),0324-073658-nts8mv0k,,WriteSerializable,True,Map(),,Databricks-Runtime/11.3.x-scala2.12


In [0]:
# just count the version 1 but the df still lastest version
spark.read.format("delta").option("versionAsOf", "1").load('dbfs:/FileStore/tables/member_scoring').count()

Out[32]: 2240

### RESTORE TABLE default.member_scoring TO VERSION 1

In [0]:
%sql
RESTORE TABLE default.member_scoring TO VERSION AS OF 1

table_size_after_restore,num_of_files_after_restore,num_removed_files,num_restored_files,removed_files_size,restored_files_size
54492,1,1,0,54528,0


In [0]:
read_df = spark.sql("select * from default.member_scoring")
read_df.count()

Out[34]: 2240

#### Finish query data from Delta Lake

## Use Koalas in Data Bricks

koalas is library that can use pandas command but run on spark

In [0]:
pip install koalas

Python interpreter will be restarted.
Collecting koalas
  Downloading koalas-1.8.2-py3-none-any.whl (390 kB)
Installing collected packages: koalas
Successfully installed koalas-1.8.2
Python interpreter will be restarted.


In [0]:
import databricks.koalas as ks

ks_df = ks.read_csv('dbfs:/FileStore/datasets/marketing_campaign.csv', header=0, sep=';')

In [0]:
ks_df.head()

Unnamed: 0,ID,Year_Birth,Education,Marital_Status,Income,Kidhome,Teenhome,Dt_Customer,Recency,MntWines,MntFruits,MntMeatProducts,MntFishProducts,MntSweetProducts,MntGoldProds,NumDealsPurchases,NumWebPurchases,NumCatalogPurchases,NumStorePurchases,NumWebVisitsMonth,AcceptedCmp3,AcceptedCmp4,AcceptedCmp5,AcceptedCmp1,AcceptedCmp2,Complain,Z_CostContact,Z_Revenue,Response
0,5524,1957,Graduation,Single,58138,0,0,2012-09-04,58,635,88,546,172,88,88,3,8,10,4,7,0,0,0,0,0,0,3,11,1
1,2174,1954,Graduation,Single,46344,1,1,2014-03-08,38,11,1,6,2,1,6,2,1,1,2,5,0,0,0,0,0,0,3,11,0
2,4141,1965,Graduation,Together,71613,0,0,2013-08-21,26,426,49,127,111,21,42,1,8,2,10,4,0,0,0,0,0,0,3,11,0
3,6182,1984,Graduation,Together,26646,1,0,2014-02-10,26,11,4,20,10,3,5,2,2,0,4,6,0,0,0,0,0,0,3,11,0
4,5324,1981,PhD,Married,58293,1,0,2014-01-19,94,173,43,118,46,27,15,5,5,3,6,5,0,0,0,0,0,0,3,11,0


In [0]:
deleted_columns = ['AcceptedCmp3', 'AcceptedCmp4', 'AcceptedCmp5', 'AcceptedCmp1', 'AcceptedCmp2', 'Complain', \
                   'Z_CostContact', 'Z_Revenue', 'Response']
 
ks_df = ks_df.drop(columns=deleted_columns)
 
ks_df.head()

Unnamed: 0,ID,Year_Birth,Education,Marital_Status,Income,Kidhome,Teenhome,Dt_Customer,Recency,MntWines,MntFruits,MntMeatProducts,MntFishProducts,MntSweetProducts,MntGoldProds,NumDealsPurchases,NumWebPurchases,NumCatalogPurchases,NumStorePurchases,NumWebVisitsMonth
0,5524,1957,Graduation,Single,58138,0,0,2012-09-04,58,635,88,546,172,88,88,3,8,10,4,7
1,2174,1954,Graduation,Single,46344,1,1,2014-03-08,38,11,1,6,2,1,6,2,1,1,2,5
2,4141,1965,Graduation,Together,71613,0,0,2013-08-21,26,426,49,127,111,21,42,1,8,2,10,4
3,6182,1984,Graduation,Together,26646,1,0,2014-02-10,26,11,4,20,10,3,5,2,2,0,4,6
4,5324,1981,PhD,Married,58293,1,0,2014-01-19,94,173,43,118,46,27,15,5,5,3,6,5


### koalas to spark

In [0]:
spark_df = ks_df.to_spark()
display(spark_df.limit(10))

ID,Year_Birth,Education,Marital_Status,Income,Kidhome,Teenhome,Dt_Customer,Recency,MntWines,MntFruits,MntMeatProducts,MntFishProducts,MntSweetProducts,MntGoldProds,NumDealsPurchases,NumWebPurchases,NumCatalogPurchases,NumStorePurchases,NumWebVisitsMonth
5524,1957,Graduation,Single,58138,0,0,2012-09-04,58,635,88,546,172,88,88,3,8,10,4,7
2174,1954,Graduation,Single,46344,1,1,2014-03-08,38,11,1,6,2,1,6,2,1,1,2,5
4141,1965,Graduation,Together,71613,0,0,2013-08-21,26,426,49,127,111,21,42,1,8,2,10,4
6182,1984,Graduation,Together,26646,1,0,2014-02-10,26,11,4,20,10,3,5,2,2,0,4,6
5324,1981,PhD,Married,58293,1,0,2014-01-19,94,173,43,118,46,27,15,5,5,3,6,5
7446,1967,Master,Together,62513,0,1,2013-09-09,16,520,42,98,0,42,14,2,6,4,10,6
965,1971,Graduation,Divorced,55635,0,1,2012-11-13,34,235,65,164,50,49,27,4,7,3,7,6
6177,1985,PhD,Married,33454,1,0,2013-05-08,32,76,10,56,3,1,23,2,4,0,4,8
4855,1974,PhD,Together,30351,1,0,2013-06-06,19,14,0,24,3,3,2,1,3,0,2,9
5899,1950,PhD,Together,5648,1,1,2014-03-13,68,28,0,6,1,1,13,1,1,0,0,20


### koalas to pandas

In [0]:
import pandas as pd
pd_df = ks_df.to_pandas()



In [0]:
pd_df.head()

Unnamed: 0,ID,Year_Birth,Education,Marital_Status,Income,Kidhome,Teenhome,Dt_Customer,Recency,MntWines,MntFruits,MntMeatProducts,MntFishProducts,MntSweetProducts,MntGoldProds,NumDealsPurchases,NumWebPurchases,NumCatalogPurchases,NumStorePurchases,NumWebVisitsMonth
0,5524,1957,Graduation,Single,58138.0,0,0,2012-09-04,58,635,88,546,172,88,88,3,8,10,4,7
1,2174,1954,Graduation,Single,46344.0,1,1,2014-03-08,38,11,1,6,2,1,6,2,1,1,2,5
2,4141,1965,Graduation,Together,71613.0,0,0,2013-08-21,26,426,49,127,111,21,42,1,8,2,10,4
3,6182,1984,Graduation,Together,26646.0,1,0,2014-02-10,26,11,4,20,10,3,5,2,2,0,4,6
4,5324,1981,PhD,Married,58293.0,1,0,2014-01-19,94,173,43,118,46,27,15,5,5,3,6,5


it shows that not use spark

In [0]:
deleted_columns = ['NumWebVisitsMonth']
 
pd_df = pd_df.drop(columns=deleted_columns)

pd_df.head()

Unnamed: 0,ID,Year_Birth,Education,Marital_Status,Income,Kidhome,Teenhome,Dt_Customer,Recency,MntWines,MntFruits,MntMeatProducts,MntFishProducts,MntSweetProducts,MntGoldProds,NumDealsPurchases,NumWebPurchases,NumCatalogPurchases,NumStorePurchases
0,5524,1957,Graduation,Single,58138.0,0,0,2012-09-04,58,635,88,546,172,88,88,3,8,10,4
1,2174,1954,Graduation,Single,46344.0,1,1,2014-03-08,38,11,1,6,2,1,6,2,1,1,2
2,4141,1965,Graduation,Together,71613.0,0,0,2013-08-21,26,426,49,127,111,21,42,1,8,2,10
3,6182,1984,Graduation,Together,26646.0,1,0,2014-02-10,26,11,4,20,10,3,5,2,2,0,4
4,5324,1981,PhD,Married,58293.0,1,0,2014-01-19,94,173,43,118,46,27,15,5,5,3,6
