In [None]:


---------Delta Overwrite with Schema
adlsCont    = "nonprod"
adlsPath    = "lake.dfs.core.windows.net"

## Name:         Oem Pipeline
## Description:  Loads oem data
## Author:     Monica

##### Begin Notebook Execution #####

# Imports
import pandas as pd
import datetime as dt

# Set Notbook Variables
currentDt   = dt.datetime.strftime((dt.datetime.now()),"%Y-%m-%d")
currentDyNm = dt.datetime.strftime((dt.datetime.now()),"%A").lower()

adlsStagingPath  = "abfss://" + adlsCont + "@" + adlsPath + "/lakehouse/integration/staging"
adlsSynergyPath  = "abfss://" + adlsCont + "@" + adlsPath + "/lakehouse/synergy"


co = spark.read.load(adlsStagingPath + "/company/*.parquet",format = "parquet").where("RowNbr = 1").dropDuplicates()
co.write.mode("overwrite").format("delta").option("overwriteSchema","true").save(adlsSynergyPath + "/company")





--Write to Azure SQL Database
df = spark.read.load('abfss://lakehouse@devmdpdatalake01.dfs.core.windows.net/xxx/fleet/*.parquet', format='parquet')
#display(df.limit(10))

serverNm = "jdbc:sqlserver://xxxx"
databaseNm = "xxxxx"
sqlSvrPath = serverNm + ";" + "databaseName=" + databaseNm + ";"

tableNm = "x.Fleet"
userNm = "xxxx"
password = "xxxx"


df.write \
  .format("com.microsoft.sqlserver.jdbc.spark") \
  .mode("overwrite").option("url",sqlSvrPath) \
  .option("dbtable",tableNm) \
  .option("user",userNm) \
  .option("password",password) \
  .save()


--MS Spark Utility Secret
#mssparkutils.credentials.getSecret('azure key vault name','secret name')
# get password from key vault
import sys

token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
password = token_library.getSecret('xxxx','xxxx')
strA = password[0:len(password)-1]
print(strA)

-----------------------------------------------------------------------------------------------
##### Begin Parameters #####

psSystemId  = ""
psLoad      = ""
adlsCont    = ""
adlsPath    = ""

## Name:        Company 1
## Description:  Loads Company 1 Api Data Into Ingest Layer (OAuth 2)
## Author:       Monica

##### Begin Notebook Execution #####

# Imports
import requests
import json
import pandas as pd
import datetime as dt
from oauthlib.oauth2 import BackendApplicationClient
from requests.auth import HTTPBasicAuth
from requests_oauthlib import OAuth2Session

# Set Notbook Variables
currentDt   = dt.datetime.strftime((dt.datetime.now()),"%Y-%m-%d")
currentDyNm = dt.datetime.strftime((dt.datetime.now()),"%A").lower()

adlsReadPath    = "abfss://" + adlsCont + "@" + adlsPath
adlsWritePath   = "abfss://" + adlsCont
adlsRemovePath  = "abfss://" + adlsCont + "@" + adlsPath

yrTxt = dt.datetime.strftime((dt.datetime.now()),"%Y")
moTxt = dt.datetime.strftime((dt.datetime.now()),"%m")
dyTxt = dt.datetime.strftime((dt.datetime.now()),"%d")

# Create Notbook Functions
def dirExists(path):
    try:
        mssparkutils.fs.ls(path)
        return True
    except Exception:
        return False

# Set the OAuth2 Url
provider_url = "xxxxx"

# Load Config
config = spark.read.load(adlsReadPath + "/lakehouse/integration/configuration/api/api.txt",format="csv",header=True,sep="|").where("SystemId = '" + psSystemId + "'").dropDuplicates()
config.createOrReplaceTempView("Config")

# Load Config
cfg = spark.sql("Select SystemId,CoId,ClientId,Secret From Config")

# Caterpillar Fleet
# Daily

for x in cfg.collect():

    # Set Cell Variables
    client_id       = x["ClientId"]
    client_secret   = x["Secret"]
    systemId        = x["SystemId"]
    coId            = x["CoId"]

    #Authorization
    client = BackendApplicationClient(client_id = client_id)

    # Create an OAuth2 Session Object
    oauth = OAuth2Session(client = client)

    # Set Token
    token = oauth.fetch_token(token_url = provider_url,auth = HTTPBasicAuth(client_id,client_secret))

    b = ('Bearer', token["access_token"])
    y = (' '.join(map(str, b)))

    params = {}
    headers = {"accept": "application/json","Authorization": y}

    pgCnt = 0
    hasNextPage = True
    while hasNextPage:
        pgCnt = pgCnt + 1 
        try:
            # Get Each Page Break When Does Not Exist (Throw Error)
            response = requests.get("https://servicesxxxxxxxx" + str(pgCnt), headers=headers, params=params).json()
            f = pd.json_normalize(response["Equipment"])
            
            # Add Alias & Default Columns
            f["OemNm"]                          = f["EquipmentHeader.OEMName"]
            f["LoadDt"]                         = currentDt
            f["SystemId"]                       = systemId
            f["CoId"]                           = coId
            f["SrcNm"]                          = "caterpillar"
            f["SrcItem"]                        = "fleet"
            f["EquipmentNbr"]                   = f["EquipmentHeader.EquipmentID"]
            f["SerialNbr"]                      = f["EquipmentHeader.SerialNumber"]
            f["Model"]                          = f["EquipmentHeader.Model"]
            f["FuelConsumedDt"]                 = f["FuelUsedLast24.Datetime"]
            f["FuelUnit"]                       = f["FuelUsedLast24.FuelUnits"]
            f["FuelConsumed"]                   = f["FuelUsedLast24.FuelConsumed"]
            f["FuelRemainingPct"]               = f["FuelRemaining.Percent"]
            f["DEFRemainingPct"]                = f["DEFRemaining.Percent"]
            f["FuelConsumedTot"]                = f["FuelUsed.FuelConsumed"]
            f["CumulativeIdleHoursDt"]          = f["CumulativeIdleHours.Datetime"]
            f["CumulativeIdleHours"]            = f["CumulativeIdleHours.Hour"]
            f["CumulativeOperatingHoursDt"]     = f["CumulativeOperatingHours.Datetime"]
            f["CumulativeOperatingHours"]       = f["CumulativeOperatingHours.Hour"]
            f["DistanceDt"]                     = f["Distance.Datetime"]
            f["DistanceUnit"]                   = f["Distance.OdometerUnits"]
            f["DistanceOdometer"]               = f["Distance.Odometer"]
            f["CumulativeLoadCntDt"]            = f["CumulativeLoadCount.Datetime"]
            f["CumulativeLoadCnt"]              = f["CumulativeLoadCount.Count"]
            f["CumulativePayloadTotDt"]         = f["CumulativePayloadTotals.Datetime"]
            f["CumulativePayloadTotUnit"]       = f["CumulativePayloadTotals.PayloadUnits"]
            f["CumulativePayloadTot"]           = f["CumulativePayloadTotals.Payload"]
            
            # Write Parquet to Data Lake (All Columns String)
            tgtPath = adlsWritePath + "/ingest/xxxx" + yrTxt + "/" + moTxt + "/" + dyTxt + "/" + coId + ".fleet.0" + str(pgCnt) + ".parquet"
            f = f.astype('string')
            fOutput = f[["x","y"]].copy()
            fOutput.to_parquet(tgtPath)
        except:
            break

##### End Notebook Execution #####

-------------------------------------------------------------------------------------------------------------------










