In [0]:
spark.conf.set("spark.sql.shuffle.partitions", sc.defaultParallelism*4)
spark.conf.set("spark.sql.adaptive.enabled","true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled","true")
spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled","true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled","true")


In [0]:
"""
Deleted Items
"""

from pyspark.sql.functions import col , row_number , year , to_timestamp , to_date
from pyspark.sql import Window


DeletedItemDF = (spark.read
             .option("header","true")
             .csv("/mnt/datalake_raw/batch/sales/oraclecrm/deleted_items/*.csv")
              #.csv(deleted_items_list)
             .select(col('DeletedBy')
                     ,col('DeletedById').alias("DeletedByKey")
                     ,to_date(col("DeletedDate"),"yyyy-MM-dd").alias("DeletedDate")
                     ,col("DeletedItemId").alias("DeletedItemKey")
                     ,col("ExternalSystemId").alias("ExternalSystemKey")
                     ,col("ObjectId").alias("ObjectKey")
                     ,col("Type")

             )
             )

window = Window.partitionBy("DeletedItemKey").orderBy(DeletedItemDF["DeletedDate"].desc())

(DeletedItemDF.dropDuplicates().withColumn("RowNumber" , row_number().over(window))
.filter("RowNumber == 1")
.drop("RowNumber")
.withColumn("DeletedYear",year(col("DeletedDate").cast("date")))
.coalesce(1)
.write
.format('delta')
.mode("overwrite")
.partitionBy("DeletedYear")
.save("/mnt/datalake_curated/view_migration/deleted_items_g")
) 

In [0]:
"""
Account Curated  
"""



from pyspark.sql.functions import col , row_number , year , to_timestamp 
from pyspark.sql import Window
from delta.tables import *

  
Account = (spark.read.format('csv')
            .option("quote",'"')
            .option("escape",'"')
            .option("header","true")
            .option("multiLine","true")
            .option("timestampFormat","MM/dd/yyyy hh:mm:SS a")
            #.load(account_list)
            .load("/mnt/datalake_raw/batch/sales/oraclecrm/account/full/*.csv") 
           .select( 
        col("Row Id").alias("AccountKey"),
        col("Address City").alias("AddressCity"),
        col("Address Country").alias("AddressCountry"),
        col("Address Address 1").alias("AddressAddress1"),
        col("Address Address 2").alias("AddressAddress2"),
        col("Address Address 3").alias("AddressAddress3"),
        col("Address County").alias("AddressCounty"),
        col("Web Site").alias("WebSite"),
        col("Location").alias("Location"),
        col("Main Phone #").alias("MainPhoneNumber"),
        col("Account Name").alias("AccountName"),
        col("Parent Account").alias("ParentAccount"),
        col("Account Partner").alias("AccountPartner"),
        col("Tier").alias("Tier"),
        col("Public Company").alias("PublicCompany"),
        col("Account Type").alias("AccountType"),
        col("Address Zip/Post Code").alias("AddressZippostCode"),
        col("Address US State").alias("AddressUsState"),
        col("Parent Account Id").alias("ParentAccountId"),
        col("Parent Account Location").alias("ParentAccountLocation"),
        col("Primary Contact").alias("PrimaryContact"),
        col("Primary Contact Id").alias("PrimaryContactId"),
        col("Branch").alias("Branch"),
        col("Parent Account External Unique Id").alias("ParentAccountExternalUniqueId"),
        col("External Unique ID").alias("ExternalUniqueId"),
        col("Owner Full Name").alias("OwnerFullName"),
        col("Description").alias("Description"),
        col("Billing Location").alias("BillingLocation"),
        #col("Deposit Eligible").alias("DepositEligible"), # 2022-03-03 column name was changes at somepoint Note by Jeff Erisman 
        col("Deposit Ineligible").alias("DepositEligible"),  # 2022-03-03 (BS) column alias changed to Deposit Eligible to match downstream mapping as required in View DM  BS         
        col("Proposal Break Down").alias("ProposalBreakDown"),
        col("Repair Proposal from National Accounts only").alias("RepairProposalFromNationalAccountsOnly"),
        col("Shipping Location").alias("ShippingLocation"),
        col("TKE National Account").alias("TkeNationalAccount"),
        col("EBS Account Name").alias("EbsAccountName"),
        col("EBS Site Location").alias("EbsSiteLocation"),
        col("Party Customer Numbers").alias("PartyCustomerNumbers"),
        col("GPO Rebate paid on").alias("GpoRebatePaidOn"),
        col("GPO Discount %").alias("GpoDiscountPercentage"),
        col("GPO Rebate %").alias("GpoRebatePercentage"),
        col("EBS Party Id").alias("EbsPartyId"),
        col("EBS Site Id").alias("EbsSiteId"),
        col("EBS Status").alias("EbsStatus"),
        col("Oracle Branch Number").alias("OracleBranchNumber"),
        col("Payment Terms").alias("PaymentTerms"),
        col("Union Local").alias("UnionLocal"),
        col("Tax Registration Number").alias("TaxRegistrationNumber"),
               col("Primary Owner Id").alias("OwnerId"),   
#         col("Owner (Owner Sign In Id)").alias("OwnerSignInId"),
#         col("Owner External ID").alias("OwnerExternalId"),
             
        col("Service Contract").alias("ServiceContract"),
        col("Service Contract: External Unique ID").alias("ServiceContractExternalUniqueID"),
        col("Service Contract: Integration ID").alias("ServiceContractIntegrationID"),
        col("IB Unit: External Unique ID").alias("IBUnitExternalUniqueID"),
        col("IB Unit").alias("IBUnit"),
        col("IB Unit: Integration ID").alias("IBUnitIntegrationID"),
        col("Market Segment").alias("MarketSegment"),
        col("Building Type").alias("BuildingType"),
         col("Status").alias("AccountStatus"),
         col("Site Location").alias("SiteLocation"),
             
        col("Main Fax #").alias("MainFaxNumber"),
       col("Capital Plan Proposed Date").alias("CapitalPlanProposedDate"),
       col("Fiscal Year Start").alias("FiscalYearStart"),
        to_timestamp(col("Modified: Date"),"MM/dd/yyyy hh:mm:SS a").alias("ModifiedDate"), 
        to_timestamp(col("Created: Date"),"MM/dd/yyyy hh:mm:SS a").alias("CreatedDate") 
        
           )
           )

window = Window.partitionBy("AccountKey").orderBy(Account["ModifiedDate"].desc())

(Account.dropDuplicates().withColumn("RowNumber" , row_number().over(window))
 .filter("RowNumber == 1")
 .filter("ModifiedDate is not null")   
 .drop("RowNumber")
 .withColumn("CreatedYear" ,year("CreatedDate"))
 .coalesce(1)
 .write
 .format('delta')
 .option("header","true")
 .mode("overwrite")
 .option("mergeSchema","true")
 .partitionBy("CreatedYear")
 .save("/mnt/datalake_curated/view_migration/account_g")
)

#Remove Deleted Accounts 
deleted_items= spark.read.format('delta').load('/mnt/datalake_curated/view_migration/deleted_items_g')
account_g = DeltaTable.forPath(spark, "/mnt/datalake_curated/view_migration/account_g") 

account_g.alias("t") \
    .merge( \
      deleted_items.alias("s"), \
      "s.Objectkey = t.AccountKey and type ='Account'" ) \
    .whenMatchedDelete() \
    .execute()

# process full copy in csv for view migration

# spark.read.format('delta').load('/mnt/datalake_curated/view_migration/account_g')\
# .coalesce(1)\
# .write\
# .format('csv')\
# .option("header","true")\
# .mode("append")\
# .save("/mnt/datalake_curated/view_migration/account")


In [0]:
"""
User Curated  
"""



from pyspark.sql.functions import col , row_number , year , to_timestamp
from pyspark.sql import Window
from delta.tables import *

User = (spark.read.format('csv')
              .option("quote",'"')
              .option("escape",'"')
              .option("header","true")
              .option("multiLine","true")
              .option("timestampFormat","MM/dd/yyyy hh:mm:SS a")
              #.csv(user_list)
              .csv("/mnt/datalake_raw/batch/sales/oraclecrm/user/full/*.csv")
             .select(
                     col("Row Id").alias("UserKey"),
                  col("Branch").alias("Branch"),
                  col("Financial Reporting Area").alias("FinancialReportingArea"),
                  col("`Region:`").alias("Region"),
                  col("Primary LOB").alias("PrimaryLOB"),
                  col("Global_Employee_Number_8_Digit_ID").alias("Global_Employee_Number_8_Digit_ID"),
                  col("Default Book Id for Analytics").alias("DefaultBookIdforAnalytics"),
                  col("Email").alias("Email"),
                  col("Employee Number").alias("EmployeeNumber"),
                  col("External Unique ID").alias("ExternalUniqueID"),
                  col("First Name").alias("FirstName"),
                  col("Name").alias("Name"),
#                     col("UserId").alias("UserId"),
                  col("Integration ID").alias("IntegrationID"),
                  col("Last Name").alias("LastName"),
#                     col("Reports To (Alias)").alias("ReportsTo(Alias)"),
                  col("Reports To").alias("ReportsTo"),
                  col("Manager Id").alias("ManagerId"),
                  col("Status").alias("Status"),
                   col('Alias').alias("Alias"),
                  col("Supervisor").alias("Supervisor"),
                  col("User Sign In ID").alias("UserSignInID"),

                  to_timestamp(col("Modified: Date"),"MM/dd/yyyy hh:mm:SS a").alias("ModifiedDate"), 
                  to_timestamp(col("Created: Date"),"MM/dd/yyyy hh:mm:SS a").alias("CreatedDate")
             )
             )



window = Window.partitionBy("UserKey").orderBy(User["ModifiedDate"].desc())

(User.dropDuplicates().withColumn("RowNumber" , row_number().over(window))
.filter("RowNumber == 1")
.drop("RowNumber")
.coalesce(1)
.write
.format('delta')
.mode("overwrite")
.save("/mnt/datalake_curated/view_migration/user_g")
)

(User.dropDuplicates().withColumn("RowNumber" , row_number().over(window))
.filter("RowNumber == 1")
.drop("RowNumber")
.coalesce(1)
.write
.format('csv')
.option("header","true")
.mode("append")
.save("/mnt/datalake_curated/view_migration/user")
)



In [0]:
"""
Opportunity Curated  
"""


from pyspark.sql.functions import col , row_number , year , to_timestamp , year , regexp_replace
from pyspark.sql import Window
from delta.tables import *

 
Opportunity = (spark.read
            .option("header","true")
            .option("quote", "\"")
            .option("escape", "\"")
            .option("parserLib", "univocity")
            .option("multiline","true")
            .option("comment",None) 
#               .csv(opportunity_list)
            .csv(f"/mnt/datalake_raw/batch/sales/oraclecrm/opportunity/full/*.csv")
            .select(
                   col("Row Id").alias("OpportunityKey"),
                   col("Account Id").alias("AccountId"),
                  col("Account").alias("Account"),
                  col("Award/Closed Date").alias("AwardClosedDate"),
                  col("Created").alias("Created"),
                  col("Created By").alias("CreatedBy"),
#                   col("Created: Date").alias("CreatedDate"),
                  col("COVID-19").alias("COVID19"),
                  col("Public Bid").alias("PublicBid"),
                  col("Safety").alias("Safety"),
                  col("Opportunity Closed (Yes/No)").alias("OpportunityClosedYesNo"),
                  col("Quick Quote").alias("QuickQuote"),
                  col("State Compliance").alias("StateCompliance"),
                  col("Eagle Qualified").alias("EagleQualified"),
                  col("Eagle Exempt").alias("EagleExempt"),
                  col("TKE National Account").alias("TKENationalAccount"),
                  col("Validated Down Payment").alias("ValidatedDownPayment"),
                  col("Regional Assistance Amount").alias("RegionalAssistanceAmount"),
                  col("Expected Eagle Revenue").alias("ExpectedEagleRevenue"),
                  col("Competitor Price").alias("CompetitorPrice"),
                  col("Existing/Old Contract Value").alias("ExistingOldContractValue"),
                  col("TKTrip Amount").alias("TKTripAmount"),
                  col("`Down Payment Amount.`").alias("DownPaymentAmount"),
                  col("Margin_Dollars").alias("MarginDollars"),
                  col("Repair Selling Price").alias("RepairSellingPrice"),
                  col("Status Update Date").alias("StatusUpdateDate"),
                  col("Final Acceptance Date").alias("FinalAcceptanceDate"),
                  col("Bid Due Date").alias("BidDueDate"),
                  col("Contract Start Date").alias("ContractStartDate"),
                  col("Contract End Date").alias("ContractEndDate"),
                  col("Existing Contract End Date").alias("ExistingContractEndDate"),
                  col("Validation Date for Deposit").alias("ValidationDateforDeposit"),
                  col("Down Payment Received Date").alias("DownPaymentReceivedDate"),
                  col("Existing/Old Contract End Date").alias("ExistingOldContractEndDate"),
                  col("Job Completion Date").alias("JobCompletionDate"),
                  col("Follow Up Date").alias("FollowUpDate"),
                  col("Contract Execution Date").alias("ContractExecutionDate"),
                  col("Contract Booked Date").alias("ContractBookedDate"),
                  col("Validation Date for IC").alias("ValidationDateforIC"),
                  col("Repair Status Date").alias("RepairStatusDate"),
                  regexp_replace(col("Mechanic Employee ID"),',','').alias("MechanicEmployeeID"),
                  col("# of Units").alias("NumberOfUnits"),
                  col("Price Escalation Cap %").alias("PriceEscalationCapPercent"),
                  col("Days to Receive Down Payment").alias("DaystoReceiveDownPayment"),
                  col("Power Sponsor Email").alias("PowerSponsorEmail"),
                  col("Negotiation Plan").alias("NegotiationPlan"),
                  col("Pain Chain").alias("PainChain"),
                  col("Collaboration Plan").alias("CollaborationPlan"),
                  col("Bill to Customer #").alias("BilltoCustomerNumber"),
                  col("Total Labor Hours").alias("TotalLaborHours"),
                  col("Repair Gross Margin %").alias("RepairGrossMarginPercent"),
                  col("Eagle Manager Alert check").alias("EagleManagerAlertcheck"),
                  col("# of MAX Plus units").alias("NumberOfMAXPlusunits"),
                  col("# of MAX Premium units").alias("NumberOfMAXPremiumunits"),
                  col("# of MAX Pro units").alias("NumberOfMAXProunits"),
                  col("EBS Billed Amount").alias("EBSBilledAmount"),
                  col("EBS Receipt Amount").alias("EBSReceiptAmount"),
                  col("Strength of Sale").alias("StrengthofSale"),
                  col("Sponsor Email").alias("SponsorEmail"),
                  col("IB Unit: Integration ID").alias("IBUnitIntegrationID"),
                  col("IB Unit").alias("IBUnit"),
                  col("Existing Quote: Integration ID").alias("ExistingQuoteIntegrationID"),
                  col("Base Bid").alias("BaseBid"),
                  col("Service Contract").alias("ServiceContract"),
                  col("Primary Competitor").alias("PrimaryCompetitor"),
                  col("Price Escalation Cap Type").alias("PriceEscalationCapType"),
                  col("Basis of Design Product").alias("BasisofDesignProduct"),
                  col("Competitor Product").alias("CompetitorProduct"),
                  col("Competitor Service Level").alias("CompetitorServiceLevel"),
                  col("Lead Sub-Source").alias("LeadSubSource"),
                  col("Existing Service Contract").alias("ExistingServiceContract"),
                  col("Account Type").alias("AccountType"),
                  col("Building Type").alias("BuildingType"),
                  col("NIM Branch").alias("NIMBranch"),
                  col("NIM Conversion Probability").alias("NIMConversionProbability"),
                  col("DD included in Specification").alias("DDincludedinSpecification"),
                  col("Type of Contract Paper").alias("TypeofContractPaper"),
                  col("Regional Assistance").alias("RegionalAssistance"),
                  col("# of Buildings with MAX activation fee").alias("NumberOfBuildingswithMAXactivationfee"),
                  col("Basis of Design").alias("BasisofDesign"),
                  col("Cancellation Terms").alias("CancellationTerms"),
                  col("Cancellation Notice in Days").alias("CancellationNoticeinDays"),
                  col("Billing Frequency").alias("BillingFrequency"),
                  col("Branch ID").alias("BranchID"),
                  col("Job Location").alias("JobLocation"),
                  col("Oracle PS Number").alias("OraclePSNumber"),
                  col("Existing TKE Service Contract Number").alias("ExistingTKEServiceContractNumber"),
                  col("Mechanic Email").alias("MechanicEmail"),
                  col("Validated TKE Job Number").alias("ValidatedTKEJobNumber"),
                  col("Validated Deposit by").alias("ValidatedDepositby"),
                  col("Independent Competitor").alias("IndependentCompetitor"),
                  col("Ops Manager Email Ids").alias("OpsManagerEmailIds"),
                  col("External_Source").alias("ExternalSource"),
                  col("Repair Type").alias("RepairType"),
                  col("TKTrip ID").alias("TKTripID"),
                  col("Lead Source Description").alias("LeadSourceDescription"),
                  col("Mechanic").alias("Mechanic"),
                  col("Validated By").alias("ValidatedBy"),
                  col("eSignature Status").alias("eSignatureStatus"),
                  col("Route").alias("Route"),
                  col("Repair Status").alias("RepairStatus"),
                  col("Repair Number").alias("RepairNumber"),
                  col("Customer Contact").alias("CustomerContact"),
                  col("Eagle Identify & Qualify Lead Date").alias("EagleIdentify&QualifyLeadDate"),
                  col("Eagle Engage Key Stakeholders Date").alias("EagleEngageKeyStakeholdersDate"),
                  col("Eagle Analyze Needs Date").alias("EagleAnalyzeNeedsDate"),
                  col("Eagle Design Solution Approach Date").alias("EagleDesignSolutionApproachDate"),
                  col("Eagle Present Offer Date").alias("EaglePresentOfferDate"),
                  col("MW Timestamp").alias("MWTimestamp"),
                  col("Description").alias("Description"),
                  col("Expected Revenue").alias("ExpectedRevenue"),
                  col("Validate  for IC").alias("ValidateforIC"),
                  col("Proposed Date").alias("ProposedDate"),
                  col("Branch").alias("Branch"),
                  col("Market Segment").alias("MarketSegment"),
                  col("Line of Business").alias("LineofBusiness"),
                  col("Region").alias("Region"),
                  col("Addendum Type").alias("AddendumType"),
                  col("Integration ID").alias("IntegrationID"),
                  col("Primary Contact ID").alias("PrimaryContactID"),
                  col("Lead Source").alias("LeadSource"),
                  col("Next Step").alias("NextStep"),
                  col("Building/Project").alias("BuildingProject"),
                  col("Type of Sale").alias("TypeofSale"),
                  col("Eagle Sales Stage").alias("EagleSalesStage"),
                  col("Sales Rep").alias("SalesRep"),
                  col("Primary Owner Id").alias("PrimaryOwnerId"),
                  col("Tier").alias("Tier"),
                  col("Probability %").alias("ProbabilityPercent"),
                  col("Reason Won/Lost").alias("ReasonWonLost"),
                  col("Revenue").alias("Revenue"),
                  col("Sales Method - Translation").alias("SalesMethodTranslation"),
                  col("Sales Stage").alias("SalesStage"),
                  col("Sales Stage Id").alias("SalesStageId"),
                  col("Source Campaign").alias("SourceCampaign"),
                  col("Stage Status").alias("StageStatus"),
                  col("Status").alias("Status"),

                  to_timestamp(col("Modified: Date"),"MM/dd/yyyy hh:mm:SS a").alias("ModifiedDate"), 
                  to_timestamp(col("Created: Date"),"MM/dd/yyyy hh:mm:SS a").alias("CreatedDate") 
               )
           )


window = Window.partitionBy("OpportunityKey").orderBy(Opportunity["ModifiedDate"].desc())

(Opportunity.dropDuplicates().withColumn("RowNumber" , row_number().over(window))
 .filter("RowNumber == 1")
 .drop("RowNumber")
 .withColumn("CreatedYear",year(col("CreatedDate").cast("date")))
 .coalesce(1)
 .write
 .format('delta')
 .mode("overwrite")
 .partitionBy("CreatedYear")
 .save("/mnt/datalake_curated/view_migration/opportunity_g")
)    

#Remove Deleted Opportunities 
deleted_items= spark.read.format('delta').load('/mnt/datalake_curated/view_migration/deleted_items_g')
opportunity_g = DeltaTable.forPath(spark, "/mnt/datalake_curated/view_migration/opportunity_g") 

opportunity_g.alias("t") \
    .merge( \
      deleted_items.alias("s"), \
      "s.Objectkey = t.OpportunityKey and type ='Opportunity'" ) \
    .whenMatchedDelete() \
    .execute()

# process full copy in csv for view migration

# spark.read.format('delta').load('/mnt/datalake_curated/view_migration/opportunity_g')\
# .coalesce(1)\
# .write\
# .format('csv')\
# .option("header","true")\
# .mode("append")\
# .save("/mnt/datalake_curated/view_migration/opportunity")


In [0]:
"""
Opportunity Product Revenue Curated  
"""


from pyspark.sql.functions import col , row_number , year , to_timestamp , year
from pyspark.sql import Window
from delta.tables import *

OpportunityProduct = (spark.read
            .option("header","true")
            .option("quote", "\"")
            .option("escape", "\"")
            .option("parserLib", "univocity")
            .option("multiline","true")
            .option("comment",None) 
            .csv(f"/mnt/datalake_raw/batch/sales/oraclecrm/opportunity_product/full/*.csv")
            .select(
              col("Row Id").alias("OpportunityProductRevenueKey"),
              col("Account External Unique Id").alias("AccountExternalUniqueId"),
              col("Account Id").alias("AccountId"),
              col("Location").alias("Location"),
              col("Account").alias("Account"),
              col("Contact").alias("Contact"),
              col("Contact Id").alias("ContactId"),
              col("Contract").alias("Contract"),
              col("Created By").alias("CreatedBy"),
              col("Created By: User Sign In ID").alias("CreatedByUserSignInID"),
              col("Currency").alias("Currency"),
              col("Opportunity Rollup Check").alias("OpportunityRollupCheck"),
              col("Proposed").alias("Proposed"),
              col("Sustainability Products").alias("SustainabilityProducts"),
              col("SPOT Created").alias("SPOTCreated"),
              col("Estimated Rate").alias("EstimatedRate"),
              col("Estimated Expense").alias("EstimatedExpense"),
              col("Estimated Mechanic Rate").alias("EstimatedMechanicRate"),
              col("TKTrip Amount").alias("TKTripAmount"),
              col("Labor Cost").alias("LaborCost"),
              col("Discount Amount").alias("DiscountAmount"),
              col("Factory Material Cost/Unit").alias("FactoryMaterialCostByUnit"),
              col("`Purch. Material Cost/Unit`").alias("PurchMaterialCostByUnit"),
              col("# of Front Openings").alias("NumberOfFrontOpenings"),
              col("# of Months").alias("NumberOfMonths"),
              col("# of Rear Openings").alias("NumberOfRearOpenings"),
              col("Z_Estimated Hours").alias("Z_EstimatedHours"),
              col("Total Labor Hours").alias("TotalLaborHours"),
              col("Rise").alias("Rise"),
              col("GP Margin %").alias("GPMarginPercint"),
              col("Branch Margin").alias("BranchMargin"),
              col("Variance to standard").alias("Variancetostandard"),
              col("Units similarity to base").alias("Unitssimilaritytobase"),
              col("Hours outside of regular").alias("Hoursoutsideofregular"),
              col("TBM score").alias("TBMscore"),
              col("Existing Manufacturer").alias("ExistingManufacturer"),
              col("Existing Machine Model").alias("ExistingMachineModel"),
              col("Hoistway Door Hangers, Tracks, and Rollers").alias("HoistwayDoorHangersTracksAndRollers"),
              col("Installation").alias("Installation"),
              col("Existing Controller Model").alias("ExistingControllerModel"),
              col("Existing Controller Manufacturer").alias("ExistingControllerManufacturer"),
              col("Machine Model").alias("MachineModel"),
              col("AGILE Destination Controls").alias("AGILEDestinationControls"),
              col("Controller Model").alias("ControllerModel"),
              col("Door Operator Manufacturer").alias("DoorOperatorManufacturer"),
              col("Fixture Manufacturer").alias("FixtureManufacturer"),
              col("Step Width").alias("StepWidth"),
              col("TKE Factory Product Code").alias("TKEFactoryProductCode"),
              col("Controller Type").alias("ControllerType"),
              col("Existing Controller Type").alias("ExistingControllerType"),
              col("Balustrade Type").alias("BalustradeType"),
              col("Lead Source Description").alias("LeadSourceDescription"),
              col("Equipment Conditions").alias("EquipmentConditions"),
              col("Unit Designation").alias("UnitDesignation"),
              col("General Repair Type").alias("GeneralRepairType"),
              col("Brand of Repair Material").alias("BrandofRepairMaterial"),
              col("New Opportunity Id").alias("NewOpportunityId"),
              col("Asset Id").alias("AssetId"),
              col("Mechanic").alias("Mechanic"),
              col("Mechanic Email").alias("MechanicEmail"),
              col("Category").alias("Category"),
              col("Environmental Conditions").alias("EnvironmentalConditions"),
              col("Usage").alias("Usage"),
              col("Special Billing Rate").alias("SpecialBillingRate"),
              col("Helper Zone").alias("HelperZone"),
              col("Mechanic Zone").alias("MechanicZone"),
              col("Voltage").alias("Voltage"),
              col("Description").alias("Description"),
              col("Expected Revenue").alias("ExpectedRevenue"),
              col("External Unique ID").alias("ExternalUniqueID"),
              col("Forecast").alias("Forecast"),
              col("Existing/Old Contract Value").alias("Existing_OldContractValue"),
              col("# of Stops").alias("NumOfStops"),
              col("Unit Classification").alias("UnitClassification"),
              col("Capacity").alias("Capacity"),
              col("Speed").alias("Speed"),
              col("Controller Manufacturer").alias("ControllerManufacturer"),
              col("Machine Manufacturer").alias("MachineManufacturer"),
              col("Repair Type").alias("RepairType"),
              col("Opportunity Id").alias("OpportunityId"),
              col("Opportunity").alias("Opportunity"),
              col("Sales Stage").alias("SalesStage"),
              col("Sales Rep").alias("SalesRep"),
              col("Primary Owner Id").alias("PrimaryOwnerId"),
              col("Probability %").alias("ProbabilityPercint"),
              col("Product Category").alias("ProductCategory"),
              col("Product Category ID").alias("ProductCategoryID"),
              col("Product External ID").alias("ProductExternalID"),
              col("Product Id").alias("ProductId"),
              col("Product").alias("Product"),
              col("Part #").alias("PartNumber"),
              col("Product Status").alias("ProductStatus"),
              col("Purchase Date").alias("PurchaseDate"),
              col("Price Per Unit").alias("PricePerUnit"),
              col("# of Units").alias("NumOfUnits"),
              col("Revenue").alias("Revenue"),
              col("Oracle Serial Number").alias("OracleSerialNumber"),
              col("Ship Date").alias("ShipDate"),
              col("Revenue Start Date").alias("RevenueStartDate"),
              col("Status").alias("Status"),
              col("Line of Business").alias("LineofBusiness"),
              col("Modified By").alias("ModifiedBy"),
              col("Modified By: User Sign In ID").alias("ModifiedByUserSignInID"),
              to_timestamp(col("Modified: Date"),"MM/dd/yyyy hh:mm:SS a").alias("ModifiedDate"), 
              to_timestamp(col("Created: Date"),"MM/dd/yyyy hh:mm:SS a").alias("CreatedDate") 
               )
           )


window = Window.partitionBy("OpportunityProductRevenueKey").orderBy(OpportunityProduct["ModifiedDate"].desc())

(OpportunityProduct.dropDuplicates().withColumn("RowNumber" , row_number().over(window))
 .filter("RowNumber == 1")
 .drop("RowNumber")
 .withColumn("CreatedYear",year(col("CreatedDate").cast("date")))
 .coalesce(1)
 .write
 .format('delta')
 .mode("overwrite")
 .partitionBy("CreatedYear")
 .save("/mnt/datalake_curated/view_migration/opportunity_product_g")
)    


#Remove Deleted Opportunities 
deleted_items= spark.read.format('delta').load('/mnt/datalake_curated/view_migration/deleted_items_g')
opportunity_product_g = DeltaTable.forPath(spark, "/mnt/datalake_curated/view_migration/opportunity_product_g") 

opportunity_product_g.alias("t") \
    .merge( \
      deleted_items.alias("s"), \
      "s.Objectkey = t.OpportunityProductRevenueKey and type ='Revenue'" ) \
    .whenMatchedDelete() \
    .execute()

# process full copy in csv for view migration

# spark.read.format('delta').load('/mnt/datalake_curated/view_migration/opportunity_product_g')\
# .coalesce(1)\
# .write\
# .format('csv')\
# .option("header","true")\
# .mode("append")\
# .save("/mnt/datalake_curated/view_migration/opportunity_product")


In [0]:
"""
Contact Curated  
"""


from pyspark.sql.functions import col , row_number , year , to_timestamp , year , lit
from pyspark.sql import Window


  
Contact = (spark.read
            .option("header","true")
            .option("quote", "\"")
            .option("escape", "\"")
            .option("parserLib", "univocity")
            .option("multiline","true")
            .option("comment",None) 
            .csv(f"/mnt/datalake_raw/batch/sales/oraclecrm/contact/full/*.csv")
            .select(
              col("Row Id").alias("ContactKey"),
              col("Email").alias("Email"),
              col("First Name").alias("FirstName"),
              col("Last Name").alias("LastName"),
              col("`Sales Rep.`").alias("SalesRep"),
              col("Account Address City").alias("AccountAddressCity"),
              col("Account Address Country").alias("AccountAddressCountry"),
              col("Account Address Address 1").alias("AccountAddressAddress1"),
              col("Account Address Address 2").alias("AccountAddressAddress2"),
              col("Account Address Address 3").alias("AccountAddressAddress3"),
              col("Account Address County").alias("AccountAddressCounty"),
              col("Account Address US State").alias("AccountAddressUSState"),
              col("`Account Address Zip/Post Code`").alias("AccountAddressZipCode"),                        
              col("Contact Address City").alias("ContactAddressCity"),
              col("Contact Address Country").alias("ContactAddressCountry"),
              col("Contact Address Address 1").alias("ContactAddressAddress1"),
              col("Contact Address Address 2").alias("ContactAddressAddress2"),
              col("Contact Address Address 3").alias("ContactAddressAddress3"),
              col("Contact Address County").alias("ContactAddressCounty"),
              col("Contact Address US State").alias("ContactAddressUSState"),
              col("`Contact Address Zip/Post Code`").alias("ContactAddressZipCode"),
              col("Account").alias("Account"),
              col("Account Location").alias("AccountLocation"),
              col("Last Assessment Date").alias("LastAssessmentDate"),
              col("Account Id").alias("AccountId"),
              col("Service Contract").alias("ServiceContract"),
              col("`Service Contract: Integration ID`").alias("ServiceContractIntegrationId"),
              col("Branch").alias("Branch"),
              col("Region").alias("Region"),
              col("Financial Reporting Area").alias("FinancialReportingArea"),
              col("`Work Phone #`").alias("WorkPhoneNumber"),
              col("Job Title").alias("JobTitle"),
              col("Cellular Phone #").alias("CellularPhoneNumber"),
              col("Home Phone #").alias("HomePhoneNumber"),
              col("Contact Type").alias("ContactType"),
              col("Lead Source").alias("LeadSource"),
              col("Status").alias("Status"),
              col("Description").alias("Description"),
              col("Job Role").alias("JobRole"),
              col("External Unique ID").alias("ExternalUniqueID"),
              lit("").alias("GoLive"),
              to_timestamp(col("Modified: Date"),"MM/dd/yyyy hh:mm:SS a").alias("ModifiedDate"), 
              to_timestamp(col("Created: Date"),"MM/dd/yyyy hh:mm:SS a").alias("CreatedDate") 
            ).filter("ModifiedDate is not null") 
          )

window = Window.partitionBy("ContactKey").orderBy(Contact["ModifiedDate"].desc())

(Contact.dropDuplicates().withColumn("RowNumber" , row_number().over(window))
 .filter("RowNumber == 1") 
 .drop("RowNumber")
 .withColumn("CreatedYear",year(col("CreatedDate").cast("date")))
 .coalesce(1)
 .write
 .format('delta')
 .mode("overwrite")
 .option('overwriteSchema',"true")
 .partitionBy("CreatedYear")
 .save("/mnt/datalake_curated/view_migration/contact_g")
)    

# (Contact.dropDuplicates().withColumn("RowNumber" , row_number().over(window))
#  .filter("RowNumber == 1")
#  .drop("RowNumber")
#  .coalesce(1)
#  .write
#  .format('csv')
#  .option("header","true")
#  .mode("overwrite")
#  .save("/mnt/datalake_curated/view_migration/contact")
# )    


In [0]:
"""
Service Contract Curated  
"""


from pyspark.sql.functions import col , row_number , year , to_timestamp , year
from pyspark.sql import Window


  
ServiceContract = (spark.read
            .option("header","true")
            .option("quote", "\"")
            .option("escape", "\"")
            .option("parserLib", "univocity")
            .option("multiline","true")
            .option("comment",None) 
            .csv(f"/mnt/datalake_raw/batch/sales/oraclecrm/service_contract/full/*.csv")
            .select(

              col("Row Id").alias("ServiceContractKey"),
              col("Account: External Unique ID").alias("AccountExternalUniqueID"),
              col("Account Id").alias("AccountId"),
              col("Account: Integration ID").alias("AccountIntegrationID"),
              col("Account Location").alias("AccountLocation"),
              col("Billing Account").alias("BillingAccount"),
              col("Activity: Integration ID").alias("ActivityIntegrationID"),
          #     col("Book: Id").alias("BookId"),
          #     col("Book").alias("Book"),
              col("Contact: External Unique ID").alias("ContactExternalUniqueID"),
              col("Contact First Name").alias("ContactFirstName"),
              col("Contact").alias("Contact"),
              col("Contact Id").alias("ContactId"),
              col("Contact: Integration ID").alias("ContactIntegrationID"),
              col("Contact Last Name").alias("ContactLastName"),
          #     col("Created").alias("Created"),
              col("Created By").alias("CreatedBy"),
          #     col("Created By: Email").alias("CreatedByEmail"),
          #     col("Created By: External Unique ID").alias("CreatedByExternalUniqueID"),
          #     col("Created By: First Name").alias("CreatedByFirstName"),
          #     col("Created By: Name").alias("CreatedByName"),
          #     col("Created By: Integration ID").alias("CreatedByIntegrationID"),
          #     col("Created By: Last Name").alias("CreatedByLastName"),
          #     col("Created By: User Sign In ID").alias("CreatedByUserSignInID"),
              col("Currency").alias("Currency"),
              col("Customer PO # Required?").alias("CustomerPONumRequired?"),
              col("Contract Attached").alias("ContractAttached"),
          #     col("Inactive Rep Notification").alias("InactiveRepNotification"),
              col("TKExtend").alias("TKExtend"),
          #     col("Cancellation_Rep_Filter").alias("Cancellation_Rep_Filter"),
          #     col("Update Books").alias("UpdateBooks"),
          #     col("Duplicate Check Flag").alias("DuplicateCheckFlag"),
              col("Validated").alias("Validated"),
              col("Public Bid").alias("PublicBid"),
              col("CRP Account").alias("CRPAccount"),
              col("1st Level Approval").alias("1stLevelApproval"),
              col("Risk of Loss").alias("RiskofLoss"),
          #     col("Create_Opportunity").alias("Create_Opportunity"),
              col("Project Cancellation Date Required").alias("ProjectCancellationDateRequired"),
              col("Tier 1 Notification").alias("Tier1Notification"),
              col("Current Yearly Contract Price").alias("CurrentYearlyContractPrice"),
              col("Contract Start Date").alias("ContractStartDate"),
              col("Anniversary Date of Contract").alias("AnniversaryDateofContract"),
              col("Original Contract Date").alias("OriginalContractDate"),
              col("TKExtend Start Date").alias("TKExtendStartDate"),
              col("TKExtend End Date").alias("TKExtendEndDate"),
              col("Approval Date").alias("ApprovalDate"),
              col("TKExtend Modified Date").alias("TKExtendModifiedDate"),
              col("Projected Cancellation Date").alias("ProjectedCancellationDate"),
              col("Current Term Start Date").alias("CurrentTermStartDate"),
              col("Status Change Date Stamp").alias("StatusChangeDateStamp"),
              col("CDM Contract Start Date").alias("CDMContractStartDate"),
              col("TKExtend Status Change Date").alias("TKExtendStatusChangeDate"),
              col("Original Contract Note").alias("OriginalContractNote"),
              col("Contract Term (Years)").alias("ContractTermYears"),
              col("# of Units").alias("NumofUnits"),
              col("Renewal Term (Months)").alias("RenewalTermMonths"),
              col("Number of Renewals").alias("NumberofRenewals"),
              col("Active Unit Serial #s").alias("ActiveUnitSerialNo"),
              col("Escalation Cap %").alias("EscalationCapPct"),
              col("Discount Percentage").alias("DiscountPercentage"),
              col("IB Unit: External Unique ID").alias("IBUnitExternalUniqueID"),
              col("IB Unit: Integration ID").alias("IBUnitIntegrationID"),
              col("IB Unit").alias("IBUnit"),
              col("IB Unit: Type").alias("IBUnitType"),
              col("Existing Quote").alias("ExistingQuote"),
              col("Service Contract: External Unique ID").alias("ServiceContractExternalUniqueID"),
              col("Service Contract: Integration ID").alias("ServiceContractIntegrationID"),
              col("Service Contract").alias("ServiceContract"),
              col("Service Contract: Type").alias("ServiceContractType"),
          #     col("Validation Status – Cleansing").alias("ValidationStatus–Cleansing"),
              col("Cancellation Terms").alias("CancellationTerms"),
          #     col("DM Audit").alias("DMAudit"),
              col("Risk Category").alias("RiskCategory"),
              col("Reason at Risk").alias("ReasonatRisk"),
              col("Contract Status Value").alias("ContractStatusValue"),
              col("Cancellation Notification").alias("CancellationNotification"),
              col("Contract Paper").alias("ContractPaper"),
              col("Renewal Notification").alias("RenewalNotification"),
              col("Tier").alias("Tier"),
              col("Validation Status – TKExtend").alias("ValidationStatus–TKExtend"),
              col("Quality Audit").alias("QualityAudit"),
              col("TKExtend Status").alias("TKExtendStatus"),
          #     col("Z_Unused_Pick0").alias("Z_Unused_Pick0"),
              col("Contract Notes").alias("ContractNotes"),
              col("Message Note").alias("MessageNote"),
          #     col("Formula Name").alias("FormulaName"),
              col("TKExtend Data").alias("TKExtendData"),
              col("TKExtend Notes").alias("TKExtendNotes"),
              col("1st Level Approver").alias("1stLevelApprover"),
              col("Resolution Notes").alias("ResolutionNotes"),
              col("Master Contract Line #").alias("MasterContractLineNum"),
          #     col("First Associated Building").alias("FirstAssociatedBuilding"),
              col("Bill to Name").alias("BilltoName"),
              col("Bill to Address").alias("BilltoAddress"),
          #     col("z_Old Row Id").alias("z_OldRowId"),
          #     col("Z_Billing Contact Last Name").alias("Z_BillingContactLastName"),
              col("Price Escalation Period").alias("PriceEscalationPeriod"),
              col("Price Escalation Type").alias("PriceEscalationType"),
          #     col("CONTRACT_EXTENSION").alias("CONTRACT_EXTENSION"),
              col("Month of Escalation").alias("MonthofEscalation"),
              col("Date of Renewal Notifaction").alias("DateofRenewalNotifaction"),
              col("Billing Contact First Name").alias("BillingContactFirstName"),
              col("Branch Number").alias("BranchNumber"),
              col("Customer Specific Pricing").alias("CustomerSpecificPricing"),
              col("Ship to Name").alias("ShiptoName"),
          #     col("Legacy System").alias("LegacySystem"),
              col("Type of Bill").alias("TypeofBill"),
              col("Original Contract Date Text").alias("OriginalContractDateText"),
              col("Special Billing?").alias("SpecialBilling?"),
              col("National Account").alias("NationalAccount"),
              col("Discount Type").alias("DiscountType"),
              col("Alpha Code").alias("AlphaCode"),
              col("Customer Number").alias("CustomerNumber"),
              col("Sales Person Original").alias("SalesPersonOriginal"),
              col("Monitoring Bil lRef").alias("MonitoringBillRef"),
              col("Ship to Address").alias("ShiptoAddress"),
              col("Contract Type").alias("ContractType"),
          #     col("RT_OT").alias("RT_OT"),
              col("Route #").alias("RouteNum"),
              col("Service Charges PO").alias("ServiceChargesPO"),
              col("Cleansed By").alias("CleansedBy"),
              col("CDM Contract Repository").alias("CDMContractRepository"),
              col("TKExtend Status Value").alias("TKExtendStatusValue"),
          #     col("Z_Text").alias("Z_Text"),
              col("Building Name").alias("BuildingName"),
              col("Existing/Legacy Contract Number").alias("Existing/LegacyContractNumber"),
              col("Customer PO #").alias("CustomerPONum"),
              col("Billing Contact Last Name").alias("BillingContactLastName"),
          #     col("Refresh from DC").alias("RefreshfromDC"),
          #     col("Generate Scanning Coversheet").alias("GenerateScanningCoversheet"),
          #     col("Create Case").alias("CreateCase"),
#               col("SLA email").alias("SLAemail"),
          #     col("TKExtend Data Correction Task").alias("TKExtendDataCorrectionTask"),
              col("Contract Number").alias("ContractNumber"),
          #     col("Indexed Checkbox").alias("IndexedCheckbox"),
              col("Current Contract Monthly Price").alias("CurrentContractMonthlyPrice"),
              col("Contract End Date").alias("ContractEndDate"),
              col("Indexed Number").alias("IndexedNumber"),
              col("Branch").alias("Branch"),
              col("Region").alias("Region"),
              col("Integration ID").alias("IntegrationID"),
          #     col("Modified").alias("Modified"),
          #     col("Modified: Date").alias("ModifiedDate"),
              col("CONTRACT_NUMBER").alias("CONTRACT_NUMBER"),
              col("Opportunity: External Unique ID").alias("OpportunityExternalUniqueID"),
              col("Opportunity Id").alias("OpportunityId"),
              col("Opportunity: Integration ID").alias("OpportunityIntegrationID"),
              col("Opportunity").alias("Opportunity"),
              col("Opportunity: Sales Stage").alias("OpportunitySalesStage"),
              col("EBS Relationship").alias("EBSRelationship"),
              col("Sales Person").alias("SalesPerson"),
              col("Owner: External Unique ID").alias("OwnerExternalUniqueID"),
              col("Owner Full Name").alias("OwnerFullName"),
              col("Primary Owner Id").alias("PrimaryOwnerId"),
              col("Contract Status").alias("ContractStatus"),
              col("Quick Search 2").alias("QuickSearch2"),
              col("Case Id").alias("CaseId"),
              col("Case: Integration ID").alias("CaseIntegrationID"),
              col("Case").alias("Case"),
          #     col("Solution: Integration ID").alias("SolutionIntegrationID"),
              col("Modified By").alias("ModifiedBy"),
          #     col("Modified By: Email").alias("ModifiedByEmail"),
          #     col("Modified By: External Unique ID").alias("ModifiedByExternalUniqueID"),
          #     col("Modified By: First Name").alias("ModifiedByFirstName"),
          #     col("Modified By: Name").alias("ModifiedByName"),
          #     col("Modified By: Integration ID").alias("ModifiedByIntegrationID"),
          #     col("Modified By: Last Name").alias("ModifiedByLastName"),
          #     col("Modified By: User Sign In ID").alias("ModifiedByUserSignInID"),

              to_timestamp(col("Modified: Date"),"MM/dd/yyyy hh:mm:SS a").alias("ModifiedDate"), 
              to_timestamp(col("Created: Date"),"MM/dd/yyyy hh:mm:SS a").alias("CreatedDate") 
            ).filter("ModifiedDate is not null") 
          )

window = Window.partitionBy("ServiceContractKey").orderBy(ServiceContract["ModifiedDate"].desc())

(ServiceContract.dropDuplicates().withColumn("RowNumber" , row_number().over(window))
 .filter("RowNumber == 1")
 .drop("RowNumber")
 .withColumn("CreatedYear",year(col("CreatedDate").cast("date")))
 .coalesce(1)
 .write
 .format('delta')
 .mode("overwrite")
 .partitionBy("CreatedYear")
 .save("/mnt/datalake_curated/view_migration/service_contract_g")
)    

(ServiceContract.dropDuplicates().withColumn("RowNumber" , row_number().over(window))
 .filter("RowNumber == 1")
 .drop("RowNumber")
 .coalesce(1)
 .write
 .format('csv')
 .option("header","true")
 .mode("overwrite")
 .save("/mnt/datalake_curated/view_migration/service_contract")
)    


In [0]:
"""
IB Unit Curated  
"""


from pyspark.sql.functions import col , row_number , year , to_timestamp , year
from pyspark.sql import Window


  
IBUnit = (spark.read
            .option("header","true")
            .option("quote", "\"")
            .option("escape", "\"")
            .option("parserLib", "univocity")
            .option("multiline","true")
            .option("comment",None) 
            .csv(f"/mnt/datalake_raw/batch/sales/oraclecrm/ib_unit/full/*.csv")
            .select( col("Row Id").alias("IBUnitKey"),		
#                    col("Modified: Date").alias("Modified:Date"),		
                   col("Account Location").alias("AccountLocation"),		
                   col("Owner Alias").alias("OwnerAlias"),		
                   col("Site Location").alias("SiteLocation"),		
                   col("Account Id").alias("AccountId"),		
                   col("IB Unit").alias("IBUnit"),		
                   col("Service Contract").alias("ServiceContract"),		
                   col("Opportunity").alias("Opportunity"),		
                   col("Opportunity Id").alias("OpportunityId"),		
                   col("Case").alias("Case"),		
                   col("Case Id").alias("CaseId"),		
                   col("Created: Date").alias("Created:Date"),		
                   col("Owner Full Name").alias("OwnerFullName"),		
                   col("Billing Site Number").alias("BillingSiteNumber"),		
                   col("Equipment Type").alias("EquipmentType"),		
                   col("Product Type").alias("ProductType"),		
                   col("Latitude").alias("Latitude"),		
                   col("Oracle Serial Number").alias("OracleSerialNumber"),		
                   col("Account: Integration ID").alias("AccountIntegrationID"),		
                   col("Account: External Unique ID").alias("AccountExternalUniqueID"),		
                   col("Opportunity: External Unique ID").alias("OpportunityExternalUniqueID"),		
                   col("Opportunity: Integration ID").alias("OpportunityIntegrationID"),		
                   col("Case: External Unique ID").alias("CaseExternalUniqueID"),		
                   col("Case: Integration ID").alias("CaseIntegrationID"),		
                   col("Modified By").alias("ModifiedBy"),		
                   col("Created By").alias("CreatedBy"),		
                   col("IB Unit: Type").alias("IBUnitType"),		
                   col("IB Unit: External Unique ID").alias("IBUnitExternalUniqueID"),		
                   col("IB Unit: Integration ID").alias("IBUnitIntegrationID"),		
                   col("Service Contract: Integration ID").alias("ServiceContractIntegrationID"),		
                   col("Service Contract: External Unique ID").alias("ServiceContractExternalUniqueID"),		
                   col("Description").alias("Description"),		
                   col("1st Safety Test Covered in Contract").alias("FirstSafetyTestCoveredinContract"),		
                   col("1st Safety Test Two Men Required").alias("FirstSafetyTestTwoMenRequired"),		
                   col("Critical Unit").alias("CriticalUnit"),		
                   col("Fireman Operation").alias("FiremanOperation"),		
                   col("Account Contract Status Flag").alias("AccountContractStatusFlag"),		
                   col("High Callback Unit").alias("HighCallbackUnit"),		
#                    col("IB Account Linkage Flag").alias("IBAccountLinkageFlag"),		
                   col("MAX Device Flag").alias("MAXDeviceFlag"),		
                   col("2nd Safety Test Two Men Required").alias("SecondSafetyTestTwoMenRequired"),		
                   col("2nd Safety Test Covered in Contract").alias("SecondSafetyTestCoveredinContract"),		
                   col("Billing Amount").alias("BillingAmount"),		
                   col("Contract End Date").alias("ContractEndDate"),		
                   col("Expiration Date").alias("ExpirationDate"),		
                   col("Factory Warranty End Date").alias("FactoryWarrantyEndDate"),		
                   col("Final Acceptance Date").alias("FinalAcceptanceDate"),		
                   col("TKExtend End Date").alias("TKExtendEndDate"),		
                   col("1st Last Safety Test").alias("FirstLastSafetyTest"),		
                   col("2nd Last Safety Test").alias("SecondLastSafetyTest"),		
                   col("Last Safety Survey Date").alias("LastSafetySurveyDate"),		
                   col("1st Next Safety Inspection").alias("FirststNextSafetyInspection"),		
                   col("2nd Next Safety Inspection").alias("SecondNextSafetyInspection"),		
                   col("Contract Start Date").alias("ContractStartDate"),		
                   col("Install Date").alias("InstallDate"),		
                   col("Contract Line Number").alias("ContractLineNumber"),		
                   col("Currently Maintained by").alias("CurrentlyMaintainedby"),		
                   col("Unit Name").alias("UnitName"),		
                   col("Equipment Condition").alias("EquipmentCondition"),		
                   col("IB External reference").alias("IBExternalreference"),		
                   col("Wrap").alias("Wrap"),		
                   col("Installed Location").alias("InstalledLocation"),		
                   col("Jack Type").alias("JackType"),		
                   col("Legal Identification #").alias("LegalIdentificationNumber"),		
                   col("Machine Room Location").alias("MachineRoomLocation"),		
                   col("Rear/Side Door Complexity").alias("Rear_SideDoorComplexity"),		
                   col("Sequence2").alias("Sequence2"),		
                   col("All Associated Service Contracts").alias("AllAssociatedServiceContracts"),		
                   col("Tax Exempt Building Classification").alias("TaxExemptBuildingClassification"),		
                   col("Buffer Type").alias("BufferType"),		
                   col("Controller Type").alias("ControllerType"),		
                   col("Type of Leveling Devices").alias("TypeofLevelingDevices"),		
                   col("Hydraulic-Valve Info").alias("HydraulicValveInfo"),		
                   col("Unit Telephone Number2").alias("UnitTelephoneNumber2"),		
                   col("`Car Door Height (in.)`").alias("CarDoorHeight_in"),		
                   col("`Car Door Width (in.)`").alias("CarDoorWidth_in"),		
                   col("`1st Safety Frequency (years)`").alias("1stSafetyFrequency_years"),		
                   col("Rope Size").alias("RopeSize"),		
                   col("Rope Length").alias("RopeLength"),		
                   col("Front Openings").alias("FrontOpenings"),		
                   col("Number of Landings").alias("NumberofLandings"),		
                   col("`Rear/Side Openings`").alias("RearSideOpenings"),		
                   col("`# of Ropes`").alias("NumberOfRopes"),		
                   col("Number of Steps").alias("NumberofSteps"),		
                   col("`2nd Safety Frequency (years)`").alias("SecondSafetyFrequency_years"),		
                   col("Sequence").alias("Sequence"),		
                   col("Longitude").alias("Longitude"),		
                   col("Number of Openings").alias("NumberofOpenings"),		
                   col("Contract Status Value").alias("ContractStatusValue"),		
                   col("Jack Lift Type").alias("JackLiftType"),		
                   col("Jack Orientation").alias("JackOrientation"),		
                   col("Machine Manufacturer").alias("MachineManufacturer"),		
                   col("Machine Model").alias("MachineModel"),		
                   col("Usage").alias("Usage"),		
                   col("EBS Relationship").alias("EBSRelationship"),		
                   col("Balustrade Type").alias("BalustradeType"),		
                   col("CRM Billing Account Row ID").alias("CRMBillingAccountRowID"),		
                   col("Branch").alias("Branch"),		
                   col("EBS Branch #").alias("EBSBranchNumber"),		
                   col("Building Type").alias("BuildingType"),		
                   col("Capacity").alias("Capacity"),		
                   col("1st Safety Test Duration").alias("1stSafetyTestDuration"),		
                   col("2nd Safety Inspection Type").alias("2ndSafetyInspectionType"),		
                   col("Contract Line Status").alias("ContractLineStatus"),		
                   col("Controller Manufacturer").alias("ControllerManufacturer"),		
                   col("Controller Model").alias("ControllerModel"),		
                   col("Region").alias("Region"),		
                   col("Door Opening Type").alias("DoorOpeningType"),		
                   col("Application of Unit").alias("ApplicationofUnit"),		
                   col("Front Door Complexity").alias("FrontDoorComplexity"),		
                   col("Drive Type").alias("DriveType"),		
                   col("Drive - Motor Starter Make").alias("DriveMotorStarterMake"),		
                   col("Elevator ID").alias("ElevatorID"),		
                   col("Unit Telephone Number").alias("UnitTelephoneNumber"),		
                   col("Environment").alias("Environment"),		
                   col("Equipment Sub-Type").alias("EquipmentSubType"),		
                   col("Jack OEM").alias("JackOEM"),		
                   col("Length of Handrail").alias("LengthofHandrail"),		
                   col("MAX Device ID").alias("MAXDeviceID"),		
                   col("TKExtend Status").alias("TKExtendStatus"),		
                   col("Market Segment").alias("MarketSegment"),		
                   col("Mechanic Employee #").alias("MechanicEmployeeNumber"),		
                   col("Mechanic Person ID").alias("MechanicPersonID"),		
                   col("Mechanic Supervisor Employee #").alias("MechanicSupervisorEmployeeNumber"),		
                   col("Mechanic Supervisor Name").alias("MechanicSupervisorName"),		
                   col("Mechanic Supervisor Person ID").alias("MechanicSupervisorPersonID"),		
                   col("Model").alias("Model"),		
                   col("Motor Current").alias("MotorCurrent"),		
                   col("Number in Group").alias("NumberinGroup"),		
                   col("Factory Serial #").alias("FactorySerialNumber"),		
                   col("OEM Unit").alias("OEMUnit"),		
                   col("Old Unit CRM Row ID").alias("OldUnitCRMRowID"),		
                   col("Contract Status").alias("ContractStatus"),		
                   col("2nd Safety Test Duration").alias("2ndSafetyTestDuration"),		
                   col("Previous Site Id").alias("PreviousSiteId"),		
                   col("Pump Motor OEM").alias("PumpMotorOEM"),		
                   col("Hydraulic - Pump Unit Type").alias("HydraulicPumpUnitType"),		
                   col("Financial Reporting Area").alias("FinancialReportingArea"),		
                   col("Rise / Travel").alias("Rise_Travel"),		
                   col("Route #").alias("RouteNumber"),		
                   col("Route Mechanic").alias("RouteMechanic"),		
                   col("1st Safety Inspection Type").alias("1stSafetyInspectionType"),		
                   col("Site Number").alias("SiteNumber"),		
                   col("Speed").alias("Speed"),		
                   col("Unit Status").alias("UnitStatus"),		
                   col("Step Width").alias("StepWidth"),		
                   col("`Hoistway - Suspension (Roping)`").alias("HoistwaySuspensionRoping"),		
                   col("Group Name").alias("GroupName"),		
                   col("Year of Modernization").alias("YearofModernization"),		
                   col("Optimized Account Id").alias("OptimizedAccountId"),
                    
                   to_timestamp(col("Modified: Date"),"MM/dd/yyyy hh:mm:SS a").alias("ModifiedDate"), 
                   to_timestamp(col("Created: Date"),"MM/dd/yyyy hh:mm:SS a").alias("CreatedDate") 
                   ).filter("ModifiedDate is not null") 
          )

window = Window.partitionBy("IBUnitKey").orderBy(IBUnit["ModifiedDate"].desc())

(IBUnit.dropDuplicates().withColumn("RowNumber" , row_number().over(window))
 .filter("RowNumber == 1")
 .drop("RowNumber")
 .withColumn("CreatedYear",year(col("CreatedDate").cast("date")))
 .coalesce(1)
 .write
 .format('delta')
 .mode("overwrite")
 .partitionBy("CreatedYear")
 .save("/mnt/datalake_curated/view_migration/ib_unit_g")
)    

(IBUnit.dropDuplicates().withColumn("RowNumber" , row_number().over(window))
 .filter("RowNumber == 1")
 .drop("RowNumber")
 .coalesce(1)
 .write
 .format('csv')
 .option("header","true")
 .mode("overwrite")
 .save("/mnt/datalake_curated/view_migration/ib_unit")
) 

In [0]:
"""
Existing Quote Curated  
"""


from pyspark.sql.functions import col , row_number , year , to_timestamp , year
from pyspark.sql import Window
  
ExistingQuote = (spark.read
            .option("header","true")
            .option("quote", "\"")
            .option("escape", "\"")
            .option("parserLib", "univocity")
            .option("multiline","true")
            .option("comment",None) 
            .csv(f"/mnt/datalake_raw/batch/sales/oraclecrm/existing_quote/full/*.csv")
            .select(
                col("Row Id").alias("ExistingQuoteKey"),
                col("Opportunity Id").alias("OpportunityId"),
                col("Quote Description").alias("QuoteDescription"),
                col("Quote Number").alias("QuoteNumber"),
                col("Revenue").alias("Revenue"),
                col("External Unique ID").alias("ExternalUniqueID"),
                col("Status").alias("Status"),
                to_timestamp(col("Modified: Date"),"MM/dd/yyyy hh:mm:SS a").alias("ModifiedDate"), 
                to_timestamp(col("Created: Date"),"MM/dd/yyyy hh:mm:SS a").alias("CreatedDate") 
            )
          )

window = Window.partitionBy("ExistingQuoteKey").orderBy(ExistingQuote["ModifiedDate"].desc())

(ExistingQuote.dropDuplicates().withColumn("RowNumber" , row_number().over(window))
 .filter("RowNumber == 1")
 .drop("RowNumber")
 .withColumn("CreatedYear",year(col("CreatedDate").cast("date")))
 .coalesce(1)
 .write
 .format('delta')
 .mode("overwrite")
 .partitionBy("CreatedYear")
 .save("/mnt/datalake_curated/view_migration/existing_quote_g")
)    

(ExistingQuote.dropDuplicates().withColumn("RowNumber" , row_number().over(window))
 .filter("RowNumber == 1")
 .drop("RowNumber")
 .coalesce(1)
 .write
 .format('csv')
 .option("header","true")
 .mode("overwrite")
 .save("/mnt/datalake_curated/view_migration/existing_quote")
)    


In [0]:
"""
Opportunity IB Unit
"""
from pyspark.sql.functions import col , row_number , year , to_timestamp, regexp_replace , lit , to_date
from pyspark.sql.types import DoubleType
from pyspark.sql import Window
from delta.tables import *
spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")

OpportunityIBUnit = (spark.read
        .option("header","true")
        .option("quote", "\"")
        .option("escape", "\"")
        .option("parserLib", "univocity")
        .option("inferSchema","true")
        .option("timestampFormat","MM/dd/yyyy hh:mm:SS a")
        .option("multiline","true")
        .option("comment",None) 
        .csv(f"/mnt/datalake_raw/batch/sales/oraclecrm/opportunity_ib_unit/full/*.csv")
       .select(
         
          col("Row Id").alias("OpportunityIBUnitKey"),
          col("Opportunity: External Unique ID").alias("OpportunityExternalUniqueId"),
          col("Opportunity: Integration ID").alias("OpportunityIntegrationId"),
          col("Child Object Name").alias("IBUnit"),
          col("Parent Record Row Id").alias("OpportunityId"),
          col("Child Object Id").alias("ChildObjectId"),

          to_timestamp(col("Modified"),"M/d/yyyy HH:mm").alias("ModifiedDate"), 
          to_timestamp(col("Created"),"M/d/yyyy HH:mm").alias("CreatedDate") ,


           )
       )


window = Window.partitionBy("OpportunityIBUnitKey").orderBy(OpportunityIBUnit["ModifiedDate"].desc())

(OpportunityIBUnit.dropDuplicates().withColumn("RowNumber" , row_number().over(window))
.filter("RowNumber == 1")
.filter("ModifiedDate is not null")
.drop("RowNumber")
# .withColumn("CreatedYear",year(col("CreatedDate").cast("date")))
.coalesce(1)
.write
.format('delta')
.mode("overwrite")
# .partitionBy("CreatedYear")
.save("/mnt/datalake_curated/view_migration/opportunity_ib_unit_g")
)    

#Remove Deleted Opportunities 
deleted_items= spark.read.format('delta').load('/mnt/datalake_curated/view_migration/deleted_items_g')
opportunity_ib_unit_g = DeltaTable.forPath(spark, "/mnt/datalake_curated/view_migration/opportunity_ib_unit_g") 

opportunity_ib_unit_g.alias("t") \
    .merge( \
      deleted_items.alias("s"), \
      "s.Objectkey = t.OpportunityIntegrationId and type ='Opportunity'" ) \
    .whenMatchedDelete() \
    .execute()

# spark.read.format('delta').load('/mnt/datalake_curated/view_migration/opportunity_ib_unit_g')\
# .coalesce(1)\
# .write\
# .format('csv')\
# .option("header","true")\
# .mode("append")\
# .save("/mnt/datalake_curated/view_migration/opportunity_ib_unit")


In [0]:
"""
Opportunity Partner
"""
from pyspark.sql.functions import col , row_number , year , to_timestamp, regexp_replace , lit , to_date
from pyspark.sql.types import DoubleType
from pyspark.sql import Window
from delta.tables import *
spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")

OpportunityPartner = (spark.read
        .option("header","true")
        .option("quote", "\"")
        .option("escape", "\"")
        .option("parserLib", "univocity")
        .option("inferSchema","true")
        .option("timestampFormat","MM/dd/yyyy hh:mm:SS a")
        .option("multiline","true")
        .option("comment",None) 
        .csv(f"/mnt/datalake_raw/batch/sales/oraclecrm/opportunity_partner/full/*.csv")
       .select(
         
          col("Row Id").alias("opportunityPartnerKey"),
          col("Account Type").alias("AccountType"),
          col("Partner ID").alias("PartnerId"),
          col("Account Name").alias("AccountName"),
          col("Opportunity Id").alias("OpportunityId"),
          to_timestamp(col("Modified: Date"),"M/d/yyyy HH:mm").alias("ModifiedDate"), 
          to_timestamp(col("Created: Date"),"M/d/yyyy HH:mm").alias("CreatedDate") 


           )
       )


window = Window.partitionBy("opportunityPartnerKey").orderBy(OpportunityPartner["ModifiedDate"].desc())

(OpportunityPartner.dropDuplicates().withColumn("RowNumber" , row_number().over(window))
.filter("RowNumber == 1")
.filter("ModifiedDate is not null")
.drop("RowNumber")
.withColumn("CreatedYear",year(col("CreatedDate").cast("date")))
.coalesce(1)
.write
.format('delta')
.mode("overwrite")
.partitionBy("CreatedYear")
.save("/mnt/datalake_curated/view_migration/opportunity_partner_g")
)    

#Remove Deleted Opportunities 
deleted_items= spark.read.format('delta').load('/mnt/datalake_curated/view_migration/deleted_items_g')
opportunity_partner_g = DeltaTable.forPath(spark, "/mnt/datalake_curated/view_migration/opportunity_partner_g") 

opportunity_partner_g.alias("t") \
    .merge( \
      deleted_items.alias("s"), \
      "s.Objectkey = t.OpportunityId and type ='Opportunity'" ) \
    .whenMatchedDelete() \
    .execute()

spark.read.format('delta').load('/mnt/datalake_curated/view_migration/opportunity_partner_g')\
.coalesce(1)\
.write\
.format('csv')\
.option("header","true")\
.mode("append")\
.save("/mnt/datalake_curated/view_migration/opportunity_partner") 

In [0]:
"""
Opportunity Contact Role
"""

from pyspark.sql.functions import col , row_number , year , to_timestamp, regexp_replace , lit , to_date
from pyspark.sql.types import DoubleType
from pyspark.sql import Window
from delta.tables import *
spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")

OpportunityContactRole = (spark.read
        .option("header","true")
        .option("quote", "\"")
        .option("escape", "\"")
        .option("parserLib", "univocity")
        .option("inferSchema","true")
        .option("timestampFormat","MM/dd/yyyy hh:mm:SS a")
        .option("multiline","true")
        .option("comment",None) 
        .csv(f"/mnt/datalake_raw/batch/sales/oraclecrm/opportunity_contact_role/full/*.csv")
       .select(
         
          col("Row Id").alias("opportunityContactRoleKey"),
         col("Contact Id").alias("ContactId"),
          col("Contact Name").alias("ContactName"),
          col("Primary").alias("Primary"),
          col("Eagle Roles").alias("EagleRoles"),
         col("First Name").alias("FirstName"),
         col("Last Name").alias("LastName"),
          col("Opportunity Id").alias("OpportunityId"),
          to_timestamp(col("Modified: Date"),"M/d/yyyy HH:mm").alias("ModifiedDate"), 
          to_timestamp(col("Created: Date"),"M/d/yyyy HH:mm").alias("CreatedDate") 


           )
       )


window = Window.partitionBy("opportunityContactRoleKey").orderBy(OpportunityContactRole["ModifiedDate"].desc())

(OpportunityContactRole.dropDuplicates().withColumn("RowNumber" , row_number().over(window))
.filter("RowNumber == 1")
.filter("ModifiedDate is not null")
.drop("RowNumber")
.coalesce(1)
.write
.format('delta')
.mode("overwrite")
.save("/mnt/datalake_curated/view_migration/opportunity_contact_role_g")
)    

#Remove Deleted Opportunities 
deleted_items= spark.read.format('delta').load('/mnt/datalake_curated/view_migration/deleted_items_g')
opportunity_contact_role_g = DeltaTable.forPath(spark, "/mnt/datalake_curated/view_migration/opportunity_contact_role_g") 

opportunity_contact_role_g.alias("t") \
    .merge( \
      deleted_items.alias("s"), \
      "s.Objectkey = t.OpportunityId and type ='Opportunity'" ) \
    .whenMatchedDelete() \
    .execute()

spark.read.format('delta').load('/mnt/datalake_curated/view_migration/opportunity_contact_role_g')\
.coalesce(1)\
.write\
.format('csv')\
.option("header","true")\
.mode("append")\
.save("/mnt/datalake_curated/view_migration/opportunity_contact_role") 

In [0]:

"""
Activity Curated  
"""



from pyspark.sql.functions import col , row_number , year , to_timestamp , lit
from pyspark.sql import Window
from delta.tables import *

TaskActivity = (spark.read.format('csv')
              .option("quote",'"')
              .option("escape",'"')
              .option("header","true")
              .option("multiLine","true")
              .option("timestampFormat","MM/dd/yyyy hh:mm:SS a")
              #.csv(user_list)
              .csv("/mnt/datalake_raw/batch/sales/oraclecrm/activity/full/tasks/*.csv")
             .select(
                     col("Row Id").alias("ActivityKey"),
                col("Account Id").alias("AccountId"),
                col("Account").alias("Account"),
                col("Activity").alias("Activity"),
                col("Alias").alias("Alias"),
                col("Completed").alias("Completed"),
                col("Activity Completed").alias("ActivityCompleted"),
                col("Completed Date").alias("CompletedDate"),
                col("Activity SubType").alias("ActivitySubType"),
                col("Due Date").alias("DueDate"),
                col("Branch").alias("Branch"),
                col("Region").alias("Region"),
                col("Integration ID").alias("IntegrationID"),
                col("Opportunity Id").alias("OpportunityId"),
                col("Owner External Unique Id").alias("OwnerExternalUniqueId"),
                col("Owner Full Name2").alias("OwnerFullName"),
                col("Owner Id").alias("OwnerId"),
                col("Owner Integration Id").alias("OwnerIntegrationId"),
                col("Primary Owner Id").alias("PrimaryOwnerId"),
                col("Case").alias("Case"),
                col("Status").alias("Status"),
                col("Type").alias("Type"), 
                col("Description").alias("Description"), 
                to_timestamp(col("Modified: Date"),"MM/dd/yyyy hh:mm:SS a").alias("ModifiedDate"), 
                to_timestamp(col("Created: Date"),"MM/dd/yyyy hh:mm:SS a").alias("CreatedDate")
             )
             )

AppointmentActivity = (spark.read.format('csv')
              .option("quote",'"')
              .option("escape",'"')
              .option("header","true")
              .option("multiLine","true")
              .option("timestampFormat","MM/dd/yyyy hh:mm:SS a")
              #.csv(user_list)
              .csv("/mnt/datalake_raw/batch/sales/oraclecrm/activity/full/appointments/*.csv")
             .select(
                     col("Row Id").alias("ActivityKey"),
                col("Account Id").alias("AccountId"),
                col("Account").alias("Account"),
                col("Activity").alias("Activity"),
                col("Alias").alias("Alias"),
                col("Completed").alias("Completed"),
                col("Activity Completed").alias("ActivityCompleted"),
                lit(None).alias("CompletedDate"),
                col("Activity SubType").alias("ActivitySubType"),
                col("Due Date").alias("DueDate"),
                col("Branch").alias("Branch"),
                col("Region").alias("Region"),
                col("Integration ID").alias("IntegrationID"),
                col("Opportunity Id").alias("OpportunityId"),
                col("Owner External Unique Id").alias("OwnerExternalUniqueId"),
                col("Owner Full Name2").alias("OwnerFullName"),
                col("Owner Id").alias("OwnerId"),
                col("Owner Integration Id").alias("OwnerIntegrationId"),
                col("Primary Owner Id").alias("PrimaryOwnerId"),
                col("Case").alias("Case"),
                col("Status").alias("Status"),
                col("Type").alias("Type"), 
                col("Description").alias("Description"), 
                to_timestamp(col("Modified: Date"),"MM/dd/yyyy hh:mm:SS a").alias("ModifiedDate"), 
                to_timestamp(col("Created: Date"),"MM/dd/yyyy hh:mm:SS a").alias("CreatedDate")
             )
             )

Activity = TaskActivity.unionAll(AppointmentActivity)
window = Window.partitionBy("ActivityKey").orderBy(Activity["ModifiedDate"].desc())

(Activity.dropDuplicates().withColumn("RowNumber" , row_number().over(window))
.filter("RowNumber == 1")
.drop("RowNumber")
.coalesce(1)
.write
.format('delta')
.mode("overwrite")
.save("/mnt/datalake_curated/view_migration/activity_g")
)

(Activity.dropDuplicates().withColumn("RowNumber" , row_number().over(window))
.filter("RowNumber == 1")
.drop("RowNumber")
.coalesce(1)
.write
.format('csv')
.option("header","true")
.mode("append")
.save("/mnt/datalake_curated/view_migration/activity")
)



In [0]:
"""
Quote Header
"""


from pyspark.sql.types import StringType , TimestampType , DoubleType , StructType , StructField , LongType
from pyspark.sql.functions import col  , year , to_timestamp , to_date , row_number , lit
from pyspark.sql import Window

quoteSchema = StructType([


StructField("crmOpportunityId_quote",StringType(), True),
StructField("quoteNumber_quote",StringType(), True),
StructField("transactionID_quote",StringType(), True),
StructField("crmBranch_quote",StringType(),True),
StructField("_newBillTo_company_name",StringType(), True),
StructField("_newBillTo_last_name",StringType(), True),
StructField("_newBillTo_first_name",StringType(), True),
StructField("_newBillTo_address",StringType(), True),
StructField("_newBillTo_address_2",StringType(), True),
StructField("_newBillTo_city",StringType(), True),
StructField("_newBillTo_state",StringType(), True),
StructField("_newBillTo_zip",StringType(), True),
StructField("_newBillTo_company_name_2",StringType(), True),
StructField("_newBillTo_country",StringType(), True),
StructField("_shippingAddress_company_name",StringType(), True),
StructField("_shippingAddress_last_name",StringType(), True),
StructField("_shippingAddress_first_name",StringType(), True),
StructField("_shippingAddress_address",StringType(), True),
StructField("_shippingAddress_address_2",StringType(), True),
StructField("_shippingAddress_city",StringType(), True),
StructField("_shippingAddress_state",StringType(), True),
StructField("_shippingAddress_zip",StringType(), True),
StructField("_shippingAddress_company_name_2",StringType(), True),
StructField("_shippingAddress_country",StringType(), True),
StructField("siteAccountID_quote",StringType(), True),         
StructField("crmLineOfBusiness_quote",StringType(), True),
StructField("sublineOfBusiness_quote",StringType(), True),
StructField("totalMaterialCost_quote" ,StructType([StructField("value", DoubleType() , True) , StructField("currency" , StringType() , True)])),
StructField("mechanicLaborRate_quote" ,StructType([StructField("value", DoubleType() , True) , StructField("currency" , StringType() , True)])),
  #added 03/08/2022 BS
StructField("mechanicLaborOvertimeRate_quote" ,StructType([StructField("value", DoubleType() , True) , StructField("currency" , StringType() , True)])),
  #added 03/08/2022 BS
StructField("adjusterLaborRate_quote" ,StructType([StructField("value", DoubleType() , True) , StructField("currency" , StringType() , True)])),
  #added 03/08/2022 BS
StructField("adjusterLaborOvertimeRate_quote" ,StructType([StructField("value", DoubleType() , True) , StructField("currency" , StringType() , True)])),
  #added 03/08/2022 BS
StructField("teamLaborRate_quote" ,StructType([StructField("value", DoubleType() , True) , StructField("currency" , StringType() , True)])),
  #added 03/08/2022 BS
StructField("teamLaborOvertimeRate_quote" ,StructType([StructField("value", DoubleType() , True) , StructField("currency" , StringType() , True)])),
  #added 03/08/2022 BS
StructField("totalLaborCost_quote" ,StructType([StructField("value", DoubleType() , True) , StructField("currency" , StringType() , True)])),
StructField("total_quote" ,StructType([StructField("value", DoubleType() , True) , StructField("currency" , StringType() , True)])),
StructField("totalCost_quote" ,StructType([StructField("value", DoubleType() , True) , StructField("currency" , StringType() , True)])),
StructField("totalMarginPercent_quote",StringType(), True ),
StructField("createdDate_quote",TimestampType(), True),
StructField("addendumType_quote",StringType(),True),  
StructField("desiredContractType_quote",StringType(),True),
StructField("_date_modified",TimestampType(), True) ,
StructField("_date_added",TimestampType(), True)  
    
            ])

Quote_Header = (
                        spark.read
                          .schema(quoteSchema)
                          .option("multiline","true")
                          .json("/mnt/datalake_raw/batch/sales/bigmachine/commerce_v2/quote_header/*/*.json")
 
)
       
window = Window.partitionBy("transactionID_quote").orderBy(Quote_Header["_date_modified"].desc())

(Quote_Header.dropDuplicates().withColumn("RowNumber" , row_number().over(window))
 .filter("RowNumber == 1")
 .drop("RowNumber")
 .select(
        "crmOpportunityId_quote",
        "quoteNumber_quote",
        "transactionID_quote",
        "crmBranch_quote",
        "_newBillTo_company_name",
        "_newBillTo_last_name",
        "_newBillTo_first_name",
        "_newBillTo_address",
        "_newBillTo_address_2",
        "_newBillTo_city",
        "_newBillTo_state",
        "_newBillTo_zip",
        "_newBillTo_company_name_2",
        "_newBillTo_country",
        "_shippingAddress_company_name",
        "_shippingAddress_last_name",
        "_shippingAddress_first_name",
        "_shippingAddress_address",
        "_shippingAddress_address_2",
        "_shippingAddress_city",
        "_shippingAddress_state",
        "_shippingAddress_zip",
        "_shippingAddress_company_name_2",
        "_shippingAddress_country",
        "siteAccountID_quote",
        "crmLineOfBusiness_quote",
        "sublineOfBusiness_quote",  
        col("totalMaterialCost_quote.value").alias("totalMaterialCost_quote"),
        col("mechanicLaborRate_quote.value").alias("mechanicLaborRate_quote"),
        col("mechanicLaborOvertimeRate_quote.value").alias("mechanicLaborOvertimeRate_quote"),
        col("adjusterLaborRate_quote.value").alias("adjusterLaborRate_quote"),
        col("adjusterLaborOvertimeRate_quote.value").alias("adjusterLaborOvertimeRate_quote"),
        col("teamLaborRate_quote.value").alias("teamLaborRate_quote"),
        col("teamLaborOvertimeRate_quote.value").alias("teamLaborOvertimeRate_quote"),
        col("totalLaborCost_quote.value").alias("totalLaborCost_quote"),
        col("total_quote.value").alias("total_quote"),
        col("totalCost_quote.value").alias("totalCost_quote"),
        col("totalMarginPercent_quote"),
        "createdDate_quote",
        "addendumType_quote",
        "desiredContractType_quote",
        "_date_modified",
        "_date_added"
 ).withColumn("CreatedYear",year(col("createdDate_quote").cast("date")))
 .coalesce(1)
 .write
 .partitionBy("CreatedYear")
 .format('delta')
 .mode("overwrite")
 .option("overwriteSchema","true")
 .save("/mnt/datalake_curated/view_migration/quote_header_g")
)


spark.read.format('delta').load('/mnt/datalake_curated/view_migration/quote_header_g')\
.coalesce(1)\
.write\
.format('csv')\
.option("header","true")\
.mode("append")\
.save("/mnt/datalake_curated/view_migration/quote_header")

In [0]:
"""
Quote Line
"""


from pyspark.sql.types import StringType , TimestampType , DoubleType , StructType , StructField
from pyspark.sql.functions import col  , year , to_timestamp , to_date , row_number , lit
from pyspark.sql import Window

quoteLineSchema = StructType([
                     StructField("_bs_id", StringType() , False),
                    StructField("_id", StringType() , False),
                  StructField("_sequence_number", StringType() , False),
                  StructField("lineDocNum_line", StringType() , False),
                  StructField("_document_number", StringType() , False),
                  StructField("_parent_doc_number", StringType() , False),
                  StructField("buildingName_line", StringType() , True),
                  StructField("lineType_line", StructType([StructField("value", StringType() , True)])), 
                  StructField("itemDescription_line", StringType() , True),
                  StructField("unitDesignation_line", StringType() , True),
                  StructField("_part_number", StringType() , True),
                  StructField("oracleSerialNumber_line", StringType() , True),
                  StructField("oemSerialNumber_line", StringType() , True),
                  StructField("_date_modified",TimestampType(), True) ,
                  StructField("_date_added",TimestampType(), True)  ,
                  StructField("numOfUnitsOnTheEstimate_line", StringType() , True),
                   StructField("proposalPricePerUnit_line" ,StructType([StructField("value", DoubleType() , True) , StructField("currency" , StringType() , True)])),
                   StructField("proposalPrice_line" ,StructType([StructField("value", DoubleType() , True) , StructField("currency" , StringType() , True)])),
                   StructField("totalCost_line" ,StructType([StructField("value", DoubleType() , True) , StructField("currency" , StringType() , True)])),
                  StructField("totalLaborHours_line" , StringType() , True),
                  StructField("totalTeamLaborHours_line" , StringType() , True),
                  StructField("unitLaborCost_line" ,StructType([StructField("value", DoubleType() , True) , StructField("currency" , StringType() , True)])),
                  StructField("unitMaterialCost_line" ,StructType([StructField("value", DoubleType() , True) , StructField("currency" , StringType() , True)])),
                   StructField("unitTotalLaborHours_line" , StringType() , True),
                    StructField("extraExpenses_line" ,StructType([StructField("value", DoubleType() , True) , StructField("currency" , StringType() , True)])),
                   StructField("grossMarginWithoutOverhead_line" , StringType() , True),
                     StructField("marginAmount_line" ,StructType([StructField("value", DoubleType() , True) , StructField("currency" , StringType() , True)])),
                  StructField("marginPercentage_line" , StringType() , True),

                  StructField("mechanicHours_line" , StringType() , True),
                  StructField("helperHours_line" , StringType() , True),
                  StructField("useTax_line" , StringType() , True),
                  
                  StructField("contractType_line",StringType(),True)
 






                  #StructField("_config_attr_info", StructType([StructField("value",StringType(), True)]), True)
                           ] 
                           )

Quote_Line = (
                        spark.read
                           .schema(quoteLineSchema)
                          .option("multiline","true")
                          .option("timestampFormat","MM/dd/yyyy HH:mm:SS a")
                          .json("/mnt/datalake_raw/batch/sales/bigmachine/commerce_v2/quote_line/*/*.json")
                          .select( 
                        "_bs_id"
                        ,"_id"
                        ,"_sequence_number"
                        ,"lineDocNum_line"
                        ,"_parent_doc_number"
                        ,"buildingName_line"
                        ,col("lineType_line.value").alias("lineType_line") 
                        ,"itemDescription_line"
                        ,"unitDesignation_line"
                        ,"_part_number"
                        ,"oracleSerialNumber_line"
                        ,"oemSerialNumber_line"
                        ,"numOfUnitsOnTheEstimate_line"                            
                       ,col("proposalPricePerUnit_line.value").alias("proposalPricePerUnit_line") 
                            ,col("proposalPrice_line.value").alias("proposalPrice_line") 
                            ,col("totalCost_line.value").alias("totalCost_line") 
                            ,col("unitLaborCost_line.value").alias("unitLaborCost_line") 
                            ,col("unitMaterialCost_line.value").alias("unitMaterialCost_line")
                            ,col("extraExpenses_line.value").alias("extraExpenses_line") 
                            ,col("marginAmount_line.value").alias("marginAmount_line") 
                            ,"totalLaborHours_line"
                            ,"totalTeamLaborHours_line"
                            ,"unitTotalLaborHours_line"
                            ,"grossMarginWithoutOverhead_line"
                            ,"marginPercentage_line"  
                            ,"mechanicHours_line"
                            ,"helperHours_line"
                            ,"useTax_line"
                            ,"contractType_line"
                         ,"_date_modified"
                        ,"_date_added"

                                  )
                     )

      
window = Window.partitionBy("_id").orderBy(Quote_Line["_date_modified"].desc())


(Quote_Line.dropDuplicates().withColumn("RowNumber" , row_number().over(window))
 .filter("RowNumber == 1")
 .drop("RowNumber")
 .withColumn("CreatedYear",year(col("_date_added").cast("date")))
#  .join(Quote_Header,Quote_Line._bs_id == Quote_Header.transactionID_quote ,how ='leftsemi')
 .coalesce(1)
 .write
 .format('delta')
 .partitionBy("CreatedYear")
 .mode("overwrite")
 .option("overwriteSchema","true")
 .save("/mnt/datalake_curated/view_migration/quote_line_g")
)

spark.read.format('delta').load('/mnt/datalake_curated/view_migration/quote_line_g')\
.coalesce(1)\
.write\
.format('csv')\
.option("header","true")\
.mode("append")\
.save("/mnt/datalake_curated/view_migration/quote_line")

In [0]:
"""
Quote Line Config & Material Summary
"""

from pyspark.sql.types import StringType , StructField , StructType , TimestampType
from pyspark.sql.functions import split , col , lit , explode, from_csv , first , row_number ,  size , arrays_zip , year ,to_timestamp , to_date 
from pyspark.sql.window import Window
spark.conf.set('spark.sql.caseSensitive', True)


quoteLineSchema = StructType([StructField("_bs_id",StringType() , False),
                              StructField("_id",StringType() , False),
                               StructField("_sequence_number",StringType() , True),
                              StructField("lineDocNum_line",StringType() , True),
                              StructField("_document_number",StringType() , False),
                              StructField("_date_modified",TimestampType(), False) ,
                              StructField("_date_added",TimestampType(), False) ,
                              StructField("_config_attr_info", StructType([StructField("value",StringType(), True)]), True)
                                               
                             ])



Quote_Line_Config = (
                      spark.read
                         .schema(quoteLineSchema)
                        .option("multiline","true")
                        .option("timestampFormat","MM/dd/yyyy HH:mm:SS a")
                        .json("/mnt/datalake_raw/batch/sales/bigmachine/commerce_v2/quote_line/*/*.json")
                        .select(
                        "_bs_id"
                        ,"_id"
                        ,"_sequence_number"
                       ,"lineDocNum_line"
                          ,"_document_number" 
                        ,"_date_modified"
                         ,"_date_added"
                         ,"_config_attr_info.value"

                        )

                   )

# Drop Duplicates and retieve the most recent lines 
window = Window.partitionBy("_id").orderBy(Quote_Line_Config["_date_modified"].desc())

Quote_Line_Config = (Quote_Line_Config.dropDuplicates().withColumn("RowNumber" , row_number().over(window))
  .filter("RowNumber == 1")
  .filter("value is not null")
  .drop("RowNumber"))

quote_line_config = (Quote_Line_Config
.select(split(col("value"), '\\|\\^\\|').alias("ColumnArray"),"_bs_id","_id" ,"_document_number","lineDocNum_line" , "_date_modified" ,"_date_added")
.select("_bs_id","_id" ,"_document_number","lineDocNum_line","_date_modified" ,"_date_added",explode("ColumnArray"))
.select("_bs_id","_id","_document_number","lineDocNum_line", "_date_modified" ,"_date_added",split("col","~")[0].alias("colName") , split("col","~")[2].alias("value"))
.groupBy("_bs_id","_id" ,"_document_number","lineDocNum_line", "_date_modified" ,"_date_added").pivot("colName").agg(first("value"))
                   
     )

quote_line_config_details = quote_line_config.select(
  "_bs_id",
  "_id",
  "_document_number",
  "lineDocNum_line",
  "_date_modified",
  "_date_added",
   "numberOfCarsInGroupDropdown",
   "existingEquipment",
    "equipmentType",
    "equipmentTypeVIEW",
    "equipmentClass",
    "upspeedOfCar",
    "unitCapacityForRepair",
    "numberOfCarsInGroup",
    "repairNumberOfStops",
    "numberOfFrontOpenings",
    "numberOfRearOpenings",
    "totalOpenings",
    "boardName",
    "controllerBrand",
    "specificControllerModels",
    "controllerType",
    "directReplacement",
    "applicationOfUnit",
    "boardType",
    "purification",
  # Added on 9/3 
    "bACNet_material",
    "cabHeightSelection",
    "compensationType",
    "controllerLocation",
    "counterweightSafety",
    "flooringThickness",
    "pitDepthInFeet",
    "interimMaintenanceTotalPrice",
    "interimMaintenanceMonths",
    "freeServiceTotalPrice",
    "freeServiceMonths",
    "appliedSRTDiscounts",
    "mAXFactoryLeadTime",
    "batch1LeadTime",
    "batch2LeadTime",
    "batch3LeadTime",
    "equipmentClassification",
    "rearDoorTypeAndHand",
    "driveType",
    "machineRoomLocation",
    "jackType",
    "capacity",
    "frontDoorTypeAndHand",
    "frontOpenings",
    "unitNumberOfStops",
    "rearOpenings",
    "futureSpeed",
    "travelInFeet",
    "totalFactoryMaterialCost",
    "otherMaterialCosts",
    "miscellaneousLaborHours",
    "adjustingInspectionNIMHours",
    "jHAHours",
    "totalCabHours",
    "totalCarDoorEquipmentHours",
    "totalCarFixturesHours",
    "totalCarHours",
    "totalControllerTaskHours",
    "totalGovernorHours",
    "totalHallFixturesHours",
    "totalHoistwayDoorEquipmentHours",
    "totalHoistwayEquipmentHours",
    "totalJackHours",
    "totalMachineHours",
    "totalPitEquipmentHours",
    "totalPowerUnitHours",
    "totalStandardJobTasksHours",
    "totalWorkByOthersHours",
    "aGILEDestinationControls",
    "roping",
    "selectedRoping",
    "ropeSize",
    "finalRopeSize",
    "finalRopeQuantity",
    "existingCarWeight",

    "existingMotorHP",
    "existingMotorRPM",
    "mainlineVoltage",
    "overheadInFeet",
    "futureOverheadInFeet",


    "orderType",
    "existingControllerManufacturer",
    "existingControllerModel",
    "existingControllerType", 
# end 
    "nickname_ServiceUnitsSet",
    "legalID_ServiceUnitsSet",
    "numberOfStops_ServiceUnitsSet",
    "frontOpenings_ServiceUnitsSet",
    "customerNumber_ServiceUnitsSet",
    "branch_ServiceUnitsSet",
    "routenumber_ServiceUnitsSet",
    "siteAddressLine1_ServiceUnitsSet",
    "siteAddressLine2_ServiceUnitsSet",
    "city_ServiceUnitsSet",
    "state_ServiceUnitsSet",
    "zipCode_ServiceUnitsSet",
    "rearOpenings_ServiceUnitsSet",
    "productType_ServiceUnitsSet",
    "applicationOfUnit_ServiceUnitsSet",
    "manufacturer_ServiceUnitsSet",
    "oEMSerialNumber_ServiceUnitsSet",
    "controllerManufacturer_ServiceUnitsSet",
    "controllerModel__ServiceUnitsSet",
    "controllerType_ServiceUnitsSet",
    "criticalUnit_ServiceUnitsSet",
    "pumpMotorOEM_ServiceUnitsSet",
    "driveConfiguration_ServiceUnitsSet",
    "speed_ServiceUnitsSet",
    "periodicTestCovered_ServiceUnitsSet",
    "annualTestCovered_ServiceUnitsSet",
    "lastAnnualInspection_ServiceUnitsSet",
    "lastPeriodicInspectionDate_ServiceUnitsSet",
    "nextSafetyInspectionDate_ServiceUnitsSet",
    "lastCategory5InspectionDate_ServiceUnitsSet",
    "capacity_ServiceUnitsSet",
    "machineRoomLocation_ServiceUnitsSet",
    "pumpMotorType_ServiceUnitsSet",
    "jackType_ServiceUnitsSet",
    "accountName_ServiceUnitsSet",
    "doorComplexity_ServiceUnitsSet",
    "equipmentType_ServiceUnitsSet",
    "motorStarter_ServiceUnitsSet",
    "equipmentCondition_ServiceUnitsSet",
    "oracleSerialNumber_ServiceUnitsSet",
    "equipmentTypeVIEW_ServiceUnitsSet",
    "equipmentClass_ServiceUnitsSet",
    "equipmentManufacturer_ServiceUnitsSet",
    "equipmentModel_ServiceUnitsSet",
    "machineManufacturer_ServiceUnitsSet",
    "customerNumber_UnitsSet",
    "unitNickName_UnitsSet",
    "unitNickName_UnitsSet_baseTab",
    "factoryJob_UnitsSet",
    "equipmentTypeVIEW_UnitsSet",
    "siteAddressLine1_UnitsSet",
    "siteAddressLine2_UnitsSet",
#     "siteAddressLine3_UnitsSet",
    "oEMSerialNumber_UnitsSet",
    "city_UnitsSet",
    "zipCode",
    "legalID_UnitsSet",
    "oracleSerialNumber_UnitsSet",
    "state_UnitsSet",
    "buildingTypeName_UnitsSet",
#     "addingOrRemovingOpenings_UnitsSet",
#     "qDU",
    "frontOpenings_UnitsSet",
    "rearOpenings_UnitsSet",
    "unitNicknameReadonly_UnitsSet",
    "accountName_UnitsSet",
    "selectUnit_UnitsSet" ,
  
# add on 11/10/2021
    "additionalOfLandings",
    "additionalFrontOpenings",
    "additionalRearOpenings",
    "futureCapacity",
    "carUpspeed",
    "additionalTravelInFeet",
    "addALanding", #added on 12/07/21,
    "tMMechanicBillingRate", #added 03/08/2022 BS
    "tMTeamBillingRate", #added 03/08/2022 BS
    "tMOvertimeMechanicBillingRate", #added 03/08/2022 BS
    "tMOvertimeTeamBillingRate", #added 03/08/2022 BS
    "tMPremiumMechanicBillingRate", #added 03/08/2022 BS
    "tMPremiumTeamBillingRate" #added 03/08/2022 BS
    
)


# Save quote line Config 
(quote_line_config_details
#  .coalesce(1)
 .write
 .format('delta')
#  .option('header','true')
 .mode("overwrite")
 .option("overwriteSchema","true")
 .save("/mnt/datalake_curated/view_migration/quote_line_config_g")
)

# Save quote line Config csv
spark.read.format('delta').load('/mnt/datalake_curated/view_migration/quote_line_config_g')\
.coalesce(1)\
.write\
.format('csv')\
.option("header","true")\
.mode("append")\
.save("/mnt/datalake_curated/view_migration/quote_line_config")

In [0]:
"""
PRODUCT Curated
"""

from pyspark.sql.functions import col , row_number , year , to_timestamp
from pyspark.sql import Window


  
Product = (spark.read.format('csv')
            .option("quote",'"')
            .option("escape",'"')
            .option("header","true")
            .option("multiLine","true")
            .option("timestampFormat","MM/dd/yyyy hh:mm:SS a")
            .csv("/mnt/datalake_raw/batch/sales/oraclecrm/product/*.csv")
           .select(
                   col("Row Id").alias("ProductKey"),
                   col("Product Category Id2").alias("ProductCategoryKey"),
                   col("Product Category").alias("ProductCategory"),
                   col("Orderable"),
                   col("Description"),
                   col("Category"),
                   col("Product Type").alias("ProductType"),
                   col("Sub Type").alias("SubType"),
                   col("Class"),
                   col("Product Name").alias("ProductName"),
                   col("Part #").alias("PartNo"),
                  to_timestamp(col("Modified: Date"),"MM/dd/yyyy hh:mm:SS a").alias("ModifiedDate"), 
                   to_timestamp(col("Created: Date"),"MM/dd/yyyy hh:mm:SS a").alias("CreatedDate")
           )
           )

window = Window.partitionBy("ProductKey").orderBy(Product["ModifiedDate"].desc())

(Product.dropDuplicates().withColumn("RowNumber" , row_number().over(window))
 .filter("RowNumber == 1")
 .drop("RowNumber")
 .coalesce(1)
 .write
 .format('csv')
 .option("header","true")
 .mode("overwrite")
 .save("/mnt/datalake_curated/view_migration/product")
)