In [2]:
EXTRA_JARS = [
    "force-partner-api-40.0.0.jar",
    "force-wsc-40.0.0.jar",
    "salesforce-wave-api-1.0.9.jar",
    "spark-salesforce_2.11-1.1.1.jar",
]

# $ wget https://repo1.maven.org/maven2/com/fasterxml/jackson/dataformat/jackson-dataformat-xml/2.10.3/jackson-dataformat-xml-2.10.3.jar
# $ wget https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.10.3/jackson-core-2.10.3.jar

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
%%configure -f
{
    "conf": {
        "spark.jars.packages": "com.force.api:force-partner-api:40.0.0,com.force.api:force-wsc:40.0.0,com.springml:salesforce-wave-api:1.0.9,com.springml:spark-salesforce_2.11:1.1.1,com.fasterxml.jackson.core:jackson-core:2.10.3,com.fasterxml.jackson.dataformat:jackson-dataformat-xml:2.10.3"
    }
}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,,pyspark,idle,,,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,,pyspark,idle,,,,✔


In [38]:
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

from typing import List, OrderedDict
from simple_salesforce import Salesforce, SFType

#####################
# --- Constants --- #
#####################

username = "some email"
password = "some password"

# get this here: https://docs.idalko.com/exalate/display/ED/Salesforce%3A+How+to+generate+a+security+token
security_token = "some token"

password_with_token = password + security_token


############################
# --- Helper Functions --- #
############################

def fetch_all_salesforce_object_fields(username: str, password: str, security_token: str, obj_name: str):
    sf = Salesforce(
        username=username,
        password=password,
        security_token=security_token
    )

    sf_obj: SFType = getattr(sf, obj_name)
    obj_describe_result: OrderedDict = sf_obj.describe()
    field_odicts: List[OrderedDict] = obj_describe_result["fields"]
    fields: List[str] = list(field["name"] for field in field_odicts)

    # print(sf_fields_dict["fields"])

    return fields

def make_select_star_soql_stmt(obj_name: str, fields: List[str]) -> str:
    field_selector_stmt = ", ".join(fields)
    return f"SELECT {field_selector_stmt} FROM {obj_name}"

def create_select_star_soql_stmt(username: str, password: str, security_token: str, obj_name: str) -> str:
    fields: List[str] = fetch_all_salesforce_object_fields(
        username=username,
        password=password,
        security_token=security_token,
        obj_name=obj_name,
    )
    stmt: str = make_select_star_soql_stmt(
        fields=fields,
        obj_name=obj_name,
    )
    return stmt


#####################
# --- Spark Job --- #
#####################

OBJECT_NAME = "opportunity"

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)


select_star_soql_stmt = create_select_star_soql_stmt(
    username=username,
    password=password,
    security_token=security_token,
    obj_name=OBJECT_NAME,
)

select_star_soql_stmt = """
SELECT Id, IsDeleted, AccountId, IsPrivate, Name, Description, StageName, Amount, Probability, ExpectedRevenue, TotalOpportunityQuantity, CloseDate, Type, NextStep, LeadSource, IsClosed, IsWon, ForecastCategory, ForecastCategoryName, CampaignId, HasOpportunityLineItem, Pricebook2Id, OwnerId, CreatedDate, CreatedById, LastModifiedDate, LastModifiedById, SystemModstamp, LastActivityDate, LastStageChangeDate, Fiscal, ContactId, LastViewedDate, LastReferencedDate, HasOpenActivity, HasOverdueTask, LastAmountChangedHistoryId, LastCloseDateChangedHistoryId, DeliveryInstallationStatus__c, TrackingNumber__c, OrderNumber__c, CurrentGenerators__c, MainCompetitors__c FROM opportunity
"""

df = (
    spark
        .read
        .format("com.springml.spark.salesforce")
        .option("username", username)
        .option("password", password_with_token)
        .option("soql", select_star_soql_stmt)
        .option("bulk", True)
        # Opportunity.LastStageChangeDate is only availaable in API v52
        .option("version", 52)
        .option("sfObject", OBJECT_NAME)
        .load()
)

print(df.show())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------+---------+------------------+---------+--------------------+-----------+--------------------+--------+-----------+---------------+------------------------+----------+--------------------+--------+-----------------+--------+-----+----------------+--------------------+----------+----------------------+------------+------------------+--------------------+------------------+--------------------+------------------+--------------------+----------------+-------------------+------+---------+--------------+------------------+---------------+--------------+--------------------------+-----------------------------+-----------------------------+-----------------+--------------+--------------------+--------------------+
|                Id|IsDeleted|         AccountId|IsPrivate|                Name|Description|           StageName|  Amount|Probability|ExpectedRevenue|TotalOpportunityQuantity| CloseDate|                Type|NextStep|       LeadSource|IsClosed|IsWon|ForecastCategor