In [None]:
!pip install pandas
!pip install pyspark
!pip install pyarrow

In [None]:
from pyspark.sql import SparkSession
 
# Building the SparkSession and name 
# it :'pandas to spark'
spark = SparkSession.builder.appName(
  "pandas to spark").getOrCreate()

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

In [21]:
import pandas as pd

data = [[0, 0, 'start', 0.712], [0, 0, 'end', 1.52], [0, 1, 'start', 3.14], [0, 1, 'end', 4.12], [1, 0, 'start', 0.55], [1, 0, 'end', 1.55], [1, 1, 'start', 0.43], [1, 1, 'end', 1.42], [2, 0, 'start', 4.1], [2, 0, 'end', 4.512], [2, 1, 'start', 2.5], [2, 1, 'end', 5], [2, 2, 'start', 2.5], [2, 2, 'end', 5]]
activity = pd.DataFrame(data, columns=['machine_id', 'process_id', 'activity_type', 'timestamp']).astype({'machine_id':'Int64', 'process_id':'Int64', 'activity_type':'object', 'timestamp':'Float64'})

In [22]:
activity = spark.createDataFrame(activity)
activity.show()

+----------+----------+-------------+---------+
|machine_id|process_id|activity_type|timestamp|
+----------+----------+-------------+---------+
|         0|         0|        start|    0.712|
|         0|         0|          end|     1.52|
|         0|         1|        start|     3.14|
|         0|         1|          end|     4.12|
|         1|         0|        start|     0.55|
|         1|         0|          end|     1.55|
|         1|         1|        start|     0.43|
|         1|         1|          end|     1.42|
|         2|         0|        start|      4.1|
|         2|         0|          end|    4.512|
|         2|         1|        start|      2.5|
|         2|         1|          end|      5.0|
|         2|         2|        start|      2.5|
|         2|         2|          end|      5.0|
+----------+----------+-------------+---------+



In [25]:
from pyspark.sql.functions import col, count, sum, round

activity \
.withColumnRenamed("timestamp", "start_time") \
.where("activity_type == 'start'") \
.join(activity \
      .withColumnRenamed("timestamp", "end_time") \
      .where("activity_type == 'end'"), 
      ["machine_id", "process_id"], 
      "inner") \
.withColumn("difference", col('end_time') - col('start_time')) \
.groupby('machine_id') \
.agg(sum('difference').alias('total_time_process'), 
     count('process_id').alias('count_process')) \
.withColumn('processing_time', round(col('total_time_process') / col('count_process'), 3)) \
.select(['machine_id', 'processing_time']) \
.show()

+----------+---------------+
|machine_id|processing_time|
+----------+---------------+
|         0|          0.894|
|         1|          0.995|
|         2|          1.804|
+----------+---------------+

