# PySpark & SQL Techniques for Data Manipulation

## Khaliil Bouda

## References

https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/index.html


## Loading Packages

# Introduction
This project explores data manipulation techniques using PySpark and SQL.

## Packages and Libraries
Necessary packages, including PySpark, are imported to facilitate data manipulation and analysis.

## Data Exploration & Manipulation
Applying PySpark and SQL functions to perform data transformations and derive meaningful insights.

In [None]:
#install PySpark
!pip install -q pyspark

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [None]:
# create a SparkSession from pyspark.sql, which is the entry point to Spark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [None]:
# verify
spark

In [None]:
#spark functions
from pyspark.sql.functions import countDistinct, sum, col, expr, concat, coalesce, lit, count
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType

In [None]:
#instal pandas
import pandas as pd

## Loading Data


In [None]:
#loading data using spark sql
spark.read.json('hw1_product.json').createOrReplaceTempView('Product')
spark.read.json('hw1_orderline.json').createOrReplaceTempView('Orderline')
spark.read.json('hw1_order.json').createOrReplaceTempView('Order')
spark.read.json('hw1_customer.json').createOrReplaceTempView('Customer')

In [None]:
#loading using pandas
pd_product = pd.read_json('hw1_product.json')
pd_orderline = pd.read_json('hw1_orderline.json')
pd_order = pd.read_json('hw1_order.json')
pd_customer = pd.read_json('hw1_customer.json')

In [None]:
#loading data using Pyspark
sp_product = spark.read.json('hw1_product.json')
sp_orderline = spark.read.json('hw1_orderline.json')
sp_order = spark.read.json('hw1_order.json')
sp_customer = spark.read.json('hw1_customer.json')

## 1. How many products (i.e., distinct count) that were ordered (i.e., in each product line) and what was the total ordered quantity in each product line (in the ascending order of product line id)?

In [None]:
#using sql
spark.sql('''
    SELECT p.productLineId, COUNT(DISTINCT l.productId) AS `Product Count`,
    SUM(l.orderedQuantity) `Ordered Quantity`
    FROM Product p, Orderline l
    WHERE p.productID = l.productID
    GROUP BY p.productLineId
    ORDER BY p.productLineId ASC

''').show()

+-------------+-------------+----------------+
|productLineId|Product Count|Ordered Quantity|
+-------------+-------------+----------------+
|            1|            5|              29|
|            2|            1|               2|
|            3|            2|              20|
+-------------+-------------+----------------+



In [None]:
#using pandas
pd.merge(pd_product, pd_orderline).groupby('productLineId')\
    .agg({'productId': 'nunique',
        'orderedQuantity': 'sum'})\
    .reset_index()\
    .rename(columns={'productId': 'Product Count', 'orderedQuantity': 'Total Ordered Quantity'})\
    .sort_values('productLineId')



Unnamed: 0,productLineId,Product Count,Total Ordered Quantity
0,1,5,29
1,2,1,2
2,3,2,20


In [None]:
#using pyspark
sp_product.join(sp_orderline, 'productId')\
    .groupBy('productLineId')\
    .agg( countDistinct('orderId').alias('Count of Orders'),sum(
        'orderedQuantity').alias('Total Ordered Quantity'))\
    .orderBy('productLineId').show()

+-------------+---------------+----------------------+
|productLineId|Count of Orders|Total Ordered Quantity|
+-------------+---------------+----------------------+
|            1|              9|                    29|
|            2|              1|                     2|
|            3|              5|                    20|
+-------------+---------------+----------------------+



## 2. What are the id, name (no truncation), finish, and revenue (0 if the product was never been ordered) for all products (in the ascending order of product id)?

In [None]:
#using sql
spark.sql('''
    SELECT p.productId, p.productDescription AS `Product Name`,
    p.productFinish AS `Product Finish`,
        COALESCE(SUM(p.productPrice * l.orderedQuantity), 0) AS `Revenue`
    FROM Product p
    LEFT JOIN Orderline l ON p.productId = l.productId
    GROUP BY p.productId, p.productDescription, p.productFinish
    ORDER BY p.productId ASC
''').show()


+---------+--------------------+--------------+-------+
|productId|        Product Name|Product Finish|Revenue|
+---------+--------------------+--------------+-------+
|        1|    Cherry End Table|        Cherry|    875|
|        2| Birch Coffee Tables|         Birch|    800|
|        3|   Oak Computer Desk|           Oak|   8250|
|        4|Entertainment Center|        Cherry|  11550|
|        5|       Writer's Desk|           Oak|    650|
|        6|    8-Drawer Dresser|         Birch|   1500|
|        7|         48 Bookcase|        Walnut|    750|
|        8|         48 Bookcase|           Oak|   2625|
|        9|         96 Bookcase|        Walnut|      0|
|       10|         96 Bookcase|           Oak|      0|
|       11|    4-Drawer Dresser|           Oak|      0|
|       12|    8-Drawer Dresser|           Oak|      0|
|       13|          Nightstand|        Cherry|      0|
|       14|       Writer's Desk|         Birch|      0|
|       17|High Back Leather...|       Leather| 

In [None]:
#using pandas
pd.merge(pd_product, pd_orderline, on='productId', how='left')\
  .groupby(['productId', 'productDescription', 'productFinish'])\
  .agg(productPrice=('productPrice', 'first'), orderedQuantity=('orderedQuantity', 'sum'))\
  .assign(Revenue=lambda x: x['productPrice'] * x['orderedQuantity'])\
  .fillna(0)\
  .sort_values('productId')\
  .reset_index()\
  .drop(columns=['productPrice', 'orderedQuantity'])\
  .rename(columns={
      'productId': 'Product ID', 'productDescription': 'Product Name'
     , 'productFinish': 'Product Finish'})\





Unnamed: 0,Product ID,Product Name,Product Finish,Revenue
0,1,Cherry End Table,Cherry,875.0
1,2,Birch Coffee Tables,Birch,800.0
2,3,Oak Computer Desk,Oak,8250.0
3,4,Entertainment Center,Cherry,11550.0
4,5,Writer's Desk,Oak,650.0
5,6,8-Drawer Dresser,Birch,1500.0
6,7,48 Bookcase,Walnut,750.0
7,8,48 Bookcase,Oak,2625.0
8,9,96 Bookcase,Walnut,0.0
9,10,96 Bookcase,Oak,0.0


In [None]:
#using pyspark
sp_product.join(sp_orderline, 'productId', 'left')\
  .groupBy('productId', 'productDescription', 'productFinish')\
  .agg(expr('COALESCE(SUM(productPrice * orderedQuantity), 0)').alias('Revenue'))\
  .orderBy('productId')\
  .withColumnRenamed('productId', 'Product ID')\
  .withColumnRenamed('productDescription', 'Product Name')\
  .withColumnRenamed('productFinish', 'Product Finish').show()


+----------+--------------------+--------------+-------+
|Product ID|        Product Name|Product Finish|Revenue|
+----------+--------------------+--------------+-------+
|         1|    Cherry End Table|        Cherry|    875|
|         2| Birch Coffee Tables|         Birch|    800|
|         3|   Oak Computer Desk|           Oak|   8250|
|         4|Entertainment Center|        Cherry|  11550|
|         5|       Writer's Desk|           Oak|    650|
|         6|    8-Drawer Dresser|         Birch|   1500|
|         7|         48 Bookcase|        Walnut|    750|
|         8|         48 Bookcase|           Oak|   2625|
|         9|         96 Bookcase|        Walnut|      0|
|        10|         96 Bookcase|           Oak|      0|
|        11|    4-Drawer Dresser|           Oak|      0|
|        12|    8-Drawer Dresser|           Oak|      0|
|        13|          Nightstand|        Cherry|      0|
|        14|       Writer's Desk|         Birch|      0|
|        17|High Back Leather..

## 3. What are the id, date, customer name, and overall price for each order (in the ascending order of order id)?

In [None]:
pd_orderline

Unnamed: 0,orderId,productId,orderedQuantity
0,1001,1,2
1,1001,2,2
2,1001,4,1
3,1002,3,5
4,1003,3,3
5,1004,6,2
6,1004,8,2
7,1005,4,3
8,1006,4,1
9,1006,5,2


In [None]:
#using sql
spark.sql('''
      SELECT o.orderId AS `Order ID`, o.orderDate AS `Order Date`, c.customerName AS `Customer Name`,
      COALESCE(SUM(l.orderedQuantity * p.productPrice), 0) AS `Overall Price`
      FROM Order o, Customer c, Orderline l, Product p
        WHERE o.orderId = l.orderId
          AND l.productId = p.productId
          AND o.customerId = c.customerId
      GROUP BY o.orderId, o.orderDate, c.customerName
      ORDER BY o.orderId ASC;
''').show()

+--------+----------+--------------------+-------------+
|Order ID|Order Date|       Customer Name|Overall Price|
+--------+----------+--------------------+-------------+
|    1001|2010-10-21|Contemporary Casuals|         2400|
|    1002|2010-10-21| California Classics|         3750|
|    1003|2010-10-22|     Mountain Scenes|         2250|
|    1004|2010-10-22|         Impressions|         1850|
|    1005|2010-10-24|    Home Furnishings|         4950|
|    1006|2010-10-24|     Value Furniture|         2600|
|    1007|2010-10-27|American Euro Lif...|          925|
|    1008|2010-10-30|Battle Creek Furn...|         2775|
|    1009|2010-11-05|   Eastern Furniture|         3750|
|    1010|2010-11-05|Contemporary Casuals|         1750|
+--------+----------+--------------------+-------------+



In [None]:
#using pandas
pd.merge(pd.merge(pd.merge(
    pd_order, pd_customer, left_on='customerId', right_on='customerId'),
      pd_orderline, left_on='orderId', right_on='orderId'),
            pd_product, left_on='productId', right_on='productId')\
    .assign(Overall_Price=lambda x: x['orderedQuantity'] * x['productPrice'])\
    .groupby(['orderId', 'orderDate', 'customerName'])['Overall_Price'].sum()\
    .reset_index()\
    .sort_values(by='orderId')\
    .rename(columns={
        'orderId': 'Order ID', 'orderDate': 'Order Date',
                     'customerName': 'Customer Name',
                        'Overall_Price': 'Overall Price'})

Unnamed: 0,Order ID,Order Date,Customer Name,Overall Price
0,1001,2010-10-21,Contemporary Casuals,2400
1,1002,2010-10-21,California Classics,3750
2,1003,2010-10-22,Mountain Scenes,2250
3,1004,2010-10-22,Impressions,1850
4,1005,2010-10-24,Home Furnishings,4950
5,1006,2010-10-24,Value Furniture,2600
6,1007,2010-10-27,American Euro Lifestyles,925
7,1008,2010-10-30,Battle Creek Furniture,2775
8,1009,2010-11-05,Eastern Furniture,3750
9,1010,2010-11-05,Contemporary Casuals,1750


In [None]:
#using pyspark
sp_order.alias("o") \
    .join(sp_customer.alias("c"), col("o.customerId") == col("c.customerId")) \
    .join(sp_orderline.alias("l"), col("o.orderId") == col("l.orderId")) \
    .join(sp_product.alias("p"), col("l.productId") == col("p.productId")) \
    .select("o.orderId", "o.orderDate", "c.customerName", (
        col("p.productPrice") * col("l.orderedQuantity")).alias("Total")) \
    .groupBy(["o.orderId", "o.orderDate", "c.customerName"]) \
    .sum("Total") \
    .select(["o.orderId", "o.orderDate", "c.customerName", col(
        "sum(Total)").alias("Overall Price")]) \
    .orderBy("o.orderId")\
    .show(truncate=False)

+-------+----------+------------------------+-------------+
|orderId|orderDate |customerName            |Overall Price|
+-------+----------+------------------------+-------------+
|1001   |2010-10-21|Contemporary Casuals    |2400         |
|1002   |2010-10-21|California Classics     |3750         |
|1003   |2010-10-22|Mountain Scenes         |2250         |
|1004   |2010-10-22|Impressions             |1850         |
|1005   |2010-10-24|Home Furnishings        |4950         |
|1006   |2010-10-24|Value Furniture         |2600         |
|1007   |2010-10-27|American Euro Lifestyles|925          |
|1008   |2010-10-30|Battle Creek Furniture  |2775         |
|1009   |2010-11-05|Eastern Furniture       |3750         |
|1010   |2010-11-05|Contemporary Casuals    |1750         |
+-------+----------+------------------------+-------------+



## 4. What are the id, name (no truncation), address (full address in one column), and count of orders (0 if never ordered) for each customer (in the ascending order of customer id)?



In [None]:
#using sql
spark.sql('''
      SELECT  c.customerId AS `Customer ID`, c.customerName AS `Customer Name`,
          CONCAT(c.customerStreet, ', ', c.customerCity, ', ',
          c.customerState, ', ', c.customerZip) AS `Address`,
          COALESCE(COUNT(o.orderId), 0) AS `Order Count`
      FROM  Customer c
      LEFT JOIN Order o ON c.customerId = o.customerId
      GROUP BY c.customerId, c.customerName,
        c.customerStreet, c.customerCity, c.customerState, c.customerZip
      ORDER BY c.customerId ASC;
 ''').show()

+-----------+--------------------+--------------------+-----------+
|Customer ID|       Customer Name|             Address|Order Count|
+-----------+--------------------+--------------------+-----------+
|          1|Contemporary Casuals|1355 S Hines Blvd...|          2|
|          2|     Value Furniture|15145 S.W. 17th S...|          1|
|          3|    Home Furnishings|1900 Allard Ave.,...|          1|
|          4|   Eastern Furniture|1925 Beltline Rd....|          1|
|          5|         Impressions|5585 Westcott Ct....|          1|
|          6|   Furniture Gallery|325 Flatiron Dr.,...|          0|
|          7|    Period Furniture|394 Rainbow Dr., ...|          0|
|          8| California Classics|816 Peach Rd., Sa...|          1|
|          9|M and H Casual Fu...|3709 First Street...|          0|
|         10|  Seminole Interiors|2400 Rocky Point ...|          0|
|         11|American Euro Lif...|2424 Missouri Ave...|          1|
|         12|Battle Creek Furn...|345 Capitol Av

In [None]:
#using pandas
pd_customer.merge(pd_order, how='left', on='customerId')\
.groupby(['customerId', 'customerName', 'customerStreet', 'customerCity', 'customerState', 'customerZip'])\
.agg(cust_id=('customerId', 'first'),
cust_name=('customerName', 'first'),
address=('customerStreet', lambda x: f"{x.iloc[0]}, {x.iloc[0]}, {x.iloc[0]}, {x.iloc[0]}"),
order_count=('orderId', 'count'))\
.reset_index()\
.sort_values(by='customerId')\
.rename(columns={
    'cust_id': 'Customer ID',
    'cust_name': 'Customer Name',
    'address': 'Address',
    'Order_Count': 'Order Count'
})\
[['Customer ID', 'Customer Name', 'Address', 'Order Count']]

Unnamed: 0,Customer ID,Customer Name,Address,Order Count
0,1,Contemporary Casuals,"1355 S Hines Blvd, 1355 S Hines Blvd, 1355 S H...",2
1,2,Value Furniture,"15145 S.W. 17th St., 15145 S.W. 17th St., 1514...",1
2,3,Home Furnishings,"1900 Allard Ave., 1900 Allard Ave., 1900 Allar...",1
3,4,Eastern Furniture,"1925 Beltline Rd., 1925 Beltline Rd., 1925 Bel...",1
4,5,Impressions,"5585 Westcott Ct., 5585 Westcott Ct., 5585 Wes...",1
5,6,Furniture Gallery,"325 Flatiron Dr., 325 Flatiron Dr., 325 Flatir...",0
6,7,Period Furniture,"394 Rainbow Dr., 394 Rainbow Dr., 394 Rainbow ...",0
7,8,California Classics,"816 Peach Rd., 816 Peach Rd., 816 Peach Rd., 8...",1
8,9,M and H Casual Furniture,"3709 First Street, 3709 First Street, 3709 Fir...",0
9,10,Seminole Interiors,"2400 Rocky Point Dr., 2400 Rocky Point Dr., 24...",0


In [None]:
#using pyspark
sp_customer.join(sp_order, "customerId", "left_outer").groupBy(
    col("customerId").alias("Customer ID"),
    col("customerName").alias("Customer Name"),
    concat( col("customerStreet"), lit(", "), col("customerCity"), lit(", "),
    col("customerState"), lit(", "), col("customerZip")).\
    alias("Address"))\
    .agg(coalesce(count("orderId"), lit(0)).alias("Order Count"))\
    .orderBy("customerId").show()




+-----------+--------------------+--------------------+-----------+
|Customer ID|       Customer Name|             Address|Order Count|
+-----------+--------------------+--------------------+-----------+
|          1|Contemporary Casuals|1355 S Hines Blvd...|          2|
|          2|     Value Furniture|15145 S.W. 17th S...|          1|
|          3|    Home Furnishings|1900 Allard Ave.,...|          1|
|          4|   Eastern Furniture|1925 Beltline Rd....|          1|
|          5|         Impressions|5585 Westcott Ct....|          1|
|          6|   Furniture Gallery|325 Flatiron Dr.,...|          0|
|          7|    Period Furniture|394 Rainbow Dr., ...|          0|
|          8| California Classics|816 Peach Rd., Sa...|          1|
|          9|M and H Casual Fu...|3709 First Street...|          0|
|         10|  Seminole Interiors|2400 Rocky Point ...|          0|
|         11|American Euro Lif...|2424 Missouri Ave...|          1|
|         12|Battle Creek Furn...|345 Capitol Av

# Summary
This project demonstrated key data manipulation techniques using PySpark and SQL functions, highlighting their effectiveness for handling and transforming data.