<a href="https://colab.research.google.com/github/rodmellot/spark_mini_project/blob/main/main.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# BLABLABLA

In [1]:
import os
import requests
import yaml
import pandas as pd

## Data Collection

In [2]:
# As mentioned, i created a config.yaml stocked on the github page.
#The link by (raw) url allows us to work easily on colab and do not have to clone the repo.
YAML_URL = "https://raw.githubusercontent.com/rodmellot/spark_mini_project/refs/heads/main/config.yaml"

# Download the YAML file to the local Colab environment and load config globally
print("--- Fetching Configuration ---")
response = requests.get(YAML_URL)
config = yaml.safe_load(response.content)
print("Configuration loaded successfully.\n")

def setup_project_data(data_dir, base_url, files_to_download):
    os.makedirs(data_dir, exist_ok=True)
    print("--- Downloading Datasets ---")
    for file_name in files_to_download:
        file_url = f"{base_url}/{file_name}"
        target_path = os.path.join(data_dir, file_name)

        print(f"Downloading: {file_name}...")
        file_response = requests.get(file_url)

        with open(target_path, 'wb') as f:
            f.write(file_response.content)
        print(f"Saved to {target_path}")

    print("\nData collection complete. Files ready for Spark processing.")

#Initialization in our case using the globally loaded config
data_dir = config.get('data_directory', './data')
base_url = config.get('base_url', 'https://raw.githubusercontent.com/rvm-courses/GasPrices/main')
files_to_download = [
        "Prix2022_part1.csv",
        "Prix2022_part2.csv", # 2022 is in two parts
        "Prix2023.csv",
        "Prix2024.csv",
        "Stations2024.csv",
        "Services2024.csv"
    ]

# Do the actual collection
setup_project_data(data_dir, base_url, files_to_download)

--- Fetching Configuration ---
Configuration loaded successfully.

--- Downloading Datasets ---
Downloading: Prix2022_part1.csv...
Saved to ./data/Prix2022_part1.csv
Downloading: Prix2022_part2.csv...
Saved to ./data/Prix2022_part2.csv
Downloading: Prix2023.csv...
Saved to ./data/Prix2023.csv
Downloading: Prix2024.csv...
Saved to ./data/Prix2024.csv
Downloading: Stations2024.csv...
Saved to ./data/Stations2024.csv
Downloading: Services2024.csv...
Saved to ./data/Services2024.csv

Data collection complete. Files ready for Spark processing.


# Initialization Spark Session

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name, lit

spark = SparkSession.builder \
    .appName("FuelPriceAnalysis") \
    .config("spark.sql.session.timeZone", "UTC") \
    .getOrCreate()

# Data Preparation – step 1

## Defining the Schema


---


*Column Name,Type,Description*

id_station,String,Unique identifier for the station

code_postal,String,Postal code (kept as string to preserve leading zeros)

date,Timestamp,Date and time of the price update

valeur,Double,"The price of the fuel (e.g., 1.859)"

nom_carburant,String,"Type of fuel (Gazole, E10, etc.)"

In [4]:
# Loading price data with a wildcard to capture all years
prices_df = spark.read.format("csv") \
    .option("header", "true") \
    .option("sep", ";") \
    .load("./data/Prix202*.csv") \
    .withColumn("source_file", input_file_name())

# Loading reference data
stations_df = spark.read.option("header", "true").option("sep", ";").csv("./data/Stations2024.csv")
services_df = spark.read.option("header", "true").option("sep", ";").csv("./data/Services2024.csv")

In [17]:
import os
import requests

# 1. Configuration
# Base URL for the raw files
base_url = "https://raw.githubusercontent.com/rvm-courses/GasPrices/master"
output_folder = "./data"
filenames = [
    "Prix2022S1.csv.gz",
    "Prix2022S2.csv.gz",
    "Prix2023.csv.gz",
    "Prix2024.csv.gz",
    "Stations2024.csv.gz",
    "Services2024.csv.gz"
]

# 2. Create the directory if it doesn't exist
os.makedirs(output_folder, exist_ok=True)

# 3. Download the files
print(f"--- Starting Download into {output_folder} ---")

for filename in filenames:
    # Construct the full URL and local path
    url = f"{base_url}/{filename}"
    output_path = os.path.join(output_folder, filename)

    print(f"Downloading {filename}...")

    try:
        response = requests.get(url, stream=True)

        # Check if the request was successful (Status Code 200)
        if response.status_code == 200:
            with open(output_path, 'wb') as f:
                for chunk in response.iter_content(chunk_size=8192):
                    f.write(chunk)
            print(f" -> Success! Saved to: {output_path}")

            # Optional: Verify it's not an HTML error page by checking size
            file_size = os.path.getsize(output_path)
            if file_size < 1000:
                print("    [Warning] File is very small. Check if it contains an error message.")

        else:
            print(f" -> Failed. Status Code: {response.status_code} (URL: {url})")

    except Exception as e:
        print(f" -> Error occurred: {e}")

print("\nAll downloads finished.")

--- Starting Download into ./data ---
Downloading Prix2022S1.csv.gz...
 -> Success! Saved to: ./data/Prix2022S1.csv.gz
Downloading Prix2022S2.csv.gz...
 -> Success! Saved to: ./data/Prix2022S2.csv.gz
Downloading Prix2023.csv.gz...
 -> Success! Saved to: ./data/Prix2023.csv.gz
Downloading Prix2024.csv.gz...
 -> Success! Saved to: ./data/Prix2024.csv.gz
Downloading Stations2024.csv.gz...
 -> Success! Saved to: ./data/Stations2024.csv.gz
Downloading Services2024.csv.gz...
 -> Success! Saved to: ./data/Services2024.csv.gz

All downloads finished.


In [24]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType
from pyspark.sql.functions import col, to_timestamp, year, month, weekofyear, desc, regexp_replace

# Initialize Spark
spark = SparkSession.builder.appName("GasPriceProject").getOrCreate()

# 1. Define Schemas
price_schema = StructType([
    StructField("id_station", StringType(), True),
    StructField("code_postal", StringType(), True),
    StructField("pop", StringType(), True),
    StructField("latitude", LongType(), True),
    StructField("longitude", LongType(), True),
    StructField("date", StringType(), True),
    StructField("id_carburant", StringType(), True),
    StructField("nom_carburant", StringType(), True),
    StructField("valeur", DoubleType(), True)
])

station_schema = StructType([
    StructField("id_station", StringType(), True),
    StructField("code_postal", StringType(), True),
    StructField("type", StringType(), True),
    StructField("latitude", LongType(), True),
    StructField("longitude", LongType(), True),
    StructField("address", StringType(), True),
    StructField("ville", StringType(), True)
])

# 2. Read Data
print("--- Reading Data ---")
raw_prices_df = spark.read \
    .option("header", "false") \
    .option("sep", ";") \
    .schema(price_schema) \
    .csv("./data/Prix*.csv.gz")

stations_df = spark.read \
    .option("header", "false") \
    .option("sep", "|") \
    .schema(station_schema) \
    .csv("./data/Stations*.csv*")

# 3. Process Data
print("--- Processing Dates & Locations ---")

# 'T' in place of ' ' so all dates match "yyyy-MM-dd HH:mm:ss"
# This prevents the ANSI strict mode crash.
prices_processed_df = raw_prices_df \
    .withColumn("date_clean", regexp_replace(col("date"), "T", " ")) \
    .withColumn("date_ts", to_timestamp(col("date_clean"), "yyyy-MM-dd HH:mm:ss")) \
    .withColumn("year", year("date_ts")) \
    .withColumn("month", month("date_ts")) \
    .withColumn("week_of_year", weekofyear("date_ts")) \
    .drop("date_clean") # Clean up temporary column

# Process Lat/Long
stations_processed_df = stations_df \
    .withColumn("latitude_clean", col("latitude") / 100000) \
    .withColumn("longitude_clean", col("longitude") / 100000)

# 4. Filter & Create Tables

prices_processed_df.createOrReplaceTempView("gas_prices_raw")
stations_processed_df.createOrReplaceTempView("stations_clean")

print("--- Gas Type Distribution ---")
gas_stats = prices_processed_df.groupBy("nom_carburant").count().orderBy(desc("count"))
gas_stats.show()




# Take the top 4 found in the data.
all_fuels = [row['nom_carburant'] for row in gas_stats.collect() if row['nom_carburant'] is not None]
selected_fuels = all_fuels[:4]

print(f"Keeping Top 4 Fuels: {selected_fuels}")

final_prices_df = prices_processed_df.filter(col("nom_carburant").isin(selected_fuels))
final_prices_df.createOrReplaceTempView("gas_prices")

print("Data Preparation Step 1 Complete.")
final_prices_df.show(5)
stations_processed_df.show(5)

--- Reading Data ---
--- Processing Dates & Locations ---
--- Gas Type Distribution ---
+-------------+-------+
|nom_carburant|  count|
+-------------+-------+
|       Gazole|4245380|
|          E10|3559498|
|         SP98|3425844|
|          E85|1390580|
|         SP95| 961020|
|         GPLc| 619759|
|         NULL|  12756|
+-------------+-------+

Keeping Top 4 Fuels: ['Gazole', 'E10', 'SP98', 'E85']
Data Preparation Step 1 Complete.
+----------+-----------+---+--------+---------+-------------------+------------+-------------+------+-------------------+----+-----+------------+
|id_station|code_postal|pop|latitude|longitude|               date|id_carburant|nom_carburant|valeur|            date_ts|year|month|week_of_year|
+----------+-----------+---+--------+---------+-------------------+------------+-------------+------+-------------------+----+-----+------------+
|   1000001|      01000|  R| 4620100|   519800|2023-01-02T07:53:26|           1|       Gazole| 1.867|2023-01-02 07:53:26|

In [25]:
from pyspark.sql.functions import avg, col, min as min_, lit, round


# 1. Compute Week Index


#(Year - StartYear) * 52 + WeekOfYear

#get the first year in the dataset
min_year = final_prices_df.select(min_("year")).first()[0]
print(f"Starting Year for Indexing: {min_year}")

#calculate week index
df_with_weeks = final_prices_df.withColumn(
    "week_index",
    (col("year") - min_year) * 52 + col("week_of_year")
)


# 2. National Daily Average

# average price per gas type, per day, across all of France

daily_avg_df = df_with_weeks.groupBy("date_ts", "nom_carburant") \
    .agg(avg("valeur").alias("avg_day_price"))

# 3. Compute Price Index
#join the averages back to the main data

#join on Date and Gas Type
df_indexed = df_with_weeks.join(
    daily_avg_df,
    on=["date_ts", "nom_carburant"],
    how="inner"
)

#Apply Formula: 100 * ((Price - Avg) / Avg + 1)
final_df = df_indexed.withColumn(
    "price_index",
    round(100 * ((col("valeur") - col("avg_day_price")) / col("avg_day_price") + 1), 2)
)
# 4. Final Review
# Make this final rich dataset available for SQL
final_df.createOrReplaceTempView("gas_prices_enhanced")

print("Data Preparation Step 2 Complete.")
final_df.select("date_ts", "nom_carburant", "valeur", "avg_day_price", "price_index", "week_index").show(10)

Starting Year for Indexing: 2022
Data Preparation Step 2 Complete.
+-------------------+-------------+------+------------------+-----------+----------+
|            date_ts|nom_carburant|valeur|     avg_day_price|price_index|week_index|
+-------------------+-------------+------+------------------+-----------+----------+
|2022-01-01 00:01:00|       Gazole| 1.662|1.5978782122905013|     104.01|        52|
|2022-01-01 00:01:00|       Gazole| 1.554|1.5978782122905013|      97.25|        52|
|2022-01-01 00:01:00|       Gazole| 1.577|1.5978782122905013|      98.69|        52|
|2022-01-01 00:01:00|       Gazole| 1.566|1.5978782122905013|       98.0|        52|
|2022-01-01 00:01:00|       Gazole| 1.535|1.5978782122905013|      96.06|        52|
|2022-01-01 00:01:00|       Gazole| 1.535|1.5978782122905013|      96.06|        52|
|2022-01-01 00:01:00|       Gazole| 1.637|1.5978782122905013|     102.45|        52|
|2022-01-01 00:01:00|       Gazole| 1.647|1.5978782122905013|     103.07|        52