In [9]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, weekofyear, dayofweek, to_date
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Initialize Spark session
spark = SparkSession.builder.appName("SalesForecast").getOrCreate()

# Load transactional data
data = spark.read.csv("/home/ccp/Desktop/task1/Stores_Transactions .csv", header=True, inferSchema=True, sep=",")

# Convert `transaction_date` to date format
data = data.withColumn("transaction_date", to_date(col("transaction_date"), "yyyy-MM-dd"))

# Add additional date features
data = data.withColumn("week", weekofyear(col("transaction_date")))
data = data.withColumn("day_of_week", dayofweek(col("transaction_date")))

# Aggregate data per day, store, and product
daily_sales = data.groupBy("store", "product_name", "transaction_date", "week", "day_of_week")\
    .agg({"quantity": "sum"})\
    .withColumnRenamed("sum(quantity)", "quantity")

# Split data into train and test sets
split_date = "1403-01-05"  # Adjust based on your needs
train_data = daily_sales.filter(col("transaction_date") <= split_date)
test_data = daily_sales.filter(col("transaction_date") > split_date)

# Convert to Pandas DataFrame
pd_train_data = train_data.toPandas()

# Encode categorical features
store_indexer = StringIndexer(inputCol="store", outputCol="store_index").setHandleInvalid("keep")
product_indexer = StringIndexer(inputCol="product_name", outputCol="product_index").setHandleInvalid("keep")

# Feature selection
feature_cols = ["store_index", "product_index", "week", "day_of_week"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Initialize Random Forest model
rf = RandomForestRegressor(featuresCol="features", labelCol="quantity", maxBins=179)

# Create pipeline
pipeline = Pipeline(stages=[store_indexer, product_indexer, assembler, rf])

# Train the model
model = pipeline.fit(train_data)

# Make predictions
test_predictions = model.transform(test_data)

# Evaluate MAE
mae_evaluator = RegressionEvaluator(labelCol="quantity", predictionCol="prediction", metricName="mae")
mae = mae_evaluator.evaluate(test_predictions)

# Get total sales for week 39
weekly_sales = test_predictions.groupBy("week").agg({"prediction": "sum"})
predicted_sales_w39 = weekly_sales.filter(col("week") == 20).select("sum(prediction)").collect()[0][0]

# Stop Spark session
spark.stop()


In [11]:
test_predictions.toPandas().head(10)


AttributeError: 'NoneType' object has no attribute 'setCallSite'