In [1]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
import os
import sys
from pyspark.sql.functions import *
from pyspark.sql.types import *
from calendar import month_name
import re

In [2]:
spark = SparkSession.builder \
    .appName('app_name') \
    .master('local[*]') \
    .config('spark.sql.execution.arrow.pyspark.enabled', True) \
    .config('spark.sql.session.timeZone', 'UTC') \
    .config('spark.driver.memory','10G') \
    .config('spark.ui.showConsoleProgress', True) \
    .config('spark.sql.repl.eagerEval.enabled', True) \
    .getOrCreate()

In [3]:
folder_path = "DE_CaseStudy_Dataset/Sales_Data/"

valid_month_names = list(month_name[1:])
file_name_pattern = re.compile(r"Sales_(\w+)_([1-2]\d{3})\.csv")

In [4]:
def is_valid_file_name(file_name):
    match = file_name_pattern.match(file_name)
    if match:
        month, year = match.groups()
        return month in valid_month_names and 2000 <= int(year) <= 2100
    return False

In [5]:
# Get a list of file names in the folder
file_names = [f for f in os.listdir(folder_path) if os.path.isfile(os.path.join(folder_path, f))]

print(file_names)

['books_data.csv', 'Books_rating.csv', 'Liquor_Sales.csv', 'Sales_April_2019.csv', 'Sales_August_2019.csv', 'Sales_December_2019.csv', 'Sales_February_2019.csv', 'Sales_January_2019.csv', 'Sales_July_2019.csv', 'Sales_June_2019.csv', 'Sales_March_2019.csv', 'Sales_May_2019.csv', 'Sales_November_2019.csv', 'Sales_October_2019.csv', 'Sales_September_2019.csv']


In [6]:
# Filter the list of file names to include only valid ones
valid_file_names = [f for f in file_names if is_valid_file_name(f)]

print(valid_file_names)

['Sales_April_2019.csv', 'Sales_August_2019.csv', 'Sales_December_2019.csv', 'Sales_February_2019.csv', 'Sales_January_2019.csv', 'Sales_July_2019.csv', 'Sales_June_2019.csv', 'Sales_March_2019.csv', 'Sales_May_2019.csv', 'Sales_November_2019.csv', 'Sales_October_2019.csv', 'Sales_September_2019.csv']


In [7]:
sales_data_df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("quote", "\"") \
    .option("escape", "\"") \
    .load([folder_path + f for f in valid_file_names])

# sales_data_df.count()

In [8]:
# sales_data_df.show()

In [9]:
# sales_data_df.describe().show()

In [10]:
sales_data_df = sales_data_df.withColumn("Order Date", to_timestamp("Order Date", "MM/dd/yy HH:mm"))

result = sales_data_df.select("Order Date")

sales_data_df.show()

+--------+--------------------+----------------+----------+-------------------+--------------------+
|Order ID|             Product|Quantity Ordered|Price Each|         Order Date|    Purchase Address|
+--------+--------------------+----------------+----------+-------------------+--------------------+
|  295665|  Macbook Pro Laptop|               1|    1700.0|2019-12-30 00:01:00|136 Church St, Ne...|
|  295666|  LG Washing Machine|               1|     600.0|2019-12-29 07:03:00|562 2nd St, New Y...|
|  295667|USB-C Charging Cable|               1|     11.95|2019-12-12 18:21:00|277 Main St, New ...|
|  295668|    27in FHD Monitor|               1|    149.99|2019-12-22 15:13:00|410 6th St, San F...|
|  295669|USB-C Charging Cable|               1|     11.95|2019-12-18 12:38:00|43 Hill St, Atlan...|
|  295670|AA Batteries (4-p...|               1|      3.84|2019-12-31 22:58:00|200 Jefferson St,...|
|  295671|USB-C Charging Cable|               1|     11.95|2019-12-16 15:10:00|928 12th St,

In [11]:
sales_mapped_df = sales_data_df.select(
    trim(col("Order ID")).alias("order_id"),
    trim(col("Product")).alias("product"),
    trim(col("Quantity Ordered")).alias("quantity_ordered"),
    trim(col("Price Each")).alias("price_each"),
    trim(col("Order Date")).alias("order_date"),
    trim(col("Purchase Address")).alias("purchase_address")
)

sales_mapped_df.show()

+--------+--------------------+----------------+----------+-------------------+--------------------+
|order_id|             product|quantity_ordered|price_each|         order_date|    purchase_address|
+--------+--------------------+----------------+----------+-------------------+--------------------+
|  295665|  Macbook Pro Laptop|               1|    1700.0|2019-12-30 00:01:00|136 Church St, Ne...|
|  295666|  LG Washing Machine|               1|     600.0|2019-12-29 07:03:00|562 2nd St, New Y...|
|  295667|USB-C Charging Cable|               1|     11.95|2019-12-12 18:21:00|277 Main St, New ...|
|  295668|    27in FHD Monitor|               1|    149.99|2019-12-22 15:13:00|410 6th St, San F...|
|  295669|USB-C Charging Cable|               1|     11.95|2019-12-18 12:38:00|43 Hill St, Atlan...|
|  295670|AA Batteries (4-p...|               1|      3.84|2019-12-31 22:58:00|200 Jefferson St,...|
|  295671|USB-C Charging Cable|               1|     11.95|2019-12-16 15:10:00|928 12th St,

In [12]:
sales_df_cleaned = sales_mapped_df.na.drop("any")

In [13]:
# Check for nulls in each row
columns_with_nulls = [col(column).isNull().alias(column) for column in sales_df_cleaned.columns]

print(columns_with_nulls)

[Column<'(order_id IS NULL) AS order_id'>, Column<'(product IS NULL) AS product'>, Column<'(quantity_ordered IS NULL) AS quantity_ordered'>, Column<'(price_each IS NULL) AS price_each'>, Column<'(order_date IS NULL) AS order_date'>, Column<'(purchase_address IS NULL) AS purchase_address'>]


In [14]:
df_with_nulls = sales_df_cleaned.withColumn("contains_null", lit(0))
for column_with_nulls in columns_with_nulls:
    df_with_nulls = df_with_nulls.withColumn("contains_null", col("contains_null").cast("boolean") | column_with_nulls)

rows_with_nulls = df_with_nulls.filter(
    (col("contains_null") == True) 
)

rows_with_nulls.show()

sales_df_cleaned = df_with_nulls.filter(
    (col("contains_null") == False)  
)

sales_df_cleaned = sales_df_cleaned.drop("contains_null")

sales_df_cleaned.show()

+--------+-------+----------------+----------+----------+----------------+-------------+
|order_id|product|quantity_ordered|price_each|order_date|purchase_address|contains_null|
+--------+-------+----------------+----------+----------+----------------+-------------+
+--------+-------+----------------+----------+----------+----------------+-------------+

+--------+--------------------+----------------+----------+-------------------+--------------------+
|order_id|             product|quantity_ordered|price_each|         order_date|    purchase_address|
+--------+--------------------+----------------+----------+-------------------+--------------------+
|  295665|  Macbook Pro Laptop|               1|    1700.0|2019-12-30 00:01:00|136 Church St, Ne...|
|  295666|  LG Washing Machine|               1|     600.0|2019-12-29 07:03:00|562 2nd St, New Y...|
|  295667|USB-C Charging Cable|               1|     11.95|2019-12-12 18:21:00|277 Main St, New ...|
|  295668|    27in FHD Monitor|      

In [15]:
# Split the address into components
sales_df_cleaned = sales_df_cleaned.withColumn("address_components", split(sales_df_cleaned["purchase_address"], ", "))

# Extract parts of the address
sales_df_cleaned = sales_df_cleaned.withColumn("street", sales_df_cleaned["address_components"][0]) \
    .withColumn("city", sales_df_cleaned["address_components"][1]) \
    .withColumn("state_zip", sales_df_cleaned["address_components"][2])

sales_df_cleaned = sales_df_cleaned.withColumn("city_state", concat_ws(", ", sales_df_cleaned["city"], sales_df_cleaned["state_zip"]))

sales_df_cleaned = sales_df_cleaned.withColumn("state", split(sales_df_cleaned["state_zip"], " ")[0])

In [16]:
sales_df_cleaned.show()

+--------+--------------------+----------------+----------+-------------------+--------------------+--------------------+----------------+-------------+---------+--------------------+-----+
|order_id|             product|quantity_ordered|price_each|         order_date|    purchase_address|  address_components|          street|         city|state_zip|          city_state|state|
+--------+--------------------+----------------+----------+-------------------+--------------------+--------------------+----------------+-------------+---------+--------------------+-----+
|  295665|  Macbook Pro Laptop|               1|    1700.0|2019-12-30 00:01:00|136 Church St, Ne...|[136 Church St, N...|   136 Church St|New York City| NY 10001|New York City, NY...|   NY|
|  295666|  LG Washing Machine|               1|     600.0|2019-12-29 07:03:00|562 2nd St, New Y...|[562 2nd St, New ...|      562 2nd St|New York City| NY 10001|New York City, NY...|   NY|
|  295667|USB-C Charging Cable|               1|  

In [17]:
state_abbr_to_full_name = {
    "AL": "Alabama", "AK": "Alaska", "AZ": "Arizona", "AR": "Arkansas",
    "CA": "California", "CO": "Colorado", "CT": "Connecticut", "DE": "Delaware",
    "FL": "Florida", "GA": "Georgia", "HI": "Hawaii", "ID": "Idaho",
    "IL": "Illinois", "IN": "Indiana", "IA": "Iowa", "KS": "Kansas",
    "KY": "Kentucky", "LA": "Louisiana", "ME": "Maine", "MD": "Maryland",
    "MA": "Massachusetts", "MI": "Michigan", "MN": "Minnesota", "MS": "Mississippi",
    "MO": "Missouri", "MT": "Montana", "NE": "Nebraska", "NV": "Nevada",
    "NH": "New Hampshire", "NJ": "New Jersey", "NM": "New Mexico", "NY": "New York",
    "NC": "North Carolina", "ND": "North Dakota", "OH": "Ohio", "OK": "Oklahoma",
    "OR": "Oregon", "PA": "Pennsylvania", "RI": "Rhode Island", "SC": "South Carolina",
    "SD": "South Dakota", "TN": "Tennessee", "TX": "Texas", "UT": "Utah",
    "VT": "Vermont", "VA": "Virginia", "WA": "Washington", "WV": "West Virginia",
    "WI": "Wisconsin", "WY": "Wyoming"
}


In [18]:
from pyspark.sql.functions import create_map, col, lit
from itertools import chain

state_map = create_map([lit(x) for x in chain(*state_abbr_to_full_name.items())])

sales_df_cleaned = sales_df_cleaned.withColumn("state_full_name", state_map[col("state")])

sales_df_cleaned.show()

+--------+--------------------+----------------+----------+-------------------+--------------------+--------------------+----------------+-------------+---------+--------------------+-----+---------------+
|order_id|             product|quantity_ordered|price_each|         order_date|    purchase_address|  address_components|          street|         city|state_zip|          city_state|state|state_full_name|
+--------+--------------------+----------------+----------+-------------------+--------------------+--------------------+----------------+-------------+---------+--------------------+-----+---------------+
|  295665|  Macbook Pro Laptop|               1|    1700.0|2019-12-30 00:01:00|136 Church St, Ne...|[136 Church St, N...|   136 Church St|New York City| NY 10001|New York City, NY...|   NY|       New York|
|  295666|  LG Washing Machine|               1|     600.0|2019-12-29 07:03:00|562 2nd St, New Y...|[562 2nd St, New ...|      562 2nd St|New York City| NY 10001|New York City,

In [19]:
sales_df_cleaned = sales_df_cleaned.withColumn("order_year", year("order_date"))
sales_df_cleaned = sales_df_cleaned.withColumn("order_month", month("order_date"))
sales_df_cleaned = sales_df_cleaned.withColumn("order_day", dayofmonth("order_date"))
sales_df_cleaned = sales_df_cleaned.withColumn("order_hour", hour("order_date"))
sales_df_cleaned = sales_df_cleaned.withColumn("order_minute", minute("order_date"))
sales_df_cleaned = sales_df_cleaned.withColumn("order_second", second("order_date"))

sales_df_cleaned.show()

+--------+--------------------+----------------+----------+-------------------+--------------------+--------------------+----------------+-------------+---------+--------------------+-----+---------------+----------+-----------+---------+----------+------------+------------+
|order_id|             product|quantity_ordered|price_each|         order_date|    purchase_address|  address_components|          street|         city|state_zip|          city_state|state|state_full_name|order_year|order_month|order_day|order_hour|order_minute|order_second|
+--------+--------------------+----------------+----------+-------------------+--------------------+--------------------+----------------+-------------+---------+--------------------+-----+---------------+----------+-----------+---------+----------+------------+------------+
|  295665|  Macbook Pro Laptop|               1|    1700.0|2019-12-30 00:01:00|136 Church St, Ne...|[136 Church St, N...|   136 Church St|New York City| NY 10001|New York C

In [20]:
sales_df_cleaned.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- product: string (nullable = true)
 |-- quantity_ordered: string (nullable = true)
 |-- price_each: string (nullable = true)
 |-- order_date: string (nullable = true)
 |-- purchase_address: string (nullable = true)
 |-- address_components: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state_zip: string (nullable = true)
 |-- city_state: string (nullable = false)
 |-- state: string (nullable = true)
 |-- state_full_name: string (nullable = true)
 |-- order_year: integer (nullable = true)
 |-- order_month: integer (nullable = true)
 |-- order_day: integer (nullable = true)
 |-- order_hour: integer (nullable = true)
 |-- order_minute: integer (nullable = true)
 |-- order_second: integer (nullable = true)



In [21]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, VectorAssembler

tokenizer = Tokenizer(inputCol="product", outputCol="product_words")
tokenizedTrain = tokenizer.transform(sales_df_cleaned)

In [22]:
swr = StopWordsRemover(inputCol=tokenizer.getOutputCol(), 
                       outputCol="product_words_insightful_words")
sales_df_cleaned = swr.transform(tokenizedTrain)

In [23]:
sales_df_cleaned.show(truncate= False, n=5)

+--------+--------------------+----------------+----------+-------------------+--------------------------------------+----------------------------------------+-------------+-------------+---------+-----------------------+-----+---------------+----------+-----------+---------+----------+------------+------------+------------------------+------------------------------+
|order_id|product             |quantity_ordered|price_each|order_date         |purchase_address                      |address_components                      |street       |city         |state_zip|city_state             |state|state_full_name|order_year|order_month|order_day|order_hour|order_minute|order_second|product_words           |product_words_insightful_words|
+--------+--------------------+----------------+----------+-------------------+--------------------------------------+----------------------------------------+-------------+-------------+---------+-----------------------+-----+---------------+----------+------

In [24]:
from transformers import BertModel, BertTokenizer
import torch

In [25]:
model_name = "bert-base-uncased"
tokenizer = BertTokenizer.from_pretrained(model_name)
model = BertModel.from_pretrained(model_name)

In [26]:
def get_bert_embeddings(product_name):
    tokens = tokenizer.encode(product_name, add_special_tokens=True)
    input_ids = torch.tensor(tokens).unsqueeze(0)
    with torch.no_grad():
        outputs = model(input_ids)
        embeddings = outputs.last_hidden_state.mean(dim=1).squeeze().tolist()
    return embeddings


In [27]:
get_bert_embeddings_udf = udf(get_bert_embeddings)

In [28]:
df_with_embeddings = sales_df_cleaned.withColumn("bert_embeddings", get_bert_embeddings_udf("product"))

In [29]:
from pyspark.ml.linalg import Vectors, VectorUDT

from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, DoubleType


In [30]:
def embeddings_to_vector(embeddings):
    return Vectors.dense(embeddings)

embeddings_to_vector_udf = udf(embeddings_to_vector, VectorUDT())

df_with_embeddings = df_with_embeddings.withColumn("bert_embeddings_vector", embeddings_to_vector_udf("bert_embeddings"))

In [31]:
# df_with_embeddings.show()

In [32]:
df_with_embeddings.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- product: string (nullable = true)
 |-- quantity_ordered: string (nullable = true)
 |-- price_each: string (nullable = true)
 |-- order_date: string (nullable = true)
 |-- purchase_address: string (nullable = true)
 |-- address_components: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state_zip: string (nullable = true)
 |-- city_state: string (nullable = false)
 |-- state: string (nullable = true)
 |-- state_full_name: string (nullable = true)
 |-- order_year: integer (nullable = true)
 |-- order_month: integer (nullable = true)
 |-- order_day: integer (nullable = true)
 |-- order_hour: integer (nullable = true)
 |-- order_minute: integer (nullable = true)
 |-- order_second: integer (nullable = true)
 |-- product_words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- product_words_insightful_words: array

In [33]:
from pyspark.sql.functions import split, col

df_with_embeddings = df_with_embeddings.withColumn("bert_embeddings_array", split(col("bert_embeddings"), ",").cast("array<float>"))

limited_df = df_with_embeddings

In [34]:
limited_df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- product: string (nullable = true)
 |-- quantity_ordered: string (nullable = true)
 |-- price_each: string (nullable = true)
 |-- order_date: string (nullable = true)
 |-- purchase_address: string (nullable = true)
 |-- address_components: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state_zip: string (nullable = true)
 |-- city_state: string (nullable = false)
 |-- state: string (nullable = true)
 |-- state_full_name: string (nullable = true)
 |-- order_year: integer (nullable = true)
 |-- order_month: integer (nullable = true)
 |-- order_day: integer (nullable = true)
 |-- order_hour: integer (nullable = true)
 |-- order_minute: integer (nullable = true)
 |-- order_second: integer (nullable = true)
 |-- product_words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- product_words_insightful_words: array

In [35]:
from pyspark.ml.feature import PCA
from pyspark.ml.clustering import KMeans

pca = PCA(k=2, inputCol="bert_embeddings_vector", outputCol="pca_features")
model = pca.fit(limited_df)
limited_df = model.transform(limited_df)

In [36]:
#clustering
kmeans = KMeans(k=10, featuresCol="pca_features", predictionCol="category")
model = kmeans.fit(limited_df)
limited_df = model.transform(limited_df)

# limited_df.select("order_id", "product", "category").show()

In [37]:
limited_df_cat.select("order_id", "product", "category", "category_label").show()

In [38]:
# limited_df_cat.printSchema()

In [39]:
to_load_limited_df_cat = limited_df_cat.drop("bert_embeddings", "bert_embeddings_vector", "bert_embeddings_array", "pca_features", "product_words", "product_words_insightful_words", "address_components")

sales_df_cleaned = sales_df_cleaned.drop("bert_embeddings", "bert_embeddings_vector", "bert_embeddings_array", "pca_features", "product_words", "product_words_insightful_words", "address_components")

In [40]:
sales_df_cleaned_type = sales_df_cleaned \
    .withColumn("quantity_ordered", col("quantity_ordered").cast(IntegerType())) \
    .withColumn("price_each", col("price_each").cast(DoubleType())) \
    .withColumn("order_date", col("order_date").cast(TimestampType()))

In [41]:
jdbc_url = "jdbc:postgresql://localhost:5432/postgres"
table_name = "ecomm.sales"
table_name2 = "ecomm.sales_category"
properties = {
    "user": "postgres",
    "password": "postgres",
    "driver": "org.postgresql.Driver"
}

# Write to PostgreSQL
sales_df_cleaned_type.write.jdbc(url=jdbc_url, table=table_name, mode="overwrite", properties=properties)
to_load_limited_df_cat.write.jdbc(url=jdbc_url, table=table_name2, mode="overwrite", properties=properties)