In [1]:
%run test_pyspark.py

In [5]:
from pyspark.sql import SparkSession

spark_session = (SparkSession.builder
                              .master("local[*]")
                              .appName("ElectricChargePointsETLJob")
                              .getOrCreate())

In [6]:
input_path = '../data/input/electric-chargepoint-analysis-2017-domestics-incomplete-anomalies.csv'
df = spark_session.read.csv(input_path, header=True)

In [7]:
df

DataFrame[ChargingEvent: string, CPID: string, StartDate: string, StartTime: string, EndDate: string, EndTime: string, Energy: string, PluginDuration: string]

In [8]:
df_prepare = (df.withColumn("PluginDuration_Hrs", 
                                     col("PluginDuration").cast("double"))
                       .dropna(subset=["PluginDuration_Hrs"]))
df_prepare.show()

+-------------+-------+----------+---------+----------+--------+------+--------------------+--------------------+
|ChargingEvent|   CPID| StartDate|StartTime|   EndDate| EndTime|Energy|      PluginDuration|  PluginDuration_Hrs|
+-------------+-------+----------+---------+----------+--------+------+--------------------+--------------------+
|     12238891|AN00038|2017-03-23| 22:30:37|2017-03-23|22:32:27|   1.1|0.030555555555555555|0.030555555555555555|
|     12003349|AN00043|2017-03-09| 15:35:54|2017-03-09|15:38:34|   0.3|0.044444444444444446|0.044444444444444446|
|     11756712|AN00093|2017-02-21| 17:33:26|2017-02-21|17:34:50|   2.6|0.023333333333333334|0.023333333333333334|
|     11757015|AN00093|2017-02-21| 17:46:15|2017-02-21|17:48:35|   2.9| 0.03888888888888889| 0.03888888888888889|
|     11761831|AN00093|2017-02-21| 22:29:33|2017-02-21|22:30:53|   2.7|0.022222222222222223|0.022222222222222223|
|     11762138|AN00093|2017-02-21| 23:03:26|2017-02-21|23:05:04|   2.5| 0.02722222222222

In [26]:
df_agg = df.dropna(subset=["PluginDuration"]) \
            .groupBy(col("CPID").alias("chargepoint_id")) \
            .agg(max("PluginDuration").alias("avg_duration"),
                 avg("PluginDuration").alias("max_duration"),
                 count("*").alias("count")
                )
                         
df_agg.show()

+--------------+--------------------+--------------------+-----+
|chargepoint_id|        avg_duration|        max_duration|count|
+--------------+--------------------+--------------------+-----+
|       AN00001|   36.36666666666667|  -69767.35833333334|    6|
|       AN00002|                7.15|   6.039583333333332|    8|
|       AN00003|   9.466666666666667|  -4787.757471264367|   87|
|       AN00004|   85.11666666666666| -32184.096153846152|   13|
|       AN00005| -416614.43333333335| -416614.43333333335|    1|
|       AN00006|   6.566666666666666|  11.361458333333337|   32|
|       AN00008|0.016666666666666666|         -208822.975|    2|
|       AN00010|   8.983333333333333|  -9521.719308943084|   82|
|       AN00011|0.016666666666666666|          -48439.665|   10|
|       AN00012|   4.933333333333334|  -4600.608058608058|   91|
|       AN00014|  10.083333333333334|   3.305555555555556|    3|
|       AN00015|                   0| -209319.95833333334|    2|
|       AN00016|0.0166666

In [2]:
pipeline = ChargePointsETLJob()

# 3. Execute the pipeline
print("Starting ETL job...")
pipeline.run()
print("ETL job finished successfully.")

Starting ETL job...
ETL job finished successfully.
