In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark import SparkConf

conf = SparkConf().setAppName("dmltest")
conf.set('spark.jars.packages', 'io.delta:delta-core_2.12:2.1.0')
conf.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
conf.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
conf.set("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore")

In [None]:
spark = SparkSession.builder.config(conf=conf).getOrCreate()

In [None]:
schema_redemer = T.StructType([T.StructField(nm, T.StringType(), True) for nm in ['Sr_No', 'Date_of_Encashment', 'Name_of_the_Political_Party', 'Account_no._of_Political_Party', 'Prefix', 'Bond_Number', 'redemer_Denominations', 'Pay_Branch_Code', 'Pay_Teller'] ])
schema_purchaser = T.StructType([T.StructField(nm, T.StringType(), True) for nm in ['Sr_No', 'Reference_No_(URN)', 'Journal_Date', 'Date_of_Purchase', 'Date_of_Expiry', 'Name_of_the_Purchaser', 'Prefix', 'Bond_Number', 'purchaser_Denominations', 'Issue_Branch_Code', 'Issue_Teller', 'Status'] ])
df_redemer = spark.read.format('csv').schema(schema_redemer).load('/home/glue_user/workspace/data-engineering/data/source/src-data-csv/electoral-bonds/bond_redemer_details.csv').filter("Pay_Teller is not null")
df_purchaser = spark.read.format('csv').schema(schema_purchaser).load('/home/glue_user/workspace/data-engineering/data/source/src-data-csv/electoral-bonds/bond_purchaser_details.csv').filter("purchaser_Denominations is not null")

In [None]:
df_redemer = df_redemer.withColumn('redemer_Denominations', F.expr("cast(replace(redemer_Denominations, ',', '') as decimal(38, 0))"))
df_purchaser = df_purchaser.withColumn('purchaser_Denominations', F.expr("cast(replace(purchaser_Denominations, ',', '') as decimal(38, 0))"))
df_redemer.createOrReplaceTempView('redemer')
df_purchaser.createOrReplaceTempView('purchaser')

In [None]:
spark.sql('''
with
data as (
    select
        *
    from
        purchaser
        left join redemer on redemer.prefix=purchaser.prefix and redemer.bond_number=purchaser.Bond_Number
)
select * from data
''').filter("Name_of_the_Political_Party='ALL INDIA TRINAMOOL CONGRESS' and Name_of_the_Political_Party is not null")\
    .groupBy('Name_of_the_Purchaser').agg(F.sum(F.expr("cast(purchaser_Denominations as decimal(38,0))")).alias('total_donations'), F.first(F.col('Name_of_the_Political_Party')).alias('Name_of_the_Political_Party'))\
    .orderBy(F.col('total_donations').desc())\
    .withColumn("total_received", F.expr("sum(total_donations) over(partition by Name_of_the_Political_Party)"))\
    .show(truncate=False, n=900)

In [None]:
df = df_purchaser.join(df_redemer, (df_purchaser['Bond_Number']==df_redemer['Bond_Number']) & (df_purchaser['Prefix']==df_redemer['Prefix']), 'left')

In [None]:
df.groupBy('Name_of_the_Political_Party').agg(F.expr("sum(redemer_Denominations)").alias("total_purchased")).orderBy(F.expr("total_purchased").desc()).show(n=100, truncate=False)