### Build the model

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer,VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression,DecisionTreeClassifier, RandomForestClassifier,GBTClassifier

spark = SparkSession.builder \
    .appName("Term Project") \
    .getOrCreate()

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark.sparkContext.setLogLevel("WARN")

csv_file_path = "/home/hadoop/Downloads/train.csv" 
df = spark.read.csv(csv_file_path, header=True, inferSchema=True)

df.show()
df.printSchema()

                                                                                

+---+------+---+---------------+-----------+------------------+-----------+--------------+--------------+--------------------+-------+--------+
| id|Gender|Age|Driving_License|Region_Code|Previously_Insured|Vehicle_Age|Vehicle_Damage|Annual_Premium|Policy_Sales_Channel|Vintage|Response|
+---+------+---+---------------+-----------+------------------+-----------+--------------+--------------+--------------------+-------+--------+
|  1|  Male| 44|              1|       28.0|                 0|  > 2 Years|           Yes|       40454.0|                26.0|    217|       1|
|  2|  Male| 76|              1|        3.0|                 0|   1-2 Year|            No|       33536.0|                26.0|    183|       0|
|  3|  Male| 47|              1|       28.0|                 0|  > 2 Years|           Yes|       38294.0|                26.0|     27|       1|
|  4|  Male| 21|              1|       11.0|                 1|   < 1 Year|            No|       28619.0|               152.0|    203|  

In [3]:
valid_df, train_df = df.randomSplit([0.3, 0.7])

In [4]:
from pyspark.ml.classification import LogisticRegression,DecisionTreeClassifier, RandomForestClassifier,GBTClassifier
from pyspark.ml.feature import VectorAssembler

lr = LogisticRegression(maxIter=10, regParam= 0.01, labelCol="Response")
#dt = DecisionTreeClassifier(featuresCol="features", labelCol="Response")
#rf = RandomForestClassifier(labelCol="Response", featuresCol="features", numTrees=10)
#gbt = GBTClassifier(labelCol="Response", featuresCol="features", maxIter=10)

categorical_columns = ['Gender', 'Vehicle_Age','Vehicle_Damage']
indexers = [StringIndexer(inputCol=col_name, outputCol=col_name + '_index') for col_name in categorical_columns]

feature_columns = ["Age", "Driving_License", "Region_Code", 
                   "Previously_Insured", "Gender_index", "Vehicle_Age_index", 
                   "Vehicle_Damage_index", "Annual_Premium", 
                   "Policy_Sales_Channel", "Vintage"]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

In [5]:
myStages = indexers+[assembler,lr]

# Define the pipeline for transformation
pipeline = Pipeline(stages=myStages)

# Fit and transform the data
Model = pipeline.fit(train_df)
trainingPred= Model.transform(train_df)

                                                                                

#### evaluate model

In [6]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
validationPred = Model.transform(valid_df)
# 準確度
evaluator_accuracy = MulticlassClassificationEvaluator(
    labelCol="Response", 
    predictionCol="prediction", 
    metricName="accuracy"
)
accuracy = evaluator_accuracy.evaluate(validationPred)

# 加權精確度
evaluator_precision = MulticlassClassificationEvaluator(
    labelCol="Response", 
    predictionCol="prediction", 
    metricName="weightedPrecision"
)
precision = evaluator_precision.evaluate(validationPred)


evaluator_recall = MulticlassClassificationEvaluator(
    labelCol="Response", 
    predictionCol="prediction", 
    metricName="weightedRecall"
)
recall = evaluator_recall.evaluate(validationPred)

evaluator_f1 = MulticlassClassificationEvaluator(
    labelCol="Response", 
    predictionCol="prediction", 
    metricName="f1"
)
f1 = evaluator_f1.evaluate(validationPred)

# 合併輸出
print("Accuracy = %g, Precision = %g, Recall = %g, F1 Score = %g" % (accuracy, precision,recall,f1))




Accuracy = 0.878245, Precision = 0.771329, Recall = 0.878245, F1 Score = 0.821322


                                                                                

### 將test.csv一筆一筆發送至kafka模擬streaming data

In [15]:
import csv
from confluent_kafka import Producer

kafka_bootstrap_servers = "localhost:9092"
kafka_topic = "test_data"

csv_file_path = "/home/hadoop/Downloads/test.csv"

# Kafka 生產者配置
producer_config = {
    'bootstrap.servers': kafka_bootstrap_servers,
    'client.id': 'python-producer'
}

# 創建 Kafka 生產者
producer = Producer(producer_config)

# 讀取 CSV 檔案，略過標頭行
with open(csv_file_path, newline='') as csvfile:
    csv_reader = csv.reader(csvfile)
    
    # 略過標頭行
    next(csv_reader, None)
    
    for row in csv_reader:
        # 將每一行轉換為字串
        message_value = ','.join(row)

        # 發送消息到 Kafka 主題
        producer.produce(kafka_topic, value=message_value.encode('utf-8'), key=None)

        # 等待 Kafka 發送消息
        producer.flush()

In [8]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

# Define the schema for the Kafka message value
message_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("Gender", StringType(), True),
    StructField("Age", IntegerType(), True),
    StructField("Driving_License", IntegerType(), True),
    StructField("Region_Code", DoubleType(), True),
    StructField("Previously_Insured", IntegerType(), True),
    StructField("Vehicle_Age", StringType(), True),
    StructField("Vehicle_Damage", StringType(), True),
    StructField("Annual_Premium", DoubleType(), True),
    StructField("Policy_Sales_Channel", DoubleType(), True),
    StructField("Vintage", IntegerType(), True)
])

### 讀取kafka的資料並丟進model裡做預測

#### 單筆資料預測結果

In [9]:
from confluent_kafka import Consumer, KafkaError

# Kafka consumer configuration
consumer_config = {
    'bootstrap.servers': 'localhost:9092',  # Replace with your Kafka broker address
    'group.id': 'my_consumer_group',        # Replace with your consumer group ID
    'auto.offset.reset': 'earliest'
}

# Create a Kafka consumer instance
consumer = Consumer(consumer_config)

# Subscribe to the Kafka topic
consumer.subscribe(['test_data'])  # Replace with your Kafka topic

# Poll for and print the first message
msg = consumer.poll(10.0)  # Timeout set to 10 seconds

# Split the comma-separated values
values = msg.value().decode('utf-8').split(',')

# Map values to the schema
row_values = [
    int(values[0]),
    values[1],
    int(values[2]),
    int(values[3]),
    float(values[4]),
    int(values[5]),
    values[6],
    values[7],
    float(values[8]),
    float(values[9]),
    int(values[10])
]

# Create a PySpark DataFrame from the row values
df = spark.createDataFrame([row_values], schema=message_schema)

# Apply the model for prediction
prediction_df = Model.transform(df)

# Show the predictions (replace with your desired output)
prediction_df.select('id','prediction','probability').show(truncate=False)
consumer.close()

+------+----------+-----------------------------------------+
|id    |prediction|probability                              |
+------+----------+-----------------------------------------+
|381110|0.0       |[0.9921324268746067,0.007867573125393323]|
+------+----------+-----------------------------------------+



#### 128筆資料預測結果

In [10]:
from confluent_kafka import Consumer, KafkaError
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import json



# Kafka consumer configuration
consumer_config = {
    'bootstrap.servers': 'localhost:9092',  # 替換為你的 Kafka broker 位址
    'group.id': 'my_consumer_group',        # 替換為你的 consumer group ID
    'auto.offset.reset': 'earliest'
}

# Create a Kafka consumer instance
consumer = Consumer(consumer_config)

# Subscribe to the Kafka topic
consumer.subscribe(['test_data'])  # 替換為你的 Kafka 主題

# Set the number of messages you want to process
num_messages_to_process = 128
messages_processed = 0

result_df = spark.createDataFrame([], schema=message_schema)

while messages_processed < num_messages_to_process:
    # Poll for messages
    msg = consumer.poll(10.0)  # Timeout set to 10 seconds
    
    if msg is not None:
        if not msg.error():
            # Decode the message value and split the comma-separated values
            values = msg.value().decode('utf-8').split(',')
            
            # Map values to the schema
            row_values = [
                int(values[0]),
                values[1],
                int(values[2]),
                int(values[3]),
                float(values[4]),
                int(values[5]),
                values[6],
                values[7],
                float(values[8]),
                float(values[9]),
                int(values[10])
            ]
            
            # Create a PySpark DataFrame from the row values
            df = spark.createDataFrame([row_values], schema=message_schema)
            
            # Append the DataFrame to the result DataFrame
            result_df = result_df.union(df)
            
            messages_processed += 1

# Apply the model for prediction on the entire result DataFrame
prediction_df = Model.transform(result_df)

# Show the predictions
prediction_df.select('id', 'prediction', 'probability').show(truncate=False)
consumer.close()

                                                                                

+------+----------+-----------------------------------------+
|id    |prediction|probability                              |
+------+----------+-----------------------------------------+
|381111|0.0       |[0.6756448130459972,0.3243551869540028]  |
|381112|0.0       |[0.7538274162506944,0.24617258374930562] |
|381113|0.0       |[0.9572579333713735,0.04274206662862645] |
|381114|0.0       |[0.9919025061478679,0.008097493852132098]|
|381115|0.0       |[0.9920444588801505,0.00795554111984953] |
|381116|0.0       |[0.988666871295324,0.011333128704675954] |
|381117|0.0       |[0.9923716775231054,0.0076283224768946]  |
|381118|0.0       |[0.7033330150495118,0.29666698495048816] |
|381119|0.0       |[0.9914504674319976,0.008549532568002394]|
|381120|0.0       |[0.9908595855091915,0.00914041449080849] |
|381121|0.0       |[0.8231777872736302,0.17682221272636978] |
|381122|0.0       |[0.989762100084346,0.010237899915654047] |
|381123|0.0       |[0.7563029356957962,0.2436970643042038]  |
|381124|

### 將預測結果一筆一筆發送至kafka

In [11]:
kafka_topic = 'prediction'
# 逐筆發送資料到 Kafka
for row in prediction_df.select('id', 'prediction', 'probability').collect():
    # 將每一行轉換為 CSV 格式的字串
    csv_string = ",".join([str(x) for x in row])
    
    # 透過 Kafka 生產者發送資料
    producer.produce(kafka_topic, value=csv_string)
    
    # 每筆之間可能需要一些時間間隔，視情況而定
    producer.flush()

                                                                                

### 將新數據也加入training data去建立新的模型，以validation data 當作模擬資料做示範

In [12]:
kafka_topic = 'valid'
# 逐筆發送資料到 Kafka
for row in valid_df.collect():
    # 將每一行轉換為 CSV 格式的字串
    csv_string = ",".join([str(x) for x in row])
    
    # 透過 Kafka 生產者發送資料
    producer.produce(kafka_topic, value=csv_string)
    
    # 每筆之間可能需要一些時間間隔，視情況而定
    producer.flush()

In [13]:
# Kafka consumer configuration
consumer_config = {
    'bootstrap.servers': 'localhost:9092',  # 替換為你的 Kafka broker 位址
    'group.id': 'my_consumer_group',        # 替換為你的 consumer group ID
    'auto.offset.reset': 'earliest'
}

# Create a Kafka consumer instance
consumer = Consumer(consumer_config)

# Subscribe to the Kafka topic
consumer.subscribe(['valid'])  # Replace with your Kafka topic


# Define the schema for the Kafka message value
message_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("Gender", StringType(), True),
    StructField("Age", IntegerType(), True),
    StructField("Driving_License", IntegerType(), True),
    StructField("Region_Code", DoubleType(), True),
    StructField("Previously_Insured", IntegerType(), True),
    StructField("Vehicle_Age", StringType(), True),
    StructField("Vehicle_Damage", StringType(), True),
    StructField("Annual_Premium", DoubleType(), True),
    StructField("Policy_Sales_Channel", DoubleType(), True),
    StructField("Vintage", IntegerType(), True),
    StructField("Response", IntegerType(), True)
])

# Set the number of messages you want to process
num_messages_to_process = 128
messages_processed = 0

while messages_processed < num_messages_to_process:
    # Poll for and print the first message
    msg = consumer.poll(10.0)  # Timeout set to 10 seconds
    
    # Split the comma-separated values
    values = msg.value().decode('utf-8').split(',')
    
    # Map values to the schema
    row_values = [
        int(values[0]),
        values[1],
        int(values[2]),
        int(values[3]),
        float(values[4]),
        int(values[5]),
        values[6],
        values[7],
        float(values[8]),
        float(values[9]),
        int(values[10]),
        int(values[11])
    ]
    
    # Create a PySpark DataFrame from the row values
    df = spark.createDataFrame([row_values], schema=message_schema)
    train_df=df.union(train_df)
    messages_processed += 1

New_Model = pipeline.fit(train_df)
trainingPred= New_Model.transform(train_df)

consumer.close()

24/01/09 13:51:18 WARN DAGScheduler: Broadcasting large task binary with size 1026.7 KiB
24/01/09 13:51:30 WARN DAGScheduler: Broadcasting large task binary with size 1027.9 KiB
24/01/09 13:51:30 WARN DAGScheduler: Broadcasting large task binary with size 1028.3 KiB
24/01/09 13:51:42 WARN DAGScheduler: Broadcasting large task binary with size 1029.4 KiB
24/01/09 13:51:42 WARN DAGScheduler: Broadcasting large task binary with size 1028.3 KiB
24/01/09 13:51:45 WARN DAGScheduler: Broadcasting large task binary with size 1029.4 KiB
24/01/09 13:51:45 WARN DAGScheduler: Broadcasting large task binary with size 1028.3 KiB
24/01/09 13:51:47 WARN DAGScheduler: Broadcasting large task binary with size 1029.4 KiB
24/01/09 13:51:47 WARN DAGScheduler: Broadcasting large task binary with size 1028.3 KiB
24/01/09 13:51:49 WARN DAGScheduler: Broadcasting large task binary with size 1029.4 KiB
24/01/09 13:51:50 WARN DAGScheduler: Broadcasting large task binary with size 1028.3 KiB
24/01/09 13:51:52 WAR

In [14]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
validationPred = New_Model.transform(valid_df)
# 準確度
evaluator_accuracy = MulticlassClassificationEvaluator(
    labelCol="Response", 
    predictionCol="prediction", 
    metricName="accuracy"
)
accuracy = evaluator_accuracy.evaluate(validationPred)

# 加權精確度
evaluator_precision = MulticlassClassificationEvaluator(
    labelCol="Response", 
    predictionCol="prediction", 
    metricName="weightedPrecision"
)
precision = evaluator_precision.evaluate(validationPred)


evaluator_recall = MulticlassClassificationEvaluator(
    labelCol="Response", 
    predictionCol="prediction", 
    metricName="weightedRecall"
)
recall = evaluator_recall.evaluate(validationPred)

evaluator_f1 = MulticlassClassificationEvaluator(
    labelCol="Response", 
    predictionCol="prediction", 
    metricName="f1"
)
f1 = evaluator_f1.evaluate(validationPred)

# 合併輸出
print("Accuracy = %g, Precision = %g, Recall = %g, F1 Score = %g" % (accuracy, precision,recall,f1))




Accuracy = 0.878245, Precision = 0.771329, Recall = 0.878245, F1 Score = 0.821322


                                                                                