In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lower, year, abs, regexp, lit


## Création de l'environnement spark
Les traitements sont éxécutés uniquement en local.

In [2]:
spark = SparkSession.builder.appName("ventes").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

## Import des donnés
Le typage automatique nous convient.

In [3]:
df = spark.read.csv("ventes.csv", header=True, inferSchema=True)
print(df.dtypes)
df.printSchema()

[('id_transaction', 'int'), ('client_nom', 'string'), ('client_age', 'int'), ('client_ville', 'string'), ('produit_nom', 'string'), ('produit_categorie', 'string'), ('produit_marque', 'string'), ('prix_catalogue', 'int'), ('magasin_nom', 'string'), ('magasin_type', 'string'), ('magasin_region', 'string'), ('date', 'date'), ('quantite', 'int'), ('montant_total', 'string')]
root
 |-- id_transaction: integer (nullable = true)
 |-- client_nom: string (nullable = true)
 |-- client_age: integer (nullable = true)
 |-- client_ville: string (nullable = true)
 |-- produit_nom: string (nullable = true)
 |-- produit_categorie: string (nullable = true)
 |-- produit_marque: string (nullable = true)
 |-- prix_catalogue: integer (nullable = true)
 |-- magasin_nom: string (nullable = true)
 |-- magasin_type: string (nullable = true)
 |-- magasin_region: string (nullable = true)
 |-- date: date (nullable = true)
 |-- quantite: integer (nullable = true)
 |-- montant_total: string (nullable = true)



## Lowercasing des colonnes de type string.

In [4]:
def get_string_columns(df_dtypes):
    return map(lambda x: x[0], filter(lambda x: x[1] == "string", df_dtypes))

In [5]:
string_columns = get_string_columns(df.dtypes)
for column in string_columns:
    df = df.withColumn(column, lower(df[column]))
    
df.show(1)


+--------------+----------+----------+------------+-----------+-----------------+--------------+--------------+-------------+------------+--------------------+----------+--------+-------------+
|id_transaction|client_nom|client_age|client_ville|produit_nom|produit_categorie|produit_marque|prix_catalogue|  magasin_nom|magasin_type|      magasin_region|      date|quantite|montant_total|
+--------------+----------+----------+------------+-----------+-----------------+--------------+--------------+-------------+------------+--------------------+----------+--------+-------------+
|             1|     alice|        25|       paris| ordinateur|     informatique|          dell|           800|boutique lyon|    physique|auvergne-rhône-alpes|2023-03-12|       2|         NULL|
+--------------+----------+----------+------------+-----------+-----------------+--------------+--------------+-------------+------------+--------------------+----------+--------+-------------+
only showing top 1 row


## Elimination des lignes où au moins une colonne est manquante


In [6]:
# on exclue la colonne "montant_total" qui contient forcément NULL
df = df.drop(df.montant_total)
df.dtypes

[('id_transaction', 'int'),
 ('client_nom', 'string'),
 ('client_age', 'int'),
 ('client_ville', 'string'),
 ('produit_nom', 'string'),
 ('produit_categorie', 'string'),
 ('produit_marque', 'string'),
 ('prix_catalogue', 'int'),
 ('magasin_nom', 'string'),
 ('magasin_type', 'string'),
 ('magasin_region', 'string'),
 ('date', 'date'),
 ('quantite', 'int')]

In [7]:
# on exclue toutes les lignes qui contienne une valeur null
original_length = df.count()

df = df.dropna(how="any")
without_nulls_length = df.count()

print(original_length - without_nulls_length, "lignes supprimés")


8 lignes supprimés


## Elimination des lignes où la date d'achat est extravagante


In [8]:
# On commence par observer la distribution des données
df.groupBy(year("date")).count().show()

# On s'aperçoit que toutes les commandes ont été effectué en 2023 sauf 6.
# Le matériel informatique n'existait pas à ce moment la.
# On ne garde que les commandes éffectué en 2023
original_length = df.count()

df = df.filter(year("date") == 2023)

print("Après la suppression des années extravagantes:")
df.groupBy(year("date")).count().show()
print(original_length - df.count(), "lignes supprimés")

+----------+-----+
|year(date)|count|
+----------+-----+
|      1745|    1|
|      2023|  481|
|      1632|    1|
|      1742|    1|
|      1852|    1|
|      1821|    1|
|      1702|    1|
+----------+-----+

Après la suppression des années extravagantes:
+----------+-----+
|year(date)|count|
+----------+-----+
|      2023|  481|
+----------+-----+

6 lignes supprimés


## Elimination des lignes présentant des anomalies

In [9]:
df.dtypes

[('id_transaction', 'int'),
 ('client_nom', 'string'),
 ('client_age', 'int'),
 ('client_ville', 'string'),
 ('produit_nom', 'string'),
 ('produit_categorie', 'string'),
 ('produit_marque', 'string'),
 ('prix_catalogue', 'int'),
 ('magasin_nom', 'string'),
 ('magasin_type', 'string'),
 ('magasin_region', 'string'),
 ('date', 'date'),
 ('quantite', 'int')]

In [10]:
# Commes les types des colonnes est inférer automatiquement.
# On a juste besoin de vérifier si les colonnes typées 'string' contiennent des numériques à la place de chaînes de caractères

original_length = df.count()

string_columns = get_string_columns(df.dtypes)
for column in string_columns:
    df = df.filter(~ regexp(df[column], lit("^[0-9]*$")))

print(original_length - df.count(), "lignes supprimés")

10 lignes supprimés


## Normalisation de l'âge (négatif -> positif).

In [11]:
df.select("client_age").where(df.client_age < 0).show()

df = df.withColumn("client_age", abs(df["client_age"]))

print("Après normalisation: ")
df.select("client_age").where(df.client_age < 0).show()

+----------+
|client_age|
+----------+
|       -29|
|       -25|
+----------+

Après normalisation: 
+----------+
|client_age|
+----------+
+----------+



## Calcul du montant total pour chaque vente

In [12]:
# montant_total = prix * quantite
df = df.withColumn("montant_total", df["prix_catalogue"] * df ["quantite"])
df.show(1)

+--------------+----------+----------+------------+-----------+-----------------+--------------+--------------+-------------+------------+--------------------+----------+--------+-------------+
|id_transaction|client_nom|client_age|client_ville|produit_nom|produit_categorie|produit_marque|prix_catalogue|  magasin_nom|magasin_type|      magasin_region|      date|quantite|montant_total|
+--------------+----------+----------+------------+-----------+-----------------+--------------+--------------+-------------+------------+--------------------+----------+--------+-------------+
|             1|     alice|        25|       paris| ordinateur|     informatique|          dell|           800|boutique lyon|    physique|auvergne-rhône-alpes|2023-03-12|       2|         1600|
+--------------+----------+----------+------------+-----------+-----------------+--------------+--------------+-------------+------------+--------------------+----------+--------+-------------+
only showing top 1 row


## Enregistrement dans DuckDB

In [None]:
import duckdb


In [19]:
pandas_df = df.toPandas()
print(df.count(), "lignes dans le dataframe")

# on écrit dans le projet dbt
db_path = "ventes/ventes_clean.duckdb"

with duckdb.connect(db_path) as con:
    con.execute("CREATE OR REPLACE TABLE vente AS SELECT * FROM pandas_df")
    print(con.sql("SELECT count(*) FROM vente;").fetchall(), "lignes dans duckdb")    

471 lignes dans le dataframe
[(471,)] lignes dans duckdb


## Création du schéma en étoile
La donnée à les attributs suivants:

In [18]:
print(", ".join(map(lambda x: x[0], df.dtypes)))

id_transaction, client_nom, client_age, client_ville, produit_nom, produit_categorie, produit_marque, prix_catalogue, magasin_nom, magasin_type, magasin_region, date, quantite, montant_total


On en déduit les tables de dimensions suivantes:
- client (client_nom, client_age, client_ville)
- produit (produit_nom, produit_categorie, produit_marque, prix_catalogue)
- magasin (magasin_nom, magasin_type, magasin_region)

La table de fait contient les attributs: 
- id_transaction, date, quantite, montant_total
