In [None]:
import pyspark

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Donation_np") \
    .getOrCreate()

In [None]:
# Read JSON file into dataframe
df = spark.read.option('multiline', True).json("donation_np.json").limit(100)

dropcols = {'field10', 'field11', 'field12', 'field13', 'field14', 'field9'}
df = df.drop(*dropcols)

df.printSchema()

from pyspark.sql.types import IntegerType
df=df.withColumn("Amount",df["Amount"].cast(IntegerType()))

In [None]:
# •	Change below column names:

# Contribution Mode	mode_of_payment
# Financial Year	fin_year
# PAN Given	pan_given

df = df.withColumnRenamed("Contribution Mode","mode_of_payment") \
    .withColumnRenamed("Financial Year","fin_year") \
    .withColumnRenamed("PAN Given","pan_given")
df.printSchema()

In [None]:
# •	Encrypt address column

# 16-B, Ferozeshah Road New Delhi-1	aebd8d41127096039df138069fab7630
from pyspark.sql.functions import *
from pyspark.sql.functions import sha2,concat_ws
df=df.withColumn("Address", sha2(concat_ws("||", df.Address), 256))

In [None]:
# •	Categorize mode_of_payment into below 4 categories
# o	Cash
# o	Cheque
# o	Bank
# o	Others
df = df.withColumn("mode_of_payment",when(lower(df.mode_of_payment).rlike("\d{5,6} | ch no | cheque no | cheque | ch. no. | ch.no."),"Cheque") \
      .when(lower(df.mode_of_payment).contains("cash"),"Cash") \
      .when(lower(df.mode_of_payment).contains("bank"),"Bank") \
      .otherwise("Others"))

In [None]:
# •	Calculate aggregates per party
# (below columns should b)

# 'INC_SUM_LTD',
#  'BJP_SUM_LTD',
#  'NCP_SUM_LTD',
#  'CPI_SUM_LTD',
#  'CPI(M)_SUM_LTD',
#  'INC_COUNT_LTD',
#  'BJP_COUNT_LTD',
#  'NCP_COUNT_LTD',
#  'CPI_COUNT_LTD',
#  'CPI(M)_COUNT_LTD',
#  'INC_AVG_LTD',
#  'BJP_AVG_LTD',
#  'NCP_AVG_LTD',
#  'CPI_AVG_LTD',
#  'CPI(M)_AVG_LTD',
#  'INC_MAX_LTD',
#  'BJP_MAX_LTD',
#  'NCP_MAX_LTD',
#  'CPI_MAX_LTD',
#  'CPI(M)_MAX_LTD'

from pyspark.sql import functions as F
party_list=df.select(F.collect_set('Party').alias('Party')).first()['Party']
aggregate_fn=["SUM","COUNT","AVG","MAX"]
for party_name in party_list:
    summ=df.withColumn("Amount",df.Amount).groupBy("Party").sum("Amount").filter(col("Party").like(party_name)).collect()[0][1]
    counts=df.withColumn("Amount",df.Amount).groupBy("Party").count().filter(col("Party").like(party_name)).collect()[0][1]
    average=df.withColumn("Amount",df.Amount).groupBy("Party").avg("Amount").filter(col("Party").like(party_name)).collect()[0][1]
    maximum=df.withColumn("Amount",df.Amount).groupBy("Party").max("Amount").filter(col("Party").like(party_name)).collect()[0][1]

    for ag_fn in aggregate_fn:
        if ag_fn=="SUM":
            df=df.withColumn(party_name+"_"+ag_fn+"_LTD",when(df.Party.contains(party_name),summ).otherwise(0))
        elif ag_fn == "COUNT":
            df=df.withColumn(party_name+"_"+ag_fn+"_LTD",when(df.Party.contains(party_name),counts).otherwise(0))
        elif ag_fn == "AVG":
           df= df.withColumn(party_name+"_"+ag_fn+"_LTD",when(df.Party.contains(party_name),average).otherwise(0))
        elif ag_fn == "MAX":
           df=df.withColumn(party_name+"_"+ag_fn+"_LTD",when(df.Party.contains(party_name),maximum).otherwise(0))

In [None]:
# •	Calculate top donor per party 

# o	BJP_TOP_DONOR
# o	CPI_TOP_DONOR
# o	INC_TOP_DONOR
# o	NCP_TOP_DONOR
# o	CPI(M)_TOP_DONOR

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
windowSpec  = Window.partitionBy("Party").orderBy(col("Amount").desc())
df_top=df.withColumn("row_number",row_number().over(windowSpec)).filter(col("row_number").like("1"))

for rows in df_top.collect():
    df=df.withColumn(rows["Party"]+"_TOP_DONOR",when(df.Party.contains(rows["Party"]),rows["Name"]).otherwise("-"))


In [None]:
# •	Calculate sum of donations per financial year per party and the column should be named as 2011-12_BJP_SUM
gbo=df.groupBy("Party","fin_year").sum("Amount").orderBy("fin_year")
for rows in gbo.collect():
    df=df.withColumn(rows["fin_year"]+"_"+rows["Party"]+"_SUM",when(df.Party.contains(rows["Party"]) & df.fin_year.contains(rows["fin_year"]),rows["sum(Amount)"]).otherwise(0))


In [None]:
# •	Calculate number of donation till date per mode_of_payment
# o	Bank_count_LTD
# o	Cheque_count_LTD
# o	Cash_count_LTD
# o	Others_count_LTD

dfOne= df.groupBy("mode_of_payment").count()
for rows in dfOne.collect():
    df=df.withColumn(rows["mode_of_payment"]+"_count_LTD",when(df.mode_of_payment.contains(rows["mode_of_payment"]),rows["count"]).otherwise(0))

In [None]:
df.printSchema()

In [None]:
df.write.parquet("pyspark_assignment")