# Import The Libraries

In [58]:
import pandas as pd
import pyodbc
from dotenv import load_dotenv
import os

## Connect into Sql Server and PySpark

In [85]:
load_dotenv()

# Load the configuration
driver=os.getenv('DB_DRIVER')
server=os.getenv('DB_SERVER')
database=os.getenv('DB_NAME')
user=os.getenv('DB_USERNAME')
password=os.getenv('DB_PASSWORD')
path=os.getenv('HDFS_PATH')

try:
    conn = pyodbc.connect(f'''
                          DRIVER={driver};
                          SERVER={server},1434;
                          DATABASE={database};
                          UID={user};
                          PWD={password};
                          TrustServerCertificate=yes;
                          ''')
    cursor = conn.cursor()
    print("Success")
    cursor.close()
except Exception as e:
    print(f'Error Occured: {e}')

Success


In [60]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("Medalion Project") \
    .config("spark.jars", "sqljdbc_12.8/enu/jars/mssql-jdbc-12.8.1.jre8.jar") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://localhost:9000") \
    .config("spark.hadoop.yarn.resourcemanager.address", "localhost:8032") \
    .getOrCreate()

# Sql Server
# jdbc_url = f'jdbc:sqlserver://{server}:1434;databaseName={database};encrypt=true;trustServerCertificate=true;'
jdbc_url = (
    f'jdbc:sqlserver://{server}:1434;'
    f'databaseName={database};'
    f'encrypt=true;'
    f'trustServerCertificate=true;'
)
connection_properties = {
    "user": user,
    "password": password,
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

### Bronze

In [61]:
# Store raw data into bronze
df_bronze = spark.read.jdbc(
    url=jdbc_url, 
    table="Sales.Store", 
    properties=connection_properties
)
try:
    df_bronze.write \
        .option('header', True) \
        .mode('overwrite') \
        .csv(f'hdfs://Arsylia:9000/{path}/store.csv')
except Exception as e:
    print(f'Error Occured:\n {e}')

In [62]:
df_read = spark.read.csv(
    f'hdfs://Arsylia:9000/{path}/store.csv', 
    header=True, 
    inferSchema=True)
df_read.show()

+----------------+--------------------+-------------+--------------------+--------------------+--------------------+
|BusinessEntityID|                Name|SalesPersonID|        Demographics|             rowguid|        ModifiedDate|
+----------------+--------------------+-------------+--------------------+--------------------+--------------------+
|             292|Next-Door Bike Store|          279|<StoreSurvey xmln...|A22517E3-848D-4EB...|2014-09-12 11:15:...|
|             294|Professional Sale...|          276|<StoreSurvey xmln...|B50CA50B-C601-4A1...|2014-09-12 11:15:...|
|             296|      Riders Company|          277|<StoreSurvey xmln...|337C3688-1339-4E1...|2014-09-12 11:15:...|
|             298|  The Bike Mechanics|          275|<StoreSurvey xmln...|7894F278-F0C8-4D1...|2014-09-12 11:15:...|
|             300|   Nationwide Supply|          286|<StoreSurvey xmln...|C3FC9705-A8C4-4F3...|2014-09-12 11:15:...|
|             302|Area Bike Accesso...|          281|<StoreSurve

### Silver

In [124]:
# Create new table
store_df = spark.read.jdbc(
    url=jdbc_url,
    table='Sales.Store',
    properties=connection_properties
)

# Write new table
try:
    store_df.write.jdbc(
        url=jdbc_url,
        table='SalesProcessing.StoreProcessing',
        mode='overwrite',
        properties=connection_properties
    )
except Exception as e:
    print(f"Error Occured: {e}")

df_silver = spark.read.jdbc(
    url=jdbc_url,
    table='SalesProcessing.StoreProcessing',
    properties=connection_properties
)
df_silver.printSchema()

root
 |-- BusinessEntityID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- SalesPersonID: integer (nullable = true)
 |-- Demographics: string (nullable = true)
 |-- rowguid: string (nullable = true)
 |-- ModifiedDate: timestamp (nullable = true)



In [125]:
demo = df_silver.select('Demographics')
demo.show(truncate=False, n=1)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Demographics                                                                                                                                                                                                                                                                                                                                                                                                                    |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------

In [126]:
add_columns_query = """
ALTER TABLE SalesProcessing.StoreProcessing
ADD AnnualSales INT,
    AnnualRevenue INT,
    BankName NVARCHAR(100),
    BusinessType NVARCHAR(20),
    YearOpened INT,
    Specialty NVARCHAR(30),
    Area INT,
    Brands NVARCHAR(30),
    Internet NVARCHAR(30),
    NumberEmployees INT;
"""

try:
    cursor = conn.cursor()
    cursor.execute(add_columns_query)
    conn.commit()
    print('Success')
except Exception as e:
    print(f"Error Occured: {e}")

finally:
    cursor.close()

Success


In [127]:
query = """
WITH XMLNAMESPACES (  
  'http://schemas.microsoft.com/sqlserver/2004/07/adventure-works/StoreSurvey' AS ns  
)  
SELECT 
  BusinessEntityID,
  Name, 
  SalesPersonID,
  rowguid,
  ModifiedDate,
  Demographics.value('(/ns:StoreSurvey/ns:AnnualSales)[1]', 'INT') as AnnualSales,
  Demographics.value('(/ns:StoreSurvey/ns:AnnualRevenue)[1]', 'INT') as AnnualRevenue,
  Demographics.value('(/ns:StoreSurvey/ns:BankName)[1]', 'NVARCHAR(100)') as BankName,
  Demographics.value('(/ns:StoreSurvey/ns:BusinessType)[1]', 'NVARCHAR(20)') as BusinessType,
  Demographics.value('(/ns:StoreSurvey/ns:YearOpened)[1]', 'INT') as YearOpened,
  Demographics.value('(/ns:StoreSurvey/ns:Specialty)[1]', 'NVARCHAR(30)') as Specialty,
  Demographics.value('(/ns:StoreSurvey/ns:SquareFeet)[1]', 'INT') as Area,
  Demographics.value('(/ns:StoreSurvey/ns:Brands)[1]', 'NVARCHAR(30)') as Brands,
  Demographics.value('(/ns:StoreSurvey/ns:Internet)[1]', 'NVARCHAR(30)') as Internet,
  Demographics.value('(/ns:StoreSurvey/ns:NumberEmployees)[1]', 'INT') as NumberEmployees
FROM 
  Sales.Store
"""

# Eksekusi query dengan pyodbc dan simpan hasilnya ke DataFrame pandas
df_pandas = pd.read_sql(query, conn)

# Konversi dari pandas ke PySpark DataFrame
df_spark = spark.createDataFrame(df_pandas)

# Tampilkan hasilnya di PySpark
df_spark.show()

  df_pandas = pd.read_sql(query, conn)


+----------------+--------------------+-------------+--------------------+--------------------+-----------+-------------+--------------------+------------+----------+---------+-----+------+--------+---------------+
|BusinessEntityID|                Name|SalesPersonID|             rowguid|        ModifiedDate|AnnualSales|AnnualRevenue|            BankName|BusinessType|YearOpened|Specialty| Area|Brands|Internet|NumberEmployees|
+----------------+--------------------+-------------+--------------------+--------------------+-----------+-------------+--------------------+------------+----------+---------+-----+------+--------+---------------+
|             292|Next-Door Bike Store|          279|A22517E3-848D-4EB...|2014-09-12 11:15:...|     800000|        80000|     United Security|          BM|      1996| Mountain|21000|     2|    ISDN|             13|
|             294|Professional Sale...|          276|B50CA50B-C601-4A1...|2014-09-12 11:15:...|     800000|        80000|  International Ban

In [132]:
try:
    df_spark.write.jdbc(
    url=jdbc_url,
    table='SalesProcessing.StoreProcessing',
    mode='overwrite',  
    properties=connection_properties
    )
except Exception as e:
    print(f"Error Occured: {e}")

update_df_silver = df_spark
update_df_silver.show(5)

+----------------+--------------------+-------------+--------------------+--------------------+-----------+-------------+--------------------+------------+----------+---------+-----+------+--------+---------------+
|BusinessEntityID|                Name|SalesPersonID|             rowguid|        ModifiedDate|AnnualSales|AnnualRevenue|            BankName|BusinessType|YearOpened|Specialty| Area|Brands|Internet|NumberEmployees|
+----------------+--------------------+-------------+--------------------+--------------------+-----------+-------------+--------------------+------------+----------+---------+-----+------+--------+---------------+
|             292|Next-Door Bike Store|          279|A22517E3-848D-4EB...|2014-09-12 11:15:...|     800000|        80000|     United Security|          BM|      1996| Mountain|21000|     2|    ISDN|             13|
|             294|Professional Sale...|          276|B50CA50B-C601-4A1...|2014-09-12 11:15:...|     800000|        80000|  International Ban

In [133]:
import pandas as pd
df_silver_pandas = update_df_silver.toPandas()
pd.set_option('display.max_rows', None)
df_silver_pandas.head()

Unnamed: 0,BusinessEntityID,Name,SalesPersonID,rowguid,ModifiedDate,AnnualSales,AnnualRevenue,BankName,BusinessType,YearOpened,Specialty,Area,Brands,Internet,NumberEmployees
0,292,Next-Door Bike Store,279,A22517E3-848D-4EBE-B9D9-7437F3432304,2014-09-12 11:15:07.497,800000,80000,United Security,BM,1996,Mountain,21000,2,ISDN,13
1,294,Professional Sales and Service,276,B50CA50B-C601-4A13-B07E-2C63862D71B4,2014-09-12 11:15:07.497,800000,80000,International Bank,BM,1991,Touring,18000,4+,T1,14
2,296,Riders Company,277,337C3688-1339-4E1A-A08A-B54B23566E49,2014-09-12 11:15:07.497,800000,80000,Primary Bank & Reserve,BM,1999,Road,21000,2,DSL,15
3,298,The Bike Mechanics,275,7894F278-F0C8-4D16-BD75-213FDBF13023,2014-09-12 11:15:07.497,800000,80000,International Security,BM,1994,Mountain,18000,2,DSL,16
4,300,Nationwide Supply,286,C3FC9705-A8C4-4F3A-9550-EB2FA4B7B64D,2014-09-12 11:15:07.497,800000,80000,Guardian Bank,BM,1987,Touring,21000,4+,DSL,17


In [135]:
df_silver_pandas.isnull().sum()

BusinessEntityID    0
Name                0
SalesPersonID       0
rowguid             0
ModifiedDate        0
AnnualSales         0
AnnualRevenue       0
BankName            0
BusinessType        0
YearOpened          0
Specialty           0
Area                0
Brands              0
Internet            0
NumberEmployees     0
dtype: int64