## Reading libraries

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, trim, regexp_replace, sum, lit, max, min, first,to_timestamp, trim, upper,to_date, date_format, when
from pyspark.sql.types import DateType

from pyspark.sql.types import IntegerType, FloatType

In [2]:
spark = SparkSession.builder.appName("read_csv").getOrCreate()

In [3]:
spark

# helper functions

In [4]:
# show and remove duplicate row
def remove_dup_rows(df, show= True):
    df_grou = df.groupby(*df.columns).count()
    dup = df_grou.filter(col("count")>1)
    if show == True:
        print(dup.show())
    df = df.drop_duplicates()
    return df

In [5]:
from pyspark.sql.functions import udf
from pyspark.sql.types import DateType
from datetime import datetime

# Define a UDF to parse the date string
@udf(DateType())
def parse_date(date_str):
    try:
        return datetime.strptime(date_str, "%m/%d/%Y").date()
    except ValueError:
        return None  # Handle invalid date strings as needed


In [6]:
def change_dtype(dataframe, column_name,change_to):
    return dataframe.withColumn(column_name, col(column_name).cast(change_to))

In [7]:
def remove_illegal_char(dataframe,column_name):
    illegal_character = ['Can','\$',',',"\(","\)"]
    for character in illegal_character:
        dataframe = dataframe.withColumn(column_name, regexp_replace(col(column_name), character, ''))
#                              .withColumn('Open Balance', regexp_replace(col('Open Balance'), 'Can', ''))
    return dataframe

In [8]:
def trim_col(dataframe, column_name):
    return dataframe.withColumn(column_name, trim(col(column_name)))

## Reading CSV

In [9]:
sales_report_kmpg_july=spark.read.options(inferSchema='True').csv('csv_converted\saleReprtKmpg_july - SalesReportKPMGAuditResults.csv', header=True, inferSchema=True, sep=',')

## Cleaning and Transforming

In [10]:
len(sales_report_kmpg_july.columns)

16

In [11]:
sales_report_kmpg_july.columns

['Transaction ID',
 'Document Number',
 'Transaction Type',
 'Project ID',
 'Customer',
 'Terms',
 'Currency',
 'Gross Amount',
 'Payment',
 'Open Balance',
 'Transaction Create Date',
 'Document Date',
 'Due Date',
 'Days Open',
 'Service Dates',
 'Payment Receipt Date']

In [12]:
object_columns = ['Transaction ID','Document Number','Transaction Type', 'Project ID','Customer','Terms','Currency']
numeric_columns = ['Gross Amount','Payment','Open Balance','Days Open']
date_columns = ['Transaction Create Date','Document Date','Due Date','Service Dates','Payment Receipt Date']

### Trim column

In [13]:
# sales_report_kmpg_july.show()

In [14]:
sales_report_kmpg_july = trim_col(sales_report_kmpg_july,column_name='Open Balance')
for column in sales_report_kmpg_july.columns:
    sales_report_kmpg_july = trim_col(sales_report_kmpg_july,column_name=column)
    

### Remove illegal characters from numeric columns

In [15]:
# sales_report_kmpg_july = remove_illegal_char(sales_report_kmpg_july,column_name='Open Balance')
for column in numeric_columns:
#     print(column)
    sales_report_kmpg_july = remove_illegal_char(sales_report_kmpg_july,column_name=column)
#     print(sales_report_kmpg_july.show(1))
    

### Change datatype

In [16]:
# sales_report_kmpg_july = sales_report_kmpg_july.withColumn('Open Balance', col('Open Balance').cast('float'))
# for column in date_columns:
#     sales_report_kmpg_july = sales_report_kmpg_july.withColumn(column, col(column).cast(to_timestamp(column)))
    

In [17]:
sales_report_kmpg_july.dtypes

[('Transaction ID', 'string'),
 ('Document Number', 'string'),
 ('Transaction Type', 'string'),
 ('Project ID', 'string'),
 ('Customer', 'string'),
 ('Terms', 'string'),
 ('Currency', 'string'),
 ('Gross Amount', 'string'),
 ('Payment', 'string'),
 ('Open Balance', 'string'),
 ('Transaction Create Date', 'string'),
 ('Document Date', 'string'),
 ('Due Date', 'string'),
 ('Days Open', 'string'),
 ('Service Dates', 'string'),
 ('Payment Receipt Date', 'string')]

In [18]:
sales_report_kmpg_july.show(1)

+--------------+---------------+----------------+----------+--------------------+-----+--------+------------+-------+------------+-----------------------+-------------+--------+---------+--------------------+--------------------+
|Transaction ID|Document Number|Transaction Type|Project ID|            Customer|Terms|Currency|Gross Amount|Payment|Open Balance|Transaction Create Date|Document Date|Due Date|Days Open|       Service Dates|Payment Receipt Date|
+--------------+---------------+----------------+----------+--------------------+-----+--------+------------+-------+------------+-----------------------+-------------+--------+---------+--------------------+--------------------+
|       3190973|      CUMC15527|     Credit Memo|   1277343|CUMC : Columbia U...| null|     USA|      145.83| 145.83|        0.00|          7/31/23 11:02|    7/31/2023|    null|        0|07/01/2023 to 07/...|           7/30/2023|
+--------------+---------------+----------------+----------+--------------------

## Operation 1:
- 3 lines
    - 1. Open Balance != 0----------------------output ---> temp_df , Sum_Open_Balance
    - 2. Tranasaction Type  == "Credit Memo"----output ----> process_df_result
    - 3. Tranasaction Type  != "Credit Memo"----output-----> temp_df_process_3

### 1. Open Balance != 0 
    - filtering the recored `Open Balance !=0`
    - Adding Column `Outstanding = "Open"`
    - Sum of `Open Balance`

In [19]:
sales_report_kmpg_july.filter(sales_report_kmpg_july['Open Balance'] != '0.00').count()

3948

In [20]:
temp_df = sales_report_kmpg_july.filter(sales_report_kmpg_july['Open Balance'] != '0.00')

In [21]:
print("type of df",type(temp_df), "no of records",temp_df.count())

type of df <class 'pyspark.sql.dataframe.DataFrame'> no of records 3948


#### adding column Outstanding = open

In [22]:
# df_spark.withColumn('year after 2', df_spark['year']+1000).show() refrence
temp_df = temp_df.withColumn('Outstanding', lit('Open'))
temp_df = temp_df.withColumn('Applying Link Amount', temp_df["Open Balance"])
temp_df = temp_df.withColumn("FileName", lit("saleReprtKmpg_july"))

In [23]:
temp_df = change_dtype(temp_df, column_name='Open Balance',change_to=FloatType())

In [24]:
Sum_Open_Balance = temp_df.select(sum(col('Open Balance'))).collect()[0][0]
Sum_Open_Balance

29295611.528552473

In [25]:
sales_report_kmpg_july.filter(sales_report_kmpg_july['Open Balance'] != '0.00').select(sum(col('Open Balance')))

DataFrame[sum(Open Balance): double]

#### Output
Sum_Open_Balance, temp_df

In [26]:
print(temp_df.count(),Sum_Open_Balance)

3948 29295611.528552473


In [27]:
len(temp_df.columns)

19

### 2. Tranasaction Type  = "Credit Memo"

In [28]:
temp_df_process_2=sales_report_kmpg_july.filter(sales_report_kmpg_july['Transaction Type'] == 'Credit Memo')
print("Input Parameters: ", temp_df_process_2.count())

Input Parameters:  15459


In [29]:
temp_df_process_2 = temp_df_process_2.withColumn('Gross Amount', col('Gross Amount').cast('float'))
temp_df_process_2 = temp_df_process_2.withColumn('Payment', col('Payment').cast('float'))
temp_df_process_2 = temp_df_process_2.withColumn('Days Open', col('Days Open').cast('float'))

In [30]:
groupby_df_process_2 =temp_df_process_2.groupBy(['Transaction ID',"Document Number"])

In [31]:
temp_df_process_2.dtypes

[('Transaction ID', 'string'),
 ('Document Number', 'string'),
 ('Transaction Type', 'string'),
 ('Project ID', 'string'),
 ('Customer', 'string'),
 ('Terms', 'string'),
 ('Currency', 'string'),
 ('Gross Amount', 'float'),
 ('Payment', 'float'),
 ('Open Balance', 'string'),
 ('Transaction Create Date', 'string'),
 ('Document Date', 'string'),
 ('Due Date', 'string'),
 ('Days Open', 'float'),
 ('Service Dates', 'string'),
 ('Payment Receipt Date', 'string')]

In [32]:
#     "Transaction Type": first(col("Transaction Type")),  # Pick the first occurrence of "Payment Type"
#     "Project ID": max(col("Profit"))  # Calculate the maximum profit
#     "Customer": first(col("Customer")),
#     "Terms": first(col("Terms")),
#     "Currency": first(col("Currency")),
#     "Gross Amount": min(col("Gross Amount")),
#     "Payment": max(col("Payment")),
#     "Open Balance": max(col("Open Balance")),
#     "Transaction Create Date": max(col("Transaction Create Date")),
#     "Document Date": max(col("Document Date")),
#     "Due Date": max(col("Due Date")),
#     "Days Open": max(col("Days Open")),
#     "Service Dates": max(col("Service Dates")),
#     "Payment Receipt Date": col("Payment Receipt Date")

In [33]:
agg_exprs = {
    "Transaction Type": 'first',  # Pick the first occurrence of "Payment Type"
    "Project ID": 'first',  # Calculate the maximum profit
    "Customer": 'first',
    "Terms": 'first',
    "Currency": 'first',
    "Gross Amount": 'min',
    "Payment": 'max',
    "Open Balance": 'max',
    "Transaction Create Date": 'max',
    "Document Date": 'max',
    "Due Date": 'max',
    "Days Open": 'max',
    "Service Dates": 'max',
    "Payment Receipt Date": 'first'}


In [34]:
type(first(col("Transaction Type")))

pyspark.sql.column.Column

In [35]:
# groupby_df_process_2.agg(agg_exprs).count()

In [36]:
process_df_result = groupby_df_process_2.agg(agg_exprs)

In [37]:
# column got renamed need to look for other way, there is one commented above but causing some error
for column in process_df_result.columns:
#     print(column)
    new_column = column
    remove_list = ["max","min","(",")","first"]
    for char in remove_list:
#         print(char)
        if char in new_column:
            new_column=new_column.replace(char,"")
    process_df_result=process_df_result.withColumnRenamed(column,new_column)
    
    

In [38]:
# process_df_result.columns

In [39]:
process_df_result = process_df_result.withColumn('Outstanding', lit('Cleared'))
process_df_result=process_df_result.withColumn('Applying Link Amount', -col("Payment"))

In [40]:
process_df_result.select(["Transaction ID", "Outstanding","Applying Link Amount"]).show()

+--------------+-----------+--------------------+
|Transaction ID|Outstanding|Applying Link Amount|
+--------------+-----------+--------------------+
|       3191846|    Cleared|              -84.87|
|       3196377|    Cleared|              -243.0|
|       3152869|    Cleared|            -1394.96|
|       3136755|    Cleared|                -0.0|
|       3117490|    Cleared|             -111.29|
|       3119866|    Cleared|            -1397.43|
|       3051990|    Cleared|             -525.98|
|       3034098|    Cleared|              -19.48|
|       2979232|    Cleared|              -119.6|
|       2889570|    Cleared|            -2857.77|
|       2840729|    Cleared|             -1000.0|
|       3195531|    Cleared|                -0.0|
|       3198467|    Cleared|             -446.38|
|       3154622|    Cleared|                -0.0|
|       3114388|    Cleared|               -0.08|
|       3049865|    Cleared|                -0.0|
|       3050589|    Cleared|                -0.0|


In [41]:
print("Output Result :", process_df_result.count())

Output Result : 14286


In [42]:
len(process_df_result.columns)

18

### Output : process_df_result

## 3. Tranasaction Type  != "Credit Memo"

In [43]:
sales_report_kmpg_july.count()

382148

In [44]:
sales_report_kmpg_july.select("Transaction Type").distinct().show()

+----------------+
|Transaction Type|
+----------------+
|     Credit Memo|
|         Invoice|
|            null|
+----------------+



In [45]:
temp_df_process_3=sales_report_kmpg_july.filter(sales_report_kmpg_july['Transaction Type'] != 'Credit Memo')
print("Input :", temp_df_process_3.count())

Input : 366688


In [46]:
len(temp_df_process_3.columns)

16

In [47]:
temp_df_process_3 = temp_df_process_3.withColumn('Customer', upper(trim(col('Customer'))))

In [48]:
temp_df_process_3.select('Customer').count()

366688

In [49]:
temp_df_process_3.filter(col('Transaction Type').like('% %')).select('Transaction Type')

DataFrame[Transaction Type: string]

In [50]:
temp_df_process_3 = temp_df_process_3.withColumn("FileName", lit("saleReprtKmpg_july"))

In [51]:
temp_df_process_3.count()

366688

In [52]:
len(temp_df_process_3.columns)

17

In [53]:
temp_df_process_3.columns

['Transaction ID',
 'Document Number',
 'Transaction Type',
 'Project ID',
 'Customer',
 'Terms',
 'Currency',
 'Gross Amount',
 'Payment',
 'Open Balance',
 'Transaction Create Date',
 'Document Date',
 'Due Date',
 'Days Open',
 'Service Dates',
 'Payment Receipt Date',
 'FileName']

# customer_payment_history 
customer_payment_history =customer_payment_history.withColumn('Date', date_format(to_date(customer_payment_history['Date'],'M/d/yyyy'),'yyyy-MM-dd' ))

In [54]:
# customer_payment_history.filter(col("payment Date").isNull()).count()
# customer_payment_history.filter(col("Payment Date").rlike(r"^\d{1,2}/\d{1,2}/\d{4}$")).count()
# customer_payment_history.filter(col('Payment Date').like('%1/1/1900%')).select('Payment Date').show()
# customer_payment_history =customer_payment_history.withColumn('Date', to_date(customer_payment_history['Date'],'M/d/yyyy'))
# customer_payment_history =customer_payment_history.withColumn('Date', date_format(to_date(customer_payment_history['Date'],'M/d/yyyy'),'yyyy-MM-dd' ))
# customer_payment_history =customer_payment_history.withColumn('Date', date_format(customer_payment_history['Date'], 'MM/dd/yyyy'))
# customer_payment_history =customer_payment_history.withColumn('Date', to_date(customer_payment_history['Date'],'yyyy-MM-dd' ))
# customer_payment_history = customer_payment_history.fillna('1/1/1900',subset=['Payment Date'])
# customer_payment_history =customer_payment_history.withColumn('Payment Date', to_date(date_format(customer_payment_history['Payment Date'],'yyyy-MM-dd'),'M/d/yyyy' ))
# customer_payment_history = customer_payment_history.withColumn('Document Number', upper(trim(col('Document Number'))))

In [55]:
customer_payment_history = spark.read.options(inferSchema='True').csv('csv_converted\Customer_Payment_history_july.csv', header=True, inferSchema=True, sep=',')

In [56]:
len(customer_payment_history.columns)

47

In [57]:
customer_payment_history=customer_payment_history.withColumnRenamed('Currency0','Currency')
customer_payment_history=customer_payment_history.withColumnRenamed('Currency24','Currency2')
customer_payment_history=customer_payment_history.withColumnRenamed('Approved for Email10','Approved for Email')
customer_payment_history=customer_payment_history.withColumnRenamed('Approved for Email44','Approved for Email')

In [58]:
customer_payment_history =customer_payment_history.withColumn('Date', to_date(customer_payment_history['Date'],'M/d/yyyy'))

In [59]:
customer_payment_history =customer_payment_history.withColumn('Payment Date', to_date(customer_payment_history['Payment Date'],'M/d/yyyy'))

In [60]:
customer_payment_history= customer_payment_history.withColumnRenamed("Payment Date", "Payment Date2")

In [61]:
customer_payment_history= customer_payment_history.withColumn("Payment Date", customer_payment_history["Date"])

In [62]:
customer_payment_history.filter((customer_payment_history['Payment Date'] >= "2018-10-01") & (customer_payment_history['Payment Date'] <= "2023-07-31")).count()

315676

In [63]:
len(customer_payment_history.columns)

48

In [65]:
# customer_payment_history.dtypes

# Inner join output of 2 and  customer_payment_history

In [66]:
temp_df_process_3.dtypes

[('Transaction ID', 'string'),
 ('Document Number', 'string'),
 ('Transaction Type', 'string'),
 ('Project ID', 'string'),
 ('Customer', 'string'),
 ('Terms', 'string'),
 ('Currency', 'string'),
 ('Gross Amount', 'string'),
 ('Payment', 'string'),
 ('Open Balance', 'string'),
 ('Transaction Create Date', 'string'),
 ('Document Date', 'string'),
 ('Due Date', 'string'),
 ('Days Open', 'string'),
 ('Service Dates', 'string'),
 ('Payment Receipt Date', 'string'),
 ('FileName', 'string')]

In [67]:
print(len(temp_df_process_3.columns), len(customer_payment_history.columns))

17 48


In [68]:
df_temp_1 =  customer_payment_history.withColumnRenamed("Currency","temp")
df_temp_1 = df_temp_1.withColumnRenamed("Approved for Email","temp2")
joined_df_2_payment_history = temp_df_process_3.join(df_temp_1, on='Document Number', how='inner')

In [69]:
joined_df_2_payment_history = joined_df_2_payment_history.withColumn("Outstanding", lit("Cleared"))

In [70]:
len(joined_df_2_payment_history.columns)

65

In [71]:
# joined_df_2_payment_history.select("Open Balance").tail(10)

In [72]:
# temp= dict()
# for i in customer_payment_history.columns:
#     if i not in temp:
#         temp[i]=1
#     else:
#         temp[i]+=1

In [73]:
# for i in temp:
#     if temp[i]>=2:
#         print(i , temp[i])

# Union

In [74]:
temp_df_copy = temp_df.select("*")
joined_df_2_payment_history_copy = joined_df_2_payment_history.select("*")
process_df_result_copy = process_df_result.select("*")


In [75]:
print("Rows : ",process_df_result.count(),joined_df_2_payment_history.count(), temp_df_copy.count())
print("columns : ",len(process_df_result_copy.columns), len(joined_df_2_payment_history_copy.columns),len(temp_df_copy.columns))
print("columns : ",len(process_df_result.columns), len(joined_df_2_payment_history.columns),len(temp_df.columns))

Rows :  14286 315678 3948
columns :  18 65 19
columns :  18 65 19


In [76]:
column_names=set()
for df in [temp_df, joined_df_2_payment_history,process_df_result]:
    for column in df.columns:
        column_names.add(column)
column_names = list(column_names)
print(len(column_names))

65


In [77]:
print(len(temp_df_copy.columns), end= "-->")
for column in column_names:
    if column not in temp_df_copy.columns:
#         print(column)
        temp_df_copy = temp_df_copy.withColumn(column,lit(None).cast("string"))
print(len(temp_df_copy.columns))

19-->65


In [78]:
print(len(joined_df_2_payment_history_copy.columns), end= "-->")
for column in column_names:
    if column not in joined_df_2_payment_history_copy.columns:
        print(column)
        joined_df_2_payment_history_copy = joined_df_2_payment_history_copy.withColumn(column,lit(None).cast("string"))
print(len(joined_df_2_payment_history_copy.columns))

65-->65


In [79]:
print(len(process_df_result_copy.columns), end= "-->")
for column in column_names:
    if column not in process_df_result_copy.columns:
#         print(column)
        process_df_result_copy = process_df_result_copy.withColumn(column,lit(None).cast("string"))
print(len(process_df_result_copy.columns))

18-->65


In [80]:
# process_df_result_copy.select("Open Balance").show()

In [81]:
union_df = temp_df_copy.unionByName(joined_df_2_payment_history_copy).unionByName(process_df_result_copy)

In [82]:
process_df_result_copy.select("Open Balance").tail(10)
# union_df.select("Open Balance").tail(10)
print(union_df.count(), len(union_df.columns))

333912 65


In [83]:
union_df.select(sum(col('Open Balance'))).collect()[0][0]

43992937.400000066

## Adding column Billing Cycle2


In [84]:
contract_terms_df=spark.read.options(inferSchema='True').csv('csv_converted\Contract Terms per Client (1).csv', header=True, inferSchema=True, sep=',')
contract_terms_df= contract_terms_df.withColumnRenamed("Name","Customer")
contract_terms_df = contract_terms_df.select(["Customer","Billing Cycle"])
contract_terms_df = contract_terms_df.withColumnRenamed("Billing Cycle","Billing Cycle2")
df_block1 = union_df.join(contract_terms_df,on='Customer', how='leftouter')
print(df_block1.count(),len(df_block1.columns))

333912 66


# Adding Column region2

In [93]:
# see dub rows
# df_grou = region_df.groupby(*region_df.columns).count()
# dup = df_grou.filter(col("count")>1)
# dup.show()

In [94]:
"D:\Bluethink\alytrx project\csv_converted\Region (2).csv"
region_df=spark.read.options(inferSchema='True').csv('csv_converted\Region (2).csv', header=True, inferSchema=True, sep=',')
region_df = region_df.select(["Name","region"])
# region_df = region_df.withColumnRenamed("Internal ID0","Internal ID")
region_df = region_df.withColumn('Name',upper(trim(col('Name'))))
region_df = region_df.withColumnRenamed("Name","Customer")
region_df = region_df.withColumnRenamed("region","region2")
region_df = remove_dup_rows(region_df)

+--------+-------+-----+
|Customer|region2|count|
+--------+-------+-----+
|PEACEHEA|   null|    2|
|EMCARENO|   null|    2|
+--------+-------+-----+

None


In [95]:
region_df.count()

11462

In [96]:
c=df_block1.join(region_df, on="Customer", how="left")

In [97]:
c.count()

333987

In [98]:
region_df.filter(~region_df.distinct())

TypeError: bad operand type for unary ~: 'DataFrame'

In [None]:
c.distinct().count()

In [None]:
c.count()

In [None]:
a.select("Customer", coalesce("Region2"))

In [111]:
print(region_df.count(), df_block1.count())

11464 333912


In [112]:
# df_block1.columns

In [115]:
a=df_block1.join(region_df, on="Customer", how="left")

In [116]:
a.count()

333987

In [121]:
b=a.join(region_df, on="customer", how="leftanti")

In [122]:
b.count()

160279

In [120]:
b.show()

+--------+--------------+---------------+----------------+----------+-----+--------+-------------+-------------+------------+-----------------------+-------------+--------+---------+-------------+--------------------+-----------+--------------------+------------------+---------------+-----------+------------------+---------+-------+----------------------------+----+-----+----+--------------------+------------------+----------+-------------+-----------------------+----+------+---------------+-----------+-----+-------+---------------+----+-------------------------+---------------------------------------+---------------+------------+--------------------+------+----------+---------------------+------+------+-----------------------+-------------+--------+-----+-------------+---------------------------------+----+----------------+-------------------+----------------+--------------+-------------+---------+----+--------------+-------+
|Customer|Transaction ID|Document Number|Transaction Type|P

In [None]:
a.join()

In [86]:
a.exceptAll(a.).count()

SyntaxError: invalid syntax (311534364.py, line 1)

In [None]:
len(a.columns)

In [None]:
region_df.show()

In [984]:
len(df_block1.columns)

68

In [978]:
region_df.select("Name").show()

+--------------------+
|                Name|
+--------------------+
|                21CO|
|21CO : BROWARD UR...|
|21CO : BROWARD UR...|
|21CO : GULFSTREAM...|
|21CO : LAUDERDALE...|
|21CO : NAPLES URO...|
|21CO : REGIONAL B...|
|21CO : SPECIALIST...|
|21CO : UNIVERSITY...|
|     21CO : UROMEDIX|
|21CO : WALDEMERE-...|
|                 AAI|
|    AAI : TWIN FALLS|
|AAI : TWIN FALLS ...|
|                AAMD|
|AAMD : TS-COLLINS...|
|                 AAS|
|AAS : ARIZONA ASS...|
|                ABMH|
|ABMH : ABINGTON M...|
+--------------------+
only showing top 20 rows

