# Gas Prices Analysis and Forecasting Project

**Author: Mehdi FATHALLAH**

## Project Overview
In this project, we will conduct a comprehensive analysis of gas price data using Apache Spark. The objectives include:
- Data acquisition from a designated GitHub repository, covering three years of gas prices.
- Data processing to enable analysis, including merging files and restructuring date information.
- Exploratory data analysis to filter gas types and compute indices like 'Price Index' and 'Week Index'.
- Data visualization to observe trends over time.
- Building and evaluating a predictive model for next-day gas price forecasting.
- Documenting the process in a structured and detailed manner.
___


### Repository Cloning Function Explanation

The following Python function `clone_and_move` is designed to automate the process of cloning a GitHub repository and relocating it to a target directory on the local file system.

**Functionality:**
- It takes two parameters: the URL of the GitHub repository (`repo_url`) and the path to the target directory (`target_directory`).
- The function uses the `git clone` command to clone the repository from GitHub.
- It then extracts the name of the repository from the URL, strips the '.git' extension if present, and moves the cloned repository to the specified target directory using the `shutil.move` method.

This function streamlines the workflow for setting up a local working environment with data from a remote repository.

In [None]:
import subprocess
import shutil
import os

def clone_and_move(repo_url, target_directory):
    """
    Clones a GitHub repository and moves it to the specified target directory.

    Args:
    repo_url (str): The URL of the GitHub repository.
    target_directory (str): The directory where the repository should be moved.
    """
    # Clone the repository
    subprocess.run(["git", "clone", repo_url])

    # Extract the repository name from the URL
    repo_name = repo_url.split('/')[-1]
    if repo_name.endswith('.git'):
        repo_name = repo_name[:-4]

    # Move the repository
    source_directory = os.path.join(os.getcwd(), repo_name)
    shutil.move(source_directory, target_directory)

### Cloning and Moving the GitHub Repository

The `clone_and_move` function is invoked here with the specific task of cloning the "GasPrices" repository from GitHub and moving it to a local directory named `spark_project/`.

**Details of the Operation:**
- `repo_url`: `"https://github.com/rvm-courses/GasPrices.git"` - This is the URL of the GitHub repository containing the gas price data.
- `target_directory`: `"spark_project/"` - This indicates that the cloned repository will be moved to a folder named `spark_project` in the current working directory.

By executing this line, we ensure that all necessary data for the project is locally available in a structured and organized manner within the `spark_project` directory.

In [None]:
clone_and_move("https://github.com/rvm-courses/GasPrices.git", "/GasPrices")

### Handling of Warnings in the Python Environment

In this code snippet, the Python `warnings` library is utilized to suppress warnings that might arise during code execution.

In [None]:
import warnings
warnings.filterwarnings('ignore')

### Loading and Previewing Gas Stations and Services Data

In this section, we load and inspect two key datasets for the project: Gas Stations data and Services data for the year 2022.

**Process Overview:**
1. **Import Pandas**: We start by importing the `pandas` library, which is essential for data manipulation and analysis in Python.
2. **File Paths**: We define the paths to the gzipped CSV files for both stations and services data.
3. **Data Loading**:
   - We use `pandas.read_csv` to load the data. 
   - The `compression='gzip'` parameter indicates that the files are gzipped.
   - We specify `sep='|'` as the delimiter, as the data in these files is pipe-separated.
4. **Data Preview**: We display the first few rows of each DataFrame (`stations_data` and `services_data`) using the `head()` function to get an initial understanding of the data structure and contents.

This step is crucial for ensuring data integrity and understanding the dataset's format before proceeding with further analysis.


In [None]:
import pandas as pd

# Paths to the files
stations_file_path = 'GasPrices/Stations2022.csv.gz'
services_file_path = 'GasPrices/Services2022.csv.gz'

# Loading the gzipped CSV files using the pipe '|' as a delimiter
stations_data = pd.read_csv(stations_file_path, compression='gzip', sep='|')
services_data = pd.read_csv(services_file_path, compression='gzip', sep='|')

# Displaying the first few rows of each dataframe
print("Stations Data:")
print(stations_data.head())
print("\nServices Data:")
print(services_data.head())

In [None]:
#pip install pyspark

### Initializing Apache Spark Session for Gas Prices Analysis

This segment of the code initializes a Spark session, a fundamental step for using Spark's functionality in data processing and analysis:

- `from pyspark.sql import SparkSession`: This line imports the `SparkSession` module from PySpark, which is an entry point to programming Spark with the DataFrame API.
- `SparkSession.builder`: This method is used to construct a SparkSession.
- `.appName("GasPricesAnalysis")`: Sets the name of the application to "GasPricesAnalysis". This name will be shown in the Spark cluster UI and is useful for identifying the application.
- `.config("spark.driver.memory", "20g")`: Configures the memory allocation for the Spark driver to 20 gigabytes. The driver is responsible for various tasks, including collecting data from executors and performing further transformations.
- `.config("spark.executor.memory", "20g")`: Sets the memory for each executor to 20 gigabytes. Executors run the tasks and return results to the driver.
- `.getOrCreate()`: This method either retrieves an existing SparkSession or, if none exists, creates a new one based on the configuration set.

Overall, this configuration is tailored to handle large datasets effectively, ensuring sufficient memory allocation for both driver and executors.


In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("GasPricesAnalysis") \
    .config("spark.driver.memory", "20g") \
    .config("spark.executor.memory", "20g") \
    .getOrCreate()

### Loading and Preparing Gas Price Data with Spark

In this part of the project, we initialize a Spark session and define a function to load and prepare the gas price data for analysis.

**Key Steps Involved:**

1. **Import Spark Functions and Types**: Essential modules for handling date-time functions and defining the schema are imported.
2. **Spark Session Initialization**: A new Spark session named "GasPricesAnalysis" is created.
3. **Schema Definition**: 
   - A schema is defined to map to the structure of the CSV files. It includes fields for station ID, postal code, type, latitude, longitude, date, gas type ID, gas type name, and price.
   - Latitude and longitude are treated as `DoubleType` and are assumed to be scaled by 100000.
   - The date field is defined as `TimestampType` for ease of date-time operations.
4. **Data Loading and Preparation Function**: 
   - `load_and_prepare_gas_data`: This function reads the CSV files, applies the schema, merges them, and does necessary data transformations.
   - Latitude and longitude are adjusted, and the date field is split into year, month, and week for more granular analysis.
   - The DataFrame is registered as a temporary SQL table for Spark SQL operations.

By executing this code, we set up a robust data foundation for our gas price analysis and forecasting tasks.


In [None]:
from pyspark.sql.functions import to_timestamp, year, month, weekofyear
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

# Initialize Spark session
spark = SparkSession.builder.appName("GasPricesAnalysis").getOrCreate()

# Define the schema according to the CSV file's content
schema = StructType([
    StructField("ID", StringType(), True),
    StructField("PostalCode", StringType(), True),  # Read as string to preserve leading zeros
    StructField("Type", StringType(), True),
    StructField("Latitude", DoubleType(), True),  # Assuming these need to be divided by 100000
    StructField("Longitude", DoubleType(), True),  # Assuming these need to be divided by 100000
    StructField("Date", TimestampType(), True),
    StructField("GasTypeID", StringType(), True),
    StructField("GasTypeName", StringType(), True),
    StructField("Price", DoubleType(), True)
])

# Function to load and prepare the gas data
def load_and_prepare_gas_data(file_paths):
    # Read and merge gas files
    df = None
    for file_path in file_paths:
        new_df = spark.read.csv(file_path, schema=schema, sep=';', header=False, timestampFormat="yyyy-MM-dd'T'HH:mm:ss")
        if df is None:
            df = new_df
        else:
            df = df.union(new_df)
    
    # Prepare latitude & longitude
    df = df.withColumn("Latitude", df["Latitude"] / 100000)
    df = df.withColumn("Longitude", df["Longitude"] / 100000)
    
    # Split date into year, month, week of the year
    df = df.withColumn('Year', year('Date'))
    df = df.withColumn('Month', month('Date'))
    df = df.withColumn('Week', weekofyear('Date'))

    # Register the DataFrame as a table for Spark SQL
    df.createOrReplaceTempView("gas_prices")

    return df

# Replace with the paths to your files
file_paths = [
    'spark_project/GasPrices/Prix2020.csv.gz',
    'spark_project/GasPrices/Prix2019.csv.gz',
    'spark_project/GasPrices/Prix2018.csv.gz'
]

# Load and prepare the data
gas_prices_df = load_and_prepare_gas_data(file_paths)

# Show the prepared DataFrame
gas_prices_df.show()

### Counting Rows in the Gas Prices DataFrame

This code snippet calculates and prints the total number of rows in the `gas_prices_df` DataFrame, providing a quick overview of the dataset's size.

In [None]:
row_count = gas_prices_df.count()
print("Number of rows:", row_count)

### Data Transformation and Analysis in Spark for Gas Prices Project

This code is an integral part of a school project focused on analyzing gas prices using Apache Spark. 

- **Spark Session Initialization**: Ensures a Spark session is active with the specific application name "GasPricesAnalysis".
- **Data Type Conversion**: The 'Date' column in the dataset is converted to a timestamp format for accurate temporal analysis.
- **Average Daily Price Calculation**: The average price for each type of gas is calculated for each day.
- **Dataframe Join**: This average daily price is then joined back to the original dataframe to maintain data integrity.
- **Price Index Calculation**: A new column, 'PriceIndex', is created to measure the relative change in price against the daily average.
- **Week Index Calculation**: The dataset is further enhanced by calculating the 'WeekIndex', which represents the number of weeks since the first week in the dataset.

This step is pivotal in preparing the data for in-depth temporal analysis and trend observation.


In [None]:
from pyspark.sql.functions import col, avg, weekofyear, to_date

# Initialize Spark session (if not already initialized)
spark = SparkSession.builder.appName("GasPricesAnalysis").getOrCreate()

# Convert the 'Date' column to a timestamp type if it's not already
gas_prices_df = gas_prices_df.withColumn("Timestamp", to_date(col("Date"), "yyyy-MM-dd'T'HH:mm:ss"))

# Calculate the average price for each gas type for each day
avg_price_per_day = gas_prices_df.groupBy('GasTypeName', 'Timestamp').agg(avg('Price').alias('AvgPricePerDay'))

# Join this average price back to the original dataframe
gas_prices_df = gas_prices_df.join(avg_price_per_day, ['GasTypeName', 'Timestamp'])

# Calculate the Price Index
gas_prices_df = gas_prices_df.withColumn("PriceIndex", 100 * ((col("Price") - col("AvgPricePerDay")) / col("AvgPricePerDay")))

# Calculate the number of weeks since the first week in the data set
# Find the first week in the dataset
first_week = gas_prices_df.select(weekofyear(col("Timestamp")).alias("Week")).orderBy("Week").first().Week

# Calculate Week Index
gas_prices_df = gas_prices_df.withColumn("WeekIndex", weekofyear(col("Timestamp")) - first_week + 1)

# Show the results
gas_prices_df.select("GasTypeName", "Timestamp", "Price", "AvgPricePerDay", "PriceIndex", "WeekIndex").show()

In [None]:
#pip install seaborn

### Visualizing Average Gas Prices Per Week Using Seaborn

This code segment is focused on visualizing the weekly evolution of average gas prices for different gas types in France:

1. **Data Aggregation**: 
   - The code aggregates the data to calculate the average price per day for each gas type based on the 'WeekIndex'.
   - The `groupBy` method is used to organize the data by week and gas type, and the `avg` function computes the average prices.

2. **Data Collection and Conversion**: 
   - The aggregated results, which are smaller in size than the original DataFrame, are collected and converted into a Pandas DataFrame for ease of plotting.

3. **Plotting with Seaborn and Matplotlib**:
   - Seaborn and Matplotlib libraries are used for plotting.
   - A line plot is created where each gas type's average price trend over weeks is visualized.
   - Styling is set to "darkgrid" for better readability, and the plot size is adjusted for clarity.

4. **Visualization Output**:
   - The plot displays trends in average gas prices, segmented by gas type, over different weeks.
   - This visualization aids in understanding the temporal dynamics of gas prices in France.


In [None]:
from pyspark.sql import functions as F

# Perform the aggregation to get the average price per day per gas type
avg_prices = gas_prices_df.groupBy('WeekIndex', 'GasTypeName').agg(F.avg('Price').alias('AvgPricePerDay'))

# Collect the result which will be much smaller than the original DataFrame
avg_prices_result = avg_prices.collect()

# Convert the result to a Pandas DataFrame
df = pd.DataFrame(avg_prices_result, columns=['WeekIndex', 'GasTypeName', 'AvgPricePerDay'])

# Now plot using Seaborn
import seaborn as sns
import matplotlib.pyplot as plt

sns.set_theme(style="darkgrid")

# Create a lineplot for each type of gas
plt.figure(figsize=(14, 7))  # Adjust the size as needed
lineplot = sns.lineplot(data=df, x='WeekIndex', y='AvgPricePerDay', hue='GasTypeName', marker="o")

plt.title('Weekly Evolution of Average Gas Price Over France')
plt.xlabel('Week Index')
plt.ylabel('Average Price for Gas Type in France')
plt.legend(title='Gas Type')

# Show the plot
plt.show()

### Implementing Lag Function in Spark for Gas Price Analysis

This code segment applies the lag function in Spark to the `gas_prices_df` DataFrame, a key step in analyzing price trends over time:

- **Window Specification**: 
  - A window is defined using `Window.partitionBy('GasTypeName').orderBy('Date')`, which groups data by gas type and orders it chronologically.
  
- **Applying the Lag Function**: 
  - The `lag` function is used to create a new column, 'LagPrice', which contains the previous day's price for each gas type. This is crucial for calculating day-to-day price changes.

- **Handling Null Values**: 
  - Any null values generated by the lag operation (likely in the first row of each partition) are dropped to maintain data integrity.

This approach is instrumental in preparing the dataset for further analysis, such as identifying trends and anomalies in gas pricing.


In [None]:
from pyspark.sql.window import Window

# Assuming 'Price' is the correct column name and 'gas_prices_df' is a Spark DataFrame
windowSpec = Window.partitionBy('GasTypeName').orderBy('Date')

# Use Spark's function to refer to the column
gas_prices_df = gas_prices_df.withColumn('LagPrice', F.lag(gas_prices_df['Price']).over(windowSpec))

# Drop rows with null values that may have been created by lag function
gas_prices_df = gas_prices_df.na.drop()

### Building and Evaluating a Random Forest Regression Model in Spark

This code cell is dedicated to creating and evaluating a Random Forest Regression model using Spark's MLlib for forecasting gas prices:

- **Spark Session Initialization**: A Spark session is initialized with a specific application name.

- **Feature Engineering with Lag Function**:
  - The `lag` function is applied to the `gas_prices_df` DataFrame to include the previous day's price ('LagPrice') for each gas type.
  - Null values resulting from this operation are removed to clean the data.

- **Data Preparation**: 
  - A `VectorAssembler` is used to transform 'LagPrice' into a feature vector.
  
- **Data Splitting**: 
  - The dataset is split into training (80%) and testing (20%) sets.

- **Model Training**:
  - A `RandomForestRegressor` model is initialized and trained using the training data within a Pipeline.

- **Model Prediction and Evaluation**:
  - Predictions are made on the test data.
  - The model's performance is evaluated using Root Mean Squared Error (RMSE) metric.

- **Visualization**:
  - A scatter plot is created to visualize the actual vs. predicted prices, aiding in assessing the model's accuracy.


In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Initialize a Spark session
spark = SparkSession.builder.appName("gas_price_forecasting").getOrCreate()

# Replace 'df' with 'gas_prices_df' as 'df' seems to be a Pandas DataFrame
# Replace 'GasType' with 'GasTypeName' as used previously
windowSpec = Window.partitionBy('GasTypeName').orderBy('Date')
gas_prices_df = gas_prices_df.withColumn('LagPrice', F.lag('Price').over(windowSpec))

# Drop rows with null values that may have been created by lag function
gas_prices_df = gas_prices_df.na.drop()

# Data Preparation
# Assemble features using the actual Spark DataFrame 'gas_prices_df', not 'df'
vectorAssembler = VectorAssembler(inputCols=["LagPrice"], outputCol="features")

# Data Splitting
(trainingData, testData) = gas_prices_df.randomSplit([0.8, 0.2])

# Model Building
# Initialize the RandomForestRegressor
rf = RandomForestRegressor(featuresCol="features", labelCol="Price")

# Chain assembler and forest in a Pipeline
pipeline = Pipeline(stages=[vectorAssembler, rf])

# Train model. This also runs the assembler.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "Price", "Date", "GasTypeName").show(5)

# Evaluate the model
evaluator = RegressionEvaluator(labelCol="Price", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

# Plotting
# For plotting, we need to convert the Spark DataFrame to a Pandas DataFrame
pandas_df = predictions.select("Price", "prediction").toPandas()

# Now use seaborn to plot
sns.scatterplot(data=pandas_df, x="Price", y="prediction")
plt.xlabel("Actual Price")
plt.ylabel("Predicted Price")
plt.title("Dispersion Plot: Actual vs Predicted Price")
plt.show()


### Conclusion and Future Work

This project successfully demonstrated how Apache Spark can be leveraged for analyzing and forecasting gas prices. By implementing data transformations, exploratory analysis, and machine learning, we gained insights into the trends and patterns of gas prices. The RandomForestRegressor model, while foundational, opens the door for more complex predictive models in the future.

Future work could explore enhancing the model's accuracy through advanced feature engineering, hyperparameter tuning, and incorporating external economic indicators. There's also potential in deploying the model for real-time predictions, using live data feeds. This project establishes a solid foundation for more sophisticated analyses in energy sector forecasting.
