### Imports

In [0]:
import pandas as pd
import numpy as np
import urllib3
from urllib3 import request
import certifi
import json
from confluent_kafka import Producer, Consumer, KafkaError, KafkaException
from confluent_kafka.admin import AdminClient, NewTopic
from time import sleep
import uuid

### Setting up the Producer

In [0]:
def error_cb(err):
    """ The error callback is used for generic client errors. These
        errors are generally to be considered informational as the client will
        automatically try to recover from all errors, and no extra action
        is typically required by the application.
        For this example however, we terminate the application if the client
        is unable to connect to any broker (_ALL_BROKERS_DOWN) and on
        authentication errors (_AUTHENTICATION). """

    print("Client error: {}".format(err))
    if err.code() == KafkaError._ALL_BROKERS_DOWN or \
       err.code() == KafkaError._AUTHENTICATION:
        # Any exception raised from this callback will be re-raised from the
        # triggering flush() or poll() call.
        raise KafkaException(err)


def acked(err, msg):
    """ 
        Error callback is used for generic issues for producer errors. 
        
        Parameters:
            err (err): Error flag.
            msg (str): Error message that was part of the callback.
    """
    if err is not None:
        print("Failed to deliver message: %s: %s" % (str(msg), str(err)))
    else:
        print("Message produced: %s" % (str(msg)))

In [0]:
#KAFKA variables
confluentClusterName = "stage3talent"
confluentBootstrapServers = "pkc-ldvmy.centralus.azure.confluent.cloud:9092"
confluentTopicName = "commodities-production"
schemaRegistryUrl = "https://psrc-gq7pv.westus2.azure.confluent.cloud"
confluentApiKey = "YHMHG7E54LJA55XZ"
confluentSecret = "/XYn+w3gHGMqpe9l0TWvA9FznMYNln2STI+dytyPqtZ9QktH0TbGXUqepEsJ/nR0"
confluentRegistryApiKey = "YHMHG7E54LJA55XZ"
confluentRegistrySecret = "/XYn+w3gHGMqpe9l0TWvA9FznMYNln2STI+dytyPqtZ9QktH0TbGXUqepEsJ/nR0"

In [0]:
storageAccount = "gen10datafund2111"
storageContainer = "gold-standard"
clientSecret = "~bJ7Q~KslVT~sAmHkOLXL0oeTp1ZkAcndtHPr"
clientid = "2ca50102-5717-4373-b796-39d06568588d"
mount_point = "/mnt/gold-standard/"

In [0]:
configs = {"fs.azure.account.auth.type": "OAuth",
       "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
       "fs.azure.account.oauth2.client.id": clientid,
       "fs.azure.account.oauth2.client.secret": clientSecret,
       "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/d46b54b2-a652-420b-aa5a-2ef7f8fc706e/oauth2/token",
       "fs.azure.createRemoteFileSystemDuringInitialization": "true"}

try: 
    dbutils.fs.unmount(mount_point)
except:
    pass

dbutils.fs.mount(
source = "abfss://"+storageContainer+"@"+storageAccount+".dfs.core.windows.net/",
mount_point = mount_point,
extra_configs = configs)

### Getting Data from API and Storing it

In [0]:
http = urllib3.PoolManager(cert_reqs='CERT_REQUIRED',ca_certs=certifi.where()) #Used for getting requests from API

In [0]:
#FUNCTION FOR GETTING DATA FROM API, WILL BE RUN EVERY 1-2 HOURS TO GET UPDATES IN NEXT CELL
def getData(): #Data frame being passed in is DFrecent
    apikey = 'ca9eq0h2kkk92ch2e086sg7lw19jgaskc8e287u7xt5v3hcyhsc018qg3l9q'
    current = f'https://www.commodities-api.com/api/latest?access_key={apikey}&symbols=CORN%2CWHEAT%2CSOYBEAN%2CXAU%2CCNY%2CEUR%2CRUB%2CBRL%2CINR%2C' #Link to API data with symbols being teh data we want
    Rcurrent = http.request('GET', current) #Gets data using http variabel we created above and the link. Data is already in JSON fromat on API
    currentData = json.loads(Rcurrent.data.decode('utf-8')) #Loads data as a dict
    DFcurrent = pd.json_normalize(currentData['data']) #Converts data from dict to pandas dataframe
    DFcurrent.drop(columns = ['success', 'base', 'unit', 'rates.USD', 'date'], axis=1, inplace=True) #drops unecessary columns
    DFcurrent = DFcurrent.rename(columns={'timestamp': 'Timestamp (Unix)', 'rates.CORN': "Corn", 'rates.SOYBEAN': 'Soybean', 'rates.WHEAT': 'Wheat', 'rates.XAU': 'Gold', 
                                          'rates.CNY': 'China', 'rates.EUR': 'EU', 'rates.RUB': 'Russia', 'rates.BRL': 'Brazil', 'rates.INR': 'India'}) #Renames columns to match data frame already built
    DFcurrent['Timestamp (Day Time(cst))'] = pd.to_datetime(DFcurrent['Timestamp (Unix)'] - 21600, unit = 's', origin='unix') #Converts UNIX time to YYYY-MM-DD format in new column to match data frame 
    for i in range(len(DFcurrent)): #Corn number is off by factor 100 (researched this)
        if DFcurrent['Corn'][i] < 0.01:
            DFcurrent['Corn'][i] = ((DFcurrent['Corn'][i]) * 100)
    
    return DFcurrent 

In [0]:
#CALL FUNCTION FOR OUR DATA FRAME TO UPDATE IT (ONLY RUN WHEN YOU WANT TO UPDATE DATA FRAME!!!!!!)
commodities_df = getData()

### Transforming Data

In [0]:
cdf = commodities_df.copy() #copy df
cdf['Corn'] = 1/cdf['Corn'] #amount per bushel of corn in USD
cdf['Wheat'] = ((1/cdf['Wheat'])/2204.62)*60 #amount per bushel of wheat in USD
cdf['Soybean'] = 1/cdf['Soybean'] #amount per bushel of soybean in USD
cdf['Gold'] = 1/cdf['Gold'] #amount per troy ounce of gold in USD
cdf['Gold'] = cdf['Gold']/31.1035 #convert to amount per gram of gold in USD
#print updated row


Unnamed: 0,Timestamp (Unix),Brazil,China,Corn,EU,India,Russia,Soybean,Wheat,Gold,Timestamp (Day Time(cst))
0,1644533040,5.252516,6.357785,6.411345,0.874979,75.628199,75.02021,15.715228,7.102271,58.096329,2022-02-10 16:44:00


### Convert Pandas Data Frame to Spark Data Frame

In [0]:
commodities_sparkdf = spark.createDataFrame(cdf)


### Implementing Producer

In [0]:
# set up AdminClient to be able to create a topic
admin_client = AdminClient({
    'bootstrap.servers': confluentBootstrapServers,
    'sasl.mechanism': 'PLAIN',
    'security.protocol': 'SASL_SSL',
    'sasl.username': confluentApiKey,
    'sasl.password': confluentSecret,
    'group.id': str(uuid.uuid1()),  # this will create a new consumer group on each invocation.
    'auto.offset.reset': 'earliest',
    'error_cb': error_cb,
})

# use AdminClient to create topic
futures = admin_client.create_topics([NewTopic(confluentTopicName, 1, 3)])

# create kafka producer, p
p = Producer({
    'bootstrap.servers': confluentBootstrapServers,
    'sasl.mechanism': 'PLAIN',
    'security.protocol': 'SASL_SSL',
    'sasl.username': confluentApiKey,
    'sasl.password': confluentSecret,
    'group.id': str(uuid.uuid1()),  # this will create a new consumer group on each invocation.
    'auto.offset.reset': 'earliest',
    'error_cb': error_cb,
})

In [0]:
from time import time
aDict = {}
for i in range(0, commodities_sparkdf.count()):
    commodities_row = commodities_sparkdf.collect()[i].asDict() #goes trhough df per row
    print(commodities_row) #prints the row
    p.produce(confluentTopicName, json.dumps(commodities_row, default=str)) #produces the row 
    p.flush()
    print('Success')

### Updating table in SQL server

In [0]:
database = "gold-standard-DB" #database in SQL server
table = "dbo.commodities_api" #table within database
user = "goldstandard" #username
password  = "G.S.1983!" #password
server = "gen10-data-fundamentals-21-11-sql-server.database.windows.net" #server name

commodities_sparkdf.write.format('jdbc').option("url", f"jdbc:sqlserver://{server}:1433;databaseName={database};") \ #updates table in SQL server with the data that was produced
    .mode("append") \
    .option("dbtable", table) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .save()