<a href="https://colab.research.google.com/github/suriarasai/BEAD2024/blob/main/colab/09_Simple_Report_Example.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Introduction
In this tutorial, we will learn to use SQLite on the Google Colab notebook.

### Spark

In [1]:
# install pyspark using pip
!pip install --ignore-install -q pyspark
# install findspark using pip
!pip install --ignore-install -q findspark

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m16.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [2]:
#from pyspark import SparkConf,SparkContext
from pyspark.sql import SparkSession
import collections
spark = SparkSession.builder.master("local").appName("Example SQL Use Cases").config('spark.ui.port', '4050').getOrCreate()

###Broadcast Variable
A broadcast variable in PySpark is a read-only variable that is shared across all the nodes in a Spark cluster. It allows the programmer to cache a large dataset in the memory of each worker node, so that the data can be accessed efficiently during the computation, instead of being shipped over the network multiple times.

1. Reduced Network I/O: Instead of sending the same data repeatedly to each worker node, the data is broadcasted once and stored locally on each node. This reduces the amount of data that needs to be transferred over the network, leading to significant performance gains, especially for large datasets.

2. Efficient Data Sharing: Broadcast variables ensure that the data is shared efficiently across all nodes. Each node gets a local copy of the data, which can be accessed with minimal latency, improving the overall speed of the computation.

3. Memory Efficiency: By storing the broadcast variable only once per node, memory usage is optimized. Each node holds a single copy of the data, rather than multiple copies, which can be the case if the same data were sent to each task independently.

Example 1

In [10]:
# Sample large DataFrame
large_df = spark.range(100)
large_df.printSchema()


+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
+---+
only showing top 10 rows

root
 |-- id: long (nullable = false)



In [11]:
large_df.select('id').show(10)

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
+---+
only showing top 10 rows



In [23]:
from pyspark.sql.functions import col, udf
# Apply filter operation without broadcast variable
filtered_data = large_df.filter("id > 1" and col("id") < 4)
filtered_data.show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
+---+



In [26]:
from pyspark.sql.types import BooleanType
# Broadcast variable example
broadcast_var = spark.sparkContext.broadcast([1, 2, 3, 4, 5])

# Function to filter data using broadcast variable
def filter_data(value):
    return value in broadcast_var.value

# Register UDF
filter_data_udf = udf(filter_data, BooleanType())

# Apply filter operation with broadcast variable
filtered_data = large_df.filter(filter_data_udf(col("id")))
filtered_data.show()



+---+
| id|
+---+
|  1|
|  2|
|  3|
|  4|
|  5|
+---+



#### Example 2

In [28]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast

# Initialize Spark session
spark = SparkSession.builder.appName("Broadcast Join Example").getOrCreate()

# Sample small DataFrame
small_df = spark.createDataFrame([(1, "A"), (2, "B"), (3, "C")], ["id", "value"])

# Sample large DataFrame
large_df = spark.range(1000).toDF("id")

# Perform broadcast join
joined_df = large_df.join(broadcast(small_df), "id")

# Show the results
joined_df.show()

+---+-----+
| id|value|
+---+-----+
|  1|    A|
|  2|    B|
|  3|    C|
+---+-----+



### coalesce
Generally, coalesce is a function that can be used in two different contexts: within SQL expressions to handle null values and as a method to reduce the number of partitions in a DataFrame or RDD.

1. SQL Expression: Handling Null Values
The coalesce function in SQL is used to return the first non-null value from a list of columns. This is particularly useful when you have multiple columns that might contain null values, and you want to fill these nulls with the values from another column.

In [29]:
from pyspark.sql.functions import coalesce

# Sample data
data = [(1, None), (None, 2), (None, None), (4, 5)]
columns = ["col1", "col2"]
df = spark.createDataFrame(data, columns)

# Use coalesce to fill null values
df.withColumn("filled_col", coalesce(df["col1"], df["col2"])).show()

+----+----+----------+
|col1|col2|filled_col|
+----+----+----------+
|   1|NULL|         1|
|NULL|   2|         2|
|NULL|NULL|      NULL|
|   4|   5|         4|
+----+----+----------+



2. Reducing the Number of Partitions
In the context of DataFrames and RDDs, coalesce is used to reduce the number of partitions in a DataFrame or RDD. This is often done to optimize performance when writing to disk or when performing operations that benefit from fewer partitions.

In [30]:
# Sample data
data = [(1,), (2,), (3,), (4,)]
columns = ["number"]
df = spark.createDataFrame(data, columns)

# Repartition DataFrame into 4 partitions
df_repartitioned = df.repartition(4)

# Reduce the number of partitions to 2
df_coalesced = df_repartitioned.coalesce(2)

# Show the number of partitions
print("Number of partitions after coalesce: ", df_coalesced.rdd.getNumPartitions())


Number of partitions after coalesce:  2


Write an SQL query to report how many units in each category have been ordered on each day of the week.

Return the result table ordered by category.

In [33]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, dayofweek, sum as _sum, when, coalesce, lit
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Orders and Items DataFrame") \
    .getOrCreate()

# Define schema for Orders table
orders_schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("order_date", StringType(), True),  # Temporarily use StringType
    StructField("item_id", IntegerType(), True),
    StructField("quantity", IntegerType(), True)
])

# Define schema for Items table
items_schema = StructType([
    StructField("item_id", IntegerType(), True),
    StructField("item_name", StringType(), True),
    StructField("item_category", StringType(), True)
])

# Sample data for Orders
orders_data = [
    (1, 1, "2024-06-01", 1, 10),
    (2, 1, "2024-06-08", 2, 10),
    (3, 2, "2024-06-02", 1, 5),
    (4, 3, "2024-06-03", 3, 5),
    (5, 4, "2024-06-04", 4, 1),
    (6, 4, "2024-06-05", 5, 5),
    (7, 5, "2024-06-05", 1, 10),
    (8, 5, "2024-06-14", 4, 5),
    (9, 5, "2024-06-21", 3, 5)
]

# Sample data for Items
items_data = [
    (1, "Atomic Habits", "Book"),
    (2, "The little blue book", "Book"),
    (3, "Samsung SmarthPhone", "Phone"),
    (4, "Some Phone 2020", "Phone"),
    (5, "Google Glass", "Glasses"),
    (6, "Random Uniqlo T-Shirt XL", "T-Shirt")
]

# Create DataFrame for Orders
orders_df = spark.createDataFrame(data=orders_data, schema=orders_schema)

# Convert order_date from string to date type
orders_df = orders_df.withColumn("order_date", orders_df["order_date"].cast("date"))

# Create DataFrame for Items
items_df = spark.createDataFrame(data=items_data, schema=items_schema)

# Join Orders and Items DataFrames
joined_df = items_df.join(orders_df, items_df["item_id"] == orders_df["item_id"], "left") \
    .groupBy("item_category") \
    .agg(
        coalesce(_sum(when(dayofweek(col("order_date")) == 2, col("quantity"))), lit(0)).alias("Monday"),
        coalesce(_sum(when(dayofweek(col("order_date")) == 3, col("quantity"))), lit(0)).alias("Tuesday"),
        coalesce(_sum(when(dayofweek(col("order_date")) == 4, col("quantity"))), lit(0)).alias("Wednesday"),
        coalesce(_sum(when(dayofweek(col("order_date")) == 5, col("quantity"))), lit(0)).alias("Thursday"),
        coalesce(_sum(when(dayofweek(col("order_date")) == 6, col("quantity"))), lit(0)).alias("Friday"),
        coalesce(_sum(when(dayofweek(col("order_date")) == 7, col("quantity"))), lit(0)).alias("Saturday"),
        coalesce(_sum(when(dayofweek(col("order_date")) == 1, col("quantity"))), lit(0)).alias("Sunday")
    ) \
    .orderBy("item_category")

# Show the result DataFrame
joined_df.show()



+-------------+------+-------+---------+--------+------+--------+------+
|item_category|Monday|Tuesday|Wednesday|Thursday|Friday|Saturday|Sunday|
+-------------+------+-------+---------+--------+------+--------+------+
|         Book|     0|      0|       10|       0|     0|      20|     5|
|      Glasses|     0|      0|        5|       0|     0|       0|     0|
|        Phone|     5|      1|        0|       0|    10|       0|     0|
|      T-Shirt|     0|      0|        0|       0|     0|       0|     0|
+-------------+------+-------+---------+--------+------+--------+------+

