<a href="https://colab.research.google.com/github/mayurpatildax/DAX-Trainings/blob/master/Data%20Engineering%5CDonor_Assignment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


### Data Engineering Assignment

#### Assignment - Donations to political parties summary 

##### Problem Statement - Create the columns based on given requirements

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

1. Importing the Donation dataset

In [4]:
df = spark.read.option("multiline", True).json("/content/drive/MyDrive/Colab Notebooks/donation_np.json")

In [5]:
df.printSchema()

root
 |-- Address: string (nullable = true)
 |-- Amount: string (nullable = true)
 |-- Contribution Mode: string (nullable = true)
 |-- Financial Year: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- PAN Given: string (nullable = true)
 |-- Party: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- field10: string (nullable = true)
 |-- field11: string (nullable = true)
 |-- field12: string (nullable = true)
 |-- field13: string (nullable = true)
 |-- field14: string (nullable = true)
 |-- field9: string (nullable = true)



In [6]:
df.show()

+--------------------+--------+--------------------+--------------+--------------------+---------+------+------+-------+-------+-------+-------+-------+------+
|             Address|  Amount|   Contribution Mode|Financial Year|                Name|PAN Given| Party|  Type|field10|field11|field12|field13|field14|field9|
+--------------------+--------+--------------------+--------------+--------------------+---------+------+------+-------+-------+-------+-------+-------+------+
|16-B, Ferozeshah ...| 3000000|                CASH|       2010-11|          Aziz Pasha|        Y|   CPI|Others|       |       |       |       |       |      |
|No.1, First Floor...|10000000|000037, HDFC Bank...|       2014-15|    V K Ramachandran|        Y|CPI(M)|Others|       |       |       |       |       |      |
|3, Motilal Nehru ...|  108000|Cheque, State Ban...|       2014-15|  Dr. Manmohan Singh|        N|   INC|Others|       |       |       |       |       |      |
|9,Firozshah Road ...|   54000|Through B

2. Update the column names

    a. Contribution Mode to mode_of_payment \n

    b. Financial Year to fin_year

    c. Pan Given to pan_given

In [7]:
df2 = df.withColumnRenamed("Contribution Mode","mode_of_payment").withColumnRenamed("Financial Year","fin_year").withColumnRenamed("PAN Given", "pan_given")
df2.printSchema()
df2.show()

root
 |-- Address: string (nullable = true)
 |-- Amount: string (nullable = true)
 |-- mode_of_payment: string (nullable = true)
 |-- fin_year: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- pan_given: string (nullable = true)
 |-- Party: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- field10: string (nullable = true)
 |-- field11: string (nullable = true)
 |-- field12: string (nullable = true)
 |-- field13: string (nullable = true)
 |-- field14: string (nullable = true)
 |-- field9: string (nullable = true)

+--------------------+--------+--------------------+--------+--------------------+---------+------+------+-------+-------+-------+-------+-------+------+
|             Address|  Amount|     mode_of_payment|fin_year|                Name|pan_given| Party|  Type|field10|field11|field12|field13|field14|field9|
+--------------------+--------+--------------------+--------+--------------------+---------+------+------+-------+-------+-------+-------+-

3. Encrypt the address column

In [8]:
# Hashing Function
import hashlib
def encrypt_value(address):
    sha_value = hashlib.sha256(address.encode()).hexdigest()
    return sha_value

In [9]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *

spark_udf = udf(encrypt_value, StringType())
df2 = df2.withColumn('Address',spark_udf('Address'))

In [10]:
df2.printSchema()

root
 |-- Address: string (nullable = true)
 |-- Amount: string (nullable = true)
 |-- mode_of_payment: string (nullable = true)
 |-- fin_year: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- pan_given: string (nullable = true)
 |-- Party: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- field10: string (nullable = true)
 |-- field11: string (nullable = true)
 |-- field12: string (nullable = true)
 |-- field13: string (nullable = true)
 |-- field14: string (nullable = true)
 |-- field9: string (nullable = true)



4. Categorize the modes of payment into 4 categories

    a. Cash

    b. Cheque

    c. Bank

    d. Others

In [11]:
modes = ['Cash', 'Cheque', 'Bank', 'Others']

In [12]:
#add column of transformed mode of payment
from pyspark.sql.functions import lit
df2 = df2.withColumn("payment_segregated", lit(''))

In [13]:
from pyspark.sql.functions import when
from pyspark.sql.functions import lower, col
df2 = df2.withColumn("payment_segregated", when(lower(df2["mode_of_payment"]).contains('cash'), 'Cash').when(lower(df2["mode_of_payment"]).contains('cheque') | lower(df2["mode_of_payment"]).contains('ch.') | lower(df2["mode_of_payment"]).contains('ch no'), 'Cheque').when(lower(df2["mode_of_payment"]).contains('bank'), 'Bank').otherwise('Others'))

In [14]:
for rows in df2.select("mode_of_payment", "payment_segregated").collect():
    print(rows[0]," - ", rows[1])

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
12719 Central Bank  -  Bank
999632 Vijaya Bank  -  Bank
208152 State Bank Of India  -  Bank
567491,Bank Of Baroda  -  Bank
925343  -  Others
28031PUNJAB & SINDH BANK, C.P., NEW DELHI-110001  -  Bank
00000 3, Bank of Baroda  -  Bank
1759049 Narmada Malwa Gramin Bank  -  Bank
81, Karur Vyshya Bank  -  Bank
Cash  -  Cash
134457INDIAN BANK, WEST PATEL NAGAR-NEW DELHI-08  -  Bank
989216 Canara Bank  -  Bank
556725, Ing Vysya Bank  -  Bank
169786, Parshawrnath Bank  -  Bank
170133, Parshwaranath Bank  -  Bank
8760, IDBI Bank  -  Bank
790920, HDFC Bank _x000D_
_x000D_
Ltd  -  Bank
270162 State Bank of India  -  Bank
162362 State Bank Of India  -  Bank
725881, Indian Overseas Bank  -  Bank
634160 Central Bank  -  Bank
398300PUNJAB NATIONAL BANK  -  Bank
040877, Cosmos Bank  -  Bank
372958, JK Bank Bari Brahmna Jammu  -  Bank
337224, HDFC Bank  -  Bank
726552, HDFC Bank  -  Bank
726553, HDFC Bank  -  Bank
952344, Indusind Bank  - 

#### 5. Calculate aggregates per party

5.1 Calculate the Sum aggregation per party

In [15]:
df2 = df2.withColumn("Amount", df2["Amount"].cast(IntegerType()))
df2.groupBy("Party").sum("Amount").show()

+------+-----------+
| Party|sum(Amount)|
+------+-----------+
|   INC| 4031487349|
|   BJP| 9295525996|
|   NCP|  647911419|
|   CPI|   68123698|
|CPI(M)|  150622128|
+------+-----------+



In [16]:
#add column for party sum

INC = df2.groupBy("Party").sum("Amount").filter(df2.Party == 'INC').select("sum(Amount)").collect()
BJP = df2.groupBy("Party").sum("Amount").filter(df2.Party == 'BJP').select("sum(Amount)").collect()
NCP = df2.groupBy("Party").sum("Amount").filter(df2.Party == 'NCP').select("sum(Amount)").collect()
CPI = df2.groupBy("Party").sum("Amount").filter(df2.Party == 'CPI').select("sum(Amount)").collect()
CPI_M = df2.groupBy("Party").sum("Amount").filter(df2.Party == 'CPI(M)').select("sum(Amount)").collect()

partyList = ['INC', 'BJP', 'NCP', 'CPI', 'CPI(M)']
sumList = ['INC_SUM_LTD', 'BJP_SUM_LTD', 'NCP_SUM_LTD', 'CPI_SUM_LTD', 'CPI_M_SUM_LTD']
sumValue = [INC[0][0], BJP[0][0], NCP[0][0], CPI[0][0], CPI_M[0][0]]


for i in range(0, len(sumList)):
    # print(partyList[i], sumList[i], type(sumValue[i]), sumValue[i])
    df2 = df2.withColumn(sumList[i], lit(0))
    df2 = df2.withColumn(sumList[i], when(df2["Party"].contains(partyList[i]), sumValue[i]).otherwise(0))

5.2 Calculate the Count aggregation per party

In [17]:
df2.groupBy("Party").count().show()
INC = df2.groupBy("Party").count().filter(df2.Party == 'INC').select("count").collect()
print(INC[0][0])

+------+-----+
| Party|count|
+------+-----+
|   INC| 3785|
|   BJP| 8782|
|   NCP|  107|
|   CPI|  384|
|CPI(M)|  515|
+------+-----+

3785


In [18]:
#add column for party COUNT
INC = df2.groupBy("Party").count().filter(df2.Party == 'INC').select("count").collect()
BJP = df2.groupBy("Party").count().filter(df2.Party == 'BJP').select("count").collect()
NCP = df2.groupBy("Party").count().filter(df2.Party == 'NCP').select("count").collect()
CPI = df2.groupBy("Party").count().filter(df2.Party == 'CPI').select("count").collect()
CPI_M = df2.groupBy("Party").count().filter(df2.Party == 'CPI(M)').select("count").collect()

partyList = ['INC', 'BJP', 'NCP', 'CPI', 'CPI(M)']
countList = ['INC_COUNT_LTD', 'BJP_COUNT_LTD', 'NCP_COUNT_LTD', 'CPI_COUNT_LTD', 'CPI_M_COUNT_LTD']
countValue = [INC[0][0], BJP[0][0], NCP[0][0], CPI[0][0], CPI_M[0][0]]


for i in range(0, len(countList)):
    # print(partyList[i], countList[i], type(countValue[i]), countValue[i])
    df2 = df2.withColumn(countList[i], lit(0))
    df2 = df2.withColumn(countList[i], when(df2["Party"].contains(partyList[i]), countValue[i]).otherwise(0))

5.3 Calculate the Average aggregation per party

In [19]:
df2.groupBy("Party").avg('Amount').show()
INC = df2.groupBy("Party").avg('Amount').filter(df2.Party == 'INC').select("avg(Amount)").collect()
print(INC[0][0])

+------+------------------+
| Party|       avg(Amount)|
+------+------------------+
|   INC|1065122.1529722589|
|   BJP|1058474.8344340697|
|   NCP| 6055246.906542056|
|   CPI|177405.46354166666|
|CPI(M)| 292470.1514563107|
+------+------------------+

1065122.1529722589


In [20]:
#add column for party AVG
INC = df2.groupBy("Party").avg('Amount').filter(df2.Party == 'INC').select("avg(Amount)").collect()
BJP = df2.groupBy("Party").avg('Amount').filter(df2.Party == 'BJP').select("avg(Amount)").collect()
NCP = df2.groupBy("Party").avg('Amount').filter(df2.Party == 'NCP').select("avg(Amount)").collect()
CPI = df2.groupBy("Party").avg('Amount').filter(df2.Party == 'CPI').select("avg(Amount)").collect()
CPI_M = df2.groupBy("Party").avg('Amount').filter(df2.Party == 'CPI(M)').select("avg(Amount)").collect()

partyList = ['INC', 'BJP', 'NCP', 'CPI', 'CPI(M)']
avgList = ['INC_AVG_LTD', 'BJP_AVG_LTD', 'NCP_AVG_LTD', 'CPI_AVG_LTD', 'CPI_M_AVG_LTD']
avgValue = [INC[0][0], BJP[0][0], NCP[0][0], CPI[0][0], CPI_M[0][0]]


for i in range(0, len(avgList)):
    # print(partyList[i], avgList[i], type(avgValue[i]), avgValue[i])
    df2 = df2.withColumn(avgList[i], lit(0))
    df2 = df2.withColumn(avgList[i], when(df2["Party"].contains(partyList[i]), avgValue[i]).otherwise(0))

5.4 Calculate the Max aggregation per party

In [21]:
df2.groupBy("Party").max('Amount').show()
INC = df2.groupBy("Party").max('Amount').filter(df2.Party == 'INC').select("max(Amount)").collect()
print(INC[0][0])

+------+-----------+
| Party|max(Amount)|
+------+-----------+
|   INC|  500000000|
|   BJP|  500000000|
|   NCP|   50000000|
|   CPI|    3000000|
|CPI(M)|   10000000|
+------+-----------+

500000000


In [22]:
#add column for party MAX
INC = df2.groupBy("Party").max('Amount').filter(df2.Party == 'INC').select("max(Amount)").collect()
BJP = df2.groupBy("Party").max('Amount').filter(df2.Party == 'BJP').select("max(Amount)").collect()
NCP = df2.groupBy("Party").max('Amount').filter(df2.Party == 'NCP').select("max(Amount)").collect()
CPI = df2.groupBy("Party").max('Amount').filter(df2.Party == 'CPI').select("max(Amount)").collect()
CPI_M = df2.groupBy("Party").max('Amount').filter(df2.Party == 'CPI(M)').select("max(Amount)").collect()

partyList = ['INC', 'BJP', 'NCP', 'CPI', 'CPI(M)']
maxList = ['INC_MAX_LTD', 'BJP_MAX_LTD', 'NCP_MAX_LTD', 'CPI_MAX_LTD', 'CPI_M_MAX_LTD']
maxValue = [INC[0][0], BJP[0][0], NCP[0][0], CPI[0][0], CPI_M[0][0]]


for i in range(0, len(maxList)):
    # print(partyList[i], maxList[i], type(maxValue[i]), maxValue[i])
    df2 = df2.withColumn(maxList[i], lit(0))
    df2 = df2.withColumn(maxList[i], when(df2["Party"].contains(partyList[i]), maxValue[i]).otherwise(0))

6. Calculate top donor per party

In [23]:
# Top donor
INC_amt = df2.groupBy("Party").max('Amount').filter(df2.Party == 'INC').select("max(Amount)").collect()
res = df2.filter(df2['Amount'] == INC_amt[0][0]).select('Name').collect()
print(INC_amt[0][0], res[0][0])

BJP_amt = df2.groupBy("Party").max('Amount').filter(df2.Party == 'BJP').select("max(Amount)").collect()
res = df2.filter(df2['Amount'] == BJP_amt[0][0]).select('Name').collect()
print(BJP_amt[0][0], res[0][0])

NCP_amt = df2.groupBy("Party").max('Amount').filter(df2.Party == 'NCP').select("max(Amount)").collect()
res = df2.filter(df2['Amount'] == NCP_amt[0][0]).select('Name').collect()
print(NCP_amt[0][0], res[0][0])

CPI_amt = df2.groupBy("Party").max('Amount').filter(df2.Party == 'CPI').select("max(Amount)").collect()
res = df2.filter(df2['Amount'] == CPI_amt[0][0]).select('Name').collect()
print(CPI_amt[0][0], res[0][0])

CPI_M_amt = df2.groupBy("Party").max('Amount').filter(df2.Party == 'CPI(M)').select("max(Amount)").collect()
res = df2.filter(df2['Amount'] == CPI_M_amt[0][0]).select('Name').collect()
print(CPI_M_amt[0][0], res[0][0])



500000000 General Electoral Trust
500000000 General Electoral Trust
50000000 A.V.Patil Foundation
3000000 Aziz Pasha
10000000 V K Ramachandran


In [24]:

partyList = ['INC', 'BJP', 'NCP', 'CPI', 'CPI(M)']
donorList = ['INC_TOP_DONOR', 'BJP_TOP_DONOR', 'NCP_TOP_DONOR', 'CPI_TOP_DONOR', 'CPI_M_TOP_DONOR']
donorValue = [INC_amt[0][0], BJP_amt[0][0], NCP_amt[0][0], CPI_amt[0][0], CPI_M_amt[0][0]]


for i in range(0, len(donorList)):
    df2 = df2.withColumn(donorList[i], lit(0))
    df2 = df2.withColumn(donorList[i], when(df2["Party"].contains(partyList[i]), donorValue[i]).otherwise(0))

7. Calculate sum of donations per financial year per party and the column should be named as 2011-12_BJP_SUM 

In [25]:
df2.groupBy('fin_year').sum('Amount').show(100)

+--------+-----------+
|fin_year|sum(Amount)|
+--------+-----------+
| 2004-05|  661603287|
| 2006-07|  153178304|
| 2010-11|  253982700|
| 2008-09|  647906691|
| 2007-08|  350074020|
| 2012-13|  991430132|
| 2009-10| 1706671272|
| 2014-15| 6223898440|
| 2005-06|   99907293|
| 2003-04|  146162222|
| 2011-12|  480926109|
| 2013-14| 2477930120|
+--------+-----------+



In [26]:
# Per financial year
from pyspark.sql.functions import desc

df2.groupBy('Party', 'fin_year').sum('Amount').sort(col("Party").asc()).show(100)

result = df2.groupBy('Party', 'fin_year').sum('Amount').sort(col("Party").asc()).collect()


+------+--------+-----------+
| Party|fin_year|sum(Amount)|
+------+--------+-----------+
|   BJP| 2010-11|  146253279|
|   BJP| 2014-15| 4373506898|
|   BJP| 2003-04|  116881973|
|   BJP| 2011-12|  334194113|
|   BJP| 2007-08|  249623653|
|   BJP| 2012-13|  831924462|
|   BJP| 2008-09|  306057231|
|   BJP| 2004-05|  339521289|
|   BJP| 2006-07|   29550672|
|   BJP| 2009-10|  823220133|
|   BJP| 2005-06|   36156111|
|   BJP| 2013-14| 1708636182|
|   CPI| 2011-12|    5982675|
|   CPI| 2005-06|    3988690|
|   CPI| 2003-04|     779148|
|   CPI| 2008-09|    2585000|
|   CPI| 2009-10|    8667852|
|   CPI| 2004-05|     630000|
|   CPI| 2014-15|   13346675|
|   CPI| 2006-07|    1229400|
|   CPI| 2010-11|   10811465|
|   CPI| 2013-14|   12281544|
|   CPI| 2012-13|    3695449|
|   CPI| 2007-08|    4125800|
|CPI(M)| 2007-08|    7226116|
|CPI(M)| 2011-12|   23838657|
|CPI(M)| 2008-09|    4155000|
|CPI(M)| 2014-15|   34197498|
|CPI(M)| 2010-11|   15357072|
|CPI(M)| 2003-04|     200000|
|CPI(M)| 2

In [27]:
financial_yr_data = df2.groupBy("Party","fin_year").sum("Amount").sort("fin_year")
financial_yr_data.show()

+------+--------+-----------+
| Party|fin_year|sum(Amount)|
+------+--------+-----------+
|   INC| 2003-04|   28301101|
|CPI(M)| 2003-04|     200000|
|   BJP| 2003-04|  116881973|
|   CPI| 2003-04|     779148|
|   INC| 2004-05|  320555643|
|CPI(M)| 2004-05|     896355|
|   CPI| 2004-05|     630000|
|   BJP| 2004-05|  339521289|
|   CPI| 2005-06|    3988690|
|   BJP| 2005-06|   36156111|
|CPI(M)| 2005-06|     550000|
|   INC| 2005-06|   59212492|
|CPI(M)| 2006-07|    1124719|
|   BJP| 2006-07|   29550672|
|   INC| 2006-07|  121273513|
|   CPI| 2006-07|    1229400|
|   CPI| 2007-08|    4125800|
|CPI(M)| 2007-08|    7226116|
|   BJP| 2007-08|  249623653|
|   NCP| 2007-08|   10225000|
+------+--------+-----------+
only showing top 20 rows



In [28]:
for row in financial_yr_data.collect():
    name = row[1] + "_" + row[0] + "_" + "SUM"
    df2 = df2.withColumn(name,lit('-'))
    df2 = df2.withColumn(name, when(col("Party") == row[0], row[2]).otherwise('-'))

# df2.show()

In [29]:
# check column list
df2.columns

['Address',
 'Amount',
 'mode_of_payment',
 'fin_year',
 'Name',
 'pan_given',
 'Party',
 'Type',
 'field10',
 'field11',
 'field12',
 'field13',
 'field14',
 'field9',
 'payment_segregated',
 'INC_SUM_LTD',
 'BJP_SUM_LTD',
 'NCP_SUM_LTD',
 'CPI_SUM_LTD',
 'CPI_M_SUM_LTD',
 'INC_COUNT_LTD',
 'BJP_COUNT_LTD',
 'NCP_COUNT_LTD',
 'CPI_COUNT_LTD',
 'CPI_M_COUNT_LTD',
 'INC_AVG_LTD',
 'BJP_AVG_LTD',
 'NCP_AVG_LTD',
 'CPI_AVG_LTD',
 'CPI_M_AVG_LTD',
 'INC_MAX_LTD',
 'BJP_MAX_LTD',
 'NCP_MAX_LTD',
 'CPI_MAX_LTD',
 'CPI_M_MAX_LTD',
 'INC_TOP_DONOR',
 'BJP_TOP_DONOR',
 'NCP_TOP_DONOR',
 'CPI_TOP_DONOR',
 'CPI_M_TOP_DONOR',
 '2003-04_BJP_SUM',
 '2003-04_CPI_SUM',
 '2003-04_INC_SUM',
 '2003-04_CPI(M)_SUM',
 '2004-05_INC_SUM',
 '2004-05_CPI(M)_SUM',
 '2004-05_CPI_SUM',
 '2004-05_BJP_SUM',
 '2005-06_CPI(M)_SUM',
 '2005-06_BJP_SUM',
 '2005-06_INC_SUM',
 '2005-06_CPI_SUM',
 '2006-07_INC_SUM',
 '2006-07_CPI(M)_SUM',
 '2006-07_BJP_SUM',
 '2006-07_CPI_SUM',
 '2007-08_CPI_SUM',
 '2007-08_BJP_SUM',
 '2007

8. Calculate number of donation till date per mode_of_payment

In [30]:
payment_modes_list = df2.groupBy("payment_segregated").count()

payment_modes_list.show()

+------------------+-----+
|payment_segregated|count|
+------------------+-----+
|              Bank| 7251|
|            Cheque| 2918|
|              Cash|  918|
|            Others| 2486|
+------------------+-----+



In [31]:
for row in payment_modes_list.collect():
    name = row[0] + "_COUNT_LTD"
    df2 = df2.withColumn(name,lit('-'))
    df2 = df2.withColumn(name, when(col("payment_segregated") == row[0], row[1]).otherwise('-'))

In [32]:
df2.select("payment_segregated","BANK_COUNT_LTD","CHEQUE_COUNT_LTD","CASH_COUNT_LTD","OTHERS_COUNT_LTD").show()

+------------------+--------------+----------------+--------------+----------------+
|payment_segregated|BANK_COUNT_LTD|CHEQUE_COUNT_LTD|CASH_COUNT_LTD|OTHERS_COUNT_LTD|
+------------------+--------------+----------------+--------------+----------------+
|              Cash|             -|               -|           918|               -|
|              Bank|          7251|               -|             -|               -|
|            Cheque|             -|            2918|             -|               -|
|              Bank|          7251|               -|             -|               -|
|              Bank|          7251|               -|             -|               -|
|            Others|             -|               -|             -|            2486|
|            Others|             -|               -|             -|            2486|
|              Bank|          7251|               -|             -|               -|
|            Others|             -|               -|             

9. Write the final data into parquet 

In [33]:
df2.write.parquet("/content/drive/MyDrive/Colab Notebooks/donation-data.parquet")

In [34]:
drive.flush_and_unmount()