In [71]:
import os
print("JAVA_HOME =", os.environ.get("JAVA_HOME"))

JAVA_HOME = /opt/homebrew/opt/openjdk@17/libexec/openjdk.jdk/Contents/Home


#### Verified which Python my notebook is using

In [72]:
import sys
print(sys.executable)

/Users/pernebayarailym/anaconda3/envs/myenv/bin/python


#### Ensured PySpark installs in the same Python environment my notebook is using

In [73]:
import sys
!{sys.executable} -m pip install pyspark



In [74]:
import pyspark
print(pyspark.__version__)

4.0.1


##### Used these commands to install apache-spark because it couldn't find from homebrew , so I manually set the SPARK_HOME path using the path from the command below:

1) brew install apache-spark
2) brew --prefix apache-spark



In [75]:
import os
os.environ['JAVA_HOME'] = '/opt/homebrew/opt/openjdk@17/libexec/openjdk.jdk/Contents/Home'

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MyApp_cleaning") \
    .getOrCreate()

print(spark.version)

4.0.1


In [76]:
# Import necessary PySpark modules
from pyspark.sql import SparkSession #is the entry point to PySpark
from pyspark.sql import functions as F 
from pyspark.sql import types as T

In [None]:
# Read CSV files
csv_path = '/Users/pernebayarailym/Documents/Portfolio_Projects_AP/Simplon_DE_Projects/Python_Projects/Project_Pyspark_DBT/notebooks/data/ventes.csv'
#'/Users/pernebayarailym/Documents/Portfolio_Projects_AP/Simplon_DE_Projects/Python_Projects/Project_Pyspark_DBT/data/ventes.csv'
df = spark.read \
    .option("header", True) \
    .option("inferSchema", False) \
    .csv(csv_path)

df.printSchema()
df.show(5)

25/09/29 17:07:19 WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: /Users/pernebayarailym/Documents/Portfolio_Projects_AP/Simplon_DE_Projects/Python_Projects/Project_Pyspark_DBT/data/ventes.csv.
java.io.FileNotFoundException: File /Users/pernebayarailym/Documents/Portfolio_Projects_AP/Simplon_DE_Projects/Python_Projects/Project_Pyspark_DBT/data/ventes.csv does not exist
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:917)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1238)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:907)
	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:462)
	at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:56)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:381)
	at org.

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/Users/pernebayarailym/Documents/Portfolio_Projects_AP/Simplon_DE_Projects/Python_Projects/Project_Pyspark_DBT/data/ventes.csv. SQLSTATE: 42K03

In [None]:
#1 task. Normalize string columns (lowercase + remove extra spaces)
#detecting which columns are string
string_cols = [f.name for f in df.schema.fields if isinstance(f.dataType, T.StringType)]

# for each string column, convert to lowercase and trim spaces
for c in string_cols:
    df = df.withColumn(c, F.lower(F.trim(F.col(c))))
df.show(5)

+--------------+----------+----------+------------+----------------+-----------------+--------------+--------------+--------------+------------+--------------------+----------+--------+-------------+
|id_transaction|client_nom|client_age|client_ville|     produit_nom|produit_categorie|produit_marque|prix_catalogue|   magasin_nom|magasin_type|      magasin_region|      date|quantite|montant_total|
+--------------+----------+----------+------------+----------------+-----------------+--------------+--------------+--------------+------------+--------------------+----------+--------+-------------+
|             1|     alice|        25|       paris|      ordinateur|     informatique|          dell|           800| boutique lyon|    physique|auvergne-rhône-alpes|2023-03-12|       2|         NULL|
|             2|       bob|        34|        lyon|      smartphone|       téléphonie|         apple|          1200| boutique lyon|    physique|auvergne-rhône-alpes|2023-01-27|       5|         NULL|


In [None]:
#2 task. Replace empty strings with None (NULL) to detect missing values
for c in df.columns:
    df = df.withColumn(c, F.when(F.col(c) == "", None).otherwise(F.col(c)))

#  Drop rows with missing values, except 'montant_total because it will be recalculated later and for now it has NULL values everywhere'
cols_to_check = [c for c in df.columns if c != "montant_total"]
df = df.dropna(subset=cols_to_check, how="any") #drop rows with any NULL values
df.show(10)

+--------------+----------+----------+------------+----------------+-----------------+--------------+--------------+--------------+------------+--------------------+----------+--------+-------------+
|id_transaction|client_nom|client_age|client_ville|     produit_nom|produit_categorie|produit_marque|prix_catalogue|   magasin_nom|magasin_type|      magasin_region|      date|quantite|montant_total|
+--------------+----------+----------+------------+----------------+-----------------+--------------+--------------+--------------+------------+--------------------+----------+--------+-------------+
|             1|     alice|        25|       paris|      ordinateur|     informatique|          dell|           800| boutique lyon|    physique|auvergne-rhône-alpes|2023-03-12|       2|         NULL|
|             2|       bob|        34|        lyon|      smartphone|       téléphonie|         apple|          1200| boutique lyon|    physique|auvergne-rhône-alpes|2023-01-27|       5|         NULL|


In [None]:
#3 task. Handle purchase date 
#parse the purchase date column into a proper date type
df = df.withColumn(
    "purchase_date", 
    F.coalesce(
        F.to_date(F.col("date"), "yyyy-MM-dd"),
        F.to_date(F.col("date"), "dd/MM/yyy"),
        F.to_date(F.col("date"), "MM/dd/yyyy"),
        F.to_date(F.col("date"), "yyyy/MM/dd"),

    )
)
df.show(10)
#coalesce() tries several formats and picks the first that works


+--------------+----------+----------+------------+----------------+-----------------+--------------+--------------+--------------+------------+--------------------+----------+--------+-------------+-------------+
|id_transaction|client_nom|client_age|client_ville|     produit_nom|produit_categorie|produit_marque|prix_catalogue|   magasin_nom|magasin_type|      magasin_region|      date|quantite|montant_total|purchase_date|
+--------------+----------+----------+------------+----------------+-----------------+--------------+--------------+--------------+------------+--------------------+----------+--------+-------------+-------------+
|             1|     alice|        25|       paris|      ordinateur|     informatique|          dell|           800| boutique lyon|    physique|auvergne-rhône-alpes|2023-03-12|       2|         NULL|   2023-03-12|
|             2|       bob|        34|        lyon|      smartphone|       téléphonie|         apple|          1200| boutique lyon|    physique|

In [None]:
#3 task. Remove extravagant dates
df =df.filter(
    (F.col("purchase_date") >= F.to_date(F.lit("2023-01-01"))) &
    (F.col("purchase_date") <= F.current_date())
)

df.show(5)

+--------------+----------+----------+------------+----------------+-----------------+--------------+--------------+--------------+------------+--------------------+----------+--------+-------------+-------------+
|id_transaction|client_nom|client_age|client_ville|     produit_nom|produit_categorie|produit_marque|prix_catalogue|   magasin_nom|magasin_type|      magasin_region|      date|quantite|montant_total|purchase_date|
+--------------+----------+----------+------------+----------------+-----------------+--------------+--------------+--------------+------------+--------------------+----------+--------+-------------+-------------+
|             1|     alice|        25|       paris|      ordinateur|     informatique|          dell|           800| boutique lyon|    physique|auvergne-rhône-alpes|2023-03-12|       2|         NULL|   2023-03-12|
|             2|       bob|        34|        lyon|      smartphone|       téléphonie|         apple|          1200| boutique lyon|    physique|

In [None]:
#4 task. handle numeric anomalies (checking the columns like age, quantite, prix_unitaire are numbers not strings
numeric_cols = {
    "client_age": T.IntegerType(),
    "quantite": T.IntegerType(),
    "prix_catalogue": T.DoubleType(),
    #"montant_total": T.DoubleType()
}

for col_name, col_type in numeric_cols.items():
    if col_name in df.columns: 
        df = df.withColumn(col_name, F.col(col_name).cast(col_type))


#cast() converts a column to a given type , if
#the value can't be converted then it becomes NULL like if it's "abc"

df.show(5)

+--------------+----------+----------+------------+----------------+-----------------+--------------+--------------+--------------+------------+--------------------+----------+--------+-------------+-------------+
|id_transaction|client_nom|client_age|client_ville|     produit_nom|produit_categorie|produit_marque|prix_catalogue|   magasin_nom|magasin_type|      magasin_region|      date|quantite|montant_total|purchase_date|
+--------------+----------+----------+------------+----------------+-----------------+--------------+--------------+--------------+------------+--------------------+----------+--------+-------------+-------------+
|             1|     alice|        25|       paris|      ordinateur|     informatique|          dell|         800.0| boutique lyon|    physique|auvergne-rhône-alpes|2023-03-12|       2|         NULL|   2023-03-12|
|             2|       bob|        34|        lyon|      smartphone|       téléphonie|         apple|        1200.0| boutique lyon|    physique|

In [None]:
#drop rows where numeric columns could not be converted
for col_name in numeric_cols.keys():
    if col_name in df.columns:
        df = df.filter(F.col(col_name).isNotNull())

df.show(5)

+--------------+----------+----------+------------+----------------+-----------------+--------------+--------------+--------------+------------+--------------------+----------+--------+-------------+-------------+
|id_transaction|client_nom|client_age|client_ville|     produit_nom|produit_categorie|produit_marque|prix_catalogue|   magasin_nom|magasin_type|      magasin_region|      date|quantite|montant_total|purchase_date|
+--------------+----------+----------+------------+----------------+-----------------+--------------+--------------+--------------+------------+--------------------+----------+--------+-------------+-------------+
|             1|     alice|        25|       paris|      ordinateur|     informatique|          dell|         800.0| boutique lyon|    physique|auvergne-rhône-alpes|2023-03-12|       2|         NULL|   2023-03-12|
|             2|       bob|        34|        lyon|      smartphone|       téléphonie|         apple|        1200.0| boutique lyon|    physique|

In [None]:
#5 task. Handle negative ages
if "client_age" in df.columns:
    df =df.withColumn("client_age", F.abs(F.col("client_age")))
df.show(5)

+--------------+----------+----------+------------+----------------+-----------------+--------------+--------------+--------------+------------+--------------------+----------+--------+-------------+-------------+
|id_transaction|client_nom|client_age|client_ville|     produit_nom|produit_categorie|produit_marque|prix_catalogue|   magasin_nom|magasin_type|      magasin_region|      date|quantite|montant_total|purchase_date|
+--------------+----------+----------+------------+----------------+-----------------+--------------+--------------+--------------+------------+--------------------+----------+--------+-------------+-------------+
|             1|     alice|        25|       paris|      ordinateur|     informatique|          dell|         800.0| boutique lyon|    physique|auvergne-rhône-alpes|2023-03-12|       2|         NULL|   2023-03-12|
|             2|       bob|        34|        lyon|      smartphone|       téléphonie|         apple|        1200.0| boutique lyon|    physique|

In [None]:
#6 task.Calculate total amount per sale
# drop the old useless NULL column if it exists
# add the computed column

if "montant_total" in df.columns:
    df = df.drop("montant_total")   # remove the placeholder column

# create new montant_total column
df = df.withColumn("montant_total", F.col("quantite") * F.col("prix_catalogue"))

df.show(10)


+--------------+----------+----------+------------+----------------+-----------------+--------------+--------------+--------------+------------+--------------------+----------+--------+-------------+-------------+
|id_transaction|client_nom|client_age|client_ville|     produit_nom|produit_categorie|produit_marque|prix_catalogue|   magasin_nom|magasin_type|      magasin_region|      date|quantite|purchase_date|montant_total|
+--------------+----------+----------+------------+----------------+-----------------+--------------+--------------+--------------+------------+--------------------+----------+--------+-------------+-------------+
|             1|     alice|        25|       paris|      ordinateur|     informatique|          dell|         800.0| boutique lyon|    physique|auvergne-rhône-alpes|2023-03-12|       2|   2023-03-12|       1600.0|
|             2|       bob|        34|        lyon|      smartphone|       téléphonie|         apple|        1200.0| boutique lyon|    physique|

In [None]:
import sys
!{sys.executable} -m pip install pandas


Collecting pandas
  Downloading pandas-2.3.2-cp39-cp39-macosx_11_0_arm64.whl.metadata (91 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m91.2/91.2 kB[0m [31m835.2 kB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hCollecting numpy>=1.22.4 (from pandas)
  Downloading numpy-2.0.2-cp39-cp39-macosx_14_0_arm64.whl.metadata (60 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m60.9/60.9 kB[0m [31m566.7 kB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
Collecting pytz>=2020.1 (from pandas)
  Using cached pytz-2025.2-py2.py3-none-any.whl.metadata (22 kB)
Collecting tzdata>=2022.7 (from pandas)
  Using cached tzdata-2025.2-py2.py3-none-any.whl.metadata (1.4 kB)
Downloading pandas-2.3.2-cp39-cp39-macosx_11_0_arm64.whl (10.8 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.8/10.8 MB[0m [31m723.8 kB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hDownloading numpy-2.0.2-cp39-cp39-macosx_14_0_arm64.whl (5.3 MB)
[2K   [90m━━━━━━━━━

In [None]:
import os

# Always resolve relative to the project folder
project_root = os.getcwd()   # prints current working directory
print("Current working dir:", project_root)

output_path = os.path.join(project_root, "data", "ventes_clean.csv")

df.toPandas().to_csv(output_path, index=False)
print("Saved to:", output_path)


Current working dir: /Users/pernebayarailym/Documents/Portfolio_Projects_AP/Simplon_DE_Projects/Python_Projects/Project_Pyspark_DBT/notebooks


OSError: Cannot save file into a non-existent directory: '/Users/pernebayarailym/Documents/Portfolio_Projects_AP/Simplon_DE_Projects/Python_Projects/Project_Pyspark_DBT/notebooks/data'