**Initial import & configuration**

In [11]:
pip install pyarrow

Collecting pyarrowNote: you may need to restart the kernel to use updated packages.


You should consider upgrading via the 'c:\Users\Maciek\AppData\Local\Programs\Python\Python310\python.exe -m pip install --upgrade pip' command.



  Downloading pyarrow-13.0.0-cp310-cp310-win_amd64.whl (24.3 MB)
Installing collected packages: pyarrow
Successfully installed pyarrow-13.0.0


In [12]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SparkSession
import pyarrow as pa
import pyarrow.parquet as pq

In [13]:
spark = SparkSession.builder\
        .appName("PysparkCustomerSegmentation")\
        .getOrCreate()

**JSON configuration**

In [14]:
config = spark.read.option("multiline","true").json("C:/Users/Maciek/Desktop/pyspark/config_file.json")
config.printSchema()

root
 |-- sources: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- path: string (nullable = true)
 |    |    |-- schema: struct (nullable = true)
 |    |    |    |-- fields: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |    |    |-- type: string (nullable = true)
 |    |    |-- table_name: string (nullable = true)



In [15]:
source_config = config.withColumn("sources", explode(col("sources"))).select("sources.*")
source_config.show()
source_config = source_config.collect()

+--------------------+--------------------+-----------+
|                path|              schema| table_name|
+--------------------+--------------------+-----------+
|C:/Users/Maciek/D...|{[{CustomerID, in...|DimCustomer|
|C:/Users/Maciek/D...|{[{OrderID, integ...|SalesOnline|
|C:/Users/Maciek/D...|{[{OrderID, integ...|     vSales|
+--------------------+--------------------+-----------+



**Ingest data from csv files with json congifuration**

In [16]:
def load_data():

    for i in range(len(source_config)):

        source = source_config[i]

        df_name = f"df_{source['table_name']}"

        config_schema = source['schema']

        schema = StructType([
            StructField(field['name'], IntegerType() if field['type'] == "integer" else
                                              (StringType() if field['type'] == "string" else
                                              (DateType() if field['type'] == "date" else
                                              (TimestampType() if field['type'] == "timestamp" else
                                              (FloatType() if field['type'] == "float" else
                                              (BooleanType() if field['type'] == "boolean" else StringType()))))),
            True
        )
            for field in config_schema['fields']
        ])

        globals()[df_name] = spark.read.format("csv")\
                                .option("header","true")\
                                .option("delimiter", ';')\
                                .schema(schema)\
                                .csv(source['path'])

In [17]:
load_data()

df_DimCustomer.show()
df_SalesOnline.show()
df_vSales.show()

+----------+----------+----------+----------+------+----+-------------+--------------------+---------------------+---------+-------+
|CustomerID| FirstName|  LastName| BirthDate|Gender|Kids|MartialStatus|     DeliveryAddress|CorrespondenceAddress|StartTime|EndTime|
+----------+----------+----------+----------+------+----+-------------+--------------------+---------------------+---------+-------+
|       151|      Anna|Drzewiecka|1993-03-15|     F|   0|       single|ul. Bohra-Komorow...| ul. Wyścigowa 10,...|     NULL|   NULL|
|       152|       Jan|  Kowalski|1988-07-02|     M|   2|      married|ul. Mickiewicza 1...| ul. Mickiewicza 1...|     NULL|   NULL|
|       153|    Michał| Jankowski|1998-11-10|     M|   0|       single|ul. Piękna 9, 00-...| ul. Piękna 9, 00-...|     NULL|   NULL|
|       154|    Alicja|       Lis|2002-04-20|     F|   1|       single|ul. Górna 15/2, 7...| ul. Górna 15/2, 7...|     NULL|   NULL|
|       155|     Marek| Zieliński|1985-06-30|     M|   0|     divorce

**Data tranform & aggregations for customer segmentation**

In [18]:
df_vSales = df_vSales.drop(col("OrderDate"))
df = df_SalesOnline.join(df_vSales, on="OrderID", how="left")
df = df.filter(col("Customer") != 'purchase without registration')
df = df.groupby("CustomerID").agg(\
                                    round(sum(col("SalesAmount")),2).alias("TotalSpends"),\
                                    sum(col("PositionCount")).alias("PurchasedItems"),\
                                    count(col("OrderID")).alias("OrdersCount"),\
                                    min(col("OrderDate")).alias("FirstOrder"),\
                                    max(col("OrderDate")).alias("LastOrder")
                                    )
df = df\
        .withColumn("AverageBasketSize", round(df.PurchasedItems/df.OrdersCount,2))\
        .withColumn("AverageBasketValue", round(df.TotalSpends/df.OrdersCount,2))
df.show()

+----------+-----------+--------------+-----------+--------------------+--------------------+-----------------+------------------+
|CustomerID|TotalSpends|PurchasedItems|OrdersCount|          FirstOrder|           LastOrder|AverageBasketSize|AverageBasketValue|
+----------+-----------+--------------+-----------+--------------------+--------------------+-----------------+------------------+
|       155|     617.93|             6|          5| 2023-06-02 00:00:00| 2023-09-15 00:00:00|              1.2|            123.59|
|       183|     837.33|             5|          4| 2023-06-12 00:00:00| 2023-10-06 00:00:00|             1.25|            209.33|
|       159|     199.89|             9|          5| 2023-06-03 00:00:00| 2023-09-18 00:00:00|              1.8|             39.98|
|       157|      865.1|             9|          5| 2023-06-02 00:00:00| 2023-09-16 00:00:00|              1.8|            173.02|
|       190|     390.97|             2|          1|2023-10-14 19:04:...|2023-10-14 

In [19]:
df_DimCustomer = df_DimCustomer.drop("DeliveryAddress")

df_DimCustomer = \
    df_DimCustomer.withColumn("City", substring_index("CorrespondenceAddress"," ",-1))\
                    .withColumn("Age", round(datediff(current_date(),'BirthDate')/365.25,0))\
                    .withColumn("AgeSexSegment", when((col("Gender")=="M") & (col("Age")<26), "M18")\
                                                .when((col("Gender")=="M") & (col("Age")<35), "M26")\
                                                .when((col("Gender")=="M") & (col("Age")<50), "M35")\
                                                .when((col("Gender")=="M") & (col("Age")>50), "M50")\
                                                .when((col("Gender")=="F") & (col("Age")<26), "F18")\
                                                .when((col("Gender")=="F") & (col("Age")<35), "F26")\
                                                .when((col("Gender")=="F") & (col("Age")<50), "F35")\
                                                .when((col("Gender")=="F") & (col("Age")>50), "F50"))\
                    .withColumn("FamilySegment", when((col("Gender")=="M") & (col("MartialStatus")=="married") & (col("Kids")>0), "head of the family")\
                                                .when((col("Gender")=="M") & (col("MartialStatus")=="married") & (col("Kids")==0), "husband")\
                                                .when((col("Gender")=="M") & (col("MartialStatus")!="married") & (col("Kids")>0),"father")\
                                                .when((col("Gender")=="M") & (col("MartialStatus")!="married"),"single")\
                                                .when((col("Gender")=="F") & (col("MartialStatus")=="married") & (col("Kids")>0),"mother&wife")\
                                                .when((col("Gender")=="F") & (col("MartialStatus")=="married") & (col("Kids")==0),"wife")\
                                                .when((col("Gender")=="F") & (col("MartialStatus")!="married") & (col("Kids")>0),"single mother")\
                                                .when((col("Gender")=="F") & (col("MartialStatus")!="married"), "single"))\
                    .withColumn("DemographicSegment",when(col("City").isin(["Warszawa", "Kraków", "Poznań", "Wrocław", "Łódź", "Gdańsk"]), "big city")\
                                                .when(col("City").isin(["Katowice", "Szczecin", "Bydgoszcz", "Częstochowa", "Lublin", "Białystok"]), "city")\
                                                .otherwise("small town/village"))

df = df_DimCustomer.join(df, on="CustomerID", how="inner")

df.show()

+----------+----------+------------+----------+------+----+-------------+---------------------+---------+-------+---------+----+-------------+------------------+------------------+-----------+--------------+-----------+--------------------+--------------------+-----------------+------------------+
|CustomerID| FirstName|    LastName| BirthDate|Gender|Kids|MartialStatus|CorrespondenceAddress|StartTime|EndTime|     City| Age|AgeSexSegment|     FamilySegment|DemographicSegment|TotalSpends|PurchasedItems|OrdersCount|          FirstOrder|           LastOrder|AverageBasketSize|AverageBasketValue|
+----------+----------+------------+----------+------+----+-------------+---------------------+---------+-------+---------+----+-------------+------------------+------------------+-----------+--------------+-----------+--------------------+--------------------+-----------------+------------------+
|       155|     Marek|   Zieliński|1985-06-30|     M|   0|     divorced| ul. Przybyszewski...|     NUL

**Save results to parquet file**

In [21]:
pandas_df = df.toPandas()
table = pa.Table.from_pandas(pandas_df)
pq.write_table(table, "C:/Users/Maciek/Desktop/pyspark/result/customer_segmentation.parquet")

#df.write.mode("overwrite").csv("C:/Users/Maciek/Desktop/pyspark/result/", header=True)

  if not is_datetime64tz_dtype(pser.dtype):
  if is_datetime64tz_dtype(s.dtype):
  if not is_datetime64tz_dtype(pser.dtype):
  if is_datetime64tz_dtype(s.dtype):
  if not is_datetime64tz_dtype(pser.dtype):
  if is_datetime64tz_dtype(s.dtype):
  if not is_datetime64tz_dtype(pser.dtype):
  if is_datetime64tz_dtype(s.dtype):
