In [0]:
#### INSTALLING REQUIRED LIBRARIES ####
%pip install fastavro
%pip install multipledispatch
#######################################

Python interpreter will be restarted.
Python interpreter will be restarted.
Python interpreter will be restarted.
Python interpreter will be restarted.


In [0]:
####################CONFIGURATION###################################
DATA_PATH = '/dbfs/FileStore/shared_uploads/prem.potta@shipt.com/orders_data.json'
INPUT_SCHEMA_PATH = '/dbfs/FileStore/shared_uploads/prem.potta@shipt.com/orders_data_schema.avsc'
OUTPUT_SCHEMA_PATH = '/dbfs/FileStore/shared_uploads/prem.potta@shipt.com/output_schema.avsc'
NO_OF_WORKERS = 10
###################################################################

In [0]:
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import col, spark_partition_id, asc, desc
import pyspark.sql.functions as F 
from pyspark.sql.types import StringType, StructType, StructField, DoubleType
from pyspark.conf import SparkConf
from pyspark.context import SparkContext

In [0]:
# Configuring spark cluster
conf = SparkConf()
conf.set("spark.kryo.registrationRequired","false")
conf.setMaster("local").setAppName("testapp")

# Gets an existing SparkSession or, if there is no existing one, creates a new one based on the options set in the builder
sc = SparkContext.getOrCreate(conf=conf)
spark=SparkSession.builder.appName("test").config(
    "spark.driver.extraJavaOptions",
    "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED",
).getOrCreate()

# Creating Spark SQL Context
sqlContext = SQLContext(sc)



In [0]:
import json
from fastavro.validation import validate
from fastavro.schema import parse_schema,load_schema
import pandas as pd
from pyspark.sql.functions import udf
from pyspark.sql.functions import col, spark_partition_id, asc, desc
import pyspark.sql.functions as F 
from pyspark.sql.types import StringType, StructType, StructField, DoubleType

In [0]:
# Load and parse Avro Schema from its file path
parsed_schema = parse_schema(load_schema(INPUT_SCHEMA_PATH))

# Open the data file and validate it with the corresponding Avro Schema
with open(DATA_PATH) as json_file:
    data = json.load(json_file)
    if validate(data, parsed_schema):
        print("Successfully validated the data with its schema" )
    else:
        raise Exception("Failed to validate the data with its schema")

Successfully validated the data with its schema


In [0]:
import abc
from multipledispatch import dispatch
from itertools import chain
import random

# Creating an abstract base class
class AbstractJob(metaclass=abc.ABCMeta):
    
    # Initialize the job with its input data, input schema and output schema
    def __init__(self, input_json, input_schema, output_schema):
        self.input_json=input_json
        self.input_schema=input_schema
        self.output_schema=output_schema
        self.validateInputData()
    
    # Return the input data for a job
    @property
    def inputData(self):
        return self.input_json
    
    # Return the input data schema for a job
    @property
    def inputDataSchema(self):
        return self.input_schema

    # Validate input data against it's avro schema
    def validateInputData(self):
        if not validate(json.loads(self.input_json), json.loads(self.input_schema)):
            raise Exception("Input data validation failed")
        else:
            print('Input Data Validation Successful')
    
    # Return the output schema for a job
    @property
    def outputDataSchema(self):
        return self.output_schema

    # Run this function to validate the output data obtained after sampling
    def validateOutputData(self,output_data):
        if not validate(output_data, json.loads(self.output_schema)):
            raise Exception("Output data validation failed")
        else:
            print('Output Data Validation Successful')
            
    # Abstract method that can be overrided for different kinds of parallel jobs
    @staticmethod
    @dispatch(object)
    @abc.abstractmethod
    def process(job):
        # translate json to avro objects in java/c++/python
        pass

    # Abstract method that takes input data, input schema and output schema for each worker
    @staticmethod
    @dispatch(str,str,str)
    @abc.abstractmethod
    def process(input_json, input_schema, output_schema):
        # translate json to avro objects in java/c++/python
        pass
    
    # Abstract method to define the parallel implementation of each worker
    @staticmethod
    @abc.abstractmethod
    def parallelProcess(input_json, input_schema, output_schema):
        pass

    # Implementation of distributed filtering based random sampling algorithm
    @staticmethod
    def runParallel(jobs):
        
        # Get the process function for each job
        process = jobs[0].parallelProcess
        
        # Read the input data from the json as a dictionary
        inputData = jobs[0].inputData
        inputDict = json.loads(inputData)
        
        # Get the list of records that need to be sampled from the json
        inputList = inputDict['data']
        
        # Read the sampling parameters defined in the input json
        samplingType = inputDict['samplingType']
        samplingRate = inputDict['samplingRate']
        print("Sampling Rate : " + str(samplingRate))
        
        # Create a spark dataframe from the input data
        inputDataFrame=spark.createDataFrame(inputList)
        
        # Register a temporary table for the data in spark's SQL context
        inputDataFrame.registerTempTable("data")
        
        # Get the filtering query from the input json
        inputFilter = inputDict['filter']
        
        # Run the sparkSQL query on the data frame and map the resulting rdd to a python list of dictionaries
        results = sqlContext.sql(inputFilter)
        filteredInputList = results.rdd.map(lambda row: row.asDict()).collect()
        print('Filtered ' + str(len(filteredInputList)) + ' out of ' + str(len(inputList)) + ' records')
        
        # Split the list of records to multiple lists that can be processed by each worker
        n = len(jobs)
        splitList= [filteredInputList[i:i+n] for i in range(0, len(filteredInputList), n)]
        
        # Convert the list to an RDD to perform parallel processing
        rdd = sc.parallelize(splitList)
        
        # Run the process on each worker parallelly using map function of RDD
        sampledRdd = rdd.map(lambda x : process(x,samplingType,samplingRate)).collect()
        
        # Combine output of each worker into a single list
        output = list(chain.from_iterable(sampledRdd))
        print(str(len(output)) + ' records out of ' + str(len(filteredInputList)) + ' filtered records have been sampled')
        
        # Validate the output data obtained against the output schema defined for the job
        jobs[0].validateOutputData(output)
        
        # Return the output after validation
        return output

In [0]:
class ParallelJob(AbstractJob):
    
    # Perform sampling from the input list for different sampling parameters
    @staticmethod
    def parallelProcess(inputList,samplingType,samplingRate):
        if samplingType == "random":
            return random.sample(inputList,int(samplingRate*len(inputList)))
        return None
    
    # Call the appropriate process function which overrides this function
    @staticmethod
    @dispatch(object)
    def process(job):
        return ParallelJob.process(job.input_json, job.input_schema, job.output_schema)
    
    # Run the logic defined in parallelProcess function for each worker
    @staticmethod
    @dispatch(str,str,str)
    def process(input_json, input_schema, output_schema):
        inputDict = json.loads(input_json)
        return ParallelJob.parallelProcess(inputDict['data'],inputDict['samplingType'],inputDict['samplingRate'])

In [0]:
# Create Parallel jobs
jobs=[]
for i in range(NO_OF_WORKERS):
    a=ParallelJob(open(DATA_PATH).read(), open(INPUT_SCHEMA_PATH).read(), open(OUTPUT_SCHEMA_PATH).read())
    jobs.append(a)

Input Data Validation Successful
Input Data Validation Successful
Input Data Validation Successful
Input Data Validation Successful
Input Data Validation Successful
Input Data Validation Successful
Input Data Validation Successful
Input Data Validation Successful
Input Data Validation Successful
Input Data Validation Successful


In [0]:
# Run the parallel jobs and measure the time taken
import time
start = time.process_time()
output=ParallelJob.runParallel(jobs)
print("Time taken for sampling : " + str(time.process_time() - start) + " seconds")

Filtered 93320 out of 191759 records
9332 records out of 93320 filtered records have been sampled
Sampling Rate : 0.1
Output Data Validation Successful
Time taken for sampling : 23.826951632000032 seconds
