<a href="https://colab.research.google.com/github/subashmb1998/Project_HKS/blob/main/HKS_Final.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Install Java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Install Spark
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz
!rm spark-3.1.2-bin-hadoop2.7.tgz

# Install findspark
!pip install -q findspark

# Set environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

# Initialize findspark
import findspark
findspark.init()

# Verify the installation
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("colab").getOrCreate()
spark

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
customers = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("/content/drive/MyDrive/Projects/HKS/customers_final/part-00000-3f77b9bb-3b44-4b0c-9668-b6395254a47d-c000.csv")

customers.show()

+---------------+------------------+-----------+--------------------+-------------+---------+-----------------+---------------+-----------+--------------------+--------------------+-------------------+-----------+------------------+--------------------+------------+------------+--------------------+------------------------------+------------------+--------------------+--------+
|CUSTOMER_NUMBER|     CUSTOMER_NAME|    QR_CODE|        ORGANIZATION|SURVEYOR_NAME|     WARD|     TRADING_TYPE|PROPERTY_NUMBER|DOOR_STATUS|       PROPERTY_NAME|       PROPERTY_TYPE|PROPERTY_OWNER_NAME|TOTAL_HOUSE|PROPERTY_OWNERSHIP|             ADDRESS|    LATITUDE|   LONGITUDE|PROPERTY_TENANT_NAME|PROPERTY_TENANT_PHONE_NUMBER28|RATION_CARD_NUMBER|    RATION_CARD_TYPE|FEE_AMNT|
+---------------+------------------+-----------+--------------------+-------------+---------+-----------------+---------------+-----------+--------------------+--------------------+-------------------+-----------+------------------+------

In [None]:
from pyspark.sql.types import *

In [None]:
schema = StructType([
    StructField("CUSTOMER_NUM", StringType(), True),
    StructField("BUILDING_NUMBER", StringType(), True),
    StructField("PENDING_MONTHS", IntegerType(), True)
])

In [None]:
payment = spark.read \
    .format("csv") \
    .option("header", "true") \
    .schema(schema) \
    .load("/content/drive/MyDrive/Projects/HKS/Pending_Payment_final/part-00000-a57da900-ba8f-41c6-adc2-fc63def99072-c000.csv")

payment.show()

+--------------+---------------+--------------+
|  CUSTOMER_NUM|BUILDING_NUMBER|PENDING_MONTHS|
+--------------+---------------+--------------+
|VIM-001-000218|          1/135|             2|
|VIM-003-000447|          3/155|             2|
|VIM-004-000044|         4/1283|             7|
|VIM-004-000429|            4/0|             2|
|VIM-004-000927|            4/0|             1|
|VIM-005-000122|          5/168|             3|
|VIM-005-000431|            5/0|             1|
|VIM-005-000660|          5/814|             2|
|VIM-007-000010|          7/762|             3|
|VIM-007-000120|          7/000|             1|
|VIM-007-000484|          7/670|             1|
|VIM-007-000546|           7/52|             1|
|VIM-007-000696|              7|             1|
|VIM-008-000413|          8/178|             1|
|VIM-013-000052|          13/77|             2|
|VIM-013-000270|         13/420|             5|
|VIM-015-000277|         15/367|             1|
|VIM-015-000449|         15/795|        

In [None]:
payment_pending_customers = customers.join(payment, customers.CUSTOMER_NUMBER == payment.CUSTOMER_NUM)

payment_pending_customers.show()

+---------------+-------------+-----------+--------------------+-------------+---------+-----------------+---------------+-----------+--------------------+--------------------+-------------------+-----------+------------------+--------------------+------------+-----------+--------------------+------------------------------+------------------+--------------------+--------+--------------+---------------+--------------+
|CUSTOMER_NUMBER|CUSTOMER_NAME|    QR_CODE|        ORGANIZATION|SURVEYOR_NAME|     WARD|     TRADING_TYPE|PROPERTY_NUMBER|DOOR_STATUS|       PROPERTY_NAME|       PROPERTY_TYPE|PROPERTY_OWNER_NAME|TOTAL_HOUSE|PROPERTY_OWNERSHIP|             ADDRESS|    LATITUDE|  LONGITUDE|PROPERTY_TENANT_NAME|PROPERTY_TENANT_PHONE_NUMBER28|RATION_CARD_NUMBER|    RATION_CARD_TYPE|FEE_AMNT|  CUSTOMER_NUM|BUILDING_NUMBER|PENDING_MONTHS|
+---------------+-------------+-----------+--------------------+-------------+---------+-----------------+---------------+-----------+--------------------+---

In [None]:
customers.count()

10431

In [None]:
payment.count()

5414

In [None]:
payment_pending_customers.count()

5377

In [None]:
payment_pending_customers.columns

['CUSTOMER_NUMBER',
 'CUSTOMER_NAME',
 'QR_CODE',
 'ORGANIZATION',
 'SURVEYOR_NAME',
 'WARD',
 'TRADING_TYPE',
 'PROPERTY_NUMBER',
 'DOOR_STATUS',
 'PROPERTY_NAME',
 'PROPERTY_TYPE',
 'PROPERTY_OWNER_NAME',
 'TOTAL_HOUSE',
 'PROPERTY_OWNERSHIP',
 'ADDRESS',
 'LATITUDE',
 'LONGITUDE',
 'PROPERTY_TENANT_NAME',
 'PROPERTY_TENANT_PHONE_NUMBER28',
 'RATION_CARD_NUMBER',
 'RATION_CARD_TYPE',
 'FEE_AMNT',
 'CUSTOMER_NUM',
 'BUILDING_NUMBER',
 'PENDING_MONTHS']

In [None]:
cols = ['CUSTOMER_NUMBER',
 'CUSTOMER_NAME',
 'WARD',
 'PROPERTY_NUMBER',
 'DOOR_STATUS',
 'PROPERTY_NAME',
 'PROPERTY_TYPE',
 'ADDRESS',
 'PROPERTY_TENANT_PHONE_NUMBER28',
 'FEE_AMNT',
 'PENDING_MONTHS']

In [None]:
payment_pending = payment_pending_customers.select(cols)

In [None]:
payment_pending.show()

+---------------+-------------+---------+---------------+-----------+--------------------+--------------------+--------------------+------------------------------+--------+--------------+
|CUSTOMER_NUMBER|CUSTOMER_NAME|     WARD|PROPERTY_NUMBER|DOOR_STATUS|       PROPERTY_NAME|       PROPERTY_TYPE|             ADDRESS|PROPERTY_TENANT_PHONE_NUMBER28|FEE_AMNT|PENDING_MONTHS|
+---------------+-------------+---------+---------------+-----------+--------------------+--------------------+--------------------+------------------------------+--------+--------------+
| VIM-001-000550|Vijayalakshmi|1-PANAYOR|          01/00|       Open|          Thattukada|                Shop|Vijayalakshmi ,Pe...|                    953XXXX931|     100|             1|
| VIM-001-000548|        Balan|1-PANAYOR|          1/426|       Open|          M. T store|                Shop|M. T store,Vaniya...|                    900XXXX000|     100|             2|
| VIM-001-000538|        Salvi|1-PANAYOR|          1/771|   

In [None]:
cutomers_pending =  payment_pending.withColumn("Total_amt_pend", payment_pending.FEE_AMNT * payment_pending.PENDING_MONTHS)

In [None]:
cutomers_pending.show()

+---------------+-------------+---------+---------------+-----------+--------------------+--------------------+--------------------+------------------------------+--------+--------------+--------------+
|CUSTOMER_NUMBER|CUSTOMER_NAME|     WARD|PROPERTY_NUMBER|DOOR_STATUS|       PROPERTY_NAME|       PROPERTY_TYPE|             ADDRESS|PROPERTY_TENANT_PHONE_NUMBER28|FEE_AMNT|PENDING_MONTHS|Total_amt_pend|
+---------------+-------------+---------+---------------+-----------+--------------------+--------------------+--------------------+------------------------------+--------+--------------+--------------+
| VIM-001-000550|Vijayalakshmi|1-PANAYOR|          01/00|       Open|          Thattukada|                Shop|Vijayalakshmi ,Pe...|                    953XXXX931|     100|             1|           100|
| VIM-001-000548|        Balan|1-PANAYOR|          1/426|       Open|          M. T store|                Shop|M. T store,Vaniya...|                    900XXXX000|     100|             2| 

In [None]:
Door_open = cutomers_pending.select("*").where("DOOR_STATUS = 'Open'").orderBy("Total_amt_pend", ascending=False)

In [None]:
Door_open.repartition(1) \
    .write \
    .format("csv") \
    .option("header", "true") \
    .mode("overwrite") \
    .partitionBy("WARD") \
    .option("path", "/content/drive/MyDrive/Projects/HKS/Pending_payment_ward") \
    .save()