In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.appName("DataTransformer").getOrCreate()
df = spark.read.csv("../extractor_consumer/data/raw/*", header=True)
df.show(5)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/11 21:33:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


+----------+----+------------------+---------+----------+
|      date|hour|          visitors|    unite|id_capteur|
+----------+----+------------------+---------+----------+
|2023-12-01|   0|               6.2|visiteurs|      NULL|
|2023-12-01|   1|             887.0|visiteurs|      NULL|
|2023-12-01|   2|1767.8000000000002|visiteurs|      NULL|
|2023-12-01|   3|2648.6000000000004|visiteurs|      NULL|
|2023-12-01|   4|          -17647.0|visiteurs|      NULL|
+----------+----+------------------+---------+----------+


In [2]:
ag_df = df.groupby("date").agg(F.sum(F.col("visitors")).alias("visitors"),
                       F.first(F.col("unite")).alias("unite"),
                       F.first(F.col("id_capteur")).alias("id_capteur"))
ag_df.show(5)

+----------+------------------+---------+----------+
|      date|          visitors|    unite|id_capteur|
+----------+------------------+---------+----------+
|2023-01-01|           53108.0|visiteurs|      NULL|
|2023-01-02|           37780.0|visiteurs|      NULL|
|2023-01-03|           38226.4|visiteurs|      NULL|
|2023-01-04|38672.799999999996|visiteurs|      NULL|
|2023-01-05|39119.200000000004|visiteurs|      NULL|
+----------+------------------+---------+----------+


In [3]:
print(f"""nombre de lignes sans compte de visiteurs :  {ag_df.where(F.col("visitors").isNull()).count()}""")
ag_df = ag_df.drop("id_capteur") # drops na col
ag_df.count()

nombre de lignes sans compte de visiteurs :  0


373

In [4]:
from pyspark.sql.window import Window

win_df = (( 
    ag_df
    .withColumn("day_of_week", F.dayofweek("date"))
    .withColumn("moyenne_roulante", F.mean("visitors").over(Window.partitionBy("day_of_week").orderBy("date").rowsBetween(-4, -1)))
    .drop("day_of_week")
    .orderBy("date")   
))
win_df.show(5)

+----------+------------------+---------+----------------+
|      date|          visitors|    unite|moyenne_roulante|
+----------+------------------+---------+----------------+
|2023-01-01|           53108.0|visiteurs|            NULL|
|2023-01-02|           37780.0|visiteurs|            NULL|
|2023-01-03|           38226.4|visiteurs|            NULL|
|2023-01-04|38672.799999999996|visiteurs|            NULL|
|2023-01-05|39119.200000000004|visiteurs|            NULL|
+----------+------------------+---------+----------------+


In [18]:
pct_df = ((
    win_df
    .withColumn("pct_change", F.round((100 * (F.col("visitors") - F.col("moyenne_roulante"))/(F.col("moyenne_roulante"))), 2))
))
pct_df.show()

+----------+------------------+---------+------------------+----------+
|      date|          visitors|    unite|  moyenne_roulante|pct_change|
+----------+------------------+---------+------------------+----------+
|2023-01-01|           53108.0|visiteurs|              NULL|      NULL|
|2023-01-02|           37780.0|visiteurs|              NULL|      NULL|
|2023-01-03|           38226.4|visiteurs|              NULL|      NULL|
|2023-01-04|38672.799999999996|visiteurs|              NULL|      NULL|
|2023-01-05|39119.200000000004|visiteurs|              NULL|      NULL|
|2023-01-06|           39565.6|visiteurs|              NULL|      NULL|
|2023-01-07|           40012.0|visiteurs|              NULL|      NULL|
|2023-01-08|           31154.0|visiteurs|           53108.0|    -41.34|
|2023-01-09|           40904.8|visiteurs|           37780.0|      8.27|
|2023-01-10|41133.600000000006|visiteurs|           38226.4|      7.61|
|2023-01-11|           39284.8|visiteurs|38672.799999999996|    