In [1]:
import pandas as pd

In [2]:
data = pd.read_csv('IOT-temp.csv', index_col="id")
data.head()

Unnamed: 0_level_0,room_id/id,noted_date,temp,out/in
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
__export__.temp_log_196134_bd201015,Room Admin,08-12-2018 09:30,29,In
__export__.temp_log_196131_7bca51bc,Room Admin,08-12-2018 09:30,29,In
__export__.temp_log_196127_522915e3,Room Admin,08-12-2018 09:29,41,Out
__export__.temp_log_196128_be0919cf,Room Admin,08-12-2018 09:29,41,Out
__export__.temp_log_196126_d30b72fb,Room Admin,08-12-2018 09:29,31,In


In [3]:
print(len(data))

97606


In [4]:
print(data.isnull().sum())

room_id/id    0
noted_date    0
temp          0
out/in        0
dtype: int64


In [5]:
data_1 = data.loc["__export__.temp_log_196134_bd201015":"__export__.temp_log_60240_9177689a"] 
print(len(data_1))
data_1.to_csv('batch_1.csv')

32535


In [6]:
data_2 = data.loc["__export__.temp_log_60239_4eb9e90b":"__export__.temp_log_12544_1a76a951"] 
print(len(data_2))
data_2.to_csv('batch_2.csv')

32535


In [7]:
data_3 = data.loc["__export__.temp_log_148002_0a1a4e64":"__export__.temp_log_133741_32958703"] 
print(len(data_3))
data_3.to_csv('batch_3.csv')

32536


In [8]:
for i in [{"title": "In", "boolean": "In"}, {"title": "Out", "boolean": "Out"}]:
    print(f"{i['title']}: {(data_3['out/in'] == i['boolean']).sum()}")

In: 10648
Out: 21888


In [9]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when
from pyspark.sql.functions import unix_timestamp
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

spark = SparkSession.builder.appName("RandomForestClassifier").getOrCreate()

# Load data
data = spark.read.csv("IOT-temp.csv")
data = data.filter(data._c0 != 'id')
data.limit(5).show()

+--------------------+----------+----------------+---+---+
|                 _c0|       _c1|             _c2|_c3|_c4|
+--------------------+----------+----------------+---+---+
|__export__.temp_l...|Room Admin|08-12-2018 09:30| 29| In|
|__export__.temp_l...|Room Admin|08-12-2018 09:30| 29| In|
|__export__.temp_l...|Room Admin|08-12-2018 09:29| 41|Out|
|__export__.temp_l...|Room Admin|08-12-2018 09:29| 41|Out|
|__export__.temp_l...|Room Admin|08-12-2018 09:29| 31| In|
+--------------------+----------+----------------+---+---+



In [10]:
data = data.withColumn("_c5", when(data["_c4"] == "Out", 1).otherwise(0))
data = data.drop("_c0", "_c1", "_c4")
data.limit(5).show()

+----------------+---+---+
|             _c2|_c3|_c5|
+----------------+---+---+
|08-12-2018 09:30| 29|  0|
|08-12-2018 09:30| 29|  0|
|08-12-2018 09:29| 41|  1|
|08-12-2018 09:29| 41|  1|
|08-12-2018 09:29| 31|  0|
+----------------+---+---+



In [11]:
data = data.withColumn("_c2", unix_timestamp("_c2", "dd-MM-yyyy HH:mm"))
data.limit(5).show()

+----------+---+---+
|       _c2|_c3|_c5|
+----------+---+---+
|1544236200| 29|  0|
|1544236200| 29|  0|
|1544236140| 41|  1|
|1544236140| 41|  1|
|1544236140| 31|  0|
+----------+---+---+



In [12]:
# Convert _c3 to numeric type (e.g., IntegerType or DoubleType)
data = data.withColumn("_c3", col("_c3").cast("double"))

assembler = VectorAssembler(inputCols=["_c3", "_c2"], outputCol="features")
data = assembler.transform(data).select("features", "_c5")

data.limit(5).show()

+-------------------+---+
|           features|_c5|
+-------------------+---+
| [29.0,1.5442362E9]|  0|
| [29.0,1.5442362E9]|  0|
|[41.0,1.54423614E9]|  1|
|[41.0,1.54423614E9]|  1|
|[31.0,1.54423614E9]|  0|
+-------------------+---+



In [13]:
train_data, test_data = data.randomSplit([0.7, 0.3], seed=42)

In [14]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Initialize the RandomForestClassifier with the best hyperparameters
rf = RandomForestClassifier(labelCol="_c5", featuresCol="features", 
                            numTrees=90, maxDepth=19, maxBins=128)

# You can still use the ParamGridBuilder if you want to explore other parameters or use a grid search
paramGrid = ParamGridBuilder().addGrid(rf.numTrees, [90]).addGrid(rf.maxDepth, [19]).addGrid(rf.maxBins, [128]).build()

In [15]:
# Define the evaluator
evaluator = BinaryClassificationEvaluator(labelCol="_c5", metricName="areaUnderROC")

# Set up CrossValidator with the updated paramGrid
crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)

# Run cross-validation and choose the best set of parameters
cvModel = crossval.fit(train_data)

In [16]:
predictions = cvModel.transform(test_data)

In [17]:
evaluator = BinaryClassificationEvaluator(labelCol="_c5", rawPredictionCol="prediction", metricName="areaUnderROC")
roc_auc = evaluator.evaluate(predictions)

print(f"Test Area Under ROC: {roc_auc}")

Test Area Under ROC: 0.8421569839427413


In [18]:
predictions.select("features", "_c5", "prediction").show(5)

+-------------------+---+----------+
|           features|_c5|prediction|
+-------------------+---+----------+
|[22.0,1.53968382E9]|  0|       0.0|
| [22.0,1.5438099E9]|  0|       0.0|
| [22.0,1.5438099E9]|  0|       0.0|
| [22.0,1.5438099E9]|  0|       0.0|
| [22.0,1.5438099E9]|  0|       0.0|
+-------------------+---+----------+
only showing top 5 rows



In [19]:
for i in [{"title": "In", "boolean": 0}, {"title": "Out", "boolean": 1}]:
    print(f"{i['title']}: {predictions.filter(predictions['prediction'] == i['boolean']).count()}")

In: 4805
Out: 24361


In [20]:
predictions = predictions.withColumn("prediction", when(predictions["prediction"] == 1, "Out").otherwise("In"))
predictions.select("features", "_c5", "prediction").show(5)

+-------------------+---+----------+
|           features|_c5|prediction|
+-------------------+---+----------+
|[22.0,1.53968382E9]|  0|        In|
| [22.0,1.5438099E9]|  0|        In|
| [22.0,1.5438099E9]|  0|        In|
| [22.0,1.5438099E9]|  0|        In|
| [22.0,1.5438099E9]|  0|        In|
+-------------------+---+----------+
only showing top 5 rows



In [21]:
# Convert to Pandas DataFrame
pandas_df = predictions.select("features", "_c5", "prediction").toPandas()

pandas_df.to_csv("result_from_train.csv", index=False)

In [None]:
import json
from flask import Flask, jsonify, request
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, unix_timestamp, col
from pyspark.ml.feature import VectorAssembler
import os

app = Flask(__name__)

UPLOAD_FOLDER = os.getcwd()  # Current directory
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER

# Initialize Spark session
spark = SparkSession.builder.appName('FileProcessingApp').getOrCreate()

@app.route('/api_1', methods=['POST'])
def create():
    try:
        # Access the uploaded file and get its filename
        file = request.files['file']
        file_name = file.filename

        # Extract the pure file name (without any path)
        pure_file_name = os.path.basename(file_name)

        # Define the path where the file will be saved (current folder)
        file_path = os.path.join(app.config['UPLOAD_FOLDER'], pure_file_name)

        print(pure_file_name)
        
        # Save the file to the current folder
        file.save(file_path)

        # Read the CSV file into a Spark DataFrame
        spark_df = spark.read.csv(pure_file_name)
        spark_df = spark_df.filter(spark_df["_c0"] != 'id')
        spark_df = spark_df.drop("_c0", "_c1")
        spark_df = spark_df.withColumn("_c2", unix_timestamp("_c2", "dd-MM-yyyy HH:mm"))
        spark_df = spark_df.withColumn("_c3", col("_c3").cast("double"))

        # Assemble features
        assembler = VectorAssembler(inputCols=["_c3", "_c2"], outputCol="features")
        spark_df = assembler.transform(spark_df).select("features")

        predictions = cvModel.transform(spark_df)

        # Export predictions to pandas DataFrame
        export = predictions.drop("rawPrediction", "probability").withColumn("prediction", when(predictions["prediction"] == 1, "Out").otherwise("In")).toPandas()

        # Save the results to a CSV file
        export.to_csv("result_from_api.csv", index=False)
        
        return jsonify({"message": "File successfully saved as result_from_api.csv"}), 200
    except Exception as e:
        return jsonify({"error": str(e)}), 500

if __name__ == '__main__':
   app.run(port=5000)