In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("SQLServer_to_Snowflake_Migration") \
    .getOrCreate()


In [None]:
# Connection details
sql_server_url = "jdbc:sqlserver://10.0.0.5:1433;databaseName=SalesDB"
sql_properties = {
    "user": "admin_user",
    "password": "*",
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

# Reading data from SQL Server table
df_sales = spark.read.jdbc(url=sql_server_url, table="dbo.Sales_Orders", properties=sql_properties)

# Preview the data
df_sales.show(5)


In [None]:
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, upper

# Snowflake connection parameters
connection_parameters = {
    "account": "xyz12345.ap-south-1",
    "user": "SF_USER",
    "password": "Password123",
    "role": "SYSADMIN",
    "warehouse": "COMPUTE_WH",
    "database": "TARGET_DB",
    "schema": "PUBLIC"
}

# Create Snowpark session
snowflake_session = Session.builder.configs(connection_parameters).create()

# Convert Spark DataFrame to Pandas for Snowpark processing (optional step)
pandas_df = df_sales.toPandas()

# Create Snowpark DataFrame
snow_df = snowflake_session.create_dataframe(pandas_df)

# Transformation: convert customer name to uppercase and filter high-value orders
transformed_df = snow_df.with_column("CUSTOMER_NAME", upper(col("CUSTOMER_NAME"))) \
                        .filter(col("ORDER_TOTAL") > 10000)

transformed_df.show()


In [None]:
# Write transformed data back to Snowflake
transformed_df.write.save_as_table("TARGET_DB.PUBLIC.SALES_ORDERS_TRANSFORMED")

print("✅ Data successfully loaded into Snowflake!")


In [None]:
USE DATABASE TARGET_DB;
SELECT * FROM PUBLIC.SALES_ORDERS_TRANSFORMED LIMIT 10;


In [None]:
try:
    df_sales = spark.read.jdbc(url=sql_server_url, table="dbo.Sales_Orders", properties=sql_properties)
except Exception as e:
    print(f"❌ Error extracting data: {e}")
