In [None]:
from urllib.parse import quote_plus
import pandas as pd
import numpy as np
import datetime
from plotly.subplots import make_subplots
from sqlalchemy import create_engine, MetaData, Table, select, text
import pyodbc
import os
import shutil
import subprocess


#### EXTRACT DATA FROM csv FILES

In [2]:
####################################################### 1.Extract data:
# Define a function to extract data from csv file
def extractCSV(file_path):
    df_data = pd.read_csv(file_path)
    return df_data

df_country = extractCSV('Data/countries.csv')
df_city = extractCSV('Data/cities.csv')
df_customer = extractCSV('Data/customers.csv')
df_category = extractCSV('Data/categories.csv')
df_product = extractCSV('Data/products.csv')
df_sales = extractCSV('Data/sales.csv')


#### TRANSFORM DATA
Follow the bellow steps to transform data:
1. Inspect data
2. Perform transformations to ensure:

|Principle|Description|
|----|-----------|
|Accuracy|Correct errors, cross-check with sources, handle outliers, and fix inconsistent formats.|
|Completeness|Identify and handle missing data through imputation or removal, flag incomplete records.|
|Validity|Apply range checks, format validation, data type validation, and ensure compliance with business rules.|
|Consistency|Standardize formats, resolve duplicates, normalize categories, and handle missing categorical data.|
|Relevance|Perform feature selection, remove noise, and engineer relevant features for the task.|
|Integrity|Ensure referential integrity, check for orphan records, and maintain primary key integrity.|

3. Enrich data

In [3]:
###################################### 2. Data inspection: to know how is the data look like
#----- Country
print(df_country.info())
print(df_country.head())

#----- City
print(df_city.info())
print(df_city.head())

#----- Customer
print(df_customer.info())
print(df_customer.head())

#----- Product Category
print(df_category.info())
print(df_category.head())

#----- Product
print(df_product.info())
print(df_product.head())
print(df_product.describe())

#------ Sales
print(df_sales.info(show_counts=True))
print(df_sales.head())
print(df_sales.describe())


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 206 entries, 0 to 205
Data columns (total 3 columns):
 #   Column       Non-Null Count  Dtype 
---  ------       --------------  ----- 
 0   CountryID    206 non-null    int64 
 1   CountryName  206 non-null    object
 2   CountryCode  205 non-null    object
dtypes: int64(1), object(2)
memory usage: 5.0+ KB
None
   CountryID CountryName CountryCode
0          1     Armenia          AN
1          2      Canada          FO
2          3      Belize          MK
3          4      Uganda          LV
4          5    Thailand          VI
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 96 entries, 0 to 95
Data columns (total 4 columns):
 #   Column     Non-Null Count  Dtype 
---  ------     --------------  ----- 
 0   CityID     96 non-null     int64 
 1   CityName   96 non-null     object
 2   Zipcode    96 non-null     int64 
 3   CountryID  96 non-null     int64 
dtypes: int64(3), object(1)
memory usage: 3.1+ KB
None
   CityID        CityNa

In [4]:
########################################## 3. Transform data
####### Relevance: Ensure the data being transformed is useful and relevant for the task at hand
#----- Drop unnecessary columns:
df_customer.drop(columns=["MiddleInitial"], inplace=True)
df_sales.drop(columns=["SalesPersonID", "TransactionNumber"], inplace=True)

In [5]:
####### Accuracy: Ensure the data accurately represents the real-world scenario its models
#----- Correct errors, Cross-check with sources
# Check Country data
df_country.head(20)
df_country["CountryName"].unique()
df_city["CityName"].unique()
df_country_code = extractCSV('Data/Alpha2CountryCode.csv')
df_country_code = df_country_code[["name", "alpha-2"]]
df_country_new = df_country.merge(df_country_code, left_on="CountryName", right_on="name", how="left")
diff = df_country_new.where((df_country_new["CountryCode"] != df_country_new["alpha-2"]) | (df_country_new["alpha-2"].isna()))
print(diff)
# --> Need to correct the Country Code

# Check City data
df_category["CategoryName"].unique()
# --> Don't need to correct

# Check Product data
df_product.head()
print(df_product["Class"].unique())
print(df_product["Resistant"].unique())
print(df_product["IsAllergic"].unique())
# --> Don't need to correct


     CountryID CountryName CountryCode       name alpha-2
0            1     Armenia          AN    Armenia      AM
1            2      Canada          FO     Canada      CA
2            3      Belize          MK     Belize      BZ
3            4      Uganda          LV     Uganda      UG
4            5    Thailand          VI   Thailand      TH
..         ...         ...         ...        ...     ...
201        202   Greenland          MW  Greenland      GL
202        203       Niger          SH      Niger      NE
203        204    Malvinas          GE        NaN     NaN
204        205     Mayotte          PG    Mayotte      YT
205        206    Mongolia          MZ   Mongolia      MN

[206 rows x 5 columns]
['Medium' 'Low' 'High']
['Durable' 'Unknown' 'Weak']
['Unknown' 'False' 'True']


In [6]:
#--- Standardize the CountryCode following Alpha-2 Code
# Fix country code for non-matching country name
#     List of fixing countries
missing_values = {
    "Swaziland":"SZ",
    "Czech Republic":"CZ",
    "Macedonia":"MK",
    "Saint Helena":"SH",
    "United States":"US",
    "Moldova":"MD",
    "Vatican City":"VC",
    "Cape Verde":"CV",
    "Virgin Islands":"VI",
    "South Georgia":"GS",
    "South Korea":"KR",
    "Falklands":"FK",
    "Bolivia":"BO",
    "Venezuela":"VE",
    "Tanzania":"TZ",
    "Iran":"IR",
    "Vietnam":"VN",
    "Eire":"EI",
    "Micronesia":"FM",
    "Netherlands":"NL",
    "United Kingdom":"GB",
    "Taiwan":"TW",
    "Syria":"SY",
    "Trinidad":"TT",
    "North Korea":"KP",
    "Turkey":"TR",
    "Russia":"RU",
    "Malvinas":"MV"
}
# Update country code for the above missing values
for name, code in missing_values.items():
    df_country_new.loc[df_country_new["CountryName"] == name, "alpha-2"] = code

df_country = df_country_new[["CountryID", "CountryName", "alpha-2"]]
df_country.rename(columns={"alpha-2":"CountryCode"}, inplace=True)

# Check again and fix the missing values
df_country.loc[df_country["CountryCode"].isna() == True,:]
df_country.loc[df_country["CountryCode"].isna(),"CountryCode"] = "NB"
# Check again
df_country.loc[df_country["CountryCode"].isna() == True,:]
df_country[df_country.duplicated(subset=["CountryCode"])]

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_country.rename(columns={"alpha-2":"CountryCode"}, inplace=True)


Unnamed: 0,CountryID,CountryName,CountryCode


In [7]:
########### Completeness: Identify and handle missing data through imputation or removal
df_sales.info(show_counts=True) 
#--> There are few rows that have SalesDate be null, the portion is (6,758,125 - 6,690,599)/6,758,125 = 0.99%
#------> Solution: remove those rows
df_sales = df_sales[~df_sales["SalesDate"].isna()]
# Check again
df_sales.info(show_counts=True) 

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6758125 entries, 0 to 6758124
Data columns (total 7 columns):
 #   Column      Non-Null Count    Dtype  
---  ------      --------------    -----  
 0   SalesID     6758125 non-null  int64  
 1   CustomerID  6758125 non-null  int64  
 2   ProductID   6758125 non-null  int64  
 3   Quantity    6758125 non-null  int64  
 4   Discount    6758125 non-null  float64
 5   TotalPrice  6758125 non-null  float64
 6   SalesDate   6690599 non-null  object 
dtypes: float64(2), int64(4), object(1)
memory usage: 360.9+ MB
<class 'pandas.core.frame.DataFrame'>
Index: 6690599 entries, 0 to 6758124
Data columns (total 7 columns):
 #   Column      Non-Null Count    Dtype  
---  ------      --------------    -----  
 0   SalesID     6690599 non-null  int64  
 1   CustomerID  6690599 non-null  int64  
 2   ProductID   6690599 non-null  int64  
 3   Quantity    6690599 non-null  int64  
 4   Discount    6690599 non-null  float64
 5   TotalPrice  6690599 non-

In [8]:
########## Consistency: Standardize formats, resolve duplicates, normalize categories, and handle missing categorical data.
#--- Product data:
# Check data
print(df_product[df_product.duplicated(subset=["ProductName"])])
print(df_product.info())
# - Convert ModifyDate to datatime data type.
# - Convert Class, Resistant, IsAllergic to category data type
# - Convert VitalityDays to int
df_product["ModifyDate"] = pd.to_datetime(df_product["ModifyDate"])
df_product = df_product.astype({"Class":"category", "Resistant":"category", "IsAllergic":"category", "VitalityDays":"int"})
# Validate:
print(df_product.info())

#--- Sales data:
# Check data
print(df_sales.info(show_counts=True))
df_sales[df_sales.duplicated(subset=["CustomerID", "ProductID", "SalesDate", "Quantity", "Discount"])]
# - Convert SalesDate to date data type
df_sales["SalesDate"] = pd.to_datetime(df_sales["SalesDate"]).dt.normalize()
# Validate:
print(df_sales.info(show_counts=True))
print(df_sales.head(20))


Empty DataFrame
Columns: [ProductID, ProductName, Price, CategoryID, Class, ModifyDate, Resistant, IsAllergic, VitalityDays]
Index: []
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 452 entries, 0 to 451
Data columns (total 9 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   ProductID     452 non-null    int64  
 1   ProductName   452 non-null    object 
 2   Price         452 non-null    float64
 3   CategoryID    452 non-null    int64  
 4   Class         452 non-null    object 
 5   ModifyDate    452 non-null    object 
 6   Resistant     452 non-null    object 
 7   IsAllergic    452 non-null    object 
 8   VitalityDays  452 non-null    float64
dtypes: float64(2), int64(2), object(5)
memory usage: 31.9+ KB
None
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 452 entries, 0 to 451
Data columns (total 9 columns):
 #   Column        Non-Null Count  Dtype         
---  ------        --------------  -----         
 0   ProductID  

In [9]:
########### Integrity: Ensure referential integrity, check for orphan records, and maintain primary key integrity
#--- If exists --> Delete rows
#Check reference by CountryID
orphan_city = df_city[~df_city['CountryID'].isin(df_country['CountryID'])]
print("Orphan City (Foreign Key Violations):")
if not orphan_city.empty:
    df_city.drop(index=orphan_city.index, inplace=True)
    print(f'Processed {orphan_city.shape[0]} rows!')
else:
    print('There is no orphan records!')

#Check reference by CityID
orphan_customer = df_customer[~df_customer['CityID'].isin(df_city['CityID'])]
print("Orphan Customer (Foreign Key Violations):")
if not orphan_customer.empty:
    df_customer.drop(index=orphan_customer.index, inplace=True)
    print(f'Processed {orphan_customer.shape[0]} rows!')
else:
    print('There is no orphan records!')

#Check reference by CategoryID
orphan_category = df_product[~df_product['CategoryID'].isin(df_category['CategoryID'])]
print("Orphan Product (Foreign Key Violations):")
if not orphan_category.empty:
    df_product.drop(index=orphan_category.index, inplace=True)
    print(f'Processed {orphan_category.shape[0]} rows!')
else:
    print('There is no orphan records!')


#Check reference by ProductID
orphan_sales_product = df_sales[~df_sales['ProductID'].isin(df_product['ProductID'])]
print("Orphan Sales - Product (Foreign Key Violations):")
if not orphan_sales_product.empty:
    df_sales.drop(index=orphan_sales_product.index, inplace=True)
    print(f'Processed {orphan_sales_product.shape[0]} rows!')
else:
    print('There is no orphan records!')

#Check reference by CustomerID
orphan_sales_customer = df_sales[~df_sales['CustomerID'].isin(df_customer['CustomerID'])]
print("Orphan Sales - Customer (Foreign Key Violations):")
if not orphan_sales_customer.empty:
    df_sales.drop(index=orphan_sales_customer.index, inplace=True)
    print(f'Processed {orphan_sales_customer.shape[0]} rows!')
else:
    print('There is no orphan records!')


Orphan City (Foreign Key Violations):
There is no orphan records!
Orphan Customer (Foreign Key Violations):
There is no orphan records!
Orphan Product (Foreign Key Violations):
There is no orphan records!
Orphan Sales - Product (Foreign Key Violations):
There is no orphan records!
Orphan Sales - Customer (Foreign Key Violations):
There is no orphan records!


##### APPLY STAR SCHEMA & DATA ENRICHMENT
- Create UnitPrice column in sales data.
- Calculate the TotalPrice based on UnitPrice and Discount.
- Denormalize data to model data following Star Schema
- Create RFM table to store Recency, Frequency, and Monetary values of each customer.
- Identify outliers on R,F, M values and handle them to make the analysis more accurate.
- Calculate RFM score
- Get definition of Customer Segment: from theory about categorizing Customer for RFM Analysis
- Merge RFM data to Customer 

In [10]:
# Get Price from Product df, then add to Sales df as UnitPrice column:
df_sales_full = df_sales.merge(df_product, on="ProductID", how="left")
df_sales_full.drop(columns=["ProductName", "CategoryID", "Class", "Resistant", "IsAllergic", "VitalityDays", "ModifyDate"], inplace=True)
df_sales_full.rename(columns={"Price":"UnitPrice"}, inplace=True)

# Calculate TotalPrice
df_sales_full["TotalPrice"] = round(df_sales_full["UnitPrice"] * df_sales_full["Quantity"] * (1 - df_sales_full["Discount"]), 2)

#------ Denormalize data
# Combine Product & Category
df_product_full = df_product.merge(df_category, on="CategoryID", how="left")
df_product_full.drop(columns=["CategoryID"], inplace=True)

# Combine Customer, City, and Country
df_customer_full = df_customer.merge(df_city, on="CityID", how="left")
df_customer_full = df_customer_full.merge(df_country, on="CountryID", how="left")
df_customer_full.drop(columns=["CityID", "CountryID"], inplace=True)

# Research and input the Customer Segments data
df_customer_segments = pd.read_excel('Data/CustomerSegment.xlsx', sheet_name="CustomerSegment")
df_customer_segments.rename(columns={"RFMScore":"RFMScoreMap", "Index":"SegmentIndex"}, inplace=True)
df_customer_segments

Unnamed: 0,RFMScoreMap,Segment,SegmentIndex,Color,Description
0,555,Champions,1,#6B007B,"have visited most recently, visited most frequ..."
1,554,Champions,1,#6B007B,"have visited most recently, visited most frequ..."
2,544,Champions,1,#6B007B,"have visited most recently, visited most frequ..."
3,545,Champions,1,#6B007B,"have visited most recently, visited most frequ..."
4,454,Champions,1,#6B007B,"have visited most recently, visited most frequ..."
...,...,...,...,...,...
120,112,Lost customers,11,#744EC2,"Lowest recency, frequency, and monetary scores."
121,121,Lost customers,11,#744EC2,"Lowest recency, frequency, and monetary scores."
122,131,Lost customers,11,#744EC2,"Lowest recency, frequency, and monetary scores."
123,141,Lost customers,11,#744EC2,"Lowest recency, frequency, and monetary scores."


In [62]:
#--------- Create RFM table to store Recency, Frequency, and Monetary values of each customer.
#---- Recency data
# Last purchased date of all sales
LAST_SALES_DATE = df_sales_full["SalesDate"].max() + datetime.timedelta(days=1)
print(f"Anchor date: {LAST_SALES_DATE}")

recency_data = df_sales_full.groupby(["CustomerID"])["SalesDate"].agg(["max"])
recency_data.rename(columns={"max":"LastPurchasedDate"}, inplace=True)
recency_data["Recency"] = (LAST_SALES_DATE - recency_data["LastPurchasedDate"]).dt.days
recency_data

Anchor date: 2018-05-10 00:00:00


Unnamed: 0_level_0,LastPurchasedDate,Recency
CustomerID,Unnamed: 1_level_1,Unnamed: 2_level_1
1,2018-05-06,4
2,2018-05-04,6
3,2018-05-09,1
4,2018-05-08,2
5,2018-05-09,1
...,...,...
98755,2018-05-07,3
98756,2018-05-09,1
98757,2018-05-03,7
98758,2018-05-08,2


In [63]:
#---- Frequency data (weekly)
# Calculate the number of weeks in whole dataset
FIRSTEST_SALES_DATE = df_sales_full["SalesDate"].min()
LASTEST_SALES_DATE = df_sales_full["SalesDate"].max()
count_weeks = (LASTEST_SALES_DATE - FIRSTEST_SALES_DATE).days/7

# Calculate weekly frequency (a week is a suitable period for Grocery Sales area)
frequency_data = df_sales_full.groupby(["CustomerID"])["SalesID"].agg(["nunique"])
frequency_data.rename(columns={"nunique":"Frequency"}, inplace=True)
frequency_data["Frequency"] = (frequency_data["Frequency"]/count_weeks).round(1)
frequency_data.head(10)

Unnamed: 0_level_0,Frequency
CustomerID,Unnamed: 1_level_1
1,3.4
2,3.4
3,3.8
4,3.8
5,3.2
6,4.0
7,3.9
8,3.2
9,3.6
10,3.0


In [64]:
#--- Monetary data
monetary_data = df_sales_full.groupby(["CustomerID"])["TotalPrice"].agg(["sum"])
monetary_data.rename(columns={"sum":"Monetary"}, inplace=True)
monetary_data["Monetary"] = (monetary_data["Monetary"]/count_weeks).round(1)
monetary_data.head()

#--- Create RFM table by combining Recency, Frequency, and Monetary data
df_RFM = pd.concat([recency_data, frequency_data, monetary_data], axis=1)
df_RFM.reset_index(inplace=True)
df_RFM.head()


Unnamed: 0,CustomerID,LastPurchasedDate,Recency,Frequency,Monetary
0,1,2018-05-06,4,3.4,171.5
1,2,2018-05-04,6,3.4,183.5
2,3,2018-05-09,1,3.8,181.5
3,4,2018-05-08,2,3.8,170.8
4,5,2018-05-09,1,3.2,144.9


In [68]:
#--------------- Identify outliers and handle them
#----- Check Recency, Frequency, and Monetary distribution
# Check distribution
fig = make_subplots(
    rows=3, cols=2,
    subplot_titles=(
        "Recency: Distribution", "Recency: Boxplot", 
        "Frequency: Distribution", "Frequency: Boxplot", 
        "Monetary: Distribution", "Monetary: Boxplot"
    ),
    x_title="Customer Count",
)

for r, key in enumerate(["Recency", "Frequency", "Monetary"]):
    fig.add_histogram(
        x=df_RFM[f"{key}"], row=(r+1), col=1,
    )
    fig.add_box(
        x=df_RFM[f"{key}"], row=(r+1), col=2,
    )

fig.update_layout(height=700, width=1000, showlegend=False)

fig.show()

In [None]:
#------- There are some outliers there --> need to handle them.
# These outliers affects to averages, the simple way to handle them is using IQR (Interquartile Range) Adjustment
for key in ["Recency", "Frequency", "Monetary"]:
    Q1 = np.percentile(df_RFM[key], 25)  # First quartile (Q1)
    Q3 = np.percentile(df_RFM[key], 75)  # Third quartile (Q3)
    IQR = Q3 - Q1
    v_flooring = round(Q1 - 1.5 * IQR,2)
    v_capping = round(Q3 + 1.5 * IQR,2)
    df_RFM[key + "_Original"] = df_RFM[key].copy() #keep the original values to compare
    df_RFM[key] = np.clip(df_RFM[key], v_flooring, v_capping)

In [70]:
# Check distribution again by visualizing
fig = make_subplots(
    rows=3, cols=2,
    subplot_titles=(
        "Recency", "Original Recency", 
        "Frequency", "Original Frequency", 
        "Monetary", "Original Monetary"
    ),
    x_title="Customer Count",
)

for r, key in enumerate(["Recency", "Frequency", "Monetary"]):
    fig.add_box(
        x=df_RFM[f"{key}"], row=(r+1), col=1,
    )
    fig.add_box(
        x=df_RFM[f"{key}_Original"], row=(r+1), col=2,
    )

fig.update_layout(height=700, width=1000, showlegend=False)

fig.show()

In [71]:
# Remove original values
df_RFM.drop(columns=["Recency_Original", "Frequency_Original", "Monetary_Original"], inplace=True)
df_RFM.rename(columns={"Recency":"RValue", "Frequency":"FValue", "Monetary":"MValue"}, inplace=True)
df_RFM

Unnamed: 0,CustomerID,LastPurchasedDate,RValue,FValue,MValue
0,1,2018-05-06,4,3.4,171.5
1,2,2018-05-04,6,3.4,183.5
2,3,2018-05-09,1,3.8,181.5
3,4,2018-05-08,2,3.8,170.8
4,5,2018-05-09,1,3.2,144.9
...,...,...,...,...,...
98754,98755,2018-05-07,3,4.0,4592.1
98755,98756,2018-05-09,1,4.5,5260.6
98756,98757,2018-05-03,6,3.3,4761.7
98757,98758,2018-05-08,2,3.4,4276.9


In [90]:
#---- Score R, F, M values using cut() and qcut() method
# Most recent purchase should receive the highest score. Because RValue distribution is strong skewed, so should use cut() method
df_RFM["RScore"] = pd.cut(df_RFM["RValue"], 5, labels=["5", "4", "3", "2", "1"])

# The more purchases a customer has made, the higher the score
df_RFM["FScore"] = pd.qcut(df_RFM["FValue"], 5, labels=["1", "2", "3", "4", "5"])

# the more money a customer has spent, the higher the score
df_RFM["MScore"] = pd.qcut(df_RFM["MValue"], 5, labels=["1", "2", "3", "4", "5"])

# Combine RScore, FScore, MScore to RFMScore
df_RFM["RFMScore"] = (df_RFM["RScore"].astype(str) + df_RFM["FScore"].astype(str) + df_RFM["MScore"].astype(str))
df_RFM

Unnamed: 0,CustomerID,LastPurchasedDate,RValue,FValue,MValue,RScore,FScore,MScore,RFMScore
0,1,2018-05-06,4,3.4,171.5,3,2,1,321
1,2,2018-05-04,6,3.4,183.5,1,2,1,121
2,3,2018-05-09,1,3.8,181.5,5,3,1,531
3,4,2018-05-08,2,3.8,170.8,5,3,1,531
4,5,2018-05-09,1,3.2,144.9,5,1,1,511
...,...,...,...,...,...,...,...,...,...
98754,98755,2018-05-07,3,4.0,4592.1,4,4,5,445
98755,98756,2018-05-09,1,4.5,5260.6,5,5,5,555
98756,98757,2018-05-03,6,3.3,4761.7,1,1,5,115
98757,98758,2018-05-08,2,3.4,4276.9,5,2,5,525


In [None]:
# Merge RFM into Customers data based on CustomerID:
df_customer = df_customer.merge(df_RFM, on="CustomerID", how="left")
# df_customer.drop(columns=["LastPurchasedDate", "RValue", "FValue", "MValue"], inplace=True)
df_customer

Unnamed: 0,CustomerID,FirstName,LastName,CityID,Address,LastPurchasedDate,RValue,FValue,MValue,RScore,FScore,MScore,RFMScore
0,1,Stefanie,Frye,79,97 Oak Avenue,2018-05-06,4,3.4,171.5,3,2,1,321
1,2,Sandy,Kirby,96,52 White First Freeway,2018-05-04,6,3.4,183.5,1,2,1,121
2,3,Lee,Zhang,55,921 White Fabien Avenue,2018-05-09,1,3.8,181.5,5,3,1,531
3,4,Regina,Avery,40,75 Old Avenue,2018-05-08,2,3.8,170.8,5,3,1,531
4,5,Daniel,Mccann,2,283 South Green Hague Avenue,2018-05-09,1,3.2,144.9,5,1,1,511
...,...,...,...,...,...,...,...,...,...,...,...,...,...
98754,98755,Yvette,Campos,27,945 Oak Parkway,2018-05-07,3,4.0,4592.1,4,4,5,445
98755,98756,Angelo,Mc Millan,82,99 Fabien Street,2018-05-09,1,4.5,5260.6,5,5,5,555
98756,98757,Shari,Prince,81,791 Milton Drive,2018-05-03,6,3.3,4761.7,1,1,5,115
98757,98758,Stuart,Cameron,57,149 Clarendon Road,2018-05-08,2,3.4,4276.9,5,2,5,525


In [None]:
# Merge Customer Segment definitions into Customer data based on RFMScore:
# Convert df_customer_segments["RFMScoreMap"] to string data type before merging
df_customer_segments["RFMScoreMap"] = df_customer_segments["RFMScoreMap"].astype(str)
df_customer = df_customer.merge(df_customer_segments, left_on="RFMScore", right_on="RFMScoreMap", how="left")
# Delete duplicated column
df_customer.drop(columns=["RFMScoreMap"], inplace=True)
df_customer

Unnamed: 0,CustomerID,FirstName,LastName,CityID,Address,LastPurchasedDate,RValue,FValue,MValue,RScore,FScore,MScore,RFMScore,RFMScoreMap,Segment,SegmentIndex,Color,Description
0,1,Stefanie,Frye,79,97 Oak Avenue,2018-05-06,4,3.4,171.5,3,2,1,321,321,About To Sleep,7,#118DFF,"Below-average recency, frequency, and monetary..."
1,2,Sandy,Kirby,96,52 White First Freeway,2018-05-04,6,3.4,183.5,1,2,1,121,121,Lost customers,11,#744EC2,"Lowest recency, frequency, and monetary scores."
2,3,Lee,Zhang,55,921 White Fabien Avenue,2018-05-09,1,3.8,181.5,5,3,1,531,531,Potential Loyalists,3,#1AAB40,"A recent customer, who spent a good amount"
3,4,Regina,Avery,40,75 Old Avenue,2018-05-08,2,3.8,170.8,5,3,1,531,531,Potential Loyalists,3,#1AAB40,"A recent customer, who spent a good amount"
4,5,Daniel,Mccann,2,283 South Green Hague Avenue,2018-05-09,1,3.2,144.9,5,1,1,511,511,New Customers,4,#197278,"visited most recently, but not often, and have..."
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
98754,98755,Yvette,Campos,27,945 Oak Parkway,2018-05-07,3,4.0,4592.1,4,4,5,445,445,Champions,1,#6B007B,"have visited most recently, visited most frequ..."
98755,98756,Angelo,Mc Millan,82,99 Fabien Street,2018-05-09,1,4.5,5260.6,5,5,5,555,555,Champions,1,#6B007B,"have visited most recently, visited most frequ..."
98756,98757,Shari,Prince,81,791 Milton Drive,2018-05-03,6,3.3,4761.7,1,1,5,115,115,Cannot Lose Them,9,#E66C37,has spent a great amount and visited often but...
98757,98758,Stuart,Cameron,57,149 Clarendon Road,2018-05-08,2,3.4,4276.9,5,2,5,525,525,Promising,5,#15C6F4,"Average recency, frequency, and monetary scores"


#### LOADING DATA INTO DATABASE (before loading, please create database and related tables in the database)

In [27]:
connection_string = 'mssql+pyodbc://sa:%s@localhost:1433/GroceryStoreSales?driver=ODBC+Driver+17+for+SQL+Server' % quote_plus("Abc@123456")

sql_engine = create_engine(connection_string)

In [34]:
# load data into Customer table:
cust = pd.read_sql('SELECT * FROM Customer', sql_engine)
col_list = cust.columns
df_customer_ok = df_customer[col_list]
print(col_list)
raw_conn = sql_engine.raw_connection()
try:
    raw_conn.fast_executemany = True
    with sql_engine.begin() as conn:
        conn.execute(text('SET IDENTITY_INSERT Customer ON'))  # Enable inserting explicit identity values
        df_customer_ok.to_sql(name='Customer', con=conn, if_exists='append', index=False)
        conn.execute(text('SET IDENTITY_INSERT Customer OFF'))  # Disable after insert
finally:
    raw_conn.close()
cust_validate = pd.read_sql('SELECT * FROM Customer;', sql_engine)
print(cust_validate.shape)
print(cust_validate.head())

Index(['CustomerID', 'FirstName', 'LastName', 'Address', 'CityID'], dtype='object')
(98759, 5)
   CustomerID FirstName LastName                       Address  CityID
0           1  Stefanie     Frye                 97 Oak Avenue      79
1           2     Sandy    Kirby        52 White First Freeway      96
2           3       Lee    Zhang       921 White Fabien Avenue      55
3           4    Regina    Avery                 75 Old Avenue      40
4           5    Daniel   Mccann  283 South Green Hague Avenue       2


In [None]:
# load data into Product table:
prod = pd.read_sql('SELECT * FROM Product', sql_engine)
col_list = prod.columns
df_product_ok = df_product[col_list]
print(col_list)
raw_conn = sql_engine.raw_connection()
try:
    raw_conn.fast_executemany = True
    with sql_engine.begin() as conn:
        conn.execute(text('SET IDENTITY_INSERT Customer OFF'))
        conn.execute(text('SET IDENTITY_INSERT Product ON'))  # Enable inserting explicit identity values
        df_product_ok.to_sql(name='Product', con=conn, if_exists='append', index=False)
        conn.execute(text('SET IDENTITY_INSERT Product OFF'))  # Disable after insert
finally:
    raw_conn.close()
prod_validate = pd.read_sql('SELECT * FROM Product;', sql_engine)
print(prod_validate.shape)
print(prod_validate.head())

Index(['ProductID', 'ProductName', 'Price', 'CategoryID', 'Class',
       'ModifyDate', 'Resistant', 'IsAllergic', 'VitalityDays'],
      dtype='object')
(452, 9)
   ProductID                 ProductName    Price  CategoryID   Class  \
0          1         Flour - Whole Wheat  74.2988           3  Medium   
1          2  Cookie Chocolate Chip With  91.2329           3  Medium   
2          3          Onions - Cippolini   9.1379           9  Medium   
3          4  Sauce - Gravy, Au Jus, Mix  54.3055           9  Medium   
4          5      Artichokes - Jerusalem  65.4771           2     Low   

               ModifyDate Resistant IsAllergic  VitalityDays  
0 2018-02-16 08:21:49.190   Durable    Unknown           0.0  
1 2017-02-12 11:39:10.970   Unknown    Unknown           0.0  
2 2018-03-15 08:11:51.560      Weak      False         111.0  
3 2017-07-16 00:46:28.880   Durable    Unknown           0.0  
4 2017-08-16 14:13:35.430   Durable       True          27.0  


In [None]:
# Get exactly the column names
prod = pd.read_sql('SELECT * FROM Sales', sql_engine)
col_list = prod.columns
df_sales_ok = df_sales_full[col_list]
print(col_list)
csv_sales_file = "Data/transformed_sales.csv"
container_name = 'azuresqledge'
container_file_path = "/var/opt/mssql-extensibility/data/transformed_sales.csv"
# Because when creating the Docker container, the binded mount was not created, so we must use subprocess to run docker commands
# to access the Docker volume
# Check if the file exists and delete it to prevent conflicts
if os.path.exists(csv_sales_file):
    os.remove(csv_sales_file)  # Remove old file to ensure fresh data
df_sales_ok.to_csv(csv_sales_file, index=False, sep='|')

# Check if the file exists in Docker mounted folder and delete it to prevent conflicts
check_result = subprocess.run(
    ["docker", "exec", container_name, "test", "-f", container_file_path],
    check=False, # Don't raise an exception on non-zero exit code
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE
)
if check_result.returncode == 0: # if the file exists
    # Remove the file
    subprocess.run(
        ["docker", "exec", container_name, "rm", container_file_path],
        check=True
    )
    print("File removed successfully!")

# Copy csv file to Docker mounted folder
subprocess.run(
    ["docker", "cp", csv_sales_file, f"{container_name}:{container_file_path}"],
    check=True
)
# Check if the file exists --> that means copying was successful
check_result = subprocess.run(
    ["docker", "exec", container_name, "test", "-f", container_file_path],
    check=False, # Don't raise an exception on non-zero exit code
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE
)
if check_result.returncode == 0: # if the file exists
    print("File copied successfully!")
    with sql_engine.begin() as conn:
        bulk_insert_query = f"""BULK INSERT Sales
                                FROM '{container_file_path}'
                                WITH(
                                    FIELDTERMINATOR = '|',
                                    ROWTERMINATOR = '0x0A',
                                    FIRSTROW = 2
                                )
                            """
        conn.execute(text(bulk_insert_query))
        conn.commit()

    sales_validate = pd.read_sql('SELECT * FROM Sales;', sql_engine)
    print(sales_validate.shape)
    print(sales_validate.head())
else:
    print("Copying was fail!")



Index(['SalesID', 'CustomerID', 'SalesDate', 'ProductID', 'Quantity',
       'UnitPrice', 'Discount', 'TotalPrice'],
      dtype='object')
File removed successfully!
File copied successfully!
(6690599, 8)
   SalesID  CustomerID   SalesDate  ProductID  Quantity  UnitPrice  Discount  \
0  6355001       24527  2018-04-04        181         7    64.5718       0.0   
1  6355021       25493  2018-02-28        114         7    79.5638       0.0   
2  6355022       25669  2018-03-19         20         7    85.9011       0.0   
3  6355027       24827  2018-01-09         96         7    62.1993       0.0   
4  6355043       24474  2018-01-08        389         7    62.4447       0.0   

   TotalPrice  
0      452.00  
1      556.95  
2      601.31  
3      435.40  
4      437.11  


In [61]:
# Check field length
#print(df_sales_ok.applymap(lambda x: len(str(x)) if pd.notnull(x) else 0).max())