# Spark Notebook

This file contains the dataset from the [Hadoop-Hive](https://github.com/leorickli/bike-company-hadoop/blob/main/README.md) project. There will be some data analysis and machine learning using PySpark.

In [84]:
from pyspark.sql.functions import desc, round, col, concat, to_date, count, avg

In [85]:
Customer = spark.read.csv("gs://infnet-project-leo/Customer.csv", header=True, inferSchema=True)
Person = spark.read.csv("gs://infnet-project-leo/Person.csv", header=True, inferSchema=True)
Product = spark.read.csv("gs://infnet-project-leo/Product.csv", header=True, inferSchema=True)
SalesOrderDetail = spark.read.csv("gs://infnet-project-leo/SalesOrderDetail.csv", header=True, inferSchema=True)
SalesOrderHeader = spark.read.csv("gs://infnet-project-leo/SalesOrderHeader.csv", header=True, inferSchema=True)
SpecialOfferProduct = spark.read.csv("gs://infnet-project-leo/SpecialOfferProduct.csv", header=True, inferSchema=True)


                                                                                

## Existing Queries

These were the queries used in the [Hadoop-Hive](https://github.com/leorickli/bike-company-hadoop/blob/main/README.md) project. The SQL queries will be translated to PySpark now. 

1. Find the different types of "PersonType" in the "Person" table.

SELECT DISTINCT PersonType
FROM Person;

In [None]:
# First Query

# Select distinct values of the "PersonType" column
first_query = person.select("PersonType").distinct()

first_query.show()

2. Find the top ten biggest orders done so far.

SELECT totaldue
FROM salesOrderHeader
ORDER BY totaldue DESC
LIMIT 10;

In [19]:
# Second Query

# Select the "totaldue" column, round it to two decimal places, order by it in descending order
second_query = salesOrderHeader.select(round("totaldue", 2).alias("rounded_totaldue")).orderBy(desc("rounded_totaldue")).limit(10)

second_query.show()

+----------------+
|rounded_totaldue|
+----------------+
|       187487.83|
|       182018.63|
|       170512.67|
|       166537.08|
|       165028.75|
|       158056.54|
|       145741.86|
|       145454.37|
|       142312.22|
|       140042.12|
+----------------+



3. Write a query that returns the number of rows in the Sales.SalesOrderDetail table by the SalesOrderID field, provided they have at least three rows of details.

SELECT SalesOrderID as id, COUNT(*) AS qtd 
FROM salesOrderDetail as sod
GROUP BY SalesOrderID
HAVING qtd >= 3
ORDER BY qtd DESC
LIMIT 10;

In [24]:
# Third Query
# The following queries are rather complex, so I had to separate them into different lines for clarity and readability

# Rename the "SalesOrderID" column to "id"
salesOrderDetail = salesOrderDetail.withColumnRenamed("SalesOrderID", "id")

# Group by "id" and count the occurrences
grouped_salesOrderDetail = salesOrderDetail.groupBy("id").count()

# Filter for rows with count (qtd) greater than or equal to 3
filtered_salesOrderDetail = grouped_salesOrderDetail.filter(col("count") >= 3)

# Order by count (qtd) in descending order
ordered_salesOrderDetail = filtered_salesOrderDetail.orderBy(col("count").desc())

# Limit the result to the top 10 rows
third_query = ordered_salesOrderDetail.limit(10)

third_query.show()



+-----+-----+
|   id|count|
+-----+-----+
|51721|   72|
|51739|   72|
|53465|   71|
|51160|   71|
|47355|   68|
|57046|   67|
|51120|   67|
|51090|   66|
|55297|   66|
|47395|   66|
+-----+-----+



                                                                                

4. Write a query linking the Person.Person, Sales.Customer, and Sales.SalesOrderHeader tables to get a list of customer names and a count of orders placed.

SELECT c.CustomerID AS id, CONCAT(p.FirstName, ' ', p.LastName) AS nome, COUNT(*) AS qtd
FROM salesOrderHeader soh
JOIN customer c ON soh.CustomerID = c.CustomerID
JOIN person p ON c.PersonID = p.BusinessEntityID 
GROUP BY c.CustomerID, p.FirstName, p.LastName
ORDER BY qtd DESC
LIMIT 10;

In [38]:
# Fourth Query
# Had to use 'concat_ws' instead of the regular 'concat'
# The 'concat_ws' function takes a separator (in this case, a space) and a list of columns to concatenate

# Perform the necessary joins with corrected column names
joined_df = salesOrderHeader.join(customer, "CustomerID", "inner").join(person, col("PersonID") == col("BusinessEntityID"), "inner")

# Select the desired columns and perform aggregations
from pyspark.sql.functions import concat_ws

fourth_query = joined_df.select(col("CustomerID").alias("id"),
                            concat_ws(" ", col("FirstName"), col("LastName")).alias("nome")) \
                            .groupBy("id", "nome") \
                            .count() \
                            .orderBy(col("count").desc()) \
                            .limit(10)

fourth_query.show()

                                                                                

+-----+--------------+-----+
|   id|          nome|count|
+-----+--------------+-----+
|29994|Robin McGuigan|   12|
|30117|  Robert Vessa|   12|
|29489| Frances Adams|   12|
|29950|       Yale Li|   12|
|29844|  Nancy Hirota|   12|
|29901|    John Kelly|   12|
|29685|    Pamela Cox|   12|
|29810|  Jean Handley|   12|
|30076| Diane Tibbott|   12|
|29637|Donna Carreras|   12|
+-----+--------------+-----+



5. Write a query showing the SalesOrderID, OrderDate, and TotalDue fields from the Sales.SalesOrderHeader table. Get only the lines where the order was placed during September/2011 and the total due is above 1,000. Sort by descending total due.

SELECT 
SalesOrderID as id,
CAST(OrderDate AS DATE) AS data, 
TotalDue AS total_devido
FROM salesOrderHeader
WHERE OrderDate BETWEEN '2011-09-01' AND '2011-09-30' AND TotalDue > 1000
ORDER BY total_devido;

In [44]:
# Fifth Query
# Query returned no values, because there is no data between that time span.

# Filter the DataFrame based on the conditions
fifth_query = salesOrderHeader.select(
    col("SalesOrderID").alias("id"),
    to_date(col("OrderDate"), "yyyy-MM-dd").alias("data"),
    col("TotalDue").alias("total_devido")) \
    .filter((col("data").between("2011-09-01", "2011-09-30")) & (col("total_devido") > 1000)) \
    .orderBy("total_devido")

fifth_query.show()

+---+----+------------+
| id|data|total_devido|
+---+----+------------+
+---+----+------------+



## Extra Queries

These are extra queries for PySpark.

1. Count the number of products by color.

In [63]:
# Count the number of products by color
first_extra = product.groupBy("color").agg(count("*").alias("total")).orderBy(col("total").desc())

first_extra.show()

+------------+-----+
|       color|total|
+------------+-----+
|        null|  248|
|       Black|   93|
|      Silver|   43|
|         Red|   38|
|      Yellow|   36|
|        Blue|   26|
|       Multi|    8|
|Silver/Black|    7|
|       White|    4|
|        Grey|    1|
+------------+-----+



2. Give me the average unit prices for every special offer ID

In [67]:
# Group by "SpecialOfferID" and calculate the average "UnitPrice" for each
grouped_df = salesOrderDetail.groupBy("SpecialOfferID").agg(avg("UnitPrice").alias("avg_UnitPrice"))

# Select distinct "SpecialOfferID" values along with the rounded average "UnitPrice"
distinct_special_offer_avg_price = grouped_df.select(
    "SpecialOfferID",
    round("avg_UnitPrice", 2).alias("AvgUnitPrice")
    ).distinct()

# Order the result by the rounded average in descending order
second_extra = distinct_special_offer_avg_price.orderBy("AvgUnitPrice", ascending=False)

second_extra.show()

+--------------+------------+
|SpecialOfferID|AvgUnitPrice|
+--------------+------------+
|            14|     1029.84|
|             7|      846.85|
|             2|      656.59|
|             1|      461.82|
|            13|      349.64|
|             9|       234.9|
|            16|       113.0|
|             3|      105.19|
|             4|       61.98|
|             5|        24.3|
|             8|       16.82|
|            11|       15.75|
+--------------+------------+



                                                                                

## Machine Learning

This code is a Python script that demonstrates the process of building a linear regression model for predicting the total sales of the top 10 products using the PySpark library. It assumes you have a DataFrame ml_df with the required columns: "name" "OrderQty", "UnitPrice", and "OrderDate", extracted from the "Product", "SalesOrderDetail", and "SalesOrderHeader" tables.

In [89]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

In [90]:
# Join the tables and select the desired columns
ml_df = Product.join(SalesOrderDetail, Product.ProductID == SalesOrderDetail.ProductID, "inner") \
                  .join(SalesOrderHeader, SalesOrderDetail.SalesOrderID == SalesOrderHeader.SalesOrderID, "inner") \
                  .select(col("name"), 
                          col("OrderQty"), 
                          col("UnitPrice"), 
                          col("OrderDate"))

ml_df.show()

+--------------------+--------+---------+-------------------+
|                name|OrderQty|UnitPrice|          OrderDate|
+--------------------+--------+---------+-------------------+
|Mountain-100 Blac...|       1| 2024.994|2011-05-31 00:00:00|
|Mountain-100 Blac...|       3| 2024.994|2011-05-31 00:00:00|
|Mountain-100 Blac...|       1| 2024.994|2011-05-31 00:00:00|
|Mountain-100 Silv...|       1| 2039.994|2011-05-31 00:00:00|
|Mountain-100 Silv...|       1| 2039.994|2011-05-31 00:00:00|
|Mountain-100 Silv...|       2| 2039.994|2011-05-31 00:00:00|
|Mountain-100 Silv...|       1| 2039.994|2011-05-31 00:00:00|
|Long-Sleeve Logo ...|       3|  28.8404|2011-05-31 00:00:00|
|Long-Sleeve Logo ...|       1|  28.8404|2011-05-31 00:00:00|
|Mountain Bike Soc...|       6|      5.7|2011-05-31 00:00:00|
|        AWC Logo Cap|       2|   5.1865|2011-05-31 00:00:00|
|Sport-100 Helmet,...|       4|  20.1865|2011-05-31 00:00:00|
|Road-650 Red, 44 ...|       1| 419.4589|2011-05-31 00:00:00|
|Road-45

In [91]:
# Define the feature columns and create the feature vector
feature_cols = ["OrderQty", "UnitPrice"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
ml_df = assembler.transform(ml_df)

# Filter for data in the top 10 products
top_10_products = ml_df.groupBy("name").agg({"OrderQty": "sum"}).withColumnRenamed("sum(OrderQty)", "TotalSales")
top_10_products = top_10_products.sort(col("TotalSales"), ascending=False).limit(10)
ml_df = ml_df.join(top_10_products, on=["name"], how="inner")

# Split the data into training and testing sets (80% training, 20% testing)
(trainingData, testData) = ml_df.randomSplit([0.8, 0.2], seed=123)

# Create a Linear Regression model
lr = LinearRegression(featuresCol="features", labelCol="TotalSales")

# Train the model using the training data
lr_model = lr.fit(trainingData)

# Make predictions on the testing data
predictions = lr_model.transform(testData)

# Evaluate the model's performance
evaluator = RegressionEvaluator(labelCol="TotalSales", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE):", rmse)


23/10/13 11:08:57 WARN Instrumentation: [0da3d781] regParam is zero, which might cause numerical instability and overfitting.

Root Mean Squared Error (RMSE): 950.9888478095909


                                                                                