LOADING DATA BY USING SPARK FTROM **CSV**

In [0]:
 #By using Key Vault secrets 

client_id = dbutils.secrets.get(scope = 'scope-workshop', key = 'databricks-app-client-id')
tenant_id = dbutils.secrets.get(scope = 'scope-workshop', key = 'databricks-app-tenant-id')
client_secret = dbutils.secrets.get(scope = 'scope-workshop', key = 'databricks-app-client-secret')

spark.conf.set("fs.azure.account.auth.type.mjuanworkshopetl.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.mjuanworkshopetl.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.mjuanworkshopetl.dfs.core.windows.net", client_id)
spark.conf.set("fs.azure.account.oauth2.client.secret.mjuanworkshopetl.dfs.core.windows.net", client_secret)
spark.conf.set("fs.azure.account.oauth2.client.endpoint.mjuanworkshopetl.dfs.core.windows.net", f"https://login.microsoftonline.com/{tenant_id}/oauth2/token")

In [0]:
circuits_df = spark.read.option("header", True).csv(
    "abfss://bronze@mjuanworkshopetl.dfs.core.windows.net/circuits.csv")

In [0]:
display(circuits_df)

In [0]:
display(circuits_df.printSchema)

In [0]:
circuits_df = spark.read.option("header", "true")
.option("inferSchema", "true")
.csv("abfss://bronze@mjuanworkshopetl.dfs.core.windows.net/circuits.csv")

In [0]:
display(circuits_df)

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

circuits_schema = StructType(fields=[StructField("circuitId", IntegerType(), False),
                                   StructField("circuitRef", StringType(), True),
                                   StructField("name", StringType(), True),
                                   StructField("location", StringType(), True),
                                   StructField("country", StringType(), True),
                                   StructField("lat", DoubleType(), True),
                                   StructField("lng", DoubleType(), True),
                                   StructField("alt", IntegerType(), True),
                                   StructField("url", StringType(), True)])

circuits_df = spark.read.option("header", "true")
.schema(circuits_schema)
.csv("abfss://bronze@mjuanworkshopetl.dfs.core.windows.net/circuits.csv")
display(circuits_df)



In [0]:
circuits_df.printSchema()

**SELECTING COLUMNS USING SPARK**

In [0]:
circuit_selected_df = circuits_df
.select("circuitId","circuitRef","name","location","country","lat","lng")
display(circuit_selected_df)

In [0]:
#Selecting columns using pyspark sql, this way allow us 
# to use functions in those columns, like alias

from pyspark.sql.functions import col

circuit_selected_df = circuits_df.select(col("circuitId")
                                         ,col("circuitRef"),col("name")
                                         ,col("location"),col("country")
                                         .alias("race_country"),col("lat")
                                         .alias("latitude"),col("lng")
                                         .alias("longitude"))

circuit_selected_df.display()

In [0]:
circuits_renamed_df = circuit_selected_df.withColumnRenamed("circuitId","circuit_id") \
.withColumnRenamed("circuitRef","circuit_ref")
.withColumnRenamed("lat","latitude")
.withColumnRenamed("lng","longitude")
.withColumnRenamed("alt","altitude")

display(circuits_renamed_df)

**Adding new column**

In [0]:
from pyspark.sql.functions import current_timestamp

circuits_final_df = circuits_renamed_df
.withColumn("ingestion_date", current_timestamp())

display(circuits_final_df)

#withColumn and withColumnRenamed are not pyspar.sql.functions imported. They already are Spark methods.

**Writting this dataframe as parquet file in our Data Lake**

In [0]:
circuits_final_df.write.mode("overwrite")
.parquet("abfss://processed@mjuanworkshopetl.dfs.core.windows.net/circuits")

In [0]:
#Checking that it was saved successfully in Processed container.

spark.read.parquet("abfss://processed@mjuanworkshopetl.dfs.core.windows.net/circuits")

Saving this dataframes as Managed tables in both Silver Schema in UC and Silver container in the Storage Account.
For doing this, first we need to create the Catalog, Schemas and External Locations in UC.
Then, we will be able to save the dataframe by using Spark and SQL as well.

In [0]:
%sql
--Creating External Locations to the storage account containers. The Storage Account container must be associated with the Access Connector used to create the Storage Credential in this workspace. An Access Connector can have more than one Storage Account linked to it.

CREATE EXTERNAL LOCATION IF NOT EXISTS external_location_bronze_container_form
URL "abfss://bronze@mjuanworkshopetl.dfs.core.windows.net/"
WITH (
STORAGE CREDENTIAL `databrickcourse-ext-storage-credential`
)
;


CREATE EXTERNAL LOCATION IF NOT EXISTS external_location_silver_container_form
URL "abfss://silver@mjuanworkshopetl.dfs.core.windows.net/"
WITH (
STORAGE CREDENTIAL `databrickcourse-ext-storage-credential`
)
;


CREATE EXTERNAL LOCATION IF NOT EXISTS external_location_gold_container_form
URL "abfss://gold@mjuanworkshopetl.dfs.core.windows.net/"
WITH (
STORAGE CREDENTIAL `databrickcourse-ext-storage-credential`
)
;

In [0]:
%sql
DESC EXTERNAL LOCATION external_location_bronze_container_form

In [0]:
%sql

CREATE CATALOG IF NOT EXISTS formula1_project;
USE CATALOG formula1_project;

CREATE SCHEMA IF NOT EXISTS bronze
MANAGED LOCATION "abfss://bronze@mjuanworkshopetl.dfs.core.windows.net/" ;

CREATE SCHEMA IF NOT EXISTS silver
MANAGED LOCATION "abfss://silver@mjuanworkshopetl.dfs.core.windows.net/" ;

CREATE SCHEMA IF NOT EXISTS gold
MANAGED LOCATION "abfss://gold@mjuanworkshopetl.dfs.core.windows.net/" ;


Managed table using SQL and source file

In [0]:
%sql

--Creating a managed table by using SQL and a file.

CREATE OR REPLACE TABLE formula1_project.silver.results_sql_file
--LOCATION
AS SELECT * FROM read_files(
  'abfss://bronze@mjuanworkshopetl.dfs.core.windows.net/circuits.csv',
  format => 'csv',
  header => true
  --, schema => 'circuitId INT, circuitRef STRING, name STRING, location STRING, country STRING'
);

Managed table using SQL and a temporary table from a dataframe

In [0]:

#Creating a managed table by using SQL and a dataframe.

#1. Create a temporary table from the dataframe:
 
circuits_final_df.createOrReplaceTempView("temp_circuits_view")


In [0]:
%sql

--2. Create a managed table by using SQL and the temporary view.

CREATE OR REPLACE TABLE formula1_project.silver.results_sql_view
--LOCATION
AS SELECT * FROM temp_circuits_view
;

External table using SQL and a source file

In [0]:
%sql

--Creating an external table by using SQL and a file.

CREATE OR REPLACE TABLE formula1_project.silver.results_sql_file_ext
LOCATION 'abfss://silver@mjuanworkshopetl.dfs.core.windows.net/circuits.csv'
AS SELECT * FROM read_files(
  'abfss://bronze@mjuanworkshopetl.dfs.core.windows.net/circuits.csv',
  format => 'csv',
  header => true
  --, schema => 'circuitId INT, circuitRef STRING, name STRING, location STRING, country STRING'
);

Managed table using Spark and a source file

In [0]:
# Saving the transformed dataframe from a file (look at the begining how to read it) as a managed table.

circuits_final_df.write.mode("overwrite") \
  .saveAsTable("formula1_project.silver.results_spark_file")


External table using Spark and a source file

In [0]:
circuits_final_df.write.mode("overwrite") \
  .option("path", 'abfss://silver@mjuanworkshopetl.dfs.core.windows.net/results') \
  .saveAsTable("formula1_project.silver.results_spark_file_ext")