# Working with Spark DataFrames in Synapse Spark

## Working with schemas and lake databases

Use `spark.read.csv()` to load the data from the source public blob storage account and display its schema and shape.

In [None]:
from pyspark.sql.types import *
import numpy as np
import pandas as pd
from delta.tables import DeltaTable

manualSchema = StructType([
  StructField("CustomerId", StringType(), True),
  StructField("ProductId", StringType(), True),
  StructField("Rating", LongType(), True),
  StructField("Cost", FloatType(), True),
  StructField("Size", FloatType(), True),
  StructField("Price", FloatType(), True),
  StructField("PrimaryBrandId", LongType(), True),
  StructField("GenderId", LongType(), True),
  StructField("MaritalStatus", LongType(), True),
  StructField("LowerIncomeBound", FloatType(), True),
  StructField("UpperIncomeBound", FloatType(), True)
])

url = "wasbs://files@synapsemlpublic.blob.core.windows.net/PersonalizedData.csv"
df = spark.read.csv(url, header=True, schema=manualSchema)
print("Schema: ")
df.printSchema()

Create a new Lake Database. Observe the use of the `%%spark` magic to switch the language of the cell to Scala.

In [None]:
%%spark
spark.sql("CREATE DATABASE IF NOT EXISTS Customers")

Save the dataframe as a table in the newly created database.

In [None]:
df.write.mode("overwrite").saveAsTable("Customers.Customer")

Observe the newly created database and table. You can find them by navigating in Synapse Studio to the `Data` hub, and selecting the `Lake database` group from the `Workspace` section.

If you rightclick on the table, and select the `New SQL script` option, you will be able to run a SQL query on the table using the serverless SQL pool. This shows how the Spark and Serverless SQL runtimes share the schema information of the lake database.

![Newly created lake database and table](https://solliancepublicdata.blob.core.windows.net/synapse-l400/notebook-images/customers-lake-database.png)

## Spark DataFrame operations

Load another DataFrame, this time from multiple folders located in the Synapse workspace data lake.

In [None]:
df_sales = spark.read.load('abfss://wwi-02@#DATA_LAKE_ACCOUNT_NAME#.dfs.core.windows.net/sale-small/Year=2019/Quarter=Q4/Month=12/*/*.parquet', format='parquet')
display(df_sales.limit(10))

Apache Spark evaluates the file contents to infer the schema. This automatic inference is sufficient for data exploration and most transformation tasks. However, when you load data to an external resource like a SQL pool table, sometimes you need to declare your own schema and apply that to the dataset.

In [None]:
df_sales.printSchema()

Apply grouping and aggregation operations to find daily total quantity, average quantity, and total profit, per product.

Observe how applying the operations to the DataFrame has no effect yet. This is because the chain does not end with an operation that would force the materialization (execution) of the chain of operations.

In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

profitByDateProduct = (df_sales.groupBy("TransactionDate","ProductId")
    .agg(
        sum("ProfitAmount").alias("(sum)ProfitAmount"),
        round(avg("Quantity"), 4).alias("(avg)Quantity"),
        sum("Quantity").alias("(sum)Quantity"))
    .orderBy("TransactionDate"))

Call `limit()` to materialize the operations.

In [None]:
display(profitByDateProduct.limit(100))

## DataFrame partitions

Check the number of partitions automatically determined by Spark.

In [None]:
df_sales.rdd.getNumPartitions()

Repartition the Spark dataframe (reorganize it into 10 partitions).

In [None]:
df_sales = df_sales.repartition(10)
df_sales.rdd.getNumPartitions()

Persist the Spark dataframe to the data lake storage. Once execution completes, check the `/temp/sales1` data lake location to confirm the write operation generated 10 separate Parquet files (according to the new number of partitions).

In [None]:
df_sales.write.mode('overwrite').parquet('abfss://wwi-02@#DATA_LAKE_ACCOUNT_NAME#.dfs.core.windows.net/temp/sales1')

Repartition again the dataframe, this time based on the values of the `TransactionDate` column. Once execution completes, check the `/temp/sales2` data lake location to confirm the write operation generated 31 separate Parquet files organized in subfolders named after the values of the `TransactionDate` column.

In [None]:
df_sales.write.partitionBy('TransactionDate').mode('overwrite').parquet('abfss://wwi-02@#DATA_LAKE_ACCOUNT_NAME#.dfs.core.windows.net/temp/sales2')