<a href="https://colab.research.google.com/github/medYousseffathallah/demand-forecast/blob/main/Pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Import Byspark

In [None]:
from pyspark.sql import SparkSession

# initialize a spark session

In [None]:
 spark=SparkSession.builder.appName('practice').getOrCreate()

#Create a DataFrame

In [None]:
census_df=spark.read.csv('census.csv',["gender","age","zipcode","marriage_status"])*
#show the DataFrame
census_df.show()
#print the DF scema
census_df.printSchema()
#.count() will t=return the total row number in the DF sum() min() max()
row_count=census_df.count()
print(f"Total number of rows in the DataFrame: {row_count}")
#groupby() allows the use sql_like aggregations .select() .filter() .agg()
c*ensus_df.groupBy('marriage_status').count().show()
#usage of filter and select
filtered_census_df=census_df.filter(census_df['age']>30).select('gender','age')
filtered_census_df.show()

In [None]:
#Import
from pyspark.sql.types import StructField,StringType,IntegerType,StructType,ArrayType
#construct the schema
schema=StructType([
    StructField('gender',StringType(),True),
    StructField('age',IntegerType(),True),
    StructField('zipcode',IntegerType(),True),
    StructField('marriage_status',ArrayType(StringType()),True)
])
#set the schema
df=spark.createDataFrame(data,schema=schema)
#print the schema
df.printSchema()

# UDF functions

In [None]:
#pandas Udf
@pandas_udf('long')
def pandas_plus_one(series:pd.Series)->pd.Series:
  return series+1

#sort and dropping missing values

In [None]:
#sort using the age column
df.sort("age",ascending=False).show()
#drop missing values
df.na.drop().show()

#usecase data manipulation

In [None]:

df=spark.read.csv('/content/Coffee_Shop_Sales.csv',header=True,inferSchema=True)
df.show()

In [None]:
df.printSchema()

In [None]:
df1=df.filter(df["store_id"]==5)
df1.count()

In [None]:
store_df=df1.select(
    "transaction_id",
    "transaction_date",
    "transaction_qty",
    "store_id",
    "product_id",
    "unit_price",
    "product_category",
)
store_df.show()


In [None]:
from pyspark.sql.functions import sum,when,col
store_df_count=store_df.select(
    sum(when(col("transaction_id").isNotNull(),1).otherwise(0)).alias("transaction_id"),
    sum(when(col("transaction_date").isNotNull(),1).otherwise(0)).alias("transaction_time"),
    sum(when(col("transaction_qty").isNotNull(),1).otherwise(0)).alias("transaction_qty"),
    sum(when(col("store_id").isNotNull(),1).otherwise(0)).alias("store_id"),
    sum(when(col("product_id").isNotNull(),1).otherwise(0)).alias("product_id"),
    sum(when(col("unit_price").isNotNull(),1).otherwise(0)).alias("unit_price"),
    sum(when(col("product_category").isNotNull(),1).otherwise(0)).alias("product_category"),
).show()

store_df.printSchema()

In [None]:
from pyspark.sql.functions import sum,when,col,round


store_df_complete=store_df.withColumn(
    "total_sales",
    round(col("transaction_qty") * col("unit_price"), 1)

)
store_df_complete.show()


In [None]:
store_df_demo=store_df_complete.drop("unit_price")
store_df_demo.show()

In [None]:
 store_df_demo.describe(['transaction_date']).show()

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd

sample_df=store_df_demo.filter(store_df_demo['store_id']==5).select(['total_sales','transaction_date']).sample(withReplacement=False,fraction=0.9,seed=42)
sample_df.count()
pandas_df=sample_df.toPandas()

# Group by month or week and sum sales
pandas_df['transaction_date'] = pd.to_datetime(pandas_df['transaction_date'])
pandas_df = pandas_df.set_index('transaction_date')
monthly_df = pandas_df.resample('ME').sum()
monthly_df = monthly_df.reset_index()  # makes 'transaction_date' a column again
sns.lineplot(x='transaction_date', y='total_sales', data=monthly_df)
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
pandas_df.tail()



In [None]:
store_df_demo.drop("store_id","transaction_id")

In [None]:
final=store_df_demo.drop("product_category","transaction_id","store_id")
final.show()

#data date transform from String to date  

In [None]:
from pyspark.sql.functions import to_date, col
from pyspark.sql.functions import month, dayofweek, when
from pyspark.sql.functions import lit
from pyspark.sql.functions import sum as spark_sum

# Convert transaction_date string  into a DateType
finalized =final.withColumn("date", to_date(col("transaction_date"), "M/d/yyyy"))

# Extract month (1 = Jan, 6 = Jun)
finalized = finalized.withColumn("month", month(col("date")))

# Day of week (1 = Sunday, 7 = Saturday)
finalized = finalized.withColumn("day_of_week", dayofweek(col("date")))

# Weekend flag (1 if Saturday or Sunday, else 0)
finalized = finalized.withColumn("is_weekend", when((col("day_of_week") == 1) | (col("day_of_week") == 7), 1).otherwise(0))

#holiday list (rass 3am/3id thawra/3id este9lal/3id chouhada/3id 3omal)
holidays = ["2023-01-01","2023-01-14" ,"2023-03-20", "2023-04-09","2023-05-01"]

# Add holiday
finalized = finalized.withColumn("is_holiday", when(col("date").isin(holidays), 1).otherwise(0))

daily_df = finalized.groupBy("date", "month", "day_of_week", "is_weekend", "is_holiday").agg(spark_sum("transaction_qty").alias("daily_demand")).orderBy("date")
finalized.show()
daily_df.show(10)

In [None]:
#modal training :
# =========================
# 1) Imports
# =========================
from pyspark.sql import functions as F
from pyspark.sql.types import DateType, IntegerType
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# =========================
# 2) Ensure types and minimal cleaning
# =========================
# If transaction_date is string like "1/1/2023", convert to DateType
# Adjust the format to match your actual string format (here we use d/M/yyyy)
finalized = finalized.withColumn(
    "date",
    F.coalesce(
        F.col("transaction_date").cast(DateType()),
        F.to_date("transaction_date", "d/M/yyyy")
    )
)

# Basic sanity checks
required = ["date", "transaction_qty", "product_id", "day_of_week", "is_weekend", "is_holiday"]
missing = [c for c in required if c not in finalized.columns]
if missing:
    raise ValueError(f"Missing required columns: {missing}")

# Drop rows with nulls in key fields
finalized =finalized.dropna(subset=["date", "transaction_qty", "product_id", "day_of_week", "is_weekend", "is_holiday"])

# =========================
# 3) Time-based train/test split (no leakage)
#    - Train: everything up to (max_date - 14 days)
#    - Test: last 14 days
# =========================
from datetime import timedelta

# max_date is already collected as a Python date from Spark
max_date = finalized.select(F.max("date").alias("max_date")).collect()[0]["max_date"]

# subtract 14 days using timedelta
cutoff = max_date - timedelta(days=14)

# now use cutoff in your filter
train_df = finalized.where(F.col("date") <= F.lit(cutoff))
test_df  = finalized.where(F.col("date") >  F.lit(cutoff))


# =========================
# 4) ML Pipeline: product encoding + assemble + model
# =========================
product_indexer = StringIndexer(inputCol="product_id", outputCol="product_id_idx", handleInvalid="keep")
product_ohe     = OneHotEncoder(inputCols=["product_id_idx"], outputCols=["product_id_ohe"])

feature_cols = ["product_id_ohe", "day_of_week", "is_weekend", "is_holiday"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

gbt = GBTRegressor(
    featuresCol="features",
    labelCol="transaction_qty",
    maxIter=120,
    maxDepth=6,
    stepSize=0.1,
    subsamplingRate=0.8,
    seed=42
)

pipeline = Pipeline(stages=[product_indexer, product_ohe, assembler, gbt])

# 5) Train

model = pipeline.fit(train_df)

# 6) Evaluate
pred_test = model.transform(test_df)

rmse = RegressionEvaluator(labelCol="transaction_qty", predictionCol="prediction", metricName="rmse").evaluate(pred_test)
mae  = RegressionEvaluator(labelCol="transaction_qty", predictionCol="prediction", metricName="mae").evaluate(pred_test)
r2   = RegressionEvaluator(labelCol="transaction_qty", predictionCol="prediction", metricName="r2").evaluate(pred_test)

print(f"Test RMSE: {rmse:.3f}")
print(f"Test MAE : {mae:.3f}")
print(f"Test R^2 : {r2:.3f}")

# Optional: see a few predictions
pred_test.select("date", "product_id", "transaction_qty", F.col("prediction").alias("pred_qty")).orderBy("date", "product_id").show(20, truncate=False)


# 7) Forecast next N days per product
N_DAYS = 14

# Create future date list (next day after max_date → next N days)
from datetime import timedelta
future_python_dates = [(max_date + timedelta(days=i+1),) for i in range(N_DAYS)]
future_dates_df = spark.createDataFrame(future_python_dates, ["date"])

# Build calendar features for the future dates
# dayofweek in Spark: 1=Sun, ..., 7=Sat
future_feat = (
    future_dates_df
    .withColumn("day_of_week", F.dayofweek("date") - 2)      # optional: shift to 0=Mon..6=Sun if you prefer
    .withColumn("day_of_week", F.when(F.col("day_of_week") < 0, F.col("day_of_week")+7).otherwise(F.col("day_of_week")))
    .withColumn("is_weekend", F.when(F.dayofweek("date").isin(1,7), F.lit(1)).otherwise(F.lit(0)))
)

# Add holidays for the future horizon (replace with your real list or a holidays table join)
future_holidays = []  # e.g., ["2023-07-25", ...] as strings
future_feat = future_feat.withColumn("is_holiday", F.when(F.col("date").cast("string").isin(future_holidays), 1).otherwise(0))

# Cross with all products to get (product_id x future_date)
products_df = df.select("product_id").distinct()
future_scoring_df = (
    products_df.crossJoin(future_feat)
    # Ensure the same column names the pipeline expects
    .select("product_id", "date", "day_of_week", "is_weekend", "is_holiday")
)

# Score future demand
future_pred = model.transform(future_scoring_df) \
    .select(
        "date", "product_id",
        F.col("prediction").alias("predicted_transaction_qty")
    ) \
    .orderBy("date", "product_id")

future_pred.show(50, truncate=False)




In [None]:
from pyspark.sql.functions import sum,when,col,round

future_pred=future_pred.withColumn(
    "predicted_transaction_qty",
    round(col("predicted_transaction_qty"))

)
future_pred.show(150, truncate=False)



#Stream processing


In [None]:
#create a streaming DataFrame from kafka
df= spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:port").option("subscribe","test").load()
#Write Streaming results to kafka
query=df.writeStream.format("kafka").outputMode("append").option("kafka.bootstrap.servers","localhost:port").option("topic","test").start()
#build streaming DataFrame
df=spark.readStream.option("maxFilesPerTrigger",1).format("delta").load()
#apply some transformations, producing new strealing DataFrame
from pyspark.sql.functions import col
email_traffic_df=(df.filter(col("traffic_source"))=="email")
#write streaming query results
email_query=email_traffic_df.writeStream.format("delta").outputMode("append").option("checkpointLocation","/tmp/checkpoints/email").queryName("email_traffic").trigger(processingTime='1 second').start()
#stop the streaming query
email_query.stop()
#wait for termination
email_query.awaitTermination()


#####Triggers

#default when no trigger is specified
df.writeStream.start()
#process batches at fixed intervals
df.writeStream.trigger(processingTime="2 minutes").start()
#one time processing of available data
df.writingStream.trigger(availableNow=True).start()



#### Monitoring via Spark UI


#check if DataFrame is streaming
df.isStreaming #returns True/False
#get unique identifier for query
streaming_query.id

#fet curretn state for query

streaming_query.status
#monitor streaming query progress
streaming_query.lastProgress
#stop the streaming query
streaming_query.stop()


In [None]:
from google.colab import drive
drive.mount('/content/drive')

# Import necessary libraries
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Define the updated schema
schema = StructType([
    StructField("transaction_id", StringType(), True),
    StructField("transaction_date", StringType(), True),
    StructField("transaction_time", TimestampType(), True),
    StructField("transaction_qty", StringType(), True),
    StructField("store_id", StringType(), True),
    StructField("store_location", StringType(), True),
    StructField("product_id", StringType(), True),
    StructField("unit_price", StringType(), True),
    StructField("product_category", StringType(), True),
    StructField("product_type", StringType(), True),
    StructField("product_detail", StringType(), True)
])

# Use the schema for your streaming DataFrame
stream_df = spark.readStream \
    .format("json") \
    .schema(schema) \
    .option("maxFilesPerTrigger", 1) \
    .option("path", "/content/drive/MyDrive/stream_json") \
    .load()


In [None]:
print(f"isStreaming:{stream_df.isStreaming}")

In [None]:
display(stream_df)

In [None]:
# Batch mode (reads all files immediately)
batch_df = spark.read.schema(schema).json("/content/drive/MyDrive/stream_json")
batch_df.show(20, truncate=False)


In [None]:
from pyspark.sql.functions import col

stream_df = stream_df.withColumn("transaction_id", col("transaction_id").cast("int")) \
                     .withColumn("transaction_qty", col("transaction_qty").cast("int")) \
                     .withColumn("product_id", col("product_id").cast("int")) \
                     .withColumn("unit_price", col("unit_price").cast("float"))


#Stateless vs Statful
stateless:no memory/ex:select&filter/process each record independtly\
stateful:maintain information across batches/reauire checkpoint location/ex:groupBy&join&dropDuplicates/window operations(covered later)

#Streaming aggregations:
count/sum&avg/min&max
approx_count_distinct()
aggregate based on window

#Window operations
Tumbling Windows :fixed (ex:counts every 5 min)\
Sliding windows :Overlapping windos where an event can belong to multiple windows\
Session windows:dynamically sized windows based on user activity (with gaps or session timeouts)