## Lab__01-Analyze data with Apache Spark

###### 1. Create Workspace
###### 2. Create Lakehouse
###### 3. Set Default Lakehouse

In [None]:
#####################################
# spark best practice
#####################################
spark.conf.set("spark.sql.parquet.vorder.enabled", "true")
spark.conf.set("spark.microsoft.delta.optimizationWrite.enabled", "true")
spark.conf.set("spark.microsoft.delta.optimizationWrite.binSize", "1073741824")
spark.conf.set('spark.ms.autotune.queryTuning.enabled', 'true')
spark.conf.set('spark.sql.files.maxPartitionBytes', '1073741824')

In [None]:
#####################################
# Download Example files 
#####################################
import os
import requests
import zipfile

DATA_ROOT = "/lakehouse/default"
DATA_FOLDER = "Files/orders"  # folder with data files
DATA_FILE = "orders.zip"  # data file name

os.makedirs(f'{DATA_ROOT}/{DATA_FOLDER}', exist_ok=True)
remote_url = "https://github.com/MicrosoftLearning/dp-data/raw/main/orders.zip"

r = requests.get(remote_url, timeout=30)
with open(f'{DATA_ROOT}/{DATA_FOLDER}/{DATA_FILE}', 'wb') as f:
    f.write(r.content)

with zipfile.ZipFile(f'{DATA_ROOT}/{DATA_FOLDER}/{DATA_FILE}', mode='r') as zipf:
    for subfile in zipf.namelist():
        zipf.extract(subfile, f'{DATA_ROOT}/{DATA_FOLDER}/')

os.remove(f'{DATA_ROOT}/{DATA_FOLDER}/{DATA_FILE}')

In [None]:
df = spark.read.format("csv").option("header","false").load("Files/orders/2019.csv")
display(df)

In [None]:
#####################################
#create dataframe with Schema
#####################################
from pyspark.sql.types import *

orderSchema = StructType([
    StructField("SalesOrderNumber", StringType()),
    StructField("SalesOrderLineNumber", IntegerType()),
    StructField("OrderDate", DateType()),
    StructField("CustomerName", StringType()),
    StructField("Email", StringType()),
    StructField("Item", StringType()),
    StructField("Quantity", IntegerType()),
    StructField("UnitPrice", FloatType()),
    StructField("Tax", FloatType())
    ])

df = spark.read.format("csv").schema(orderSchema).load("Files/orders/*.csv")
display(df)

In [None]:
customers = df['CustomerName', 'Email']
print(customers.count())
print(customers.distinct().count())
display(customers.distinct())

In [None]:
#####################################
#select dataframe with filter
#####################################
customers = df.select("CustomerName", "Email").where(df['Item']=='Road-250 Red, 52')
print(customers.count())
print(customers.distinct().count())
display(customers.distinct())

In [None]:
#####################################
#Aggregate and group data in a dataframe
#####################################
productSales = df.select("Item", "Quantity").groupBy("Item").sum()
display(productSales)

In [None]:
#####################################
#groupby, orderby in a dataframe
#####################################
from pyspark.sql.functions import *

yearlySales = df.select(year(col("OrderDate")).alias("Year")).groupBy("Year").count().orderBy("Year")
display(yearlySales)

In [None]:
#####################################
#Use dataframe methods and functions to transform data (Option: Data Wrangler)
#####################################
from pyspark.sql.functions import *

## Create Year and Month columns
transformed_df = df.withColumn("Year", year(col("OrderDate"))).withColumn("Month", month(col("OrderDate")))

# Create the new FirstName and LastName fields
transformed_df = transformed_df.withColumn("FirstName", split(col("CustomerName"), " ").getItem(0)).withColumn("LastName", split(col("CustomerName"), " ").getItem(1))

# Filter and reorder columns
transformed_df = transformed_df["SalesOrderNumber", "SalesOrderLineNumber", "OrderDate", "Year", "Month", "FirstName", "LastName", "Email", "Item", "Quantity", "UnitPrice", "Tax"]

# Display the first five orders
display(transformed_df.limit(5))

In [None]:
#####################################
#Save the transformed data
#####################################
transformed_df.write.mode("overwrite").parquet('Files/transformed_data/orders')
#print ("Transformed data saved!")

#####################################
#Read parquet file
#####################################
orders_df = spark.read.format("parquet").load("Files/transformed_data/orders")
display(orders_df)


In [None]:
#####################################
#Save the transformed data by partitioined files
#####################################
orders_df.write.partitionBy("Year","Month").mode("overwrite").parquet("Files/partitioned_data")
print ("Transformed data saved!")

orders_2021_df = spark.read.format("parquet").load("Files/partitioned_data/Year=2021/Month=*")
display(orders_2021_df)

In [None]:
#####################################
# Create a new table (default format : delta)
#####################################
df.write.format("delta").mode("overwrite").saveAsTable("salesorders")

# Get the table description
spark.sql("DESCRIBE EXTENDED salesorders").show(truncate=False)


In [None]:
#####################################
# DESCRIBE DETAIL
#####################################
from pyspark.sql.functions import lit

lakehouse = "lakehouse"
tablename = "salesorders"
detail_df = spark.sql(f"DESCRIBE DETAIL {lakehouse}.{tablename}")
detail_df = detail_df.withColumn("lakehousename", lit(lakehouse)).withColumn("tablename", lit(tablename))

history_df = spark.sql(f"DESCRIBE HISTORY {lakehouse}.{tablename}")
history_df = history_df.withColumn("lakehousename", lit(lakehouse)).withColumn("tablename", lit(tablename))

join = detail_df.join(history_df, ['lakehousename','tablename'])

display(join)

In [None]:
df = spark.sql("SELECT * FROM lakehouse.salesorders LIMIT 1000")
display(df)

In [None]:
#########################################
# Partitioning
#https://learn.microsoft.com/en-us/fabric/data-engineering/tutorial-lakehouse-data-preparation
#########################################
from pyspark.sql.functions import col, year, month, quarter

table_name = 'salesorders_Partition'

df = spark.sql("SELECT * FROM lakehouse.salesorders")
df = df.withColumn('Year', year(col("OrderDate")))
df = df.withColumn('Month', month(col("OrderDate")))

#display(df)

df.write.mode("overwrite").format("delta").partitionBy("Year","Month").save("Tables/" + table_name)

In [None]:
%%sql
-----------------------------------------------
-- Partitioning by SparkSQL
-----------------------------------------------
CREATE or REPLACE TABLE lakehouse.salesorders_partition_128
(
    SalesOrderLineNumber BIGINT
    , OrderDate DATE
    , Quantity BIGINT
    , UnitPrice FLOAT
    , Year INT
    , Month INT
)USING DELTA
PARTITIONED BY(Year, Month)
TBLPROPERTIES('delta.targetFileSize' = '128mb');

SET spark.sql.sources.partitionOverwriteMode=dynamic;
SET spark.sql.enable.concurrentWrites=true;

INSERT OVERWRITE TABLE lakehouse.salesorders_partition_128
    SELECT SalesOrderLineNumber, OrderDate, Quantity, UnitPrice, Year, Month  FROM lakehouse.salesorders_partition;

SET spark.sql.sources.partitionOverwriteMode=static;

In [None]:
#########################################
# check_vorder
#########################################
def check_vorder(table_name_path):
    import os 

    if not os.path.exists(table_name_path):
        print(f'{os.path.basename(table_name_path)} does not exist')
        result = None  # Initialize the variable with a default value

    else:
        import pyarrow.dataset as ds
        schema = ds.dataset(table_name_path).schema.metadata
        is_vorder = any(b'vorder' in key for key in schema.keys())
        if is_vorder:
            result = str(schema[b'com.microsoft.parquet.vorder.enabled'])
        else:
            result = "Table is not V-ordered"

    return result

In [None]:
table_list = spark.catalog.listTables()

for table in table_list:
    print(table.name + ': ' +check_vorder(f'//lakehouse/default/Tables/{table.name}'))

In [None]:
%%sql
---------------------------------------------------
--4. Vacuum
--https://learn.microsoft.com/en-us/fabric/data-engineering/lakehouse-table-maintenance
---------------------------------------------------
VACUUM salesorders_partition_128
