In [1]:
#Imports
from pyspark.sql.functions import col, udf, unix_timestamp, year, month, dayofmonth
from pyspark.sql.types import DateType, StringType
from pyspark.sql.functions import lpad
from pyspark.sql.functions import concat, lit 
from pyspark.sql.functions import regexp_replace, col
import pyspark.sql.functions as f

#url for account and Key
spark.conf.set("")

#Accessing file from datalake in azure storage 
dbutils.fs.ls('')

### Authorization and Configuration Setup

In [3]:
configs = {"",
           "",
           "",
           "",
           ""}

In [4]:
dbutils.fs.ls('/mnt/comm-report/')

### Reading in the data from blob storage so we can work with it

In [6]:
df = spark.read.format('csv').options(header = 'true', inferSchema = 'true', delimiter = ',').load('/mnt/comm/comm-report/')

## connecting to our SQL database

In [8]:
jdbcHostname = ""
jdbcDatabase = ""
jdbcPort = int


jdbcUrl = ''.format(jdbcHostname, jdbcPort, jdbcDatabase)
connectionProperties = {
  'user' : "",
  'password' : "",
  'driver' : ''
}

## reading in item master data from sql

In [10]:
pushDown_query = """(select* from Item) as s"""

## creating our item master data df along with standardizing columns

In [12]:

gimdf = spark.read.jdbc(url=jdbcUrl, table=pushDown_query, properties=connectionProperties)

for c in gimdf.columns:
  gimdf = gimdf.withColumnRenamed(c, c.replace(' ','').lower())
gimdf = gimdf.select('catalognumber', 'normalizedcatalognumber')

## Replacing unnesssary characters to match normailzed item numbers in our master data

In [14]:
df = df.select("*", f.regexp_replace(f.col("ITEM_NUMBER"), "[-,./]", "").alias("replaced")).drop('ITEM_NUMBER')

In [15]:
df = df.select('*', f.regexp_replace('replaced', r'^*', '').alias('ITEM_NUMBER'))

## joining on normalized item number

In [17]:
newdf = df.join(gimdf, df.ITEM_NUMBER == gimdf.normalizedcatalognumber, how='inner' )

In [18]:
df = newdf.drop('ITEM_NUMBER')
df = df.withColumnRenamed('catalognumber', 'Item_number')

## creating a data column that is replaced by other columns if empty

In [20]:
from pyspark.sql.functions import coalesce
df = df.withColumn('MANUFACTURING_DATE', coalesce('MANUFACTURING_DATE', 'PO_RECEIPT_DATE'))
df = df.withColumn('MANUFACTURING_DATE', coalesce('MANUFACTURING_DATE', 'INTRANSIT_RECEIPT_DATE'))

### Selecting what we need to work with from the database

In [22]:
df2 = df.select('Item_number', 'SERIAL_NUMBER', 'LOT_NUMBER', 'INTRANSIT_RECEIPT_DATE', 'LOT_EXPIRATION_DATE', 'MANUFACTURING_DATE', 'INTRANSIT_RECEIPT_DATE', 'PO_RECEIPT_DATE', 'CUSTOMER_NUMBER', 'ORGANIZATION_ID') 

### adding in lot_serial column and filling it with lot_number

In [24]:
df3 = df2.withColumn('LOT_SERIAL', df2.LOT_NUMBER)

### Filling in the lot_serial column with serial_number when there is not lot_number to place so if there is an na

In [26]:
df4 =df3.withColumn('LOT_SERIAL', coalesce('LOT_SERIAL', 'SERIAL_NUMBER'))

### dropping lot_number and serial_number columns

In [28]:
df4 = df4.drop('LOT_NUMBER', 'SERIAL_NUMBER')

### selecting our dataframe again

In [30]:
df4 = df4.select('Item_number', 'LOT_SERIAL', 'INTRANSIT_RECEIPT_DATE', 'PO_RECEIPT_DATE', 'MANUFACTURING_DATE', 'LOT_EXPIRATION_DATE', 'CUSTOMER_NUMBER', 'ORGANIZATION_ID')

### replacing the date of columns that were strings as new columns that are the in Date format

In [32]:
df5= df4.withColumn('DateOfManufacture', df4['MANUFACTURING_DATE'].cast(DateType())).drop('MANUFACTURING_DATE')
df6= df5.withColumn('DateOfLotExpiration', df5['LOT_EXPIRATION_DATE'].cast(DateType())).drop('LOT_EXPIRATION_DATE')

### creates 3 new fields for each value of the date in the manufacting and expiration date columns and puts the specificed part of the date in the 6 columns we created

In [34]:
df7 = (df6.select(year('DateOfManufacture').alias('ManufacturerYear'),month('DateOfManufacture').alias('ManufactureMonth'),dayofmonth('DateOfManufacture').alias('ManufactureDay'), 'Item_number',  'LOT_SERIAL', 'INTRANSIT_RECEIPT_DATE', 'PO_RECEIPT_DATE','DateOfLotExpiration', 'CUSTOMER_NUMBER', 'ORGANIZATION_ID'))
           

df7 =(df7.select(year('DateOfLotExpiration').alias('ExpirationYear'),month('DateOfLotExpiration').alias('ExpirationMonth'),dayofmonth('DateOfLotExpiration').alias('ExpirationDay'), 'Item_number', 'LOT_SERIAL', 'INTRANSIT_RECEIPT_DATE', 'PO_RECEIPT_DATE', 'ManufacturerYear', 'ManufactureMonth', 'ManufactureDay', 'CUSTOMER_NUMBER', 'ORGANIZATION_ID'))

###  This places in the 0s in our to fill in the day and month colums so 1 reads 01.

In [36]:
df7a = df7.select('Item_number', 'LOT_SERIAL', 'INTRANSIT_RECEIPT_DATE', 'PO_RECEIPT_DATE', 'ManufacturerYear', 'ExpirationYear', 'CUSTOMER_NUMBER', 'ORGANIZATION_ID', lpad(df7.ExpirationMonth, 2, '0').alias('ExpirationMonth'), (lpad(df7.ExpirationDay, 2, '0').alias('ExpirationDay')),(lpad(df7.ManufactureMonth, 2, '0').alias('ManufactureMonth')), (lpad(df7.ManufactureDay, 2, '0').alias('ManufactureDay')))

### concats the 6 fields into two columns that read the manufacture and expiration date

In [38]:
df8 = (df7a.select('*')
      .withColumn('Lot_expiration', concat(col('ExpirationDay'), lit('/'), col('ExpirationMonth'), lit('/'), col('ExpirationYear')))
      .withColumn('Date_of_manufacture', concat(col('ManufactureDay'), lit('/'), col('ManufactureMonth'), lit('/'), col('ManufacturerYear')))
      .drop('ManufacturerYear', 'ManufactureMonth','ManufactureDay', 'ExpirationDay', 'ExpirationMonth', 'ExpirationYear')
      )

### makes nas become empty strings

In [40]:
df9 = df8.na.fill({'Lot_expiration': ''},{'Date_of_manufacture': ''})
df10 = df9.na.fill({'Date_of_manufacture': ''})
df10 = df10.na.fill({'INTRANSIT_RECEIPT_DATE': ''})

### selects entire dataframe

In [42]:
df11 = df10.select('Item_number', 'LOT_SERIAL', 'Lot_expiration', 'INTRANSIT_RECEIPT_DATE', 'PO_RECEIPT_DATE', 'Date_of_manufacture', 'CUSTOMER_NUMBER', 'ORGANIZATION_ID')

###  makes organization_ID a string instead of integer and changes name to ORG_ID

In [44]:
df11 = df11.withColumn('ORG_ID',df11['ORGANIZATION_ID'].cast(StringType())).drop('ORGANIZATION_ID')

### adds in three columns as strings to make data formatted correctly so it can work with in sql

In [46]:
df12 = (df11.withColumn('M', lit('1')).withColumn('O', lit('2')).withColumn('R', lit('3')))

### selects dataframe

In [48]:
df13 = df12.select('Item_number', 'LOT_SERIAL', 'Lot_expiration', 'INTRANSIT_RECEIPT_DATE', 'Date_of_manufacture', 'M', 'ORG_ID')

###  connecting back to our database

In [50]:
jdbcHostname = ""
jdbcDatabase = ""
jdbcPort = int

 

jdbcUsername = dbutils.secrets.get(scope ='',  key = '')
jdbcPassword = dbutils.secrets.get(scope ='',  key = '')

 

jdbcUrl = ''.format(jdbcHostname, jdbcPort, jdbcDatabase)
connectionProperties = {
  'user' : ,
  'password' : ,
  'driver' : ''
}

###  writing this as an sql table to our database

In [52]:
df13.write.jdbc(url= jdbcUrl, table= 'comm_org_report', mode= 'append', properties= connectionProperties)