In [22]:
import findspark
findspark.init()

import pyspark
import json
from pyspark.sql import SparkSession
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
import datetime
from json import JSONEncoder

spark = SparkSession.builder.getOrCreate()
spark = SparkSession \
    .builder \
    .appName("Convert to Json file") \
    .getOrCreate()


# Cleanse the csv file
class Cleanse_file:
    def __init__(self,df2):
        df_clean = df_csv.withColumnRenamed("Account_ID ", "AccountID")\
                               .withColumnRenamed("CODE ","Code")\
                               .withColumnRenamed("Active Indicator ","ActiveIndicator")\
                               .withColumnRenamed("Implemented Date ","ImplementedDate")\
                               .withColumnRenamed("Account Type ","AccountType")\
                               .withColumnRenamed("Service ","Service")\
                               .withColumnRenamed("BU","BU")\
                               .withColumnRenamed("Request Date ","RequestDate")\
                               .withColumnRenamed("Account status ","AccountStatus")\
                               .withColumnRenamed("Status Code ","StatusCode")\
                               .withColumnRenamed("$ Amount ","Amount")\
                               .withColumnRenamed("Version ","Version")\
                               .withColumnRenamed("Agent ID ","AgentID")\
                               .withColumnRenamed("FIBRE ", "Fibre")\
                               .withColumnRenamed("last Updated Date ","LastUpdatedDate")\
                               .withColumnRenamed("Property TYPE ","PropertyType")\
                               .withColumnRenamed("Post Code ","PostCode")
                        
        # remove the timestamp column from the date field
        df_clean = df_clean.withColumn("ImplementedDate",expr("substring(ImplementedDate,1,length(ImplementedDate)-5)")) \
                       .withColumn("RequestDate",expr("substring(RequestDate,1,length(RequestDate)-5)"))
   
        # Convert the string formatted date column to Date format and Amount to datatype float
        # Generate hashkey
        df_clean = df_clean.withColumn("ImplementedDate",when(length(col("ImplementedDate")) == 9,concat(lit(0),col("ImplementedDate")) ).otherwise(col("ImplementedDate")))\
                       .withColumn("RequestDate",when(length(col("RequestDate")) == 9,concat(lit(0),col("RequestDate")) ).otherwise(col("RequestDate"))) \
                       .withColumn("Amount",df_clean.Amount.cast('float'))\
                       .withColumn("HashKey",md5(concat(col("AccountID"),col("Fibre"))))  # Generate Hashkey based on accountid and Fibre column
    
 
        df_clean1 = df_clean.select("HashKey","AccountID","Code","ActiveIndicator",\
                                    to_date(col("ImplementedDate"),"dd/MM/yyyy").alias("ImplementedDate"),\
                                    "AccountType","Service","BU",\
                                    to_date(col("RequestDate"),"dd/MM/yyyy").alias("RequestDate"),\
                                    "AccountStatus","StatusCode","Amount","Version","AgentID","Fibre","LastUpdatedDate","PropertyType",\
                                    "PostCode")
  

   
        print("Unfiltered Account Ids:")
        df_clean1.select("HashKey","AccountId").show(truncate=False)
        print("Count before removing the Account IDs:",df_clean1.count())
        
         #filter decimal numbers from account column 
        #Account number will integer values is taken
        df1 = df_clean1.where(col("AccountID").rlike("^[0-9]*$") )
        print ("***********************************************")
        print ("Output validation")
        print ("Count after removing the non-integer Account IDs:", df1.count())
        print ("***********************************************")
       
   
    # filter data with + sign from Fibre column
    
        df1.createOrReplaceTempView("Data")
        print("Unfiltered Fibre records:")
        spark.sql("Select HashKey,AccountID,Fibre FROM Data").show(truncate= False)
        cleanfile = spark.sql("Select * from Data where Fibre  not like '%+%'")
        print ("***********************************************")
        print ("Count after before removing special characters from Fibre column:", cleanfile.count())
        print ("***********************************************")
        
        #Create Json file
        jsonfile = Create_json_file(cleanfile)
        
        #Class which defines rest of  operations with data
        fastresponse = Fast_response(cleanfile)
        

    # class to perform Top agents and fast response    
class Fast_response:
    def __init__(self,cleanfile):
         # Fastest response based on postcode
        print("Fastest Response based on PostCode")
        print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
        df3 = cleanfile.select("*",datediff(col("ImplementedDate"),col("RequestDate")).alias("DaysToRespond"))
        df3.groupby("PostCode").agg(min("DaysToRespond").alias("MinDaysToRespond")) \
           .where(col("MinDaysToRespond") <= 1).show()
    
           # Top agent based on postcode and Amount
        print("Top Agent based on postcode and Amount")
        print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
        df4 = cleanfile.groupby("AgentID","PostCode").agg(sum("Amount").alias("TotalAmount"))
        df4.createOrReplaceTempView("TopAgents")
        TopAgentsByPostCode = spark.sql("SELECT a.AgentID,a.PostCode,cast(b.maxAmount as float) as AmountCollected from TopAgents a INNER JOIN \
                        (SELECT PostCode,Max(TotalAmount) as maxAmount from TopAgents\
                            group by PostCode) b \
                            on a.PostCode == b.PostCode and b.maxAmount=a.TotalAmount order by a.AgentID" ).show()

        # Class to create Json file
class Create_json_file:
    def __init__(self,cleanfile):        
        json_list =[]
        data_collect = cleanfile.collect()
        
        i=0 #initialise the counters for record writing
        j=1 #intiialise for the file numbering
        transaction = {'metadata':{'start_at':datetime.datetime.now(),'end_at':datetime.datetime.now()+datetime.timedelta(1)} }
        transaction['data'] =[]
        for row in data_collect:
            if i <= 1000:
               transaction['data'].append ({
                        'HashKey' : row["HashKey"],
                        'AccountId' : row["AccountID"],
                        'Code' : row["Code"],
                        'ActiveIndicator' : row["ActiveIndicator"],
                        'ImplementedDate' : row["ImplementedDate"],
                        'AccountType' : row["AccountType"],
                        'Service' : row["Service"],
                        'BU': row["BU"],
                        'RequestDate' : row["RequestDate"],
                        'AccountStatus' : row["AccountStatus"],
                        'StatusCode' : row["StatusCode"],
                        'Amount' : row["Amount"],
                        'Version' : row["Version"],
                        'AgentID' : row["AgentID"],
                        'Fibre' : row["Fibre"],
                        'LastUpdatedDate' : row["LastUpdatedDate"],
                        'PropertyType' : row["PropertyType"],
                        'PostCode' : row["PostCode"],
               })
               
               i= i + 1
            else:
               i = 0
                # Write into the file
               with open(str(j) + ".json", "w") as write_file:
                  json.dump(transaction, write_file, indent=2,cls=DateTimeEncoder)
             #  print(json_list)
                # empty the list
               transaction['data'] =[]
               j = j + 1 
          #write the remaining records
        with open(str(j) + ".json", "w") as write_file:
                  json.dump(transaction, write_file, indent=2,cls=DateTimeEncoder)
    
# Class JSONEncoder for converting date format to returns isoformat
class DateTimeEncoder(JSONEncoder):
        # Override the default method
        def default(self, obj):
            if isinstance(obj, (datetime.date, datetime.datetime)):
                return obj.isoformat()
        
    
if __name__ == "__main__":
    
    # read a csv file
    df_csv = spark.read.csv('Transaction.csv',header = True)
        
        #return cleanfile
    activity = Cleanse_file(df_csv)
    
    

  


Unfiltered Account Ids:
+--------------------------------+-----------+
|HashKey                         |AccountId  |
+--------------------------------+-----------+
|0f6f2b86bf61c32a4e6a24a21c677655|15368      |
|a233b2e3f632c09b0df252165c112f61|23232      |
|a585cbf86a677cb100929ac2cc2c6ba9|232323     |
|c448e1abaeefbe05a8847d1fbdc7fe4e|307263     |
|0623963d62f3f2d15bc63621e5297d2f|415740     |
|66ca8eb73ba003fc9fb0f61023fcfc39|524218     |
|062b118ef57802ee9674862012d336b8|632695     |
|2fa9ae5b103842545d942aabbdb7aa8d|741173     |
|ed37cdb0bb927f08e08afb9e20f9263c|849650     |
|71e745d0418e495a8e437e2659989bef|958128     |
|df634d91c34119fb67a4fc254a229367|1066605    |
|da05891a363fc2be957c86df9570b199|1175082.667|
|ad80095218c7357f95602cb06bc0547d|1283560.167|
|55eee16de2f48d3d24d74a71e0df6807|1392037.667|
|c782cbdff011c69de5ef501c22c30f56|1500515.167|
|e59dcebc464fd04d0e527acec89436c8|1608992.667|
|b3a987929956ce94e408fe65c7c976b6|1717470.167|
|1fa832e58e2d3307fca12a894152700e|18