In [1]:
import polars as pl
import findspark

from pyspark.sql import SparkSession, functions as F

In [2]:
findspark.init("C:\Program Files\Spark\spark-3.3.1-bin-hadoop3")

In [3]:
spark = (
    SparkSession.builder
        .master("yarn")
        .appName("Transformation Part")
        .config("spark.sql.adaptive.enabled", True)
        .enableHiveSupport()
        .getOrCreate()
)

In [4]:
from spark_utils import *

In [5]:
hive = read_hive(spark, db_name="nifi", table_name="armut")

2023-03-01 22:57:27,351 - spark_utils - INFO - read_hive function is started.
2023-03-01 22:57:31,733 - spark_utils - INFO - read_hive function was finished.


In [6]:
hive.show()

+------+---------+----------+----------+----------+
|userid|serviceid|categoryid|createdate|createtime|
+------+---------+----------+----------+----------+
|  5200|       18|         4|2017-08-06|  19:40:00|
|  5537|       18|         4|2017-08-07|  05:38:00|
|  5676|        2|         0|2017-08-07|  08:21:00|
|  5853|       48|         5|2017-08-07|  13:11:00|
|  6289|       46|         4|2017-08-07|  12:48:00|
|  5304|       18|         4|2017-08-07|  19:37:00|
|  5390|       19|         6|2017-08-08|  05:17:00|
|  5358|       48|         5|2017-08-08|  10:12:00|
|  5358|       32|         4|2017-08-08|  10:26:00|
|  5358|        6|         7|2017-08-08|  10:30:00|
|  5588|        0|         8|2017-08-08|  16:37:00|
|  5358|       48|         5|2017-08-09|  01:45:00|
|  6379|       49|         1|2017-08-09|  03:04:00|
|  5358|       32|         4|2017-08-09|  10:58:00|
|  5980|        2|         0|2017-08-09|  12:18:00|
|  5855|       31|         6|2017-08-09|  14:22:00|
|  5368|    

In [7]:
hive.printSchema()

root
 |-- userid: integer (nullable = true)
 |-- serviceid: integer (nullable = true)
 |-- categoryid: integer (nullable = true)
 |-- createdate: string (nullable = true)
 |-- createtime: string (nullable = true)



# Merge Time Values as Timestamp

In [8]:
hive = hive.withColumn("timestamp", F.concat(F.col("createdate"), F.lit(" ") , F.col("createtime")))

In [9]:
hive.show(5)

+------+---------+----------+----------+----------+-------------------+
|userid|serviceid|categoryid|createdate|createtime|          timestamp|
+------+---------+----------+----------+----------+-------------------+
|  5200|       18|         4|2017-08-06|  19:40:00|2017-08-06 19:40:00|
|  5537|       18|         4|2017-08-07|  05:38:00|2017-08-07 05:38:00|
|  5676|        2|         0|2017-08-07|  08:21:00|2017-08-07 08:21:00|
|  5853|       48|         5|2017-08-07|  13:11:00|2017-08-07 13:11:00|
|  6289|       46|         4|2017-08-07|  12:48:00|2017-08-07 12:48:00|
+------+---------+----------+----------+----------+-------------------+
only showing top 5 rows



In [10]:
hive = hive.withColumn("timestamp", F.to_timestamp("timestamp"))

In [11]:
hive = hive.drop("createdate", "createtime")

In [12]:
hive.show(5)

+------+---------+----------+-------------------+
|userid|serviceid|categoryid|          timestamp|
+------+---------+----------+-------------------+
|  5200|       18|         4|2017-08-06 19:40:00|
|  5537|       18|         4|2017-08-07 05:38:00|
|  5676|        2|         0|2017-08-07 08:21:00|
|  5853|       48|         5|2017-08-07 13:11:00|
|  6289|       46|         4|2017-08-07 12:48:00|
+------+---------+----------+-------------------+
only showing top 5 rows



In [13]:
hive.printSchema()

root
 |-- userid: integer (nullable = true)
 |-- serviceid: integer (nullable = true)
 |-- categoryid: integer (nullable = true)
 |-- timestamp: timestamp (nullable = true)



# Understanding Dataframe Columns

| Columns | Description |
| --- | --- |
| userid | Customer unique id. |
| serviceid | Category service's name id. |
| categoryid | Category's name id. |
| timestamp | Time |


For example: If category is education, service can be math course.

# Data Preparation
Lets concat serviceid to categoryid.

In [14]:
hive = hive.withColumn("job", F.concat(F.col("serviceid"), F.lit("_"), F.col("categoryid")))

In [15]:
hive.show(5)

+------+---------+----------+-------------------+----+
|userid|serviceid|categoryid|          timestamp| job|
+------+---------+----------+-------------------+----+
|  5200|       18|         4|2017-08-06 19:40:00|18_4|
|  5537|       18|         4|2017-08-07 05:38:00|18_4|
|  5676|        2|         0|2017-08-07 08:21:00| 2_0|
|  5853|       48|         5|2017-08-07 13:11:00|48_5|
|  6289|       46|         4|2017-08-07 12:48:00|46_4|
+------+---------+----------+-------------------+----+
only showing top 5 rows



For apply association rule learning, data has to has shopping card. Can to define shopping cart at monthly. Firstly lets define year-month column.

In [16]:
hive = hive.withColumn("year_month", 
                       F.concat(F.year("timestamp"), F.lit("-"), F.month("timestamp")))

In [17]:
hive.show(5)

+------+---------+----------+-------------------+----+----------+
|userid|serviceid|categoryid|          timestamp| job|year_month|
+------+---------+----------+-------------------+----+----------+
|  5200|       18|         4|2017-08-06 19:40:00|18_4|    2017-8|
|  5537|       18|         4|2017-08-07 05:38:00|18_4|    2017-8|
|  5676|        2|         0|2017-08-07 08:21:00| 2_0|    2017-8|
|  5853|       48|         5|2017-08-07 13:11:00|48_5|    2017-8|
|  6289|       46|         4|2017-08-07 12:48:00|46_4|    2017-8|
+------+---------+----------+-------------------+----+----------+
only showing top 5 rows



For make group on shopping cart, Lets concat userid to year_month values.

In [18]:
hive = hive.withColumn("cardid",
               F.concat(F.col("userid"), F.lit("_"), F.col("year_month")))

In [19]:
hive.show(5)

+------+---------+----------+-------------------+----+----------+-----------+
|userid|serviceid|categoryid|          timestamp| job|year_month|     cardid|
+------+---------+----------+-------------------+----+----------+-----------+
|  5200|       18|         4|2017-08-06 19:40:00|18_4|    2017-8|5200_2017-8|
|  5537|       18|         4|2017-08-07 05:38:00|18_4|    2017-8|5537_2017-8|
|  5676|        2|         0|2017-08-07 08:21:00| 2_0|    2017-8|5676_2017-8|
|  5853|       48|         5|2017-08-07 13:11:00|48_5|    2017-8|5853_2017-8|
|  6289|       46|         4|2017-08-07 12:48:00|46_4|    2017-8|6289_2017-8|
+------+---------+----------+-------------------+----+----------+-----------+
only showing top 5 rows



# Associate Rule Learning
## Creating Matrix

In [20]:
pivot = hive \
    .groupby("cardid") \
    .pivot("job") \
    .count() \
    .na.fill(0)

In [21]:
pivot.limit(5).toPandas()

Unnamed: 0,cardid,0_8,10_9,11_11,12_7,13_11,14_7,15_1,16_8,17_5,18_4,19_6,1_4,20_5,21_5,22_0,23_10,24_10,25_0,26_7,27_7,28_4,29_0,2_0,30_2,31_6,32_4,33_4,34_6,35_11,36_1,37_0,38_4,39_10,3_5,40_8,41_3,42_1,43_2,44_0,45_6,46_4,47_7,48_5,49_1,4_5,5_11,6_7,7_3,8_5,9_4
0,12037_2018-1,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0
1,7376_2017-11,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
2,9626_2017-12,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,1,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
3,8674_2018-4,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,2,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
4,12108_2017-12,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


In [22]:
set_score = lambda x: 1 if x > 0 else 0

In [23]:
from pyspark.sql.types import IntegerType

In [24]:
set_score_udf = F.udf(lambda x: set_score(x), IntegerType())

In [25]:
pivot_columns = pivot.columns[1:]

In [26]:
for column in pivot_columns:
    pivot = pivot.withColumn(column, set_score_udf(F.col(column)).alias(column))

In [27]:
pivot.orderBy(F.rand()).limit(20).toPandas()

Unnamed: 0,cardid,0_8,10_9,11_11,12_7,13_11,14_7,15_1,16_8,17_5,18_4,19_6,1_4,20_5,21_5,22_0,23_10,24_10,25_0,26_7,27_7,28_4,29_0,2_0,30_2,31_6,32_4,33_4,34_6,35_11,36_1,37_0,38_4,39_10,3_5,40_8,41_3,42_1,43_2,44_0,45_6,46_4,47_7,48_5,49_1,4_5,5_11,6_7,7_3,8_5,9_4
0,21874_2018-5,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
1,15360_2018-4,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
2,6120_2017-9,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0
3,9459_2017-10,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
4,15039_2017-9,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
5,2960_2018-7,0,0,0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1
6,3698_2018-5,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
7,13879_2018-7,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1
8,2553_2018-3,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1
9,1414_2018-5,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


In [28]:
pivot_columns == pivot.columns[1:]

True

In [29]:
pivot.describe().filter("summary == 'max' ").toPandas()

Unnamed: 0,summary,cardid,0_8,10_9,11_11,12_7,13_11,14_7,15_1,16_8,17_5,18_4,19_6,1_4,20_5,21_5,22_0,23_10,24_10,25_0,26_7,27_7,28_4,29_0,2_0,30_2,31_6,32_4,33_4,34_6,35_11,36_1,37_0,38_4,39_10,3_5,40_8,41_3,42_1,43_2,44_0,45_6,46_4,47_7,48_5,49_1,4_5,5_11,6_7,7_3,8_5,9_4
0,max,9_2018-4,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1


## Applying Association Rules

In [30]:
from mlxtend.frequent_patterns import apriori, association_rules

In [31]:
pivot = pivot.toPandas()

  if LooseVersion(pandas.__version__) < LooseVersion(minimum_pandas_version):
  if LooseVersion(pandas.__version__) < LooseVersion(minimum_pandas_version):


In [32]:
pivot.set_index("cardid", inplace=True)

In [33]:
frequence_itemsets = apriori(pivot, min_support=0.01, use_colnames=True)



In [34]:
frequence_itemsets.sort_values("support", ascending=False).head()

Unnamed: 0,support,itemsets
8,0.238131,(18_4)
19,0.129366,(2_0)
5,0.119469,(15_1)
39,0.066906,(49_1)
28,0.065505,(38_4)


In [56]:
rules = association_rules(frequence_itemsets, metric="support", min_threshold=0.01)

In [57]:
rules.sort_values("lift", ascending=False)

Unnamed: 0,antecedents,consequents,antecedent support,consequent support,support,confidence,lift,leverage,conviction
8,(25_0),(22_0),0.042193,0.046411,0.010461,0.247925,5.341936,0.008503,1.267944
9,(22_0),(25_0),0.046411,0.042193,0.010461,0.225394,5.341936,0.008503,1.236508
4,(15_1),(33_4),0.119469,0.027241,0.011024,0.092276,3.387454,0.00777,1.071647
5,(33_4),(15_1),0.027241,0.119469,0.011024,0.404695,3.387454,0.00777,1.479127
10,(2_0),(22_0),0.129366,0.046411,0.015699,0.121351,2.614705,0.009695,1.08529
11,(22_0),(2_0),0.046411,0.129366,0.015699,0.338255,2.614705,0.009695,1.315663
12,(2_0),(25_0),0.129366,0.042193,0.012775,0.098752,2.340477,0.007317,1.062756
13,(25_0),(2_0),0.042193,0.129366,0.012775,0.302779,2.340477,0.007317,1.24872
2,(15_1),(2_0),0.119469,0.129366,0.032174,0.269309,2.081758,0.016719,1.191521
3,(2_0),(15_1),0.129366,0.119469,0.032174,0.248705,2.081758,0.016719,1.172018


* antecedents: First product
* consequents: Second product
* antecedent support: Probability of seen first product
* consequent support: Probability of seen second product
* support: Probability of seen (first and second) product
* confidence: Probability of seen second product if taken first product
* lift: If taken first product, what will second product increase for taken

In [58]:
rules.dtypes

antecedents            object
consequents            object
antecedent support    float64
consequent support    float64
support               float64
confidence            float64
lift                  float64
leverage              float64
conviction            float64
dtype: object

In [59]:
rules.columns

Index(['antecedents', 'consequents', 'antecedent support',
       'consequent support', 'support', 'confidence', 'lift', 'leverage',
       'conviction'],
      dtype='object')

## Import Dataframe to Elasticsearch

In [60]:
from elasticsearch import Elasticsearch, helpers

In [61]:
elas = Elasticsearch("localhost:9200")

In [62]:
body = {
    "mappings": {
        "properties": {
            "antecedents": { "type": "keyword" },
            "consequents": { "type": "keyword" },
            "antecedent support": { "type": "float" },
            "consequent support": { "type": "float" },
            "support": { "type": "float" },
            "confidence": { "type": "float" },
            "lift": { "type": "float" },
            "leverage": { "type": "float" },
            "conviction": { "type": "float" }
        }
    }
}

In [43]:
elas.indices.create("armut", body=body)

  return response.status, response.getheaders(), raw_data
  return response.status, response.getheaders(), raw_data


{'acknowledged': True, 'shards_acknowledged': True, 'index': 'armut'}

In [52]:
def to_elasticsearch(df, index_name: str):
    for row in df.itertuples():
        yield {
            "_index": index_name,
            "_id": row.Index,
            "_source": {
                "antecedents":  row.antecedents,  
                "consequents": row.consequents, 
                "antecedent support": row._3,
                "consequent support": row._4,
                "support": row.support,
                "confidence": row.confidence,
                "lift": row.lift,
                "leverage": row.leverage,
                "conviction": row.conviction
            }
        }

In [68]:
rules = rules.explode("antecedents")
rules = rules.explode("consequents")

In [69]:
rules.head()

Unnamed: 0,antecedents,consequents,antecedent support,consequent support,support,confidence,lift,leverage,conviction
0,13_11,2_0,0.055745,0.129366,0.012334,0.221251,1.710268,0.005122,1.11799
1,2_0,13_11,0.129366,0.055745,0.012334,0.095339,1.710268,0.005122,1.043767
2,15_1,2_0,0.119469,0.129366,0.032174,0.269309,2.081758,0.016719,1.191521
3,2_0,15_1,0.129366,0.119469,0.032174,0.248705,2.081758,0.016719,1.172018
4,15_1,33_4,0.119469,0.027241,0.011024,0.092276,3.387454,0.00777,1.071647


In [70]:
rules.dtypes

antecedents            object
consequents            object
antecedent support    float64
consequent support    float64
support               float64
confidence            float64
lift                  float64
leverage              float64
conviction            float64
dtype: object

In [71]:
helpers.bulk(elas, to_elasticsearch(rules, "armut"), raise_on_error=False)

  return response.status, response.getheaders(), raw_data
  return response.status, response.getheaders(), raw_data


(16, [])