In [83]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark Donation usecase") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

Reading json file

In [84]:
df = spark.read.json("donation_np.json")
df.printSchema()
df.show()

root
 |-- Address: string (nullable = true)
 |-- Amount: string (nullable = true)
 |-- Contribution Mode: string (nullable = true)
 |-- Financial Year: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- PAN Given: string (nullable = true)
 |-- Party: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- _corrupt_record: string (nullable = true)
 |-- field10: string (nullable = true)
 |-- field11: string (nullable = true)
 |-- field12: string (nullable = true)
 |-- field13: string (nullable = true)
 |-- field14: string (nullable = true)
 |-- field9: string (nullable = true)

+--------------------+--------+--------------------+--------------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|             Address|  Amount|   Contribution Mode|Financial Year|                Name|PAN Given| Party|  Type|_corrupt_record|field10|field11|field12|field13|field14|field9|
+--------------------+--------+---------

Task 1: Change column names

In [85]:
df=df.withColumnRenamed("Contribution Mode","mode_of_payment")\
    .withColumnRenamed("Financial Year","fin_year")\
        .withColumnRenamed("PAN Given","pan_given")


In [86]:
df.printSchema()

root
 |-- Address: string (nullable = true)
 |-- Amount: string (nullable = true)
 |-- mode_of_payment: string (nullable = true)
 |-- fin_year: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- pan_given: string (nullable = true)
 |-- Party: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- _corrupt_record: string (nullable = true)
 |-- field10: string (nullable = true)
 |-- field11: string (nullable = true)
 |-- field12: string (nullable = true)
 |-- field13: string (nullable = true)
 |-- field14: string (nullable = true)
 |-- field9: string (nullable = true)



Dropping rows with null values:

In [87]:
df=df.na.drop(subset=['Name'])


In [88]:
df.show(5)

+--------------------+--------+--------------------+--------+-------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|             Address|  Amount|     mode_of_payment|fin_year|               Name|pan_given| Party|  Type|_corrupt_record|field10|field11|field12|field13|field14|field9|
+--------------------+--------+--------------------+--------+-------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|16-B, Ferozeshah ...| 3000000|                CASH| 2010-11|         Aziz Pasha|        Y|   CPI|Others|           null|       |       |       |       |       |      |
|No.1, First Floor...|10000000|000037, HDFC Bank...| 2014-15|   V K Ramachandran|        Y|CPI(M)|Others|           null|       |       |       |       |       |      |
|3, Motilal Nehru ...|  108000|Cheque, State Ban...| 2014-15| Dr. Manmohan Singh|        N|   INC|Others|           null|       |       |       |       |  

In [89]:
df.columns

['Address',
 'Amount',
 'mode_of_payment',
 'fin_year',
 'Name',
 'pan_given',
 'Party',
 'Type',
 '_corrupt_record',
 'field10',
 'field11',
 'field12',
 'field13',
 'field14',
 'field9']

Dropping unnecessary columns:

In [90]:
df=df.drop("_corrupt_record","field10","field11","field12","field13","field14","field9")
   

In [91]:
df.columns


['Address',
 'Amount',
 'mode_of_payment',
 'fin_year',
 'Name',
 'pan_given',
 'Party',
 'Type']

In [92]:
df.show(5)

+--------------------+--------+--------------------+--------+-------------------+---------+------+------+
|             Address|  Amount|     mode_of_payment|fin_year|               Name|pan_given| Party|  Type|
+--------------------+--------+--------------------+--------+-------------------+---------+------+------+
|16-B, Ferozeshah ...| 3000000|                CASH| 2010-11|         Aziz Pasha|        Y|   CPI|Others|
|No.1, First Floor...|10000000|000037, HDFC Bank...| 2014-15|   V K Ramachandran|        Y|CPI(M)|Others|
|3, Motilal Nehru ...|  108000|Cheque, State Ban...| 2014-15| Dr. Manmohan Singh|        N|   INC|Others|
|9,Firozshah Road ...|   54000|Through Bank Tran...| 2011-12|Dr. Manda Jagnathan|        N|   INC|Others|
|17,Dr.B.R.Mehta L...|   54000|Through Bank Tran...| 2011-12|   Prof. K.V.Thomas|        N|   INC|Others|
+--------------------+--------+--------------------+--------+-------------------+---------+------+------+
only showing top 5 rows



Task 2: Encrypt Address column

In [93]:
from pyspark.sql.functions import sha2,concat_ws

df=df.withColumn("Address", sha2(concat_ws("||", df.Address), 256))


In [94]:
df.show(10)

+--------------------+--------+--------------------+--------+--------------------+---------+------+------+
|             Address|  Amount|     mode_of_payment|fin_year|                Name|pan_given| Party|  Type|
+--------------------+--------+--------------------+--------+--------------------+---------+------+------+
|5a3058deb6f337958...| 3000000|                CASH| 2010-11|          Aziz Pasha|        Y|   CPI|Others|
|846539cb21bc9e6c6...|10000000|000037, HDFC Bank...| 2014-15|    V K Ramachandran|        Y|CPI(M)|Others|
|869fa3a19f1c51ad1...|  108000|Cheque, State Ban...| 2014-15|  Dr. Manmohan Singh|        N|   INC|Others|
|5f04f40130569ddab...|   54000|Through Bank Tran...| 2011-12| Dr. Manda Jagnathan|        N|   INC|Others|
|524b1379d08e4c02f...|   54000|Through Bank Tran...| 2011-12|    Prof. K.V.Thomas|        N|   INC|Others|
|6db7af0c5dca3b333...|  100000|          146865 SBI| 2011-12|     Sweta Chyouksey|        Y|   BJP|Others|
|c01158e07376c3778...|  100000|      

Task 3: Categorize mode_of_payment into 4 categories: Cash, cheque, bank, others

In [95]:
from pyspark.sql.functions import *
df= df.withColumn("mode_of_payment", when(df.mode_of_payment.like("%Cheque%"),"cheque")
                                 .when(df.mode_of_payment.like("Ch.%"),"cheque")
                                 .when(df.mode_of_payment.like("%CASH%"),"cash")
                                 .when(df.mode_of_payment.like("%Bank%"),"bank")
                                 .otherwise("others"))

In [96]:
df.show()

+--------------------+--------+---------------+--------+--------------------+---------+------+------+
|             Address|  Amount|mode_of_payment|fin_year|                Name|pan_given| Party|  Type|
+--------------------+--------+---------------+--------+--------------------+---------+------+------+
|5a3058deb6f337958...| 3000000|           cash| 2010-11|          Aziz Pasha|        Y|   CPI|Others|
|846539cb21bc9e6c6...|10000000|           bank| 2014-15|    V K Ramachandran|        Y|CPI(M)|Others|
|869fa3a19f1c51ad1...|  108000|         cheque| 2014-15|  Dr. Manmohan Singh|        N|   INC|Others|
|5f04f40130569ddab...|   54000|           bank| 2011-12| Dr. Manda Jagnathan|        N|   INC|Others|
|524b1379d08e4c02f...|   54000|           bank| 2011-12|    Prof. K.V.Thomas|        N|   INC|Others|
|6db7af0c5dca3b333...|  100000|         others| 2011-12|     Sweta Chyouksey|        Y|   BJP|Others|
|c01158e07376c3778...|  100000|         others| 2011-12|   Uma Shankar Gupta|     

Task 4: Calculate aggregates per party

In [97]:
df.select("Party").distinct().show()

+------+
| Party|
+------+
|   INC|
|   BJP|
|   NCP|
|   CPI|
|CPI(M)|
+------+



In [98]:
132

from pyspark.sql.types import IntegerType
#converting Amount column to numeric form
df = df.withColumn("Amount", df["Amount"].cast(IntegerType()))

#function to find sum,max,count,avg for each party
def get_party_aggregate(party_name):
    sum_val=df.groupBy("Party").sum("Amount").filter(df.Party == party_name).collect()[0][1]
    max_val=df.groupBy("Party").max("Amount").filter(df.Party == party_name).collect()[0][1]
    avg_val=df.groupBy("Party").avg("Amount").filter(df.Party == party_name).collect()[0][1]
    count_val=df.groupBy("Party").agg(count("Amount")).filter(df.Party == party_name).collect()[0][1]
    agg=[sum_val,max_val,avg_val,count_val]
    return agg
    
#getting aggregate values for each party
INC_result_agg=get_party_aggregate("INC")
BJP_result_agg=get_party_aggregate("BJP")
NCP_result_agg=get_party_aggregate("NCP")
CPI_result_agg=get_party_aggregate("CPI")
CPIM_result_agg=get_party_aggregate("CPI(M)")


#function to create columns of aggregate values
def create_agg_col(df,party_name,col_name,agg_val):
    df=df.withColumn(col_name,lit(agg_val)).withColumn(col_name, when(df.Party.like(party_name),agg_val).otherwise("0"))
    return df

#function to create columns with customized name for a party with aggregate values
def add_party_agg_cols(df,party_name,party_agg_list):
    df=create_agg_col(df,party_name,party_name+"_SUM_LTD",party_agg_list[0])
    df=create_agg_col(df,party_name,party_name+"_MAX_LTD",party_agg_list[1])
    df=create_agg_col(df,party_name,party_name+"_AVG_LTD",party_agg_list[2])
    df=create_agg_col(df,party_name,party_name+"_COUNT_LTD",party_agg_list[3])
    return df

#creating aggregate values columns by passing party name and aggregate value list to function
df=add_party_agg_cols(df,"INC",INC_result_agg)
df=add_party_agg_cols(df,"BJP",BJP_result_agg)
df=add_party_agg_cols(df,"NCP",NCP_result_agg)
df=add_party_agg_cols(df,"CPI",CPI_result_agg)
df=add_party_agg_cols(df,"CPI(M)",CPIM_result_agg)


df.show(5)



+--------------------+--------+---------------+--------+-------------------+---------+------+------+-----------+-----------+------------------+-------------+-----------+-----------+-----------+-------------+-----------+-----------+-----------+-------------+-----------+-----------+------------------+-------------+--------------+--------------+-----------------+----------------+
|             Address|  Amount|mode_of_payment|fin_year|               Name|pan_given| Party|  Type|INC_SUM_LTD|INC_MAX_LTD|       INC_AVG_LTD|INC_COUNT_LTD|BJP_SUM_LTD|BJP_MAX_LTD|BJP_AVG_LTD|BJP_COUNT_LTD|NCP_SUM_LTD|NCP_MAX_LTD|NCP_AVG_LTD|NCP_COUNT_LTD|CPI_SUM_LTD|CPI_MAX_LTD|       CPI_AVG_LTD|CPI_COUNT_LTD|CPI(M)_SUM_LTD|CPI(M)_MAX_LTD|   CPI(M)_AVG_LTD|CPI(M)_COUNT_LTD|
+--------------------+--------+---------------+--------+-------------------+---------+------+------+-----------+-----------+------------------+-------------+-----------+-----------+-----------+-------------+-----------+-----------+---------

Task 5: Top donor name per party

In [99]:
#For Party: BJP
x=df.where(df.Party == "BJP").select(max("Amount"))
# x=df.groupBy("Party").max("Amount").filter(df.Party == "BJP")
amt=x.collect()[0][0]
dff=df.select("Name").where(df.Amount==amt).where(df.Party=="BJP")
bjp_top=dff.collect()[0][0]
bjp_top
df=df.withColumn("BJP_TOP_DONOR",lit(bjp_top)).withColumn("BJP_TOP_DONOR", when(df.Party.like("BJP"),bjp_top).otherwise("N.A"))
df.show(5)


+--------------------+--------+---------------+--------+-------------------+---------+------+------+-----------+-----------+------------------+-------------+-----------+-----------+-----------+-------------+-----------+-----------+-----------+-------------+-----------+-----------+------------------+-------------+--------------+--------------+-----------------+----------------+-------------+
|             Address|  Amount|mode_of_payment|fin_year|               Name|pan_given| Party|  Type|INC_SUM_LTD|INC_MAX_LTD|       INC_AVG_LTD|INC_COUNT_LTD|BJP_SUM_LTD|BJP_MAX_LTD|BJP_AVG_LTD|BJP_COUNT_LTD|NCP_SUM_LTD|NCP_MAX_LTD|NCP_AVG_LTD|NCP_COUNT_LTD|CPI_SUM_LTD|CPI_MAX_LTD|       CPI_AVG_LTD|CPI_COUNT_LTD|CPI(M)_SUM_LTD|CPI(M)_MAX_LTD|   CPI(M)_AVG_LTD|CPI(M)_COUNT_LTD|BJP_TOP_DONOR|
+--------------------+--------+---------------+--------+-------------------+---------+------+------+-----------+-----------+------------------+-------------+-----------+-----------+-----------+-------------+-----

In [100]:
# For party: INC
x=df.where(df.Party == "INC").select(max("Amount"))
amt=x.collect()[0][0]
dff=df.select("Name").where(df.Amount==amt).where(df.Party=="INC")
dff.show()
inc_top=dff.collect()[0][0]
inc_top
df=df.withColumn("INC_TOP_DONOR",lit(inc_top)).withColumn("INC_TOP_DONOR", when(df.Party.like("INC"),inc_top).otherwise("N.A"))


+--------------------+
|                Name|
+--------------------+
|General Electoral...|
+--------------------+



In [101]:
# For party: NCP
x=df.where(df.Party == "NCP").select(max("Amount"))
amt=x.collect()[0][0]
dff=df.select("Name").where(df.Amount==amt).where(df.Party=="NCP")
dff.show()
ncp_top=dff.collect()[0][0]
ncp_top
df=df.withColumn("NCP_TOP_DONOR",lit(ncp_top)).withColumn("NCP_TOP_DONOR", when(df.Party.like("NCP"),ncp_top).otherwise("N.A"))


+--------------------+
|                Name|
+--------------------+
|Lodha Dwellers Pv...|
|Satya Electoral T...|
+--------------------+



In [102]:
# For party: CPI
x=df.where(df.Party == "CPI").select(max("Amount"))
amt=x.collect()[0][0]
dff=df.select("Name").where(df.Amount==amt).where(df.Party=="CPI")
dff.show()
cpi_top=dff.collect()[0][0]
cpi_top
df=df.withColumn("CPI_TOP_DONOR",lit(cpi_top)).withColumn("CPI_TOP_DONOR", when(df.Party.like("CPI"),cpi_top).otherwise("N.A"))


+----------+
|      Name|
+----------+
|Aziz Pasha|
+----------+



In [103]:
# For party: CPI(M)
x=df.where(df.Party == "CPI(M)").select(max("Amount"))
amt=x.collect()[0][0]
dff=df.select("Name").where(df.Amount==amt).where(df.Party=="CPI(M)")
dff.show()
cpim_top=dff.collect()[0][0]
cpim_top
df=df.withColumn("CPI(M)_TOP_DONOR",lit(cpim_top)).withColumn("CPI(M)_TOP_DONOR", when(df.Party.like("CPI(M)"),cpim_top).otherwise("N.A"))


+----------------+
|            Name|
+----------------+
|V K Ramachandran|
+----------------+



In [104]:
df.show(5)

+--------------------+--------+---------------+--------+-------------------+---------+------+------+-----------+-----------+------------------+-------------+-----------+-----------+-----------+-------------+-----------+-----------+-----------+-------------+-----------+-----------+------------------+-------------+--------------+--------------+-----------------+----------------+-------------+--------------------+-------------+-------------+----------------+
|             Address|  Amount|mode_of_payment|fin_year|               Name|pan_given| Party|  Type|INC_SUM_LTD|INC_MAX_LTD|       INC_AVG_LTD|INC_COUNT_LTD|BJP_SUM_LTD|BJP_MAX_LTD|BJP_AVG_LTD|BJP_COUNT_LTD|NCP_SUM_LTD|NCP_MAX_LTD|NCP_AVG_LTD|NCP_COUNT_LTD|CPI_SUM_LTD|CPI_MAX_LTD|       CPI_AVG_LTD|CPI_COUNT_LTD|CPI(M)_SUM_LTD|CPI(M)_MAX_LTD|   CPI(M)_AVG_LTD|CPI(M)_COUNT_LTD|BJP_TOP_DONOR|       INC_TOP_DONOR|NCP_TOP_DONOR|CPI_TOP_DONOR|CPI(M)_TOP_DONOR|
+--------------------+--------+---------------+--------+-------------------+----

Task 6: Calculate sum of donations per financial year per party and the column should be named as 2011-12_BJP_SUM

In [105]:
fin=df.groupBy('Party','fin_year').sum('Amount')
fin.show()
#checking sum for a sample row
sum_t=fin.collect()[9][2]
sum_t


+------+--------+-----------+
| Party|fin_year|sum(Amount)|
+------+--------+-----------+
|   BJP| 2010-11|  146253279|
|   CPI| 2012-13|    3695449|
|   INC| 2013-14|  595837728|
|   BJP| 2003-04|  116881973|
|CPI(M)| 2011-12|   23838657|
|   BJP| 2009-10|  823220133|
|   INC| 2014-15| 1414610950|
|CPI(M)| 2005-06|     550000|
|   NCP| 2012-13|     504000|
|   BJP| 2011-12|  334194113|
|   NCP| 2013-14|  140200000|
|   BJP| 2008-09|  306057231|
|CPI(M)| 2009-10|    3962049|
|CPI(M)| 2008-09|    4155000|
|   BJP| 2005-06|   36156111|
|   NCP| 2014-15|  388236419|
|   NCP| 2009-10|   30300000|
|   CPI| 2007-08|    4125800|
|   NCP| 2010-11|    1355000|
|   INC| 2006-07|  121273513|
+------+--------+-----------+
only showing top 20 rows



334194113

In [106]:
# df=df.withColumn("2011-12_BJP_SUM",lit(sum_t)).withColumn("2011-12_BJP_SUM", when(df.Party.like("BJP") & df.fin_year.like("2011-12"),sum_t).otherwise("N.A"))
# df.show(50)

#iterating to create columns for each financial year per party
for i in df.groupBy('Party','fin_year').sum('Amount').collect():
    df=df.withColumn(i['fin_year']+"_"+i['Party']+"_SUM",lit(i['sum(Amount)'])).withColumn(i['fin_year']+"_"+i['Party']+"_SUM", when(df.fin_year.like(i['fin_year']) & df.Party.like(i['Party']),i['sum(Amount)']).otherwise("N.A"))

df.show(5)

+--------------------+--------+---------------+--------+-------------------+---------+------+------+-----------+-----------+------------------+-------------+-----------+-----------+-----------+-------------+-----------+-----------+-----------+-------------+-----------+-----------+------------------+-------------+--------------+--------------+-----------------+----------------+-------------+--------------------+-------------+-------------+----------------+---------------+---------------+---------------+---------------+------------------+---------------+---------------+------------------+---------------+---------------+---------------+---------------+------------------+------------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+------------------+---------------+--------------

Task 7: Calculate number of donation till date per mode_of_payment

In [107]:

df1=df.groupBy('mode_of_payment').count()
df1.show()

payment_modes=['cash','cheque','others','bank']
#iterating for each payment node to create new columns
for i in range(4):
    df=df.withColumn(payment_modes[i]+"_COUNT_LTD",lit(df1.collect()[i][1])).withColumn(payment_modes[i]+"_COUNT_LTD", when(df.mode_of_payment.like(payment_modes[i]),df1.collect()[i][1]).otherwise("0"))



+---------------+-----+
|mode_of_payment|count|
+---------------+-----+
|           cash|   29|
|         cheque| 2683|
|         others| 4016|
|           bank| 6845|
+---------------+-----+



In [108]:
#displaying final dataframe
df.show(10)

+--------------------+--------+---------------+--------+--------------------+---------+------+------+-----------+-----------+------------------+-------------+-----------+-----------+------------------+-------------+-----------+-----------+-----------+-------------+-----------+-----------+------------------+-------------+--------------+--------------+-----------------+----------------+--------------------+--------------------+-------------+-------------+----------------+---------------+---------------+---------------+---------------+------------------+---------------+---------------+------------------+---------------+---------------+---------------+---------------+------------------+------------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+------------------+---------------

Task 8: Writing data into Parquet file:

In [109]:
# df.write.parquet("Donation_data.parquet")
#parquet file was created by running notebook on colab