In [49]:
pip install pyspark


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [50]:
from pyspark.sql import SparkSession,types
spark=SparkSession.builder.master("local").appName('Json File').getOrCreate()
df=spark.read.json('donation_np.json').limit(10)
df.printSchema()
df.show(5)

root
 |-- Address: string (nullable = true)
 |-- Amount: string (nullable = true)
 |-- Contribution Mode: string (nullable = true)
 |-- Financial Year: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- PAN Given: string (nullable = true)
 |-- Party: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- _corrupt_record: string (nullable = true)
 |-- field10: string (nullable = true)
 |-- field11: string (nullable = true)
 |-- field12: string (nullable = true)
 |-- field13: string (nullable = true)
 |-- field14: string (nullable = true)
 |-- field9: string (nullable = true)

+--------------------+--------+--------------------+--------------+-------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|             Address|  Amount|   Contribution Mode|Financial Year|               Name|PAN Given| Party|  Type|_corrupt_record|field10|field11|field12|field13|field14|field9|
+--------------------+--------+-----------

In [51]:
from functools import reduce
oldColumns = ["Contribution Mode","Financial Year","PAN Given"]
newColumns = ["mode_of_payment","fin_year","pan_given"]

df = reduce(lambda data, idx: data.withColumnRenamed(oldColumns[idx], newColumns[idx]), range(len(oldColumns)), df)
df.printSchema()
df.show(5)

root
 |-- Address: string (nullable = true)
 |-- Amount: string (nullable = true)
 |-- mode_of_payment: string (nullable = true)
 |-- fin_year: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- pan_given: string (nullable = true)
 |-- Party: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- _corrupt_record: string (nullable = true)
 |-- field10: string (nullable = true)
 |-- field11: string (nullable = true)
 |-- field12: string (nullable = true)
 |-- field13: string (nullable = true)
 |-- field14: string (nullable = true)
 |-- field9: string (nullable = true)

+--------------------+--------+--------------------+--------+-------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|             Address|  Amount|     mode_of_payment|fin_year|               Name|pan_given| Party|  Type|_corrupt_record|field10|field11|field12|field13|field14|field9|
+--------------------+--------+--------------------+--------+-

In [52]:
from pyspark.sql.functions import sha2,concat_ws
df=df.withColumn("Address", sha2(concat_ws("||", df.Address), 256))

In [53]:
from pyspark.sql.functions import *

import re

df = df.withColumn("mode_of_payment", when(lower(df.mode_of_payment).contains("cheque"),"Cheque") \

      .when(lower(df.mode_of_payment).rlike("(\d{5,6})"),"Cheque") \

      .when(lower(df.mode_of_payment).contains("ch no"),"Cheque") \

      .when(lower(df.mode_of_payment).contains("cheque no"),"Cheque") \

      .when(lower(df.mode_of_payment).contains("cash"),"Cash") \

      .when(lower(df.mode_of_payment).contains("bank"),"Bank") \

      .otherwise("Others"))
df.show()

+--------------------+--------+---------------+--------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|             Address|  Amount|mode_of_payment|fin_year|                Name|pan_given| Party|  Type|_corrupt_record|field10|field11|field12|field13|field14|field9|
+--------------------+--------+---------------+--------+--------------------+---------+------+------+---------------+-------+-------+-------+-------+-------+------+
|e3b0c44298fc1c149...|    null|         Others|    null|                null|     null|  null|  null|              [|   null|   null|   null|   null|   null|  null|
|5a3058deb6f337958...| 3000000|           Cash| 2010-11|          Aziz Pasha|        Y|   CPI|Others|           null|       |       |       |       |       |      |
|846539cb21bc9e6c6...|10000000|         Cheque| 2014-15|    V K Ramachandran|        Y|CPI(M)|Others|           null|       |       |       |       |       |      |
|869fa3a19

In [54]:
from pyspark.sql.functions import col
df=df.drop('_corrupt_record','field10','field11','field12','field13','field14','field9')

In [55]:
df = df.where(col('Amount').isNotNull())
df.show()

+--------------------+--------+---------------+--------+--------------------+---------+------+------+
|             Address|  Amount|mode_of_payment|fin_year|                Name|pan_given| Party|  Type|
+--------------------+--------+---------------+--------+--------------------+---------+------+------+
|5a3058deb6f337958...| 3000000|           Cash| 2010-11|          Aziz Pasha|        Y|   CPI|Others|
|846539cb21bc9e6c6...|10000000|         Cheque| 2014-15|    V K Ramachandran|        Y|CPI(M)|Others|
|869fa3a19f1c51ad1...|  108000|         Cheque| 2014-15|  Dr. Manmohan Singh|        N|   INC|Others|
|5f04f40130569ddab...|   54000|           Bank| 2011-12| Dr. Manda Jagnathan|        N|   INC|Others|
|524b1379d08e4c02f...|   54000|           Bank| 2011-12|    Prof. K.V.Thomas|        N|   INC|Others|
|6db7af0c5dca3b333...|  100000|         Cheque| 2011-12|     Sweta Chyouksey|        Y|   BJP|Others|
|c01158e07376c3778...|  100000|         Cheque| 2011-12|   Uma Shankar Gupta|     

In [56]:
from pyspark.sql.types import IntegerType
df = df.withColumn("Amount",df["Amount"].cast(IntegerType()))

In [57]:
type(df)

pyspark.sql.dataframe.DataFrame

In [68]:
for i in df.groupBy("Party").sum("Amount").collect():
  party,amt=i['Party'],i['sum(Amount)']
  df=df.withColumn(party+"_SUM_LTD", when(df.Party.contains(party),amt).otherwise(0))
df.show()

+--------------------+--------+---------------+--------+--------------------+---------+------+------+-----------+--------------+-----------+-----------+
|             Address|  Amount|mode_of_payment|fin_year|                Name|pan_given| Party|  Type|CPI_SUM_LTD|CPI(M)_SUM_LTD|INC_SUM_LTD|BJP_SUM_LTD|
+--------------------+--------+---------------+--------+--------------------+---------+------+------+-----------+--------------+-----------+-----------+
|5a3058deb6f337958...| 3000000|           Cash| 2010-11|          Aziz Pasha|        Y|   CPI|Others|    3000000|             0|          0|          0|
|846539cb21bc9e6c6...|10000000|         Cheque| 2014-15|    V K Ramachandran|        Y|CPI(M)|Others|    3000000|      10000000|          0|          0|
|869fa3a19f1c51ad1...|  108000|         Cheque| 2014-15|  Dr. Manmohan Singh|        N|   INC|Others|          0|             0|     216000|          0|
|5f04f40130569ddab...|   54000|           Bank| 2011-12| Dr. Manda Jagnathan|     

In [72]:
for i in df.groupby('Party').agg(count('Party')).collect():
  party,cnt=i['Party'],i['count(Party)']
  df=df.withColumn(party+"_COUNT_LTD", when(df.Party.contains(party),cnt).otherwise(0))
df.show()

+--------------------+--------+---------------+--------+--------------------+---------+------+------+-----------+--------------+-----------+-----------+-------------+----------------+-------------+-------------+
|             Address|  Amount|mode_of_payment|fin_year|                Name|pan_given| Party|  Type|CPI_SUM_LTD|CPI(M)_SUM_LTD|INC_SUM_LTD|BJP_SUM_LTD|CPI_COUNT_LTD|CPI(M)_COUNT_LTD|INC_COUNT_LTD|BJP_COUNT_LTD|
+--------------------+--------+---------------+--------+--------------------+---------+------+------+-----------+--------------+-----------+-----------+-------------+----------------+-------------+-------------+
|5a3058deb6f337958...| 3000000|           Cash| 2010-11|          Aziz Pasha|        Y|   CPI|Others|    3000000|             0|          0|          0|            1|               0|            0|            0|
|846539cb21bc9e6c6...|10000000|         Cheque| 2014-15|    V K Ramachandran|        Y|CPI(M)|Others|    3000000|      10000000|          0|          0|

In [87]:
for i in df.groupby('Party').agg(avg('Amount')).collect():
  party,avgg=i['Party'],i['avg(Amount)']
  df=df.withColumn(party+"_AVG_LTD", when(df.Party.contains(party),avgg).otherwise(0))
df.show()


+--------------------+--------+---------------+--------+--------------------+---------+------+------+-----------+--------------+-----------+-----------+-------------+----------------+-------------+-------------+-----------+--------------+-----------+-----------+
|             Address|  Amount|mode_of_payment|fin_year|                Name|pan_given| Party|  Type|CPI_SUM_LTD|CPI(M)_SUM_LTD|INC_SUM_LTD|BJP_SUM_LTD|CPI_COUNT_LTD|CPI(M)_COUNT_LTD|INC_COUNT_LTD|BJP_COUNT_LTD|CPI_AVG_LTD|CPI(M)_AVG_LTD|INC_AVG_LTD|BJP_AVG_LTD|
+--------------------+--------+---------------+--------+--------------------+---------+------+------+-----------+--------------+-----------+-----------+-------------+----------------+-------------+-------------+-----------+--------------+-----------+-----------+
|5a3058deb6f337958...| 3000000|           Cash| 2010-11|          Aziz Pasha|        Y|   CPI|Others|    3000000|             0|          0|          0|            1|               0|            0|            0|

In [88]:
for i in df.groupby('Party').agg(max('Amount')).collect():
  party,max=i['Party'],i['max(Amount)']
  df=df.withColumn(party+"_MAX_LTD", when(df.Party.contains(party),max).otherwise(0))
df.show()

+--------------------+--------+---------------+--------+--------------------+---------+------+------+-----------+--------------+-----------+-----------+-------------+----------------+-------------+-------------+-----------+--------------+-----------+-----------+-----------+--------------+-----------+-----------+
|             Address|  Amount|mode_of_payment|fin_year|                Name|pan_given| Party|  Type|CPI_SUM_LTD|CPI(M)_SUM_LTD|INC_SUM_LTD|BJP_SUM_LTD|CPI_COUNT_LTD|CPI(M)_COUNT_LTD|INC_COUNT_LTD|BJP_COUNT_LTD|CPI_AVG_LTD|CPI(M)_AVG_LTD|INC_AVG_LTD|BJP_AVG_LTD|CPI_MAX_LTD|CPI(M)_MAX_LTD|INC_MAX_LTD|BJP_MAX_LTD|
+--------------------+--------+---------------+--------+--------------------+---------+------+------+-----------+--------------+-----------+-----------+-------------+----------------+-------------+-------------+-----------+--------------+-----------+-----------+-----------+--------------+-----------+-----------+
|5a3058deb6f337958...| 3000000|           Cash| 2010-11|  

In [94]:
for i in df.groupby(['Party','fin_year']).agg(sum('Amount')).collect():
  party,sum,yr=i['Party'],i['sum(Amount)'],i['fin_year']
  df=df.withColumn(yr+'_'+party+"_SUM", when(df.Party.contains(party),sum).otherwise(0))
df.show()


+--------------------+--------+---------------+--------+--------------------+---------+------+------+-----------+--------------+-----------+-----------+-------------+----------------+-------------+-------------+-----------+--------------+-----------+-----------+-----------+--------------+-----------+-----------+---------------+------------------+---------------+---------------+---------------+---------------+
|             Address|  Amount|mode_of_payment|fin_year|                Name|pan_given| Party|  Type|CPI_SUM_LTD|CPI(M)_SUM_LTD|INC_SUM_LTD|BJP_SUM_LTD|CPI_COUNT_LTD|CPI(M)_COUNT_LTD|INC_COUNT_LTD|BJP_COUNT_LTD|CPI_AVG_LTD|CPI(M)_AVG_LTD|INC_AVG_LTD|BJP_AVG_LTD|CPI_MAX_LTD|CPI(M)_MAX_LTD|INC_MAX_LTD|BJP_MAX_LTD|2010-11_CPI_SUM|2014-15_CPI(M)_SUM|2014-15_INC_SUM|2011-12_INC_SUM|2011-12_BJP_SUM|2013-14_BJP_SUM|
+--------------------+--------+---------------+--------+--------------------+---------+------+------+-----------+--------------+-----------+-----------+-------------+--------

In [126]:
for i in df.groupby(['mode_of_payment']).agg(count('mode_of_payment')).collect():
  party,sum=i['mode_of_payment'],i['count(mode_of_payment)']
  df=df.withColumn(party+"_count_LTD", when(df.mode_of_payment.contains(party),sum).otherwise(0))
df.show()

+--------------------+--------+---------------+--------+------------------+---------+------+------+-----------+--------------+-----------+-----------+-------------+----------------+-------------+-------------+-----------+--------------+-----------+-----------+-----------+--------------+-----------+-----------+---------------+------------------+---------------+---------------+---------------+---------------+--------------+----------------+--------------+----------+
|             Address|  Amount|mode_of_payment|fin_year|              Name|pan_given| Party|  Type|CPI_SUM_LTD|CPI(M)_SUM_LTD|INC_SUM_LTD|BJP_SUM_LTD|CPI_COUNT_LTD|CPI(M)_COUNT_LTD|INC_COUNT_LTD|BJP_COUNT_LTD|CPI_AVG_LTD|CPI(M)_AVG_LTD|INC_AVG_LTD|BJP_AVG_LTD|CPI_MAX_LTD|CPI(M)_MAX_LTD|INC_MAX_LTD|BJP_MAX_LTD|2010-11_CPI_SUM|2014-15_CPI(M)_SUM|2014-15_INC_SUM|2011-12_INC_SUM|2011-12_BJP_SUM|2013-14_BJP_SUM|Cash_count_LTD|Cheque_count_LTD|Bank_count_LTD|row_number|
+--------------------+--------+---------------+--------+------

In [127]:
from pyspark.sql.window import Window

from pyspark.sql.functions import row_number

windowSpec  = Window.partitionBy("Party").orderBy(col("Amount").desc())

df=df.withColumn("row_number",row_number().over(windowSpec)).filter(col("row_number").like("1"))

top_donor_count=0

temp_top_df=df

for party_name in df.select('Party').distinct().collect():
    party_name=party_name['Party']
    top_donor_name=df.collect()[top_donor_count][4]

    temp_top_df=temp_top_df.withColumn(party_name+"_TOP_DONOR",when(temp_top_df.Party.contains(party_name),top_donor_name).otherwise("-"))

    top_donor_count +=1



In [128]:
df.show()


+--------------------+--------+---------------+--------+------------------+---------+------+------+-----------+--------------+-----------+-----------+-------------+----------------+-------------+-------------+-----------+--------------+-----------+-----------+-----------+--------------+-----------+-----------+---------------+------------------+---------------+---------------+---------------+---------------+--------------+----------------+--------------+----------+
|             Address|  Amount|mode_of_payment|fin_year|              Name|pan_given| Party|  Type|CPI_SUM_LTD|CPI(M)_SUM_LTD|INC_SUM_LTD|BJP_SUM_LTD|CPI_COUNT_LTD|CPI(M)_COUNT_LTD|INC_COUNT_LTD|BJP_COUNT_LTD|CPI_AVG_LTD|CPI(M)_AVG_LTD|INC_AVG_LTD|BJP_AVG_LTD|CPI_MAX_LTD|CPI(M)_MAX_LTD|INC_MAX_LTD|BJP_MAX_LTD|2010-11_CPI_SUM|2014-15_CPI(M)_SUM|2014-15_INC_SUM|2011-12_INC_SUM|2011-12_BJP_SUM|2013-14_BJP_SUM|Cash_count_LTD|Cheque_count_LTD|Bank_count_LTD|row_number|
+--------------------+--------+---------------+--------+------