<a href="https://colab.research.google.com/github/rohansb10/fully_data_engineer/blob/main/pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
    .appName("Simple PySpark Example") \
    .getOrCreate()

In [None]:
import kagglehub
from kagglehub import KaggleDatasetAdapter

# Set the path to the file you'd like to load
file_path = "sales_data.csv"

# Load the latest version
df = kagglehub.load_dataset(
  KaggleDatasetAdapter.PANDAS,
  "atomicd/retail-store-inventory-and-demand-forecasting",
  file_path,
)


In [None]:
spark_df = spark.createDataFrame(df)

spark_df.show()

In [None]:
spark_df.write.parquet("/rohan/input/df_parquet")

In [None]:
df = spark.read.parquet("/rohan/input/df_parquet",header=True, inferSchema=True)

In [None]:
df.columns

In [None]:
df.printSchema()

In [None]:
df.count()

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType

schema = StructType([
    StructField("Date", DateType(), True),
    StructField("Store ID", IntegerType(), True),
    StructField("Product ID", IntegerType(), True),
    StructField("Category", StringType(), True),
    StructField("Region", StringType(), True),
    StructField("Inventory Level", IntegerType(), True),
    StructField("Units Sold", IntegerType(), True),
    StructField("Units Ordered", IntegerType(), True),
    StructField("Price", DoubleType(), True),
    StructField("Discount", DoubleType(), True),
    StructField("Weather Condition", StringType(), True),
    StructField("Promotion", StringType(), True),
    StructField("Competitor Pricing", DoubleType(), True),
    StructField("Seasonality", StringType(), True),
    StructField("Epidemic", StringType(), True),
    StructField("Demand", IntegerType(), True)
])

In [None]:
from pyspark.sql.functions import date_format

df.withColumn("Month", date_format("Date", "yyyy-MM")) \
  .groupBy("Month") \
  .sum("Units Sold") \
  .orderBy("Month") \
  .show()

In [None]:
# By Product
df.groupBy("Product ID")\
          .sum("Units Sold")\
          .orderBy("sum(Units Sold)", ascending=False)\
          .withColumnRenamed("sum(Units Sold)", "Total Units Sold")\
          .show()

In [None]:
df.show(3)

In [None]:
from pyspark.sql.functions import sum, desc ,avg

top_products = df.groupBy("Category").agg(sum("Units Sold").alias("Total Sold"))
top_products.orderBy(desc("Total Sold")).show(5)

In [None]:
region_demand = df.groupBy("Region").agg(avg("Demand").alias("Avg Demand"))
region_demand.orderBy(desc("Avg Demand")).show()

# sql

In [None]:
df.createOrReplaceTempView("retail_data")

In [None]:
spark.sql("""
SELECT `Product ID`, SUM(`Units Sold`) AS Total_Units_Sold
FROM retail_data
GROUP BY `Product ID`
ORDER BY Total_Units_Sold DESC
LIMIT 5
""").show()


In [None]:
spark.sql("""
SELECT Seasonality, AVG(Demand) AS Avg_Demands
FROM retail_data
GROUP BY Seasonality
ORDER BY Avg_Demands DESC
""").show()


#git

In [None]:
import requests

# Download the file
url = "https://raw.githubusercontent.com/rohansb10/fully_data_engineer/main/data/data.csv"
with open("data.csv", "wb") as f:
    f.write(requests.get(url).content)

# Then load it with Spark
df = spark.read.csv("data.csv", header=True, inferSchema=True)
df.show()


In [None]:
df.count()