From Parquet in Blob Storage to Azure SQL using Spark

In [0]:
%python
%pip install pyodbc
%pip install azure-storage-blob
%pip install azure-data-tables 


In [0]:
dbutils.library.restartPython()

In [0]:
from azure.data.tables import TableServiceClient, TableClient
from azure.storage.blob import BlobServiceClient
import json
import io
import pandas as pd

import os

# Azure Blob Storage Config
BLOB_CONNECTION_STRING = os.getenv("BLOB_CONNECTION_STRING")
BLOB_CONTAINER = "weather-data"
BLOB_NAME = "weather-data.parquet"

# Blob Container Config
blob_service_client = BlobServiceClient.from_connection_string(BLOB_CONNECTION_STRING)
container_client = blob_service_client.get_container_client(BLOB_CONTAINER)

def read_parquet_from_blob():
    global df

    blob_client = container_client.get_blob_client(BLOB_NAME)
    existing_data = io.BytesIO()
    blob_client.download_blob().readinto(existing_data)
    df = pd.read_parquet(existing_data)

read_parquet_from_blob()
display(df)

In [0]:
import pyodbc
import json
from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException

import os

# JDBC Config
jdbc_hostname = "weather-database.database.windows.net"
jdbc_database = "weather-db"
jdbc_port = 1433

# JDBC Credentials to access the database (Azure SQL) -> Check Connectors in the database to get the credentials
jdbc_username = os.getenv("jdbc_username")
print(jdbc_username)
jdbc_password = os.getenv("jdbc_password")
print(jdbc_password)
driver_class = os.getenv("driver_class")
print(driver_class)

jdbc_url = f"jdbc:sqlserver://{jdbc_hostname}:{jdbc_port};database={jdbc_database};user={jdbc_username}@weather-database;password={jdbc_password};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
connection_properties = {
    "user": jdbc_username,
    "password": jdbc_password,
    "driver": driver_class
}

# As default Databricks already has the JDBC driver for SQL Server. Just to check if it is realy installed
try:
    jvm = spark._jvm
    driver = jvm.Class.forName(driver_class)
    print(f"✅ The JDBC driver {driver_class} is available!")
except Exception as e:
    print(f"❌ JDBC driver not found: {e}")

In [0]:
# Target Table Name
table_name = "dbo.WeatherTable"

#Parse the pandas dataframe to a Spark DataFrame
spark_df = spark.createDataFrame(df)

# In my experience, to delete a table, it is better to navegate to database editor and delete it there ("Query Editor (pre-visualization)")

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, TimestampType
from datetime import datetime
import json
from pyspark.sql.functions import col, from_unixtime

# Function to processs the JSON data and return a tuple
def parse_weather(row_json):
    try:
        data = json.loads(row_json)
        return (
            (
                data.get("id", None),  
                data.get("name", None),
                data.get("main", {}).get("temp", 0),
                data.get("main", {}).get("feels_like", 0),
                data.get("main", {}).get("humidity", None),
                data.get("main", {}).get("pressure", None),
                data.get("wind", {}).get("speed", None),
                data.get("wind", {}).get("deg", None),
                data.get("weather", [{}])[0].get("id", None),
                data.get("clouds", {}).get("all", None),
                datetime.utcfromtimestamp(data.get("sys", {}).get("sunrise", 0)),
                datetime.utcfromtimestamp(data.get("sys", {}).get("sunset", 0)),
                datetime.utcfromtimestamp(data.get("dt", 0)),
                data.get("coord", {}).get("lat", None),
                data.get("coord", {}).get("lon", None), 
                data.get("sys", {}).get("country", None)
            )

        )
    except Exception as e:
        print(f"Erro ao processar JSON: {e}")
        return None

# Dataframe Schema (Like the collumns of a database table)
schema = StructType([
    StructField("latitude", FloatType(), True),
    StructField("longitude", FloatType(), True),
    StructField("weather_description", StringType(), True),
    StructField("temperature", FloatType(), True),
    StructField("feels_like", FloatType(), True),
    StructField("humidity", IntegerType(), True),
    StructField("pressure", IntegerType(), True),
    StructField("wind_speed", FloatType(), True),
    StructField("wind_deg", IntegerType(), True),
    StructField("id", IntegerType(), True),
    StructField("sunrise", TimestampType(), True),
    StructField("sunset", TimestampType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("city_id", IntegerType(), True),
    StructField("city_name", StringType(), True),
    StructField("country", StringType(), True)
])

# Iterate the lines of the created Spark DataFrame (spark_df)
parsed_data = []
for row in spark_df.collect():
    parsed_row = parse_weather(row)
    if parsed_row:
        parsed_data.append(parsed_row)

# Select and transform the data to a Spark DataFrame
processed_df = spark_df.select(
    col("coord.lat").alias("latitude"),
    col("coord.lon").alias("longitude"),
    col("weather")[0]["id"].alias("id"),
    col("main.temp").alias("temperature"),
    col("main.feels_like").alias("feels_like"),
    col("main.humidity").alias("humidity"),
    col("main.pressure").alias("pressure"),
    col("wind.speed").alias("wind_speed"),
    col("wind.deg").alias("wind_deg"),
    col("clouds.all").alias("cloud_coverage"),
    from_unixtime(col("sys.sunrise")).alias("sunrise"),
    from_unixtime(col("sys.sunset")).alias("sunset"),
    from_unixtime(col("dt")).alias("timestamp"),
    col("id").alias("city_id"),
    col("name").alias("city_name"),
    col("sys.country").alias("country")
)

display(processed_df)

In [0]:
# Insert data from the created DataFrame 'processed_df' in the database using the JDBC connector
try:
    processed_df.write.jdbc(url=jdbc_url, table=table_name, mode="append", properties=connection_properties)
    print("✅ New data added with sucess!")
except Exception as e:
    print(f"❌ Error in data insertion: {e}")

In [0]:
# Read data from the table in a Azure SQL
try:
    sql_query = f"SELECT * FROM {table_name}"
    df_new = (spark.read
      .format("jdbc")
      .option("url", jdbc_url)
      .option("dbtable", f"({sql_query}) as tmp")
      .option("user", connection_properties["user"])
      .option("password", connection_properties["password"])
      .load()
     )
    display(df_new)
except Exception as e:
    print(f"❌ Table not found: {e}")


Exercice example just to test Spark

In [0]:

# Data example
data = [
    (41.1496, -8.611, "Clouds", "few clouds", "02d", "stations", 289.88, 289.15, 289.88, 289.88, 1005, 59, 
     10000, 10.29, 160, 20, 1742393294, "PT", 1742366366, 1742409896, 0, "Porto", 200),
    
    (38.7169, -9.1399, "Clear", "clear sky", "01n", "stations", 285.32, 284.01, 283.0, 286.5, 1012, 72, 
     9000, 5.5, 90, 5, 1742393294, "PT", 1742366366, 1742409896, 0, "Lisboa", 200)
]

# Column example
columns = [
    "coord_lat", "coord_lon", "weather_main", "weather_description", "weather_icon", "base",
    "main_temp", "main_feels_like", "main_temp_min", "main_temp_max",
    "main_pressure", "main_humidity", "visibility", 
    "wind_speed", "wind_deg", "clouds_all", "dt",
    "sys_country", "sys_sunrise", "sys_sunset", "timezone", 
    "name", "cod"
]

# Create a Spark DataFrame, combining both data and columns 
df_new = spark.createDataFrame(data, columns)

#Notice: In recent studies, some articles say that using spark.sql is more efficient than spark.createDataFrame (To be evaluated)
# Exibite the created DataFrame
display(df_new)
